In [None]:
import os
import librosa
import numpy as np
import csv
import torch
import random
import copy

n_mfcc = 39
csv_file_save = './csv/features.csv'
calculate_csv = None

if not os.path.exists('./csv'):
    os.makedirs('./csv')

if os.path.exists(csv_file_save):
    print ("File " + csv_file_save + " exists")
    print ("Features won't be recalculated")
    calculate_csv = False
else:
    print ("File " + csv_file_save + " doesn't exists")
    print ("Features will be calculated")
    calculate_csv = True

In [None]:
# extract info speakers 

file_speaker = "./LibriSpeech/SPEAKERS.TXT"
speakers = []

f = open(file_speaker, "r")
for line in f:
    if line[0] == ";":
        continue
    else:
        speakers.append( str.rstrip(line) ) # rstrip to remove \n at the end of the string

In [None]:
speakers[:10]

In [None]:
# extract speakers gender 
# store in dictionary and in list to choose train and test speakers

dict_speakers = {}
list_speakers = []
for speaker in speakers:
    speaker_split = speaker.split()
    speaker_split = [word for word in speaker_split if word != "|"]
    
    # indexes = 0 : id, 1 : gender, 2 : dataset
    if speaker_split[2] == "dev-clean":
        dict_speakers[speaker_split[0]] = speaker_split[1]
        list_speakers.append(speaker_split[0])
    else:
        continue

In [None]:
dict_speakers

In [None]:
# get audio files info : name and path 

if calculate_csv:
    audio_files = []

    root = "./"
    path = os.path.join(root, "LibriSpeech")

    for (dirpath, dirnames, filenames) in os.walk(path):
        for file in filenames:
            if file[-5:] == ".flac":
                audio_files.append({"dirpath": dirpath, "filename": file})

In [None]:
if calculate_csv:
    len(audio_files)

In [None]:
if calculate_csv:
    audio_files[:10]

In [None]:
# get labels genders (target) for each file 

if calculate_csv:
    gender_labels = []

    for audio_file in audio_files:
        file = audio_file["filename"]
        index = 0
        for char in file:
            if char != "-":
                index += 1
            else:
                break
        id_speaker = file[:index]
        audio_file["id_speaker"] = id_speaker

        gender_labels.append(dict_speakers[id_speaker])

In [None]:
if calculate_csv:
    len(gender_labels)

In [None]:
# extract mfcc features and make mean for each feature

if calculate_csv:

    input_features = []

    counter = 0

    for audio_file in audio_files:

        counter += 1
        if counter % 100 == 0:
            print("Extracting features file " + str(counter) + "/" + str(len(audio_files)))

        directory = audio_file["dirpath"]
        filename = audio_file["filename"]

        y, sr = librosa.load(directory + "/" + filename)
        hop_length = 512
        mfccs = librosa.feature.mfcc(y=y, sr=sr, hop_length=hop_length, n_mfcc=n_mfcc)
        mfccs_processed = np.mean(mfccs.T,axis=0)
        input_features.append(mfccs_processed)

    print("End of extracting features")

In [None]:
if calculate_csv:
    len(input_features)

In [None]:
if calculate_csv:
    len(input_features[0])

In [None]:
# save features on csv with additional info and labels

if calculate_csv:

    with open(csv_file_save, 'w', newline='') as csvfile:
        feat_writer = csv.writer(csvfile, delimiter=',',
                                quotechar='|', quoting=csv.QUOTE_MINIMAL)

        index = 0

        row_to_write = ['label', 'label_value', 'id_speaker']
        for f in range(n_mfcc):
            row_to_write.append('f' + str(f))

        feat_writer.writerow(row_to_write)

        for input_feature in input_features:

            class_label = gender_labels[index]
            if class_label == "M":
                class_label = 0
            else:
                class_label = 1

            row_to_write = [class_label, gender_labels[index], audio_files[index]["id_speaker"]]

            index += 1

            if index % 500 == 0:
                print("Writing row " + str(index) + "/" + str(len(audio_files)))
            for features in input_feature:
                row_to_write.append(features)

            feat_writer.writerow(row_to_write)
    print("End of writing csv")

In [None]:
# choose speakers to put in each dataset

random.shuffle(list_speakers)

speakers_train = []
speakers_val   = []
speakers_test  = []
count_males   = 0
count_females = 0 
n_speakers = len(list_speakers)
n_speakers_for_gender_train = (n_speakers / 2) * 0.7
n_speakers_for_gender_train_val = (n_speakers / 2) * 0.85

for speaker in list_speakers:
    gender = dict_speakers[speaker]
    if gender == 'M':
        if count_males < n_speakers_for_gender_train: 
            speakers_train.append(int(speaker))
        elif count_males < n_speakers_for_gender_train_val: 
            speakers_val.append(int(speaker))
        else:
            speakers_test.append(int(speaker))
        count_males += 1
    elif gender == 'F':
        if count_females < n_speakers_for_gender_train: 
            speakers_train.append(int(speaker))
        elif count_females < n_speakers_for_gender_train_val: 
            speakers_val.append(int(speaker))
        else:
            speakers_test.append(int(speaker))
        count_females += 1
    
print(speakers_train)
print(speakers_val)
print(speakers_test)

In [None]:
# read csv to get inputs and targets

my_data = np.genfromtxt(csv_file_save, delimiter=',', skip_header=1)

labels_train = []
data_train   = []
labels_val   = []
data_val     = []
labels_test  = []
data_test    = []
labels_train_svm = []
labels_val_svm = []
labels_test_svm  = []

for row in my_data:
    if row[2] in speakers_train:
        labels_train.append([row[0]])
        data_train.append(row[3:])
        if row[0] == 1:
            labels_train_svm.append([1])
        else:
            labels_train_svm.append([-1])
    elif row[2] in speakers_val:
        labels_val.append([row[0]])
        data_val.append(row[3:])
        if row[0] == 1:
            labels_val_svm.append([1])
        else:
            labels_val_svm.append([-1])
    else:
        labels_test.append([row[0]])
        data_test.append(row[3:])
        if row[0] == 1:
            labels_test_svm.append([1])
        else:
            labels_test_svm.append([-1])

data_train = list(zip(labels_train, data_train, labels_train_svm))
random.shuffle(data_train)
labels_train, data_train, labels_train_svm = zip(*data_train)

data_val = list(zip(labels_val, data_val, labels_val_svm))
random.shuffle(data_val)
labels_val, data_val, labels_val_svm = zip(*data_val)

data_test = list(zip(labels_test, data_test, labels_test_svm))
random.shuffle(data_test)
labels_test, data_test, labels_test_svm = zip(*data_test)

labels_train      = np.array(labels_train)
data_train        = np.array(data_train)
labels_val        = np.array(labels_val)
data_val          = np.array(data_val)
labels_test       = np.array(labels_test)
data_test         = np.array(data_test)
labels_train_svm  = np.array(labels_train_svm)
labels_val_svm    = np.array(labels_val_svm)
labels_test_svm   = np.array(labels_test_svm)

print(labels_train.shape)
print(data_train.shape)
print(labels_val.shape)
print(data_val.shape)
print(labels_test.shape)
print(data_test.shape)
print(labels_train_svm.shape)
print(labels_val_svm.shape)
print(labels_test_svm.shape)

In [None]:
# cast data to tensor

data_train_tensor   = torch.from_numpy(data_train).float()
labels_train_tensor = torch.from_numpy(labels_train).float()
data_val_tensor   = torch.from_numpy(data_val).float()
labels_val_tensor = torch.from_numpy(labels_val).float()
data_test_tensor    = torch.from_numpy(data_test).float()
labels_test_tensor  = torch.from_numpy(labels_test).float()

print(data_train_tensor.shape, labels_train_tensor.shape)
print(data_val_tensor.shape, labels_val_tensor.shape)
print(data_test_tensor.shape, labels_test_tensor.shape)

labels_train_svm_tensor = torch.from_numpy(labels_train_svm).float()
labels_val_svm_tensor   = torch.from_numpy(labels_val_svm).float()
labels_test_svm_tensor  = torch.from_numpy(labels_test_svm).float()

In [None]:
# kNN MODEL WITH RESULTS

label_pred_val = []

for val_data in data_val_tensor:
    dist = torch.norm(data_train_tensor - val_data, dim=1, p=None)
    knn  = dist.topk(3, largest=False)
    
    result = 0
    for index in knn.indices:
        result += int(labels_train_tensor[index])
    result = (result / 3)
    
    label_pred_val.append([result])

label_pred_val = torch.as_tensor(label_pred_val)

# Accuracy test 
output_val  = (label_pred_val > 0.5).float()
correct_val = (output_val == labels_val_tensor).float().sum()

print("Accuracy_Val: {:.3f}".format(correct_val/labels_val_tensor.shape[0]))

label_pred_test = []

for test_data in data_test_tensor:
    dist = torch.norm(data_train_tensor - test_data, dim=1, p=None)
    knn = dist.topk(3, largest=False)
    
    result = 0
    for index in knn.indices:
        result += int(labels_train_tensor[index])
    result = (result / 3)
    
    label_pred_test.append([result])

label_pred_test = torch.as_tensor(label_pred_test)

# Accuracy test 
output_test  = (label_pred_test > 0.5).float()
correct_test = (output_test == labels_test_tensor).float().sum()

print("Accuracy_Test: {:.3f}".format(correct_test/labels_test_tensor.shape[0]))

In [None]:
# NN MODEL WITH RESULTS

hidden_neurons = 32
n_outputs = 1
num_epochs = 10000

model = torch.nn.Sequential(
    torch.nn.Linear(n_mfcc, hidden_neurons),
    torch.nn.ReLU(),
    torch.nn.Dropout(p = 0.2),
    torch.nn.Linear(hidden_neurons, hidden_neurons),
    torch.nn.ReLU(),
    torch.nn.Dropout(p = 0.2),
    torch.nn.Linear(hidden_neurons, n_outputs),
    torch.nn.Sigmoid()
)

criterion = torch.nn.BCELoss(reduction='sum')

learning_rate = 5e-5
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

minimum = 0
best_model = None

no_improve = 0
early_stopping_steps = 49

for epoch in range(num_epochs):
    y_pred = model(data_train_tensor)
        
    loss = criterion(y_pred, labels_train_tensor)
    
    if epoch % 15 == 14:
        
        y_pred_val = model(data_val_tensor)
        loss_val = criterion(y_pred_val, labels_val_tensor)
        
        #Accuracy 
        output       = (y_pred > 0.5).float()
        correct      = (output == labels_train_tensor).float().sum()
        output_val   = (y_pred_val > 0.5).float()
        correct_val  = (output_val == labels_val_tensor).float().sum()
        accuracy_val = correct_val/labels_val_tensor.shape[0]
        
        if accuracy_val > minimum:
            minimum = accuracy_val
            torch.save({'state_dict':model.state_dict(), 'optimizer': optimizer.state_dict()}, 'model.pth.tar')          
            no_improve = 0
        else:
            no_improve += 1
        
        print("Epoch {}/{}, Loss: {:.3f}, Accuracy: {:.3f}, Loss_Val: {:.3f}, Accuracy_Val: {:.3f}".format(epoch+1,
                        num_epochs, loss, correct/labels_train_tensor.shape[0], loss_val, correct_val/labels_val_tensor.shape[0]))  

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # early stopping
    if no_improve > early_stopping_steps:
        break

checkpoint = torch.load('model.pth.tar')         

model.load_state_dict(checkpoint['state_dict'])
optimizer.load_state_dict(checkpoint['optimizer'])

model.eval()

y_pred_val = model(data_val_tensor)
loss_val = criterion(y_pred_val, labels_val_tensor)
output_val  = (y_pred_val > 0.5).float()
correct_val = (output_val == labels_val_tensor).float().sum()
        
print("Final Model, Loss_Val: {:.3f}, Accuracy_Val: {:.3f}".format(loss_val, correct_val/labels_val_tensor.shape[0])) 

y_pred_test = model(data_test_tensor)
loss_test = criterion(y_pred_test, labels_test_tensor)
output_test  = (y_pred_test > 0.5).float()
correct_test = (output_test == labels_test_tensor).float().sum()
        
print("Final Model, Loss_Test: {:.3f}, Accuracy_Test: {:.3f}".format(loss_test, correct_test/labels_test_tensor.shape[0])) 

In [None]:
# SVM MODEL WITH RESULTS

dim = 39
w = torch.autograd.Variable(torch.rand(dim), requires_grad=True)
b = torch.autograd.Variable(torch.rand(1),   requires_grad=True)

step_size = 3e-5
num_epochs = 5000
minibatch_size = 20

w_best = None
b_best = None

minimum = 0
no_improve = 0
early_stopping_steps = 29 

def accuracy(X, y):
    correct = 0
    for i in range(len(y)):
        y_predicted = int(np.sign((torch.dot(w, X[i]) - b).detach().numpy()[0]))
        if y_predicted == y[i]: correct += 1
    return float(correct)/len(y)


for epoch in range(num_epochs):
    inds = [i for i in range(len(data_train_tensor))]
    for i in range(len(inds)):
        L = max(0, 1 - labels_train_svm_tensor[inds[i]] * (torch.dot(w, data_train_tensor[inds[i]]) - b))
        if L != 0: # if the loss is zero, Pytorch leaves the variables as a float 0.0, so we can't call backward() on it
            L.backward()
            w.data -= step_size * w.grad.data # step
            b.data -= step_size * b.grad.data # step
            w.grad.data.zero_()
            b.grad.data.zero_()
    
    if epoch % 5 == 4:
        accuracy_val = accuracy(data_val_tensor, labels_val_svm_tensor)
        
        if accuracy_val > minimum:
            minimum = accuracy_val
            w_best = copy.deepcopy(w)
            b_best = copy.deepcopy(b)
            no_improve = 0
        else:
            no_improve += 1

        print("Epoch {}/{}, Accuracy: {:.3f}, Accuracy_Val: {:.3f}".format(epoch+1, num_epochs, 
                        accuracy(data_train_tensor, labels_train_svm_tensor), accuracy(data_val_tensor, labels_val_svm_tensor)))  
    
    # early stopping
    if no_improve > early_stopping_steps:
        break

w = w_best
b = b_best

print("Best Result, Accuracy_Val: {:.3f}".format(accuracy(data_val_tensor, labels_val_svm_tensor)))  
print("Best Result, Accuracy_Test: {:.3f}".format(accuracy(data_test_tensor, labels_test_svm_tensor)))  
    