In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
from torchvision import datasets, models, transforms
import sys
from IPython.display import Audio
import matplotlib.pyplot as plt
from PIL import Image
import io

import os as os
import glob

import numpy as np
import pandas as pd

from pathlib import Path
import librosa

from audiomentations import Compose, AddGaussianNoise, PitchShift, Shift
from scipy.io.wavfile import read, write
import soundfile as sf
from librosa.display import specshow

## Functions

In [6]:
#import soundfile as sf
def wav_to_spec(path):
    sr = 16000
    wav, s = librosa.load(path)
    if (len(wav) < sr):
        wav = np.append(wav, np.zeros(sr - len(wav)))
    elif (len(wav) > sr):
        wav = wav[:sr]
    #sf.write(path, wav, sr) 
    
    # Generating Spectogram and saving as 'png'
    plt.figure()
    src_ft = librosa.stft(wav)
    src_db = librosa.amplitude_to_db(abs(src_ft))
    plt.axis('off')
    specshow(src_db, sr=sr)  
    plt.savefig(Path(path_spectrograms +'\\'+ str(path).split('\\')[-2]+ '\\'+ str(path).split('\\')[-1][:-3]+ 'png'))
    plt.close()

# walkover folders and get list of files
def load_data(dir_list_walkout, dir_list_not_walkout):
    for file in dir_list_walkout:
        wav_to_spec(Path(path_walkout +"\\"+ file))
    for file in dir_list_not_walkout:
        wav_to_spec(Path(path_not_walkout +"\\"+ file))
    

# wav augmentation to increase audio datatset
def augmentation_wav():
    
    augment = Compose([
        AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.005, p=0.5),
        PitchShift(min_semitones=-2, max_semitones=2, p=0.5),
        Shift(min_fraction=-0.2, max_fraction=0.2, p=0.5),
    ])

    for file in dir_list_walkout:
        for i in range(2):
            samples, sr= librosa.load(Path(path_walkout +"\\"+ file))
            augmented_samples = augment(samples=samples, sample_rate=sr)
            write(Path(path_walkout +"\\"+ file[:-4]+"{}.wav".format(i)), sr, augmented_samples)

    for file in dir_not_list_walkout:
        samples, sr= librosa.load(Path(path_not_walkout +"\\"+ file))
        augmented_samples = augment(samples=samples, sample_rate=sr)
        write(Path(path_not_walkout +"\\"+ file[:-4]+"{}.wav".format(i)), sr, augmented_samples)

### Dataset preparation

In [None]:
# Getting list of files
path_walkout = r"C:\Users\Ihtisham Ahmad\Desktop\ipython\sterlex\dataset\walkout"
path_not_walkout = r"C:\Users\Ihtisham Ahmad\Desktop\ipython\sterlex\dataset\not walkout"
path_spectrograms = r"C:\Users\Ihtisham Ahmad\Desktop\ipython\sterlex\dataset\spectograms"

dir_list_walkout = os.listdir(path_walkout)
dir_list_not_walkout = os.listdir(path_not_walkout)

    
files = dir_list_walkout + dir_list_not_walkout 

In [47]:
# load saved spectograms data and apply transformations

load_data(dir_list_walkout, dir_list_not_walkout)

data_path = r'.\dataset\spectograms' #looking in subfolders

yes_no_dataset = datasets.ImageFolder(
    root=data_path,
    transform=transforms.Compose([transforms.Resize((201,81)),
                                  transforms.ToTensor()
                                  ])
)
print(yes_no_dataset)

Dataset ImageFolder
    Number of datapoints: 172
    Root location: .\dataset\spectograms
    StandardTransform
Transform: Compose(
               Resize(size=(201, 81), interpolation=bilinear, max_size=None, antialias=None)
               ToTensor()
           )


In [48]:
class_map = yes_no_dataset.class_to_idx

print("\nClass category and index of the images: {}\n".format(class_map))


Class category and index of the images: {'not walkout': 0, 'walkout': 1}



In [49]:
#split data to test and train
#use 80% to train

train_size = int(0.8 * len(yes_no_dataset))
test_size = len(yes_no_dataset) - train_size
yes_no_train_dataset, yes_no_test_dataset = torch.utils.data.random_split(yes_no_dataset, [train_size, test_size])

print("Training size:", len(yes_no_train_dataset))
print("Testing size:",len(yes_no_test_dataset))

Training size: 137
Testing size: 35


In [50]:
from collections import Counter

# labels in training set
train_classes = [label for _, label in yes_no_train_dataset]
Counter(train_classes)

Counter({1: 70, 0: 67})

In [52]:
#train/test dataloaders

train_dataloader = torch.utils.data.DataLoader(
    yes_no_train_dataset,
    batch_size=16,
    num_workers=2,
    shuffle=True
)

test_dataloader = torch.utils.data.DataLoader(
    yes_no_test_dataset,
    batch_size=16,
    num_workers=2,
    shuffle=True
)



In [3]:
# CNN Architecture
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

class CNNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=5)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(51136, 50)
        self.fc2 = nn.Linear(50, 2)


    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        #x = x.view(x.size(0), -1)
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc2(x))
        return F.log_softmax(x,dim=1)  

model = CNNet().to(device)

Using cpu device


In [69]:
# cost function used to determine best parameters
cost = torch.nn.CrossEntropyLoss()

#Augmentations for spectograms during training
aug = transforms.Compose([torchaudio.transforms.FrequencyMasking(freq_mask_param=20),
              torchaudio.transforms.TimeMasking(time_mask_param=20)])

# used to create optimal parameters
learning_rate = 0.0001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Create the training function

def train(dataloader, model, loss, optimizer):
    model.train()
    size = len(dataloader.dataset)
    for batch, (X, Y) in enumerate(dataloader):
        X = aug(X)
        X, Y = X.to(device), Y.to(device)
        optimizer.zero_grad()
        pred = model(X)
        loss = cost(pred, Y)
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f'loss: {loss:>7f}  [{current:>5d}/{size:>5d}]')


# Create the validation/test function

def test(dataloader, model):
    best = 0
    size = len(dataloader.dataset)
    model.eval()
    test_loss, correct = 0, 0

    with torch.no_grad():
        for batch, (X, Y) in enumerate(dataloader):
            X, Y = X.to(device), Y.to(device)
            pred = model(X)

            test_loss += cost(pred, Y).item()
            correct += (pred.argmax(1)==Y).type(torch.float).sum().item()

    test_loss /= size
    correct /= size
    acc = 100*correct
    print(f'\nTest Error:\nacc: {(100*correct):>0.1f}%, avg loss: {test_loss:>8f}\n')
    

## Training

In [None]:
#Training
epochs = 200

for t in range(epochs):
    print(f'Epoch {t+1}\n-------------------------------')
    train(train_dataloader, model, cost, optimizer)
    test(test_dataloader, model)
    
print('Done!')

In [193]:
from torchinfo import summary
summary(model, input_size=(16, 3, 201, 81))

Layer (type:depth-idx)                   Output Shape              Param #
CNNet                                    [8, 2]                    --
├─Conv2d: 1-1                            [8, 32, 197, 77]          2,432
├─Conv2d: 1-2                            [8, 64, 94, 34]           51,264
├─Dropout2d: 1-3                         [8, 64, 94, 34]           --
├─Flatten: 1-4                           [8, 51136]                --
├─Linear: 1-5                            [8, 50]                   2,556,850
├─Linear: 1-6                            [8, 2]                    102
Total params: 2,610,648
Trainable params: 2,610,648
Non-trainable params: 0
Total mult-adds (G): 1.63
Input size (MB): 1.56
Forward/backward pass size (MB): 44.16
Params size (MB): 10.44
Estimated Total Size (MB): 56.17

In [None]:
# Model evaluation
model.eval()
test_loss, correct = 0, 0
class_map = ['not_walkout', 'walkout']

with torch.no_grad():
    for batch, (X, Y) in enumerate(test_dataloader):
        X, Y = X.to(device), Y.to(device)
        print(X.shape)
        print(Y)
        pred = model(X)

        print("Predicted:\nvalue={}, class_name= {}\n".format(pred[4].argmax(0),class_map[pred[4].argmax(0)]))
        print("Actual:\nvalue={}, class_name= {}\n".format(Y[4],class_map[Y[4]]))
        break

### saving model

In [25]:
model.eval()
scripted_model = torch.jit.script(model)
torch.jit.save(scripted_model, 'cnn_97_accuracy.pt')