In [None]:
!pip uninstall -y tensorflow-gpu
!pip uninstall -y tensorflow-io
!pip install tensorflow-gpu
!pip install --no-deps tensorflow-io

In [None]:
import os
import csv
import pandas as pd
import datetime
from itertools import groupby
from matplotlib import pyplot as plt

import tensorflow as tf 
import tensorflow_io as tfio
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, Flatten

In [None]:
#Mounting Google Drive to a Colab instance. Only to be executed if running on Colab.
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#This function loads an audio clip and resamples it to 16000 samples/second

def load_wav_16k_mono(filename):
    # Load encoded wav file
    file_contents = tf.io.read_file(filename)
    # Decode wav (tensors by channels) 
    wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)
    # Removes trailing axis
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    # Goes from 44100Hz to 16000hz - amplitude of the audio signal
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

In [None]:
#Storing samples from each class for test runs
TIGER_FILE = "/content/drive/MyDrive/dataset/TIGER/SMM01167_20220929_155802_1146_reduced_16BIT.wav"
NOT_TIGER_FILE = "/content/drive/MyDrive/dataset/NO_TIGER/SMM01167_20220929_155802_5_reduced_16BIT.wav"

In [None]:
#Loading test files and assigning them to variables
wave = load_wav_16k_mono(TIGER_FILE)
nwave = load_wav_16k_mono(NOT_TIGER_FILE)

In [None]:
#Plotting example positive and negative samples
plt.plot(wave)
plt.plot(nwave)
plt.show()

In [None]:
#This creates a dataset from the positive and negative samples and loads them onto two variables
pos = tf.data.Dataset.list_files('/content/drive/MyDrive/dataset/TIGER/*.wav')
neg = tf.data.Dataset.list_files('/content/drive/MyDrive/dataset/NO_TIGER/*.wav')

In [None]:
#Adding samples labels to each sample set. potsitive examples are assigned label '1' and negative samples are assigned label '0'
positives = tf.data.Dataset.zip((pos, tf.data.Dataset.from_tensor_slices(tf.ones(len(pos)))))
negatives = tf.data.Dataset.zip((neg, tf.data.Dataset.from_tensor_slices(tf.zeros(len(neg)))))
#Consolidating all samples under one variable sequentially
data = positives.concatenate(negatives)

In [None]:
#This function loads an audio clip and converts it to a spectrogram for further processing

def preprocess(file_path, label): 
    wav = load_wav_16k_mono(file_path)
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=320, window_fn=None)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    return spectrogram, label

In [None]:
#Extracting one sample and plotting it's spectrogram for analysis

filepath, label = positives.shuffle(buffer_size=10000).as_numpy_iterator().next()
spectrogram, label = preprocess(filepath, label)

plt.figure(figsize=(7,4))
plt.imshow(tf.transpose(spectrogram)[0])
plt.show()

In [None]:
#Creating a data loading pipeline, that shuffles the dataset and creates batches of 16 images

data = data.map(preprocess)
data = data.cache()
data = data.shuffle(buffer_size=1000)
data = data.batch(16)
data = data.prefetch(8)

In [None]:
#Splitting dataset into train and test sets
train = data.take(12)
test = data.skip(12).take(4)

In [None]:
#Extracting random example from train set
samples, labels = train.as_numpy_iterator().next()

In [None]:
#Outputs shape of example train sample. 
#This gives the input_shape for the Convolutional Neural Network
samples.shape

In [None]:
#Creating a sequential convolutional neural network model

model = Sequential()
model.add(Conv2D(16, (3,3), activation='relu', input_shape=(150,257,1)))
model.add(Conv2D(16, (3,3), activation='relu'))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

In [None]:
#Model compilation
model.compile('Adam', loss='BinaryCrossentropy', metrics=[tf.keras.metrics.Recall(),tf.keras.metrics.Precision()])

In [None]:
#Outputs model summary and model parameters
model.summary()

In [None]:
#Initiating training instance and storing results in 'hist' variable
hist = model.fit(train, epochs=3, validation_data=test)

In [None]:
#Plotting Loss
plt.title('Loss')
plt.plot(hist.history['loss'], 'r')
plt.plot(hist.history['val_loss'], 'b')
plt.show()

In [None]:
#Plotting Precision
plt.title('Precision')
plt.plot(hist.history['precision'], 'r')
plt.plot(hist.history['val_precision'], 'b')
plt.show()

In [None]:
#Plotting Recall
plt.title('Recall')
plt.plot(hist.history['recall'], 'r')
plt.plot(hist.history['val_recall'], 'b')
plt.show()

In [None]:
#Extracting an example from the test set to check model performance
X_test, y_test = test.as_numpy_iterator().next()

In [None]:
#Testing model by making prediction from extracted test sample
yhat = model.predict(X_test)
yhat = [1 if prediction > 0.5 else 0 for prediction in yhat]
print(yhat)
print(y_test)

In [None]:
#Testing functions that will be used to ensure correctness
RECORDING = os.path.join('dataset','RECORDING','SOUND_16BIT.wav')
test_wav = load_wav_16k_mono(RECORDING)
audio_slices = tf.keras.utils.timeseries_dataset_from_array(test_wav, test_wav, sequence_length=48000, sequence_stride=48000, batch_size=1)
samples, index = audio_slices.as_numpy_iterator().next()
samples.shape

In [None]:
#This function performs required preprocessing for test dataset
def preprocess_dataset(sample, index):
    sample = sample[0]
    zero_padding = tf.zeros([48000] - tf.shape(sample), dtype=tf.float32)
    wav = tf.concat([zero_padding, sample],0)
    spectrogram = tf.signal.stft(wav, frame_length=320, frame_step=320)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.expand_dims(spectrogram, axis=2)
    return spectrogram

In [None]:
#Splitting extracted sample into 3 second clips
audio_slices = tf.keras.utils.timeseries_dataset_from_array(test_wav, test_wav, sequence_length=48000, sequence_stride=48000, batch_size=1)
#Mapping preprocessing function to extracted sample
audio_slices = audio_slices.map(preprocess_dataset)
#Creating batches of 64
audio_slices = audio_slices.batch(64)

In [None]:
#Performing predictions on the extracted sample to and creating class prediction array
yhat = model.predict(audio_slices)
yhat = [1 if prediction > 0.99 else 0 for prediction in yhat]

In [None]:
#Grouping class predictions and storing call density in 'calls' variable
yhat = [key for key, group in groupby(yhat)]
calls = tf.math.reduce_sum(yhat).numpy()

In [None]:
#This code window performs the prediction on a set of audio files and stores them into an array against the name of the file

results = {}
for file in os.listdir('/content/drive/MyDrive/dataset/RECORDING'):
    FILEPATH = os.path.join('/content/drive/MyDrive/dataset/RECORDING', file)
    
    wav = load_wav_16k_mono(FILEPATH)
    audio_slices = tf.keras.utils.timeseries_dataset_from_array(wav, wav, sequence_length=48000, sequence_stride=48000, batch_size=1)
    audio_slices = audio_slices.map(preprocess_new)
    audio_slices = audio_slices.batch(16)
    
    yhat = model.predict(audio_slices)
    
    results[file] = yhat

In [None]:
for key in results.keys():
  print(len(results[key]))


In [None]:
#Converts predictions stored in the results file to class predictions
class_preds = {}
for file, logits in results.items():
    class_preds[file] = [1 if prediction > 0.99 else 0 for prediction in logits]
#class_preds

In [None]:
class_preds.items()

In [None]:
#Functions to facilitate creating a csv file and storing time stepped results
def convert(n):
    return str(datetime.timedelta(seconds = n))

def save_results(directory, dictionary):    
    try:
        os.mkdir(directory)
    except:
        pass
    
    for file, logits in class_preds.items():
      col_len = len(logits)
      sec = [x for x in range(0, col_len*3, 3)]
      form = [convert(x) for x in sec]

      df = pd.DataFrame({'Timestep': form, 'Prediction': logits}).set_index('Timestep')
      path = directory + '/'+ file[0:-4] + '_output.xlsx'
      df.to_excel(path)

In [None]:
save_results('/content/drive/MyDrive/TIMED_RESULTS', class_preds)

In [None]:
#This code window, groups consecutive prediction into a single value and outputs sum of all positive samples.
#This gives us the call density for each file
postprocessed = {}
for file, scores in class_preds.items():
    postprocessed[file] = tf.math.reduce_sum([key for key, group in groupby(scores)]).numpy()
#postprocessed

In [None]:
#Stores call density into a csv file for further study
with open('/content/drive/MyDrive/results.csv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter=',')
    writer.writerow(['Recording', 'Main Calls'])
    for key, value in postprocessed.items():
        writer.writerow([key, value])