# Code for the experiment
By Loes (and Merel)

In [None]:
import tensorflow as tf
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
directory = 'spectograms/'
label_csv = pd.read_csv("secdata.csv")
names = label_csv["Filename"].tolist()

images = []
count = 0
for i in range(len(names)):
    name = str(directory+names[i][:-4]+".png")
    try:
        im = Image.open(name).convert("RGB")
        images.append(np.asarray(im))
    except Exeption:
        print("something went wrong!!")
        print("with name = ", names[i])
        print("and index i =", i)
        count += 1
        
print(count)
if(count > 0):
    print("Something went wrong!!! Please check it out")

In [None]:
#Do not use the data you do not need
for i in reversed(range(len(label_csv))):
    if(label_csv.iloc[i]["Speakers"] > 5):
        del images[i]
        label_csv = label_csv.drop(label_csv.index[i])
        
print(len(images))
print(len(label_csv))

images = images[0:3000]
labels = label_csv["Speakers"].head(3000).tolist()

print(len(images))
print(len(labels))

In [None]:
unique, counts = np.unique(labels, return_counts=True) #Count total amount of files per label
print(unique, counts)

In [None]:
# create data here, split in 3 parts. 0.7 training, 0.2 validation, 0.1 testing is used for large datasets
x_train, x_val, x_test = np.split(images, [int(.7*len(labels)), int(.9*len(labels))])
y_train, y_val, y_test = np.split(labels, [int(.7*len(labels)), int(.9*len(labels))])

In [None]:
"""
#IF YOU WANT TO UNDERSAMPLE THE TRAINING SET 
from imblearn.under_sampling import RandomUnderSampler

x_train = np.reshape(x_train, (x_train.shape[0], 288*432*3)) #for undersampling

undersample = RandomUnderSampler(sampling_strategy='not minority')
xtrainsample, ytrainsample = undersample.fit_resample(x_train, y_train)

xtrainsample = np.reshape(xtrainsample, (xtrainsample.shape[0], 288, 432, 3))

unique, counts = np.unique(ytrainsample, return_counts=True) # count everything in the labels for the report :) 
print(unique, counts)

x_train = np.float(xtrainsample)
y_train = np.float(ytrainsample)
"""

In [None]:
y_test = np.float32(y_test)
y_val = np.float32(y_val)
x_train = np.float32(x_train)
x_test = np.float32(x_test)
x_val = np.float32(x_val)
y_train = np.float32(y_train)

In [None]:
# Compute class weights to balance data
# Scaling by total/5 helps keep the loss to a similar magnitude.
# The sum of the weights of all examples stays the same.
# needs to be done for each class! 
unique, counts = np.unique(y_train, return_counts=True) # only balance train data
print(unique, counts)

total = counts[0]+counts[1] + counts[2] + counts[3] + counts[4] 
weight_for_1 = (1 / counts[0])*(total)/5.0 
weight_for_2 = (1 / counts[1])*(total)/5.0
weight_for_3 = (1 / counts[2])*(total)/5.0
weight_for_4 = (1 / counts[3])*(total)/5.0
weight_for_5 = (1 / counts[4])*(total)/5.0
class_weight = {1: weight_for_1, 2: weight_for_2, 3: weight_for_3, 4: weight_for_4, 5: weight_for_5}
print(class_weight)

In [None]:
#Creating one-hot notations
y_train = tf.keras.utils.to_categorical(y_train, num_classes=6)
y_val = tf.keras.utils.to_categorical(y_val, num_classes=6)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=6)

In [None]:
specshape=(288, 432, 3) #shape of the data

In [None]:
#Imports for the neural networks
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, Conv1D, MaxPooling2D, MaxPooling1D, Dropout, LSTM, Dense, TimeDistributed, Flatten, BatchNormalization
from tensorflow.keras.models import Sequential

In [None]:
# The CountNet model 
# aka model A
def build_model_countnet():
    
    model = keras.Sequential()
    
    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=1.)
    
    model.add(Conv2D(filters = 64, kernel_size=(3,3), activation='relu',kernel_initializer=initializer, input_shape=specshape))
    model.add(Conv2D(filters=32, kernel_size=(3,3), activation='relu', kernel_initializer=initializer))
    model.add(MaxPooling2D(pool_size=(3,3)))
    
    model.add(Conv2D(filters =128, kernel_size=(3,3), activation='relu', kernel_initializer=initializer))
    model.add(Conv2D(filters=64, kernel_size=(3,3), activation='relu', kernel_initializer=initializer))
    model.add(MaxPooling2D(pool_size=(3,3)))
    model.add(Dropout(0.25))

    model.add(TimeDistributed(Flatten()))
    
    model.add(LSTM(40, return_sequences=True))
    model.add(MaxPooling1D(pool_size=2))
   
    model.add(Flatten()) #added this...
    model.add(Dense(6, activation='softmax'))
  
    return model

In [None]:
#https://github.com/hamzag95/keras/blob/master/examples/cifar10_cnn.py
#aka TowardsDataScience model 
#aka model B
def build_model_cifar10():
    
    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=1.)
    
    input_layer = tf.keras.Input(shape=specshape)
    conv1 = tf.keras.layers.Conv2D(32, kernel_size=(3,3), activation="relu", kernel_initializer=initializer)(input_layer)
    conv2 = tf.keras.layers.Conv2D(32, kernel_size=(3,3), activation="relu", kernel_initializer=initializer)(conv1)
    max1 = tf.keras.layers.MaxPooling2D(pool_size=(2,2))(conv2)
    drop1 = tf.keras.layers.Dropout(0.25)(max1)
    
    conv3 = tf.keras.layers.Conv2D(64, kernel_size=(3,3), activation="relu", kernel_initializer=initializer)(drop1)
    conv4 = tf.keras.layers.Conv2D(64, kernel_size=(3,3), activation="relu", kernel_initializer=initializer)(conv3)
    max2 = tf.keras.layers.MaxPooling2D(pool_size=(2,2))(conv4)
    drop2 = tf.keras.layers.Dropout(0.25)(max2) 
    
    flat1 = tf.keras.layers.Flatten()(drop2)
    dense1 = tf.keras.layers.Dense(256, activation="relu", kernel_initializer=initializer)(flat1) #originally 512
    drop3 = tf.keras.layers.Dropout(0.25)(dense1) #originally 0.5
    output_layer = tf.keras.layers.Dense(6, activation="softmax", kernel_initializer=initializer)(drop3)
    model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
    
    return model


In [None]:
#aka model C
def build_model_andrei():     
    model = keras.Sequential()
    
    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=1.)
    model.add(Conv2D(filters = 32, kernel_size=(8,8), activation='relu',kernel_initializer=initializer, input_shape=specshape))
    model.add(MaxPooling2D(pool_size=(1,1)))
    model.add(Conv2D(filters = 64, kernel_size=(6,6), activation='relu',kernel_initializer=initializer))
    model.add(MaxPooling2D(pool_size=(2,2)))
    model.add(Conv2D(filters = 128, kernel_size=(4,4), activation='relu',kernel_initializer=initializer))
    model.add(MaxPooling2D(pool_size=(1,1)))
    
    #not using the 2 extra conv blocks..
    
    model.add(Dropout(0.25))
    model.add(BatchNormalization())
    
    model.add(Dense(128, activation='relu',kernel_initializer=initializer)) #originally 1024
    model.add(Dropout(0.1))
    model.add(Dense(64, activation='relu',kernel_initializer=initializer)) #originally 512
    model.add(Dropout(0.1))
    model.add(Dense(32, activation='relu',kernel_initializer=initializer)) #originally 256
    model.add(Dropout(0.1)) #originally 0.5
    
    model.add(Flatten())
    model.add(Dense(6, activation='softmax', kernel_initializer=initializer))
   
    return model 

In [None]:
# Create model
model = build_model_andrei()
model.summary()

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.001), loss=tf.keras.losses.categorical_crossentropy, metrics=['accuracy'])#, class_weight=class_weight)

In [None]:
#Callbacks
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1,
                              patience=0, min_lr=0.00001)
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', min_delta=0, patience=3, verbose=0, mode='auto',
    baseline=None, restore_best_weights=True
)

In [None]:
#Print the confusionmatrix after each epoch
from sklearn.metrics import confusion_matrix as cm

class ConfusionMatrix(tf.keras.callbacks.Callback):
    
    x = []
    y = []
    model = []
    
    def setup(self, model, validation_x, validation_y):
        self.model = model
        self.x = validation_x
        self.y = validation_y
    
    def on_epoch_end(self, epoch, logs={}):
        print("Confusion matrix")
        y_prob = self.model.predict(self.x)
        y_pred = np.argmax(y_prob, axis=1)
        y_true = np.argmax(self.y, axis=1)
        print(cm(y_true, y_pred))


In [None]:
cm_val = ConfusionMatrix()
cm_val.setup(model, x_val, y_val)

In [None]:
#Print the counts for train, validation and test set again
unique, counts = np.unique(y_train.argmax(axis=1), return_counts=True) # only balance train data! 
print('train set = ')
print(unique, counts)

unique, counts = np.unique(y_val.argmax(axis=1), return_counts=True) # only balance train data! 
print('val set = ')
print(unique, counts)

unique, counts = np.unique(y_test.argmax(axis=1), return_counts=True) # only balance train data! 
print('test set = ')
print(unique, counts)

In [None]:
model.fit(x_train, y_train, batch_size=16, epochs=10, validation_data=(x_val, y_val), callbacks=[reduce_lr, early_stop, cm_val], class_weight = class_weight)

In [None]:
preds = model.predict(x_test)
onehot_preds = np.zeros((x_test.shape[0], 6))
i = 0 
for pred in preds:
    onehot_preds[i][np.argmax(preds[i])] = 1
    i += 1

In [None]:
from sklearn.metrics import confusion_matrix

cmatrix = confusion_matrix(y_test.argmax(axis=1), onehot_preds.argmax(axis=1))

print(cmatrix)

import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt

df_cm = pd.DataFrame(cmatrix, range(5), range(5))
# plt.figure(figsize=(10,7))
sn.set(font_scale=1.4) # for label size

sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}) # font size

plt.show()

In [None]:
model.evaluate(x_test, y_test, batch_size=16)