In [3]:
# this code implements a deep neural network for feature learning
# this code comes from Vishwa's class on constructing a data loader and creating a model

# Some common system imports
import os
import sys
import importlib
import time
import csv

# Numeric computing
import numpy as np

# Sklearn functions are useful for generating train/test splits, and metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier

from scipy.io import wavfile

# pytorch
import torch
import torch.utils.data as tdata
from torchaudio import transforms

# Plotting (if we want it)
import matplotlib.pyplot as plt

# importing our own modules
import audio_datasets as ads

In [4]:
# loading the raw data for feature learning
training_path = os.path.join(os.getcwd(), "..", "training_data/data") #need to change this back before pushing ***
files = os.listdir(training_path)
wav_files = []
file_type = "wav"
LABELS = {"neutral": 0, "calm": 1, "happy": 2, "sad": 3, "angry": 4, "fearful": 5, "disgust": 6, "surprised": 7}

for file in files:
    curr_path = os.path.join(training_path, file)
    if os.path.isfile(curr_path) and file_type in file:
        for label in LABELS.keys():
            if label in file:
                wav_files.append((file, LABELS[label]))

data_array = []
label_array = []
for data in wav_files: #in the my_datasets code
    data_array.append(wavfile.read(os.path.join(training_path, data[0]))[1])
    label_array.append(data[1])

max_len = max([len(data) for data in data_array])
data_array = [np.resize(data, 48000*5) for data in data_array]
print(len(data_array[0]))

  data_array.append(wavfile.read(os.path.join(training_path, data[0]))[1])


240000


In [5]:
# split into training and testing
train_data, test_data, train_labels, test_labels = train_test_split(data_array, label_array, train_size=0.5, test_size=0.5)
#print(data.shape, train_data.shape, test_data.shape)

train_ten, test_ten = torch.tensor(train_data), torch.tensor(test_data)
train_y_ten, test_y_ten = torch.tensor(train_labels), torch.tensor(test_labels)

  train_ten, test_ten = torch.tensor(train_data), torch.tensor(test_data)


In [6]:
#implementing the Mel-frequency cepstrum coefficients as feature vectors
mfcc = transforms.MFCC(sample_rate=48000, n_mfcc=40)
print(train_ten.shape)
#mel_spectrogram = transforms.MelSpectrogram(sample_rate=48000, n_fft=400, n_mels=1)
#print("2", mel_spectrogram(train_ten[0].float()).shape)
#print("2.5", mfcc(mel_spectrogram(train_ten[0].float())).shape)
print(mfcc(train_ten[0].float()).shape)
mfcc_features = np.zeros((562, 48040))
for row in range(562):
    mel_coef = np.array(mfcc(train_ten[row].float()))
    mel_coef = mel_coef.reshape((1, 48040))
    mfcc_features[row] = mel_coef



torch.Size([562, 240000])
torch.Size([40, 1201])


In [7]:
#implementing the Mel-frequency cepstrum coefficients as feature vectors
mfcc2 = transforms.MFCC(sample_rate=48000, n_mfcc=40)
#mel_spectrogram = transforms.MelSpectrogram(sample_rate=48000, n_fft=400, n_mels=1)
#print("2", mel_spectrogram(train_ten[0].float()).shape)
#print("2.5", mfcc(mel_spectrogram(train_ten[0].float())).shape)
mfcc_test_features = np.zeros((562, 48040))
for row in range(562):
    mel_coef = np.array(mfcc2(test_ten[row].float()))
    mel_coef = mel_coef.reshape((1, 48040))
    mfcc_test_features[row] = mel_coef

In [21]:
# convolution layer
# structure comes from IBM website
# will use mel_coef as feautre vectors

mfcc_features = torch.tensor(mfcc_features)

CONV_KERNEL = 3
conv_layer = torch.nn.Conv1d(562, 50, CONV_KERNEL)

conv_out = conv_layer(mfcc_features)

print(conv_out.shape)



  mfcc_features = torch.tensor(mfcc_features)


torch.Size([50, 48038])


In [22]:
# pooling layer
KERNEL_SIZE = 3
pooling_layer = torch.nn.MaxPool1d(KERNEL_SIZE)

pooled_out = pooling_layer(conv_out)

pooled_out = pooled_out.flatten()
pooled_out.shape

torch.Size([800600])

In [None]:
# fully connected layer
#implementing kernel SVM
class MLP(torch.nn.Module):
  def __init__(self):
    super().__init__()

    self.fc1 = torch.nn.Linear(in_features=800600, out_features=400000)
    self.fc2 = torch.nn.Linear(in_features=400000, out_features=200000)
    self.fc3 = torch.nn.Linear(in_features=200000, out_features=2000)
    self.fc4 = torch.nn.Linear(in_features=2000, out_features=200)
    self.fc5 = torch.nn.Linear(in_features=200, out_features=20)
    self.fc6 = torch.nn.Linear(in_feature=20, out_features=8)
    self.classifier = torch.softmax(dim=1)

  def forward(self, x):
    x1 = torch.relu(self.fc1(x))
    x2 = torch.relu(self.fc2(x1))
    x3 = torch.relu(self.fc3(x2))
    x4 = torch.relu(self.fc4(x3))
    x5 = torch.relu(self.fc5(x4))
    x6 = self.fc3(x5) #usually left linear
    return self.classifier(x3) #for classification

In [None]:
n_epochs = 10 #essentially number of iterations

#instantiating a model
model = MLP()

#loss function
criterion = torch.nn.CrossEntropyLoss() #binary cross-entropy loss, clamps log values to protect against extremes

#optimizer mechanism
optimizer = torch.optim.Adam(lr=1e-2, params=model.parameters())

loss_array = np.zeros(n_epochs)
#something about test accuracy here - see final code
for epoch_idx in range(n_epochs):
  for batch in train_loader:
    data_batch, labels_batch = batch

    # move to GPU if available - data_batch = data_batch.cuda() etc.

    #predict
    labels_pred = model(data_batch.to(torch.float32))

    #compute loss
    loss = criterion(labels_pred.flatten(), labels_batch.to(torch.float32)) #note: cross entropy is not symmetric

    loss_array[epoch_idx] += loss.item()/data_batch.shape[0] #mean of loss
    #plotting the precision
    #with torch.no_grad():
      #test_pred = model(test_ten).flatten()
      #test_loss =
 
    #backprop
    optimizer.zero_grad()
    loss.backward() #backprop from pytorch
    optimizer.step() #all params optimized

  #if epoch_idx%50 == 0:
  #  fig.data[0].y = loss_array[:epoch_idx]
  #  fig.update_yaxes(type='log')

In [None]:
forest2 = RandomForestClassifier() #gnb and y_pred lines directly from scikit-learn website documentation for Naive-Bayes
print(mfcc_features.shape, train_y_ten.shape)
y_pred2 = forest2.fit(mfcc_features, train_labels).predict(mfcc_test_features)
y_pred2 = list(y_pred2)

'''
LABEL_CONVERTER = dict([(value, key) for key, value in LABELS.items()])

#counting the mislabeled points

with open("bayes_results.csv", "w+") as result_file:
   writer = csv.writer(result_file)
   writer.writerow(["filename", "label"])

   for idx in range(len(y_pred)):
       writer.writerow([os.path.splitext(test_wav_files[idx])[0], LABEL_CONVERTER[y_pred[idx]]])
'''
#print(classification_report(test_y_ten, test_pred))
correct = 0
for label in range(len(y_pred2)):
    if y_pred2[label] == test_labels[label]:
        correct += 1
print("Number of correct labels: ", correct)
#print("Number of mislabeled points out of a total %d points : %d", (len(test_labels), (test_labels != y_pred2).sum()))