<a href="https://colab.research.google.com/github/mcui5/dl-final/blob/main/CNNLSTM_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf 
from tensorflow.keras import Model
import numpy as np
import pickle 
import os
import pandas as pd 


from google.colab import drive #Can ignore if done locally
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
def preprocess(filepath): 
  """
    1. Unpickle file
    2. Separate 
    3. One-hot encode labels 

    :inputs: 
    filepath: filepath to the pickle file in Drive 

    :returns: 
    (inputs, labels, folders)
  """
  
  with open(filepath, 'rb') as fo:
    pickle_output = pickle.load(fo, encoding='bytes')
  
  inputs = [row[0] for row in pickle_output]
  inputs = [inputs[i][:173] for i in range(len(inputs))]
  inputs = np.array(inputs)
  labels = np.array(pickle_output)[:, 1]
  folders = np.array(pickle_output)[:, 2]

  return (inputs, labels, folders)

def split(inputs, labels, folders, test_folder_idx):
  """
    Split data into training and testing data 

    :inputs: 
    the outputs from preprocess 
    test_folder_idx: index of the folder that will be used for testing

    :return: 
    one quadruple, (train_inputs, train_labels, test_inputs, test_labels)
  """
  test_indices = np.nonzero(folders == test_folder_idx)
  train_indices = np.nonzero(folders != test_folder_idx)

  return (inputs[train_indices], labels[train_indices], inputs[test_indices], labels[test_indices])

In [3]:
def shuffle(inputs, labels, test_fraction):
  '''
  shuffle collection of all data, and split into testing and training, 15%:85%

  :inputs: 
    the outputs from preprocess (inputs and labels)
    test_fraction: percentage of inputs that will be used for testing
  
  :return: 
    one quadruple, (train_inputs, train_labels, test_inputs, test_labels)
  '''
  indices = np.arange(labels.shape[0])
  np.random.shuffle(indices)
  inputs = np.take(inputs, indices, axis=0)
  labels = np.take(labels, indices, axis=0)

  num_test = int(test_fraction * labels.shape[0])
  test_inputs = inputs[:num_test]
  test_labels = labels[:num_test]
  train_inputs = inputs[num_test:]
  train_labels = labels[num_test:]

  return (train_inputs, train_labels, test_inputs, test_labels)

In [6]:
class CNNLSTMModel(tf.keras.Model):
  """
    Model based on LSTM3 in Table III of the paper
  """
  def __init__(self, num_batches):
    super(CNNLSTMModel, self).__init__()

    self.lstm_dropout = 0.2
    self.dropout_rate = 0.8 
    self.lstm_size = 256
    self.batch_size = num_batches

    #adam optimizer
    self.optimizer = tf.keras.optimizers.Adam(lr = 1e-4)

    #initialize layers
    
    self.lstm1 = tf.keras.layers.LSTM(self.lstm_size, dropout=self.lstm_dropout)
    self.dense1 = tf.keras.layers.Dense(10, activation='softmax')

    self.conv1 = tf.keras.layers.Conv2D(filters=4, kernel_size=(5,5), strides=(4,1), activation='relu')
    self.conv2 = tf.keras.layers.Conv2D(filters=16, kernel_size=(3,3), strides=(2,1), activation='relu')
    self.conv3 = tf.keras.layers.Conv2D(filters=64, kernel_size=(2,2), strides=(2,1), activation='relu')
   
    self.conv4 = tf.keras.layers.Conv2D(filters=300, kernel_size=(2,2), strides=(1,1), activation='relu')

    self.maxpool1 = tf.keras.layers.MaxPooling2D(pool_size=(3, 1), strides=(2,1))
    self.maxpool2 = tf.keras.layers.MaxPooling2D(pool_size=(3, 1), strides=(2,1))

    self.dropout1 = tf.keras.layers.Dropout(self.dropout_rate)
    self.dropout2 = tf.keras.layers.Dropout(self.dropout_rate)
    self.dropout3 = tf.keras.layers.Dropout(self.dropout_rate)
    
  def call(self, inputs):
    '''
      Forward pass though layers 

      :inputs: 
      model: tensorflow model
      inputs: list features of sound files

      :return: 
      probs: The batch element probabilities as a tensor
    '''
    inputs = tf.expand_dims(inputs, axis=3)
    convlayer1 = self.conv1(inputs)
    maxpool1 = self.maxpool1(convlayer1)

    drop1 = self.dropout1(maxpool1)
    convlayer3 = self.conv3(drop1)
    maxpool2 = self.maxpool2(convlayer3)

    drop2 = self.dropout2(maxpool2)
    convlayer4 = self.conv4(drop2)
    drop3 = self.dropout3(convlayer4)

    reshape = tf.reshape(drop3, (self.batch_size,300,-1))

    lstm = self.lstm1(reshape)
    dense = self.dense1(lstm)

    return dense

  def loss(self, logits, labels):
    """
    Calculates average cross entropy loss of the prediction

    :param logits: a matrix of logits as a tensor
    :param labels: matrix of labels containing the labels
    :return: the loss of the model as a tensor of size 1

    As cited in the paper, Table IV Experimental Results, authors used Categorical
  cross entropy loss to measure their CNN+LSTM models
    """

    losses = tf.keras.losses.categorical_crossentropy(labels, logits, from_logits=True)
    return tf.reduce_mean(losses)

  def accuracy(self, probabilities, labels):
    """
    returns TOTAL NUMBER correct over a batch (does not average)
    """ 
    correct_predictions = tf.equal(tf.argmax(probabilities, 1), tf.argmax(labels, 1))    
    return tf.reduce_sum(tf.cast(correct_predictions, tf.float32))
  
  def accuracy_2(self, probabilities, labels): 
    """
      returns the TOTAL NUMBER correct for our binary classification 
      (dangerous versus non-dangerous sounds)
    """
    # dangerous labels include car_horn, dog_bark, drilling, gun_shot, jackhammer, siren
    DANGEROUS_LABELS = [1, 3, 4, 6, 7, 8]  

    # get the correct classification
    classified = tf.argmax(probabilities, 1)
    # for each classification, classify it as dangerous or not dangerous 
    classified_binary = tf.map_fn(lambda x: x in DANGEROUS_LABELS, classified)
    
    # get the label, and for each label classify it as dangerous or not dangerous
    labels_classes = tf.argmax(labels, 1)
    labels_binary = tf.map_fn(lambda x: x in DANGEROUS_LABELS, labels_classes)

    # count the overlap and return the number correct in the given batch 
    correct_predictions = tf.equal(classified_binary, labels_binary)
    return tf.reduce_sum(tf.cast(correct_predictions, tf.float32))

In [7]:
def train(model, train_inputs, train_labels):
  """
  Runs through one epoch - all training examples.

  :param model: the initialized model to use for forward and backward pass
  :param train_inputs: input train data (all data for training) 
  :param train_labels: labels train data (all data for training)
  :returns: None
  """
  for i in range(len(train_inputs) // model.batch_size):
      # getting the proper batch 
      start = i * model.batch_size 
      inputs = train_inputs[start : start + model.batch_size]
      labels = train_labels[start : start + model.batch_size]


      with tf.GradientTape() as tape:
          # forward pass 
          logits = model.call(inputs)
          loss = model.loss(logits, labels)
                  
      # backprop 
      gradients = tape.gradient(loss, model.trainable_variables)
      model.optimizer.apply_gradients(zip(gradients, model.trainable_variables))

def test(model, test_inputs, test_labels, binary=False): 
  """
    :param model: the trained model to use for prediction
    :param test_input: test inputs (all inputs for testing) 
    :param test_input: test labels (all labels for testing)
    returns the TOTAL accuracy for a single FOLDER 
  """
  num_batches = len(test_inputs) // model.batch_size
  total_right = 0

  for i in range(num_batches):
      # getting the proper batch 
      start = i * model.batch_size 
      inputs = test_inputs[start : start + model.batch_size]
      labels = test_labels[start : start + model.batch_size]

      # calling the model to get our probabilities 
      probabilities = model.call(inputs)
      if binary: 
        total_right += model.accuracy_2(probabilities, labels)
      else: 
        total_right += model.accuracy(probabilities, labels)
  
  return total_right

In [8]:
pickled_path = '/content/gdrive/Shared drives/CS1470-Final/mfccs.pkl'
inputs, labels, folders = preprocess(pickled_path)

batch_size, num_epochs = 100, 250
  
accuracy, binary_accuracy = 0, 0 
total_tested = 0 
for i in range(10): 
  print("Split/test folder: ", i + 1) 
  model = CNNLSTMModel(batch_size)
  tr_in, tr_lb, te_in, te_lb = split(inputs, labels, folders, i + 1)
  tr_in = tf.convert_to_tensor(tr_in, dtype=tf.float32)
  te_in = tf.convert_to_tensor(te_in, dtype=tf.float32)
  tr_lb = tf.one_hot(tr_lb, 10, dtype=tf.int64)
  te_lb = tf.one_hot(te_lb, 10, dtype=tf.int64)

  for i in range(num_epochs): 
    train(model, tr_in, tr_lb)
    print(i)

  per_fold_acc, binary_fold_acc = test(model, te_in, te_lb), test(model, te_in, te_lb, True)
  accuracy += per_fold_acc
  binary_accuracy += binary_fold_acc
     
  per_fold_tested = (len(te_lb) - (len(te_lb) % batch_size))
  total_tested += per_fold_tested
  print('per-fold acc (10-class): ' + str(per_fold_acc / per_fold_tested))
  print('per-fold acc (BINARY):   ' + str(binary_fold_acc / per_fold_tested))
  
print("Total Average Accuracy (10-class): ", accuracy / total_tested)
print("Total Average Accuracy (BINARY):   ", binary_accuracy / total_tested)

Split/test folder:  1


KeyboardInterrupt: ignored

In [None]:
'''
Shuffled version: extracted 15% for testing (follows paper) - Inflated scores
'''
pickled_path = '/content/gdrive/Shared drives/CS1470-Final/mfccs.pkl'
inputs, labels, folders = preprocess(pickled_path)

batch_size, num_epochs = 100, 250

model = CNNLSTMModel(batch_size)
print(np.shape(inputs))
tr_in, tr_lb, te_in, te_lb = shuffle(inputs, labels, 0.15)

tr_in = tf.convert_to_tensor(tr_in, dtype=tf.float32)
te_in = tf.convert_to_tensor(te_in, dtype=tf.float32)
tr_lb = tf.one_hot(tr_lb, 10, dtype=tf.int64)
te_lb = tf.one_hot(te_lb, 10, dtype=tf.int64)

for i in range(num_epochs): 
  train(model, tr_in, tr_lb)
  print(i)

acc, binary_acc = test(model, te_in, te_lb), test(model, te_in, te_lb, True)
print(np.shape(te_in))
print(np.shape(te_lb))
tested = (len(te_lb) - (len(te_lb) % batch_size))
print("10 class Accuracy: ", acc / tested)
print("BINARY Accuracy: ", binary_acc / tested)