In [63]:
import tensorflow as tf
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
# This code is to unzip the dataset, which should only be done once! 

import zipfile
zer_ref = zipfile.ZipFile("10000_30.zip", 'r')
zer_ref.extractall()

In [3]:
# Load images and labels

label_csv = pd.read_csv("2speakers_10000_30.csv")
names = label_csv["Filename"].tolist()
labels = label_csv["Speakers"].tolist()
images = []
directory = "./SPECTROGRAMS/"

count = 0
for name in names:
    name = os.path.splitext(name)[0]
    try:
        image_array = Image.open(directory+name+".png").convert('RGB')
        images.append(np.asarray(image_array))
    except Exception:
        count += 1
    
if(count > 0):
    print("Something went wrong!!! Please check it out")


In [4]:
# This code should only be run once, otherwise it computes categorical labels for the categorical labels!

# create data here, split in 3 parts. 0.7 training, 0.2 validation, 0.1 testing is used for large datasets
x_train, x_val, x_test = np.split(images, [int(.7*len(labels)), int(.9*len(labels))])
y_train, y_val, y_test = np.split(labels, [int(.7*len(labels)), int(.9*len(labels))])

# Convert all data to float32 to avoid data type errors
x_train = np.float32(x_train)
y_train = np.float32(y_train)
x_val = np.float32(x_val)
y_val = np.float32(y_val)
x_test = np.float32(x_test)
y_test = np.float32(y_test)

# Compute class weights to balance data
# Scaling by total/2 helps keep the loss to a similar magnitude
# Compute weights for all classes!
unique, counts = np.unique(y_train, return_counts=True) # only balance train data! 
print(unique, counts)
total = counts[0]+counts[1] # + counts[other classes]
weight_for_1 = (1 / counts[0])*(total)/2.0 
weight_for_2 = (1 / counts[1])*(total)/2.0
class_weight = {1: weight_for_1, 2: weight_for_2}

# Compute the categorical one-hot encoded labels
# Assume three classes instead of two, since the function expects label 0 to be a class as well
# Given that the training data only contains labels 1 and 2, the model will not learn to predict label 0 anyway!
y_train = tf.keras.utils.to_categorical(y_train, num_classes=3)
y_val = tf.keras.utils.to_categorical(y_val, num_classes=3)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=3)

[1. 2.] [2744 3903]


In [5]:
specshape=(288, 432, 3) # shape of the spectrograms

In [6]:
# The original cifar-10 model as used by 
# https://towardsdatascience.com/automatic-speaker-recognition-using-transfer-learning-6fab63e34e74

def cifar_10():
    input_layer = tf.keras.Input(shape=specshape)
    conv1 = tf.keras.layers.Conv2D(32, kernel_size=(3,3), activation="relu", padding='same')(input_layer) 
    conv2 = tf.keras.layers.Conv2D(32, kernel_size=(3,3), activation="relu")(conv1) 
    max1 = tf.keras.layers.MaxPooling2D(pool_size=(2,2))(conv2)
    drop1 = tf.keras.layers.Dropout(0.25)(max1) 
    
    conv3 = tf.keras.layers.Conv2D(64, kernel_size=(3,3), activation="relu", padding='same')(drop1) 
    conv4 = tf.keras.layers.Conv2D(64, kernel_size=(3,3), activation="relu")(conv3) 
    max2 = tf.keras.layers.MaxPooling2D(pool_size=(2,2))(conv4)
    drop2 = tf.keras.layers.Dropout(0.25)(max2) 
    
    flat1 = tf.keras.layers.Flatten()(drop2)
    dense1 = tf.keras.layers.Dense(512, activation="relu")(flat1) 
    drop3 = tf.keras.layers.Dropout(0.5)(dense1) 
    output_layer = tf.keras.layers.Dense(3, activation="softmax")(drop3) 
    
    model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
    
    return model

In [7]:
# The CNN taken from 
# https://medium.com/x8-the-ai-community/audio-classification-using-cnn-coding-example-f9cbd272269e

def build_model():
    
    input_layer = tf.keras.Input(shape=specshape)
    conv1 = tf.keras.layers.Conv2D(32, kernel_size=(3,3), activation="relu")(input_layer) 
    conv2 = tf.keras.layers.Conv2D(64, kernel_size=(3,3), activation="relu")(conv1) 
    max1 = tf.keras.layers.MaxPooling2D(pool_size=(2,2))(conv2)
    drop1 = tf.keras.layers.Dropout(0.1)(max1) #originally 0.25, but 25% of 64 units is quite a lot
    flat1 = tf.keras.layers.Flatten()(drop1)
    dense1 = tf.keras.layers.Dense(128, activation="relu")(flat1) #128
    drop2 = tf.keras.layers.Dropout(0.2)(dense1) #originally 0.5, but 50% of 128 units is quite a lot 
    output_layer = tf.keras.layers.Dense(3, activation="softmax")(drop2)
    model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
    
    return model

In [8]:
# Create the model
model = build_model()

In [9]:
# Compile the model
# Using Adams optimizer with learning rate 0.001
# Using categorical crossentropy loss to deal with the one-hot encoded labels
# Since there are three labels (0, 1, 2) rather than two (1, 2), binary crossentropy loss cannot be used
# Using accuracy as metric
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.001), loss=tf.keras.losses.categorical_crossentropy, metrics=['accuracy'])

In [10]:
# Callbacks for reducing the learning rate and stopping training early when the performance drops
# Use the validation loss to monitor the performance. 

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1,
                              patience=0, min_lr=0.00001)
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', min_delta=0, patience=5, verbose=0, mode='auto',
    baseline=None, restore_best_weights=True
)

In [11]:
# Custom callback to plot confusion matrices after every epoch

from sklearn.metrics import confusion_matrix as cm

class ConfusionMatrix(tf.keras.callbacks.Callback):
    
    x = []
    y = []
    model = []
    
    def setup(self, model, validation_x, validation_y):
        self.model = model
        self.x = validation_x
        self.y = validation_y
    
    def on_epoch_end(self, epoch, logs={}):
        print("Confusion matrix")
        y_prob = self.model.predict(self.x)
        y_pred = np.argmax(y_prob, axis=1)
        y_true = np.argmax(self.y, axis=1)
        print(cm(y_true, y_pred))


In [12]:
# Initialize the confusion matrix callback with the model and validation data
cm_val = ConfusionMatrix()
cm_val.setup(model, x_val, y_val)

In [13]:
# Train the model using a batch size of 16 and a max of 20 epochs
# Also use the callbacks and the class weights!
model.fit(x_train, y_train, batch_size=16, epochs=20, validation_data=(x_val, y_val), callbacks=[reduce_lr, early_stop, cm_val], class_weight = class_weight)

  ...
    to  
  ['...']
  ...
    to  
  ['...']
Train on 6647 samples, validate on 1900 samples
Epoch 1/20
[[592 185]
 [216 907]]
Epoch 2/20
[[615 162]
 [230 893]]
Epoch 3/20
[[621 156]
 [197 926]]
Epoch 4/20
[[623 154]
 [162 961]]
Epoch 5/20
[[622 155]
 [159 964]]
Epoch 6/20
[[615 162]
 [150 973]]
Epoch 7/20
[[615 162]
 [230 893]]
Epoch 8/20
[[615 162]
 [230 893]]
Epoch 9/20
[[615 162]
 [230 893]]
Epoch 10/20
[[615 162]
 [230 893]]
Epoch 11/20
[[615 162]
 [230 893]]
Epoch 12/20
[[615 162]
 [230 893]]
Epoch 13/20
[[615 162]
 [230 893]]
Epoch 14/20
[[615 162]
 [230 893]]
Epoch 15/20
[[615 162]
 [230 893]]
Epoch 16/20
[[615 162]
 [230 893]]
Epoch 17/20
[[615 162]
 [230 893]]
Epoch 18/20
[[615 162]
 [230 893]]
Epoch 19/20
[[615 162]
 [230 893]]
Epoch 20/20
[[615 162]
 [230 893]]


<tensorflow.python.keras.callbacks.History at 0x12b7eeb1848>

In [14]:
# Save the model
model.save("asrmodel10000_30_cnn1_3.h5")

In [15]:
# Evaluate the model on the test data
model.evaluate(x_test, y_test, batch_size=16)



[0.545062582367345, 0.76842105]

In [7]:
model = tf.keras.models.load_model("asrmodel10000_30_cnn1_3.h5")

In [8]:
# Make predictions for the entire test data to obtain the performance per class

true_pos = 0 # true label 2 and prediction 2
true_neg = 0  # true label 1 and prediction 1
false_pos = 0 # true label 1 and prediction 2
false_neg = 0 # true label 2 and prediction 1
zeros = 0 # just checking if the model ever predicts 0 speakers

false_pos_i = []
false_neg_i = []
zeros_i = []

for i in range(950): # 950 is the number of test images!
    prediction = model.predict(x_test[i:i+1])[0] 
    pred = np.argmax(prediction)
    true = np.argmax(y_test[i])
    if(pred == true and pred == 1):
        true_neg += 1
    if(pred == true and pred == 2):
        true_pos += 1
    if(pred != true and pred == 1):
        false_neg += 1
        false_neg_i.append(i)
    if(pred != true and pred == 2):
        false_pos += 1
        false_pos_i.append(i)
    if(pred == 0):
        zeros += 1
        zeros_i.append(i)

In [9]:
# Print the scores computed above

print("actual ones:", true_neg + false_pos)
print("correctly predicted ones:", true_neg)
print("actual twos:", true_pos + false_neg)
print("correctly predicted twos:", true_pos)
print("zero predictions:", zeros)

actual ones: 381
correctly predicted ones: 297
actual twos: 569
correctly predicted twos: 433
zero predictions: 0


In [68]:
precision = true_pos/(true_pos+false_pos)
print(precision)
recall = true_pos/(true_pos+false_neg)
print(recall)
f = (2 * precision * recall) / (precision + recall)
print(f)

0.8375241779497099
0.7609841827768014
0.7974217311233887
