In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
from torch.utils.data import DataLoader, Dataset
from torchaudio import transforms
import librosa
from sklearn.model_selection import train_test_split
import json
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

train_filepath = 'data/train/'
test_filepath = 'data/test/'

In [2]:
# Load data
f = open('train.json')
data = json.load(f)
f.close()

# Transform into dataframe
df = pd.DataFrame.from_dict(data, orient = 'index')
df = df.reset_index(level=0)
df = df.set_axis(['file', 'speaker'], axis=1)
df = df.astype({'speaker': 'int64'})

In [3]:
filepath = train_filepath

# ----------------------------
# Preprocessing functions
# ----------------------------

def load_audiofile(filename):
    sig,sr = librosa.load(filepath + str(filename), sr = 16000, duration = 4)
    return (sig, sr)

def extract_mfcc(filename):
    sig, sr = load_audiofile(filename)
    mfcc = librosa.feature.mfcc(sig, sr=sr, n_mfcc=64)
    # mfcc = preprocessing.scale(mfcc, axis=1)
    return mfcc

def padding(array, xx, yy):
    """
    Code found at: https://stackoverflow.com/questions/59241216/padding-numpy-arrays-to-a-specific-size

    :param array: numpy array
    :param xx: desired height
    :param yy: desirex width
    :return: padded array

    """

    h = array.shape[0]
    w = array.shape[1]

    a = (xx - h) // 2
    aa = xx - a - h

    b = (yy - w) // 2
    bb = yy - b - w

    return np.pad(array, pad_width=((a, aa), (b, bb)), mode='constant')

def time_mask(spec):
    masking = transforms.TimeMasking(time_mask_param=20)
    aug_spec = masking(spec)
    return aug_spec

def freq_mask(spec):
    masking = transforms.FrequencyMasking(freq_mask_param=20)
    aug_spec = masking(spec)
    return aug_spec

In [4]:
# ----------------------------
# Dataset objects
# ----------------------------
class SoundDS(Dataset):
  def __init__(self, df):
    self.df = df
    
  def __len__(self):
    return len(self.df)
    
  def __getitem__(self, idx):
    # Get the Class ID
    class_id = self.df.loc[idx, 'speaker']

    # Get MFCC
    mfcc = extract_mfcc(self.df.loc[idx, 'file'])
    mfcc = torch.from_numpy(mfcc)

    # Padding
    mfcc = padding(mfcc, 64, 126)
    
    # Add dim
    mfcc = np.expand_dims(mfcc, 0)

    return mfcc, class_id


class SoundDS_train(Dataset):
  def __init__(self, df):
    self.df = df
    
  def __len__(self):
    return len(self.df)
    
  def __getitem__(self, idx):
    # Get the Class ID
    class_id = self.df.loc[idx, 'speaker']

    # Get MFCC
    mfcc = extract_mfcc(self.df.loc[idx, 'file'])
    mfcc = torch.from_numpy(mfcc)

    # Time & frequency masking
    mfcc = time_mask(mfcc)
    mfcc = freq_mask(mfcc)

    # Padding
    mfcc = padding(mfcc, 64, 126)
    
    # Add dim
    mfcc = np.expand_dims(mfcc, 0)

    return mfcc, class_id

In [5]:
# Random split of 80:20 between training and validation
train_df, val_df = train_test_split(df, test_size=0.2)
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

In [6]:
# Generate training and validation datasets
train_ds = SoundDS_train(train_df)
val_ds = SoundDS(val_df)

# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=64, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=64, shuffle=False)

In [7]:
# ----------------------------
# Audio Classification Model
# Based on the network found at: https://towardsdatascience.com/audio-deep-learning-made-simple-sound-classification-step-by-step-cebc936bbe5
# ----------------------------
class AudioClassifier (nn.Module):
    # ----------------------------
    # Build the model architecture
    # ----------------------------
    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(1, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Third Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Fourth Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=184)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)
 
    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# device = torch.device("cpu")
myModel = myModel.to(device)

In [8]:
# ----------------------------
# Training Loop
# ----------------------------
def training(model, train_dl, num_epochs):
  # Loss Function, Optimizer and Scheduler
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    # Repeat for each batch in the training set
    for i, data in enumerate(train_dl):
        # Get the input features and target labels, and put them on the GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        # Zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        # Keep stats for Loss and Accuracy
        running_loss += loss.item()

        # Get the predicted class with the highest score
        _, prediction = torch.max(outputs,1)
        # Count of predictions that matched the target label
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

        #if i % 10 == 0:    # print every 10 mini-batches
        # print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
    
    # Print stats at the end of the epoch
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    # print(f'Epoch: {epoch+1}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  # print('Finished Training')
  
num_epochs=45
training(myModel, train_dl, num_epochs)

KeyboardInterrupt: 

In [9]:
# Load test data
f = open('test.json')
data_test = json.load(f)
f.close()

# Transform into dataframe
test_df = pd.DataFrame.from_dict(data_test, orient = 'index')
test_df = test_df.reset_index(level=0)
test_df = test_df.set_axis(['file', 'speaker'], axis=1)
test_df['speaker'] = 0
test_df = test_df.astype({'speaker': 'int64'})

In [10]:
filepath = test_filepath

# Generate test dataset
test_ds = SoundDS(test_df)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=64, shuffle=False)

In [11]:
filepath = test_filepath

def predictions(model, test_dl):
    predictions_list = []

    # Disable gradient updates
    with torch.no_grad():
        for data in test_dl:
            # Get the input features and target labels, and put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            # Normalize the inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            # Get predictions
            outputs = model(inputs)

            # Get the predicted class with the highest score
            _, prediction = torch.max(outputs,1)

            # Create list of predictions
            prediction = prediction.cpu()
            prediction = prediction.numpy()
            prediction = list(prediction)
            prediction = [str(item) for item in prediction]
            for i in range(len(prediction)):
                predictions_list.append(prediction[i])

    return predictions_list

test_preds = predictions(myModel, test_dl)

KeyboardInterrupt: 

In [12]:
# Add predictions to dataframe
test_df['speaker'] = test_preds
test_df = test_df.astype({'speaker': 'str'})

# Generate json
pred_dict = dict(test_df.values)
with open('preds.json', 'w') as fp:
    json.dump(pred_dict, fp, indent = 2)

NameError: name 'test_preds' is not defined