In [None]:
# Need to install these if running on Google Colab as they don't come automatically installed
!pip3 install torchaudio
!pip3 install torchinfo

In [None]:
import matplotlib.pyplot as plt
from os import listdir
from os.path import isdir, join
import pathlib
from pathlib import Path
import numpy as np
import tensorflow as tf
from torch.autograd import Variable
import torchaudio
import torch
import torchinfo
from tqdm.notebook import tqdm


def plot_tensor(x, ar=8):
    for i in range(x.shape[0]):
        fig = plt.figure()
        ax = fig.add_subplot(111)
        ax.imshow(x[i], aspect='auto')
        fig.set_figwidth(100)
        fig.set_figheight(200)
        ax.set_aspect(ar)
        plt.axis('off')
        plt.show()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
#Run this if don't already have dataset downloaded or if on Colab
data_dir = pathlib.Path('/content/data')
if not data_dir.exists():
  tf.keras.utils.get_file(
      'speech_commands_v0.02.tar.gz',
      origin="http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz",
      extract=True,
      cache_dir='.', cache_subdir='data')

In [None]:
# data_dir='C:/Users/cferr/Documents/4th Year/FYP Data/speech_commands_v0.02'
keywords = [name for name in listdir(data_dir) if isdir(join(data_dir, name))]
#remove bg noise as it not a keyword
keywords.remove('_background_noise_')
print(keywords)

In [None]:
word2index = {
    # core words
    "backward": 0,
    "bed": 1,
    "bird": 2,
    "cat": 3,
    "dog": 4,
    "down": 5,
    "eight": 6,
    "five": 7,
    "follow": 8,
    "forward": 9,
    "four": 10,
    "go": 11,
    "happy": 12,
    "house": 13,
    "learn": 14,
    "left": 15,
    "marvin": 16,
    "nine": 17,
    "no": 18,
    "off": 19,
    "on":20,
    "one":21,
    "right":22,
    "seven":23,
    "sheila":24,
    "six":25,
    "stop":26,
    "three":27,
    "tree":28,
    "two":29,
    "up":30,
    "visual":31,
    "wow":32,
    "yes":33,
    "zero":34
}

index2word = [word for word in word2index]

In [None]:
num_classes = len(keywords)
#helps oversampling of certain keywords by setting a max sample amount
# num_samples_per_class = 4500
speech_commands_dataset_basepath = Path(data_dir)

samples = []
classes =  []

for word_class in word2index:
    folder = speech_commands_dataset_basepath / word_class # folder for each word - looks like ' content/data/backward '
    count = 0
    for file in folder.iterdir(): # iterate over all files in the folder
        #there are a few samples which aren't exactly 1 s long in the dataset.
        if file.stat().st_size == 32044:
            samples.append(file) # store path of sample file
            classes.append(word2index[word_class]) # append word class index to list
            count +=1
            
classes = np.array(classes, dtype=np.int)

In [None]:
#split the data into training and test
from sklearn.model_selection import train_test_split
train_data, test_data, train_classes, test_classes = train_test_split(samples, classes,
                                                                      test_size=0.2, random_state=42, shuffle=True)

In [None]:
s = []
s.append(str(train_data[1]))
s.append(str(train_data[0]))

In [None]:
def transform(files):

  mel_specgrams=[]
  for fp in files:
          waveform, sample_rate = torchaudio.load(fp)
    
          # normalize data
          waveform -= waveform.mean()
          waveform /= np.max((waveform.max(), -waveform.min()))
        
          mel_specgram = torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate, win_length=101, hop_length=8, n_mels=40)(waveform)
          mel_specgrams.append(mel_specgram)

  x = torch.cat(mel_specgrams)
  return x


In [None]:
plot = transform(s)
plot_tensor(plot)

In [None]:
#function to append the ground truth labels with its corresponding spectorgram tensor
# needed to create DataLoader for input to model.
def combinelabel(dataset, labels):
  combined = []
  labels=Variable(torch.from_numpy(labels).float())
  for i in range(len(dataset)):
    combined.append([str(dataset[i]), labels[i]])
  return combined

In [None]:
train_data_comb = combinelabel(train_data, train_classes)

In [None]:
#taking 10% of the training data aas a validation set:
val_length = int(len(train_data_comb) * 0.1)
print(val_length)

In [None]:
validation_data_combine = train_data_comb[-val_length:]
train_data_comb = train_data_comb[:-val_length]

In [None]:
len(train_data_comb)

In [None]:
test_data_comb = combinelabel(test_data, test_classes)

Create DataLoaders

In [None]:
trainloader = torch.utils.data.DataLoader(train_data_comb, batch_size=128,
                                          shuffle=False, num_workers=2)

In [None]:
len(trainloader)

In [None]:
validationloader = torch.utils.data.DataLoader(validation_data_combine, batch_size=128,
                                          shuffle=False, num_workers=2)

In [None]:
testloader = torch.utils.data.DataLoader(test_data_comb, batch_size=128,
                                          shuffle=False, num_workers=2)

Define Model

In [None]:
def DSConvLayer(c_in, c, k, s):
    depth_conv = torch.nn.Conv1d(in_channels=c_in, out_channels=c_in, kernel_size=k, stride=2, groups=c_in)
    point_conv = torch.nn.Conv1d(in_channels=c_in, out_channels=c, kernel_size=1, stride=1)
    return torch.nn.Sequential(depth_conv, point_conv, torch.nn.ReLU(), torch.nn.BatchNorm1d(c), torch.nn.AvgPool1d(s), torch.nn.Dropout(0.1))

In [None]:
model = torch.nn.Sequential(
    torch.nn.BatchNorm1d(40),
    DSConvLayer(40,160,25,2), 
    DSConvLayer(160,160,9,1), 
    DSConvLayer(160,160,9,1), 
    DSConvLayer(160,160,9,1), 
    DSConvLayer(160,160,9,1), 
    torch.nn.AvgPool1d(24),
    torch.nn.Flatten(),
    torch.nn.Linear(160,35),
    torch.nn.Softmax()
)

In [None]:
model=model.to(device)

View the Model using torchinfo

In [None]:
samp = transform(s)

In [None]:
# x_sample =x[0:1,:,:]
# print(x.shape)
torchinfo.summary(model, input_size=samp.shape)

In [None]:
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()

In [None]:
%load_ext tensorboard

In [None]:
import torch.optim as optim

#declare loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
epochs=10

In [None]:
min_valid_loss = np.inf
for e in range(epochs):
    train_loss = 0.0
    model.train()     # Optional when not using Model Specific layer
    for batch_idx, (data, target) in enumerate(trainloader):
        data = transform(data)
        data = data.to(device)

   
        target = target.to(device)
        target = target.type(torch.LongTensor)
        target = target.to(device)
        output = model(data)
        
        optimizer.zero_grad()
        loss = criterion(output,target)
        writer.add_scalar("Loss/train", loss, e)
        loss.backward()
        optimizer.step()
        train_loss = loss.item()
    
    valid_loss = 0.0
    model.eval()     # Optional when not using Model Specific layer
    for data, target in validationloader:
        data = transform(data)

        data = data.to(device)
        output = model(data)
        target = target.type(torch.LongTensor)
        target = target.to(device)
        loss = criterion(output,target)
        writer.add_scalar("Loss/val", loss, e)
        valid_loss = loss.item()

    print(f'Epoch {e+1} \t\t Training Loss: {train_loss} \t\t Validation Loss: {valid_loss}')
    if min_valid_loss > valid_loss:
        print(f'Validation Loss Decreased({min_valid_loss:.6f}--->{valid_loss:.6f}) \t Saving The Model')
        min_valid_loss = valid_loss
        # Saving State Dict
        torch.save(model.state_dict(), 'saved_model_melspec.pth')

In [None]:
%tensorboard --logdir runs

# Testing Trained Model:

In [None]:
model_load = model
model_load.load_state_dict(torch.load('Trained_Models/saved_model_melspec.pth'))
model_load.eval()

In [None]:
def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in testloader:
        target = target.to(device)
        data = transform(data)

        data = data.to(device)
        output = model(data)

        pred = pred=torch.max(output,dim=1)[1]
        
        correct+= (pred == target).float().sum() 

    print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(testloader.dataset)} ({100. * correct / len(testloader.dataset):.2f}%)\n")

In [None]:
test(model_load, 1)

# Example to Show how a convolution can be used as the first layer of 'pre-processing'

In [None]:
input_conv = torch.nn.Conv1d(in_channels=1, out_channels=40, kernel_size=101, stride=8)

In [None]:
print(input_conv(p).shape)
print(x.shape)

In [None]:
model_s = torch.nn.Sequential(input_conv, model)

In [None]:
print(x.shape)
torchinfo.summary(model_s, input_size=p.shape)