<a href="https://colab.research.google.com/github/Faisal-NSU/CSE465/blob/main/Custom%20CNN%20model%20with%20Melspectogram.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Utils


In [23]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import sys


def plot_images(images, nrows = None, ncols = None, figsize = None, ax = None, 
                axis_style = 'on', bgr2rgb = True):
    '''
    Plots a given list of images and returns axes.Axes object
    
    Parameters
    -----------
    images: list
            A list of images to plot
            
    nrows: int
           Number of rows to arrange images into
    
    ncols: int
           Number of columns to arrange images into
    
    figsize: tuple
             Plot size (width, height) in inches
           
    ax: axes.Axes object
        The axis to plot the images on, new axis will be created if None
        
    axis_style: str
                'off' if axis are not to be displayed
    '''
    N = len(images)
    if not isinstance(images, (list, np.ndarray)):
        raise AttributeError("The images parameter should be a list of images, "
                             "if you want to plot a single image, pass it as a "
                             "list of single image")

    # Setting nrows and ncols as per parameter input
    if nrows is None:
        if ncols is None:
            nrows = N
            ncols = 1
        else:
            nrows = int(np.ceil(N / ncols))
    else:
        if ncols is None:
            ncols = int(np.ceil(N / nrows))
    
    if ax is None:
        _, ax = plt.subplots(nrows, ncols, figsize = figsize)
    
    if len(images) == 1:
        if bgr2rgb == True:
            images[0] = cv2.cvtColor(images[0], cv2.COLOR_BGR2RGB)
    
        ax.imshow(images[0])
        ax.axis(axis_style)
        
        return ax
    
    else:
        for i in range(nrows):
            for j in range(ncols):
                if (i * ncols + j) < N:
                    img = images[i * ncols + j]
                    
                    if bgr2rgb == True:
                            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                    
                    # For this condition, ax is a 2d array else a 1d array
                    if nrows >1 and ncols > 1: 
                        ax[i][j].imshow(img)
                    
                    else:
                        ax[i + j].imshow(img)
                
                if nrows > 1 and ncols > 1:
                    ax[i][j].axis(axis_style)
                else:
                    ax[i + j].axis(axis_style)
        
        return ax


def drawProgressBar(current, total, string = '', barLen = 20):
    '''
    Draws a progress bar, something like [====>    ] 20%
    
    Parameters
    ------------
    current: int/float
             Current progress
    
    total: int/float
           The total from which the current progress is made
             
    string: str
            Additional details to write along with progress
    
    barLen: int
            Length of progress bar
    '''
    percent = current/total
    arrow = ">"
    if percent == 1:
        arrow = ""
    # Carriage return, returns to the begining of line to owerwrite
    sys.stdout.write("\r")
    sys.stdout.write("Progress: [{:<{}}] {}/{}".format("=" * int(barLen * percent) + arrow, 
                                                         barLen, current, total) + string)
    sys.stdout.flush()
    
def get_fixed_audio_len(wav, sr, audio_len):
    '''
    Converts a time-series audio to a fixed length either by padding or trimming
    
    Parameters
    -------------
    wav: Audio time-series
    
    sr: Sample rate
    
    audio_len: The fixed audio length needed in seconds
    '''
    if wav.shape[0] < audio_len * sr:
        wav = np.pad(wav, int(np.ceil((audio_len * sr - wav.shape[0])/2)), mode = 'reflect')
    wav = wav[:audio_len * sr]
    
    return wav

def get_melspectrogram_db(wav, sr, audio_len = 4, n_fft = 2048, hop_length = 512, 
                          n_mels = 128, fmin = 20, fmax = 8300, top_db = 80):
    '''
    Decomposes the audio sample into different frequencies using fourier transform 
    and converts frequencies to mel scale and amplitude to decibel scale.
    
    Parameters
    -------------------
    wav: Audio time-series
    
    sr: Sample rate
    
    audio_len: The fixed length of audio in seconds
    
    n_fft: Length of the Fast Fourier Transform window
    
    hop_length: Number of samples between successive frames
    
    n_mels: Number of mel filters, which make the height of spectrogram image
    
    fmin: Lowest frequency
    
    fmax: Heighest frequency
    
    top_db: Threashold of the decibel scale output
    '''
    wav = get_fixed_audio_len(wav, sr, audio_len)
        
    spec = librosa.feature.melspectrogram(wav, sr = sr, n_fft = n_fft, hop_length = hop_length, 
                                          n_mels = n_mels, fmin = fmin, fmax = fmax)
    
    spec = librosa.power_to_db(spec, top_db = top_db)
    return spec

def spec_to_image(spec):
    '''
    Converts the spectrogram to an image
    
    Parameters
    -------------
    spec: Spectrogram
    '''
    eps=1e-6
    
    # Z-score normalization
    mean = spec.mean()
    std = spec.std()
    spec_norm = (spec - mean) / (std + eps)
    spec_min, spec_max = spec_norm.min(), spec_norm.max()
    
    # Min-max scaling
    spec_scaled = 255 * (spec_norm - spec_min) / (spec_max - spec_min)
    spec_scaled = spec_scaled.astype(np.uint8)
    
    return spec_scaled

# Imports


In [11]:
import IPython.display as ipd
import librosa
import librosa.display

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

# Drive Mount


In [8]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


# Training an Artificial Neural Network on time-series audio data

In [13]:
!gdown --id 1-4S8AbAeKL7Tl_jlqT-7sA9sc8nmW2qa

Downloading...
From: https://drive.google.com/uc?id=1-4S8AbAeKL7Tl_jlqT-7sA9sc8nmW2qa
To: /content/dataDic
100% 2.47G/2.47G [00:32<00:00, 76.9MB/s]


In [14]:
#load pickle
import pickle
filename = 'dataDic'
infile = open(filename,'rb')
dataDic = pickle.load(infile)
infile.close()

In [15]:
dataDic['train_time_series'].shape,dataDic['val_time_series'].shape,dataDic['test_time_series'].shape

((4900, 88200), (700, 88200), (1400, 88200))

In [25]:
# Convert numpy arrays to torch tensors
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data_utils

BATCH_SIZE = 8
train_time_series = torch.from_numpy(dataDic['train_time_series'])
train_labels = torch.from_numpy(dataDic['train_labels']).long()

val_time_series = torch.from_numpy(dataDic['val_time_series'])
val_labels = torch.from_numpy(dataDic['val_labels']).long()

test_time_series = torch.from_numpy(dataDic['test_time_series'])
test_labels = torch.from_numpy(dataDic['test_labels']).long()

# Create data loaders
train_time_series = data_utils.TensorDataset(train_time_series, train_labels)
train_loader = data_utils.DataLoader(train_time_series, batch_size = BATCH_SIZE, shuffle = False)

val_time_series = data_utils.TensorDataset(val_time_series, val_labels)
val_loader = data_utils.DataLoader(val_time_series, batch_size = BATCH_SIZE, shuffle = False)

test_time_series = data_utils.TensorDataset(test_time_series, test_labels)
test_loader = data_utils.DataLoader(test_time_series, batch_size = BATCH_SIZE, shuffle = False)


In [26]:
classes ={0: 'ANGRY',
 1: 'DISGUST',
 2: 'FEAR',
 3: 'HAPPY',
 4: 'NEUTRAL',
 5: 'SAD',
 6: 'SURPRISE'}

In [27]:

NUM_CLASSES = len(classes)

N_FEATURES = train_time_series[0][0].shape[0]
N_FEATURES

88200

# Training a Convolutional neural network on spectrogram images

In [28]:
train_sr = 22050
val_sr = 22050
test_sr = 22050

def get_spec_loader(audio_time_series, sr, batch_size, shuffle = False):
    '''
    Returns data loader of spectrogram images
    
    Parameters
    ------------
    audio_time_series: Tensor Dataset with wav, label iterables
    
    sr: Sample rate
    
    batch_size: The batch size of data loader
    '''
    audio_spec_img = []
    labels = []
    curr = 0
    tot = len(audio_time_series)

    for wav, label in audio_time_series:
        spec_img = spec_to_image(get_melspectrogram_db(wav.numpy(), sr))
        spec_img = np.expand_dims(spec_img, axis = 0)
        audio_spec_img.append(spec_img)
        labels.append(label)

        curr += 1
        drawProgressBar(curr, tot, barLen = 40)

    audio_spec_img = torch.Tensor(audio_spec_img)
    audio_spec_img = audio_spec_img / 255
    
    labels = torch.Tensor(labels).long()

    audio_spec_img = data_utils.TensorDataset(audio_spec_img, labels)
    audio_loader = data_utils.DataLoader(audio_spec_img, batch_size = batch_size, shuffle = shuffle)
    
    return audio_loader

In [29]:
# Getting the spectrogram image for each audio in train set
import time
start_time = time.time()
train_loader = get_spec_loader(train_time_series, train_sr, BATCH_SIZE, shuffle = False)
val_loader = get_spec_loader(val_time_series, val_sr, BATCH_SIZE, shuffle = False)
test_loader = get_spec_loader(test_time_series, test_sr, BATCH_SIZE, shuffle = False)
print("\n--- %s seconds ---" % (time.time() - start_time))





--- 223.48922061920166 seconds ---


In [30]:
x, y = test_loader.dataset[0]
print(x.shape)

torch.Size([1, 128, 173])


## Model cnn

In [52]:
import torch.nn as nn
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        
        # Layer 1, Input shape (1, 128, 173) ->  Output shape (8, 62, 84)
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (5, 6)), 
            nn.MaxPool2d(kernel_size = (2, 2)),
            nn.ReLU(inplace=True), 
            nn.BatchNorm2d(8)            
            )
        
        # Layer 2, Input shape (8, 62, 84) -> Output shape (16, 30, 41)
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3, 3)), 
            nn.MaxPool2d(kernel_size = (2, 2)),
            nn.ReLU(inplace=True), 
            nn.BatchNorm2d(16)
            ) 
        
        # Layer 3, Input shape (16, 30, 41) -> Output shape (64, 10, 15)
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (6, 7)), 
            nn.MaxPool2d(kernel_size = (2, 2)),
            nn.ReLU(inplace=True), 
            nn.BatchNorm2d(32),   
            nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = (6, 6)), 
            nn.MaxPool2d(kernel_size = (2, 2)),
            nn.ReLU(inplace=True), 
            nn.BatchNorm2d(64)  
            )
        
        # Fully Connected layer 1, Input features 64 * 10 * 15 -> Output features 512
        self.fc1 = nn.Sequential(
            nn.Linear(1152, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(128, NUM_CLASSES)
        )
        self.softmax = nn.Softmax(dim=1)
        
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        
        x = x.view(-1, self.num_flat_features(x))    
        
        logits = self.fc1(x)
        predictions = self.softmax(logits)
        return predictions
    
    
    def num_flat_features(self, x):
        size = x.size()[1:]
        n_features = 1
        for s in size:
            n_features = n_features * s
        
        return n_features

In [53]:
# Defining loss and optimizer
NUM_CLASSES = len(classes)
model = ConvNet().to(device)

In [65]:
from torch import nn

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
#step_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.1)

In [66]:
def train_loop(dataloader, model, loss_fn, optimizer,lowest_loss):
    model.train()
    size = int(len(dataloader.dataset) / BATCH_SIZE)
    for batch, (X,y) in enumerate(dataloader):
        # Compute prediction and loss
        X, y = X.to(device), y.to(device)
        
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total = y.size(0)
        _, predicted = torch.max(pred, dim = 1)
        correct = (predicted == y).sum().item()
        accuracy = correct / total

        drawProgressBar((batch + 1), size, 
                              '\t loss: {:.4f} \t acc: {:.4f}'.format(round(loss.item(), 4), round(accuracy, 4)))
        
    if abs(lowest_loss - loss.item()) < THRESHOLD:
        #early_stop_epoch += 1
        print(' Loss did not decrease from ' + str(lowest_loss))
    
    else:
        print(' Loss decreased from {:.4f} to {:.4f}, saving model.'.format(
            round(lowest_loss, 4), round(loss.item(), 4)))
        
        lowest_loss = loss.item()
        early_stop_epoch = 0
        torch.save(model,'model.pth')
        
    acc_hist.append(accuracy)
    loss_hist.append(loss.item())
    return lowest_loss

def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for (X,y) in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
import math
import time
EPOCHS = 50
loss_hist = []
acc_hist = []
THRESHOLD = 0.001
lowest_loss = np.inf
start = time.time()
for t in range(EPOCHS):
    print(f"Epoch {t+1}\n-------------------------------")
    lowest_loss = train_loop(train_loader, model, criterion, optimizer,lowest_loss)
    test_loop(val_loader, model, criterion)
final = (time.time() - start)/60
print(f"Done for all {EPOCHS} epochs in {math.ceil(final)} minutes\n")

Epoch 1
-------------------------------
Progress: [===>                ] 122/612	 loss: 2.1654 	 acc: 0.0000

In [None]:
def test_single_epoch(model, dataloader, device):
  correct = 0
  size = len(dataloader.dataset)

  model.eval()
  for input,target in dataloader:
        input, target = input.to(device), target.to(device)
        # calculate loss
        prediction = model(input)
        correct += (prediction.argmax(1) == target).type(torch.float).sum().item()
  correct /= size
  print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}% \n")

In [None]:
test_single_epoch(model,test_loader,device)