## Import Required Modules

In [46]:
import os
import sys
import numpy as np
import random
from audio import read_mfcc
from batcher import sample_from_mfcc
from constants import SAMPLE_RATE, NUM_FRAMES
from conv_models import DeepSpeakerModel
from test import batch_cosine_similarity
from scipy.stats import entropy
import tensorflow as tf
import torch
import torch.nn as nn
import torch.nn.functional as F
import logging
import matplotlib.pyplot as plt
from tabulate import tabulate
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torch.optim as optim

## Setup Environment

In [47]:
is_cli = 'nn.py' in sys.argv[0]

In [48]:
np.random.seed(1234)
random.seed(1234)

In [49]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
logging.getLogger('tensorflow').setLevel(logging.FATAL)

## Import Pre-trained Model  

In [50]:
if is_cli:
    print('Loading pre-trained model...')
model = DeepSpeakerModel()
model.m.load_weights('ResCNN_triplet_training_checkpoint_265.h5', by_name=True)

In [55]:
class NoneOfTheAboveDataset(Dataset):
    def __init__(self, in_dataset_dir, not_in_dataset_dir, limit=1000):
        outputs = []
        labels = []

        # iterate through true samples
        in_speakers = [f for f in os.listdir(in_dataset_dir) if f != '.DS_Store']
        for speaker in in_speakers:
            # only train on the first 5 clips for each speaker
            for clip in os.listdir(f'{in_dataset_dir}/{speaker}'):
                if 'npy' not in clip:
                    continue

                output = np.load(f'{in_dataset_dir}/{speaker}/{clip}')[0]
                outputs.append(output)
                labels.append(1)

        # iterate through false samples
        out_speakers = [f for f in os.listdir(not_in_dataset_dir) if f != '.DS_Store']

        random.shuffle(out_speakers)

        out_speakers = out_speakers[:limit]

        for speaker in out_speakers:
            for clip in os.listdir(f'{not_in_dataset_dir}/{speaker}'):
                if 'npy' not in clip:
                    continue

                output = np.load(f'{not_in_dataset_dir}/{speaker}/{clip}')[0]
                outputs.append(output)
                labels.append(0)

        self.outputs = np.array(outputs)
        self.labels = np.array(labels)

    def __len__(self):
        return len(self.outputs)

    def __getitem__(self, idx):
        return self.outputs[idx], self.labels[idx]      
    

class ClassifierDataset(Dataset):
    """Load numpy files from directory structure where each numpy file represents
    the extracted features from the pre-trained model"""
    
    def __init__(self, dirs, train):
        self.dir = dir

        outputs = []
        labels = []

        speakers = [f for f in os.listdir(dir) if f != '.DS_Store']
        for i, speaker in enumerate(speakers):
            for clip in os.listdir(f'{dir}/{speaker}'):
                if 'npy' not in clip:
                    continue

                # we want to only train on clips 0 and 1
                if train:
                    if '0' not in clip:
                        continue

                output = np.load(f'{dir}/{speaker}/{clip}')
                output = output[0]

                outputs.append(output)
                labels.append(i)

        self.outputs = np.array(outputs)
        self.labels = np.array(labels)

    def __len__(self):
        return len(self.outputs)

    def __getitem__(self, idx):
        return self.outputs[idx], self.labels[idx]

In [62]:
# note: this assumes you have run `split_audio()` and
# `create_and_store_mfcc()` from `audio_processing.ipynb`
dir = 'mfcc/split/SherlockHolmes'

classes = [f for f in os.listdir(dir) if f != '.DS_Store']

training_dataset = ClassifierDataset(dir, train=True)
testing_dataset = ClassifierDataset(dir, train=False)

print(len(training_dataset))

none_of_the_above_dataset = NoneOfTheAboveDataset('mfcc/split/SherlockHolmes', 'mfcc/split/Accents', limit=20)

17


In [57]:
batch_size = 16

# don't shuffle test data, we want clips 0->1 trained in order
train_loader = DataLoader(training_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(testing_dataset, batch_size=batch_size)
none_of_the_above_loader = DataLoader(none_of_the_above_dataset, batch_size=batch_size)

In [58]:
class NoneOfTheAboveClassifier(nn.Module):
    """Classifier to determine whether a speaker is included
    in the training set or not. Binary output
    """

    def __init__(self):
        super(NoneOfTheAboveClassifier, self).__init__()
        self.fc1 = nn.Linear(512, 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.fc1(x)

        return self.sigmoid(x)

class Classifier(nn.Module):
    """Define a simple linear neural network

    Args:
        num_classes: the number of classes we are classifying

    """
    
    def __init__(self, num_classes):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(512, num_classes)
        
    def forward(self, x):
        x = self.fc1(x)
        x = F.softmax(x, dim=1)
        
        return x

In [59]:
# train first network
none_of_the_above_classifier = NoneOfTheAboveClassifier()
num_epochs = 1000
lr = 0.003

optimizer = optim.Adam(none_of_the_above_classifier.parameters(), lr=lr)
criterion = nn.BCELoss()

losses = []
for epoch_num, epoch in enumerate(range(num_epochs)):
    none_of_the_above_classifier.train()

    running_loss = 0
    for batch_index, (inputs, labels) in enumerate(none_of_the_above_loader):
        optimizer.zero_grad()

        outputs = none_of_the_above_classifier(inputs)

        labels = labels.type(torch.FloatTensor)
        outputs = outputs.type(torch.FloatTensor)[:, 0]

        loss = criterion(outputs, labels)
        loss.backward()
        
        running_loss += loss.item()
        optimizer.step()

        running_loss += loss.item()

## Train Neural Network

In [60]:
if is_cli:
    print('Training...')

num_classes = len(classes)
classifier = Classifier(num_classes=num_classes)

num_epochs = 5000
lr = 0.003

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.parameters(), lr=lr)

for epoch_num, epoch in enumerate(range(num_epochs)):
    classifier.train()

    running_loss = 0
    for batch_index, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()

        outputs = classifier(inputs)

        loss = criterion(outputs, labels)
        loss.backward()
        
        running_loss += loss.item()
        optimizer.step()

        running_loss += loss.item()

## Test Neural Network

In [61]:
classes = [f for f in os.listdir(dir) if f != '.DS_Store']

class_correct = [0 for i in range(len(classes))]
class_total = [0 for i in range(len(classes))]

ARGMAX_THRESHOLD = 0.5

with torch.no_grad():
    for data in test_loader:
        images, labels = data
        outputs = classifier(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels).squeeze()

        for i, (output, label) in enumerate(zip(outputs, labels)):
            class_total[label] += 1

            if c[i].item() and max(output) >= ARGMAX_THRESHOLD:
                class_correct[label] += 1

for i in range(len(classes)):
    print(f'Accuracy of {classes[i]}: {round(100 * class_correct[i] / class_total[i], 2)}% ({class_correct[i]}/{class_total[i]})')
print(sum(class_correct) / sum(class_total))


Accuracy of mikie: 100.0% (13/13)
Accuracy of hailey: 100.0% (10/10)
Accuracy of crystal: 100.0% (10/10)
Accuracy of mei: 100.0% (13/13)
Accuracy of changhan: 100.0% (17/17)
Accuracy of daphne: 81.82% (9/11)
Accuracy of swadhin: 100.0% (17/17)
Accuracy of stephanie: 100.0% (9/9)
Accuracy of cheryl: 100.0% (12/12)
Accuracy of chad: 50.0% (5/10)
Accuracy of ethan: 100.0% (10/10)
0.946969696969697


In [31]:
BINARY_THRESHOLD = 0.4
MAX_THRESHOLD = 0.4

def make_prediction(audio_path):
    mfcc = sample_from_mfcc(read_mfcc(audio_path, SAMPLE_RATE), NUM_FRAMES)
    predict = model.m.predict(np.expand_dims(mfcc, axis=0))
    predict = torch.from_numpy(predict)

    # first see if it passes thresholding/junk class
    binary_classification_value = float(none_of_the_above_classifier(predict)[0][0])
    if binary_classification_value < BINARY_THRESHOLD:
        return None

    # run through the actual classifier
    result = classifier(predict)
    argmax = torch.argmax(result)

    if result[0][argmax] < MAX_THRESHOLD:
        return None

    return classes[argmax]


def make_prediction_with_features(numpy_path):
    numpy_array = np.load(numpy_path)
    predict = torch.from_numpy(numpy_array)

    binary_classification_value = float(none_of_the_above_classifier(predict)[0][0])

    if binary_classification_value < BINARY_THRESHOLD:
        return None

    # run through the actual classifier
    result = classifier(predict)
    argmax = torch.argmax(result)

    if result[0][argmax] < MAX_THRESHOLD:
        return None
        
    return classes[argmax]

In [32]:
make_prediction('audio/split/SherlockHolmes/ethan/SherlockHolmes_2.wav')

tensor(0.4651, grad_fn=<SelectBackward>)


'cheryl'

In [35]:
def test_binary_classifier():
    y_pred = []
    y_true = []

    for dir in ['Accents', 'SherlockHolmes']:
        test_classes = os.listdir(f'mfcc/split/{dir}')
        for test_class in test_classes:
            upper, lower = (0, 20) if dir == 'Accents' else (5, 20)

            for i in range(upper, lower):
                try:
                    path = f'/Users/ethanzh/Code/automatic-speaker-recognition/mfcc/split/{dir}/{test_class}/{dir}_{i}.npy'
                    result = str(make_prediction_with_features(path))
                    y_pred.append(result)

                    if dir == 'Accents':
                        y_true.append(str(None))
                    else:
                        y_true.append(test_class)

                except Exception as e:
                    pass

    return y_pred, y_true

In [63]:
from sklearn.metrics import confusion_matrix, plot_confusion_matrix, f1_score

labels = [f for f in os.listdir(f'mfcc/split/SherlockHolmes') if f != '.DS_Store']
labels.append(str(None))

y_pred, y_true = test_binary_classifier()

print_confusion_matrix(y_true, y_pred, labels, hide_zeroes=True)

weighted_f1 = f1_score(y_true, y_pred, average='weighted')
micro_f1 = f1_score(y_true, y_pred, average='micro')
macro_f1 = f1_score(y_true, y_pred, average='macro')
print(f'Weighted F1 score: {weighted_f1}')
print(f'Micro F1 score: {micro_f1}')
print(f'Macro F1 score: {macro_f1}')

t/p    mikie     hailey    crystal   mei       changhan  daphne    swadhin   stephanie cheryl    chad      ethan     None      
    mikie           8.0                                                                                                               
    hailey                    5.0                                                                                                     
    crystal                             5.0                                                                                           
    mei                                           8.0                                                                                 
    changhan                                               12.0                                                                       
    daphne                                                            5.0                 1.0                                         
    swadhin                                                   

In [37]:
from typing import List, Optional

import numpy as np
from sklearn.metrics import confusion_matrix

def print_confusion_matrix(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    labels: Optional[List] = None,
    hide_zeroes: bool = False,
    hide_diagonal: bool = False,
    hide_threshold: Optional[float] = None,
):
    """Print a nicely formatted confusion matrix with labelled rows and columns.

    Predicted labels are in the top horizontal header, true labels on the vertical header.

    Args:
        y_true (np.ndarray): ground truth labels
        y_pred (np.ndarray): predicted labels
        labels (Optional[List], optional): list of all labels. If None, then all labels present in the data are
            displayed. Defaults to None.
        hide_zeroes (bool, optional): replace zero-values with an empty cell. Defaults to False.
        hide_diagonal (bool, optional): replace true positives (diagonal) with empty cells. Defaults to False.
        hide_threshold (Optional[float], optional): replace values below this threshold with empty cells. Set to None
            to display all values. Defaults to None.
    """
    if labels is None:
        labels = np.unique(np.concatenate((y_true, y_pred)))
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    # find which fixed column width will be used for the matrix
    columnwidth = max(
        [len(str(x)) for x in labels] + [5]
    )  # 5 is the minimum column width, otherwise the longest class name
    empty_cell = ' ' * columnwidth

    # top-left cell of the table that indicates that top headers are predicted classes, left headers are true classes
    padding_fst_cell = (columnwidth - 3) // 2  # double-slash is int division
    fst_empty_cell = padding_fst_cell * ' ' + 't/p' + ' ' * (columnwidth - padding_fst_cell - 3)

    # Print header
    print('    ' + fst_empty_cell, end=' ')
    for label in labels:
        print(f'{label:{columnwidth}}', end=' ')  # right-aligned label padded with spaces to columnwidth

    print()  # newline
    # Print rows
    for i, label in enumerate(labels):
        print(f'    {label:{columnwidth}}', end=' ')  # right-aligned label padded with spaces to columnwidth
        for j in range(len(labels)):
            # cell value padded to columnwidth with spaces and displayed with 1 decimal
            cell = f'{cm[i, j]:{columnwidth}.1f}'
            if hide_zeroes:
                cell = cell if float(cm[i, j]) != 0 else empty_cell
            if hide_diagonal:
                cell = cell if i != j else empty_cell
            if hide_threshold:
                cell = cell if cm[i, j] > hide_threshold else empty_cell
            print(cell, end=' ')
        print()

In [None]:
test_classes = os.listdir('mfcc/split/Accents')

results = []

for test_class in test_classes:
    for i in range(0, 5):
        try:
            path = f'/Users/ethanzh/Code/automatic-speaker-recognition/mfcc/split/Accents/{test_class}/Accents_{i}.npy'
            result = make_prediction_with_features(path)
            results.append(result)
        except Exception as e:
            pass