In [None]:
# Step 1: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Step 2: Define Folder Paths
POS = '/content/drive/MyDrive/audio/data/Parsed_Violence_Clips'
NEG = '/content/drive/MyDrive/audio/data/Parsed_Not_Violence_Clips'

# Step 3: Install TensorFlow and TensorFlow I/O
%pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 matplotlib
%pip install tensorflow-io

# Step 4: Import Necessary Libraries and Define the Data Loading Function
import tensorflow as tf
import tensorflow_io as tfio
from matplotlib import pyplot as plt
import os

def load_wav_16k_mono(filename):
    # Load the file
    file_contents = tf.io.read_file(filename)
    # Decode the WAV file
    wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)
    # Resample to 16kHz
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=1600)0
    return wav

# Step 5: Define Paths to Files and Plot Waves
VIOLENT_FILE = os.path.join(POS, 'XC3776-3.wav')
NOT_VIOLENT_FILE = os.path.join(NEG, 'CVBD8.wav')

wave = load_wav_16k_mono(VIOLENT_FILE)
nwave = load_wav_16k_mono(NOT_VIOLENT_FIL)E
plt.plot(wave)
plt.plot(nwave)
plt.show()

# Step 6: Create TensorFlow Dataset
pos = tf.data.Dataset.list_files(os.path.join(POS, '*.wav'))
neg = tf.data.Dataset.list_files(os.path.join(NEG, '*.wav'))

# Step 7: Add labels and Combine Positive and Negative Samples
positives = tf.data.Dataset.zip((pos, tf.data.Dataset.from_tensor_slices(tf.ones(len(pos)))))
negatives = tf.data.Dataset.zip((neg, tf.data.Dataset.from_tensor_slices(tf.zeros(len(neg)))))
data = positives.concatenate(negatives)

# Step 8: Determine Average Length of a Capuchin Call
lengths = []

for file in os.listdir(POS):
    tensor_wave = load_wav_16k_mono(os.path.join(POS, file))
    lengths.append(len(tensor_wave))

mean_length = tf.math.reduce_mean(lengths)
min_length = tf.math.reduce_min(lengths)
max_length = tf.math.reduce_max(lengths)

print(f"Mean Length: {mean_length}")
print(f"Min Length: {min_length}")
print(f"Max Length: {max_length}")

# Step 9: Build Preprocessing Function to Convert to Spectrogram
def preprocess(file_path, label):
    wav = load_wav_16k_mono(file_path)
    wav = wav[:48000]
    zero_padding = tf.zeros([48000] - tf.shape(wav), dtype=tf.float32)
    wav = tf.concat([zero_padding, wav],0)
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    # Ensure the spectrogram has a fixed shape
    spectrogram.set_shape([1491, 257, 1])  # Set the shape explicitly
    return spectrogram, label

# Step 10: Test Out the Function and Viz the Spectrogram
filepath, label = positives.shuffle(buffer_size=10000).as_numpy_iterator().next()
spectrogram, label = preprocess(filepath, label)
plt.figure(figsize=(30,20))
plt.imshow(tf.transpose(spectrogram)[0])
plt.show()

# Step 11: Create Training and Testing Partitions
data = data.map(preprocess)
data = data.cache()
data = data.shuffle(buffer_size=1000)
data = data.batch(16)
data = data.prefetch(8)

train = data.take(36)
test = data.skip(36).take(15)

samples, labels = train.as_numpy_iterator().next()
samples.shape

# Step 12: Build Deep Learning Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, Flatten

model = Sequential()
model.add(Conv2D(16, (3,3), activation='relu', input_shape=(1491, 257,1)))
model.add(Conv2D(16, (3,3), activation='relu'))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(1, activation='sigmoid'))
model.compile('Adam', loss='BinaryCrossentropy', metrics=[tf.keras.metrics.Recall(),tf.keras.metrics.Precision()])
model.summary()

hist = model.fit(train, epochs=4, validation_data=test)
plt.title('Loss')
plt.plot(hist.history['loss'], 'r')
plt.plot(hist.history['val_loss'], 'b')
plt.show()
plt.title('Precision')
plt.plot(hist.history['precision'], 'r')
plt.plot(hist.history['val_precision'], 'b')
plt.show()
plt.title('Recall')
plt.plot(hist.history['recall'], 'r')
plt.plot(hist.history['val_recall'], 'b')
plt.show()

# Step 13: Make a Prediction on a Single Clip
X_test, y_test = test.as_numpy_iterator().next()
yhat = model.predict(X_test)
yhat = [1 if prediction > 0.5 else 0 for prediction in yhat]

# Step 14: Build Forest Parsing Functions
def load_mp3_16k_mono(filename):
    res = tfio.audio.AudioIOTensor(filename)
    tensor = res.to_tensor()
    tensor = tf.math.reduce_sum(tensor, axis=1) / 2
    sample_rate = res.rate
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    wav = tfio.audio.resample(tensor, rate_in=sample_rate, rate_out=16000)
    return wav
mp3 = os.path.join('data', 'Environment Recordings', 'recording_00.mp3')
wav = load_mp3_16k_mono(mp3)
audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=48000, sequence_stride=48000, batch_size=1)
samples, index = audio_slices.as_numpy_iterator().next()
## 9.2 Build Function to Convert Clips into Windowed Spectrograms
def preprocess_mp3(sample, index):
    sample = sample[0]
    zero_padding = tf.zeros([48000] - tf.shape(sample), dtype=tf.float32)
    wav = tf.concat([zero_padding, sample],0)
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=32)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    return spectrogram
## 9.3 Convert Longer Clips into Windows and Make Predictions
audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=16000, sequence_stride=16000, batch_size=1)
audio_slices = audio_slices.map(preprocess_mp3)
audio_slices = audio_slices.batch(64)
yhat = model.predict(audio_slices)
yhat = [1 if prediction > 0.5 else 0 for prediction in yhat]
## 9.4 Group Consecutive Detections
from itertools import groupby
yhat = [key for key, group in groupby(yhat)]
calls = tf.math.reduce_sum(yhat).numpy()
calls
# 10. Make Predictions
## 10.1 Loop over all recordings and make predictions
results = {}
for file in os.listdir(os.path.join('data', 'Environment Recordings')):
    FILEPATH = os.path.join('data','Environment Recordings', file)

    wav = load_mp3_16k_mono(FILEPATH)
    audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=48000, sequence_stride=48000, batch_size=1)
    audio_slices = audio_slices.map(preprocess_mp3)
    audio_slices = audio_slices.batch(64)

    yhat = model.predict(audio_slices)

    results[file] = yhat
results
## 10.2 Convert Predictions into Classes
class_preds = {}
for file, logits in results.items():
    class_preds[file] = [1 if prediction > 0.99 else 0 for prediction in logits]
class_preds
## 10.3 Group Consecutive Detections
postprocessed = {}
for file, scores in class_preds.items():
    postprocessed[file] = tf.math.reduce_sum([key for key, group in groupby(scores)]).numpy()
postprocessed
# 11. Export Results
import csv
with open('results.csv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter=',')
    writer.writerow(['recording', 'violence_calls'])
    for key, value in postprocessed.items():
        writer.writerow([key, value])