## Imports

In [1]:

import math
import os
import random
from collections import defaultdict

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset
import numpy as np
from scipy.io.wavfile import read
import librosa
from matplotlib import pyplot as plt

cuda = True if torch.cuda.is_available() else False

Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor


In [2]:
def set_seed(seed_value):
    """Set seed for reproducibility.
    """
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)

set_seed(42)

In [3]:
class SpeechCommandsDataset(Dataset):
    """Google Speech Commands dataset."""

    def __init__(self, root_dir, split):
        """
        Args:
            root_dir (string): Directory with all the data files.
            split    (string): In ["train", "valid", "test"].
        """
        self.root_dir = root_dir
        self.split = split

        self.number_of_classes = len(self.get_classes())

        self.class_to_file = defaultdict(list)

        self.valid_filenames = self.get_valid_filenames()
        self.test_filenames = self.get_test_filenames()

        for c in self.get_classes():
            file_name_list = sorted(os.listdir(self.root_dir + "data_speech_commands_v0.02/" + c))
            for filename in file_name_list:
                if split == "train":
                    if (filename not in self.valid_filenames[c]) and (filename not in self.test_filenames[c]):
                        self.class_to_file[c].append(filename)
                elif split == "valid":
                    if filename in self.valid_filenames[c]:
                        self.class_to_file[c].append(filename)
                elif split == "test":
                    if filename in self.test_filenames[c]:
                        self.class_to_file[c].append(filename)
                else:
                    raise ValueError("Invalid split name.")

        self.filepath_list = list()
        self.label_list = list()
        for cc, c in enumerate(self.get_classes()):
            f_extension = sorted(list(self.class_to_file[c]))
            l_extension = [cc for i in f_extension]
            f_extension = [self.root_dir + "data_speech_commands_v0.02/" + c + "/" + filename for filename in f_extension]
            self.filepath_list.extend(f_extension)
            self.label_list.extend(l_extension)
        self.number_of_samples = len(self.filepath_list)

    def __len__(self):
        return self.number_of_samples

    def __getitem__(self, idx):
        sample = np.zeros((16000, ), dtype=np.float32)

        sample_file = self.filepath_list[idx]

        sample_from_file = read(sample_file)[1]
        sample[:sample_from_file.size] = sample_from_file
        sample = sample.reshape((16000, ))
        
        sample = librosa.feature.mfcc(y=sample, sr=16000, hop_length=512, n_fft=2048).transpose().astype(np.float32)

        label = self.label_list[idx]

        return sample, label

    def get_classes(self):
        return ['one', 'two', 'three']

    def get_valid_filenames(self):
        class_names = self.get_classes()

        class_to_filename = defaultdict(set)
        with open(self.root_dir + "data_speech_commands_v0.02/validation_list.txt", "r") as fp:
            for line in fp:
                clean_line = line.strip().split("/")

                if clean_line[0] in class_names:
                    class_to_filename[clean_line[0]].add(clean_line[1])

        return class_to_filename

    def get_test_filenames(self):
        class_names = self.get_classes()

        class_to_filename = defaultdict(set)
        with open(self.root_dir + "data_speech_commands_v0.02/testing_list.txt", "r") as fp:
            for line in fp:
                clean_line = line.strip().split("/")

                if clean_line[0] in class_names:
                    class_to_filename[clean_line[0]].add(clean_line[1])

        return class_to_filename

In [18]:

train_dataset = SpeechCommandsDataset(dataset_folder,
                                      "train")
valid_dataset = SpeechCommandsDataset(dataset_folder,
                                      "valid")

test_dataset = SpeechCommandsDataset(dataset_folder,
                                     "test")

batch_size = 100


num_epochs = 5
valid_every_n_steps = 20
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=valid_dataset,
                                           batch_size=batch_size,
                                           shuffle=False)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

## LSTM and GRU Cells

In [19]:
class LSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True):
        super(LSTMCell, self).__init__()
        self.input_size = input_size 
        self.hidden_size = hidden_size 
        self.bias = bias

        self.x2h = nn.Linear(self.input_size, self.hidden_size*4, bias=self.bias)
        self.h2h = nn.Linear(self.hidden_size, self.hidden_size*4, bias=self.bias)

        self.reset_parameters()

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, input, hx=None):
        if hx is None:
            hx = input.new_zeros(input.size(0), self.hidden_size, requires_grad=False)
            hx = (hx, hx)
            
        # We used hx to pack both the hidden and cell states
        hx, cx = hx

        # Given h_t-1, c_t-1 (hx, cx):
        # x2h layer packs all weight matrices and biases

        all = self.x2h(input) + self.h2h(hx)
        fy, iy, oy, c_tilda_y = torch.chunk(all, 4, dim=1)

        fy = nn.Sigmoid()(fy)
        iy = nn.Sigmoid()(iy)
        oy = nn.Sigmoid()(oy)
        c_tilda_y = nn.Tanh()(c_tilda_y)
        
        cy = (fy * cx) + (iy * c_tilda_y)
        hy = oy * nn.Tanh()(cy)

        return (hy, cy)

class BasicRNNCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True, nonlinearity="tanh"):
        super(BasicRNNCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias
        self.nonlinearity = nonlinearity
        if self.nonlinearity not in ["tanh", "relu"]:
            raise ValueError("Invalid nonlinearity selected for RNN.")

        self.x2h = nn.Linear(input_size, hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, hidden_size, bias=bias)

        self.reset_parameters()
        

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

            
    def forward(self, input, hx=None):
        if hx is None:
            hx = input.new_zeros(input.size(0), self.hidden_size, requires_grad=False)

        activation = getattr(nn.functional, self.nonlinearity)
        hy = activation(self.x2h(input) + self.h2h(hx))

        return hy

    
    
class GRUCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True):
        super(GRUCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size 
        self.bias = bias

        self.x2h = nn.Linear(self.input_size, self.hidden_size*2, bias=self.bias)
        self.h2h = nn.Linear(self.hidden_size, self.hidden_size*2, bias=self.bias)

        self.x2r = nn.Linear(self.input_size, hidden_size, bias=bias)       
        self.h2r = nn.Linear(self.hidden_size, hidden_size, bias=self.bias)

        self.reset_parameters()
        

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, input, hx=None):
        if hx is None:
            hx = input.new_zeros(input.size(0), self.hidden_size, requires_grad=False)
        
        all = self.x2h(input) + self.h2h(hx)
        zy, ry = torch.chunk(all, 2, dim=1)
        zy = nn.Sigmoid()(zy)
        ry = nn.Sigmoid()(ry)
        
        input_h2r = hx * ry
        h_tilda_y = nn.Tanh()(self.x2r(input) + self.h2r(input_h2r))
        hy = (1 - zy) * hx + zy * h_tilda_y
        return hy

## RNN and Bidirectional RNN Models

In [55]:
class RNNModel(nn.Module):
    def __init__(self, mode, input_size, hidden_size, num_layers, bias, output_size):
        super(RNNModel, self).__init__()
        self.mode = mode
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bias = bias
        self.output_size = output_size
        
        self.rnn_cell_list = nn.ModuleList()
        
        if mode == 'LSTM':

            # Forward
            self.rnn_cell_list.append(LSTMCell(self.input_size, self.hidden_size, bias=self.bias))      # bottom layer from input and across
      
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(LSTMCell(self.hidden_size, self.hidden_size, bias=self.bias))  # forward layer from prev layer and across

        elif mode == 'GRU':

            # Forward
            self.rnn_cell_list.append(GRUCell(self.input_size, self.hidden_size, bias=self.bias))      # bottom layer from input and across
        
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(GRUCell(self.hidden_size, self.hidden_size, bias=self.bias))  # forward layer from prev layer and across    
        
        elif mode == 'RNN_TANH':
            
            # Forward
            self.rnn_cell_list.append(BasicRNNCell(self.input_size, self.hidden_size, bias=self.bias, nonlinearity="tanh"))      # bottom layer from input and across
        
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(BasicRNNCell(self.hidden_size, self.hidden_size, bias=self.bias, nonlinearity="tanh"))  # forward layer from prev layer and across
                
        elif mode == 'RNN_RELU':

           # Forward
            self.rnn_cell_list.append(BasicRNNCell(self.input_size, self.hidden_size, bias=self.bias, nonlinearity="relu"))      # bottom layer from input and across
        
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(BasicRNNCell(self.hidden_size, self.hidden_size, bias=self.bias, nonlinearity="relu"))  # forward layer from prev layer and across

        else:
            raise ValueError("Invalid RNN mode selected.")


        self.att_fc = nn.Linear(self.hidden_size, 1)
        self.fc = nn.Linear(self.hidden_size, self.output_size)
 

        
    def forward(self, input, hx=None):

        outs = []
        h0 = [None] * self.num_layers if hx is None else list(hx)
        
        # In this forward pass we want to create our RNN from the rnn cells,
        # ..taking the hidden states from the final RNN layer and passing these 
        # ..through our fully connected layer (fc).
        
        # The multi-layered RNN should be able to run when the mode is either 
        # .. LSTM, GRU, RNN_TANH or RNN_RELU.
        
        if self.mode == 'LSTM':
            # Iterate over each time step
            for j in range(input.shape[1]):
                # Iterate over each layer step
                for i, cell in enumerate(self.rnn_cell_list):
                    # First on h0
                    if i == 0:
                        hx_new = cell.forward(input[:,j,:], hx=h0[i])
                    # Go through other cells
                    else:
                        hx_new = cell.forward(hx_new[0], hx=h0[i])

                    h0[i] = hx_new
                    outs.append(hx_new[0])
                
        else:
            # Iterate over each time step
            for j in range(input.shape[1]):
                # Iterate over each layer step
                for i, cell in enumerate(self.rnn_cell_list):
                    # First on h0
                    if i == 0:
                        hx_new = cell.forward(input[:,j,:], hx=h0[i])
                    # Go through other cells
                    else:
                        hx_new = cell.forward(hx_new, hx=h0[i])
                    
                    h0[i] = hx_new
                    outs.append(hx_new)

        out = outs[-1].squeeze()

        out = self.fc(out)
        
        
        return out



class BidirRecurrentModel(nn.Module):
    def __init__(self, mode, input_size, hidden_size, num_layers, bias, output_size):
        super(BidirRecurrentModel, self).__init__()
        self.mode = mode
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bias = bias
        self.output_size = output_size
        
        self.rnn_cell_list = nn.ModuleList()
        self.rnn_cell_list_rev = nn.ModuleList()
        
        if mode == 'LSTM':
            # Forward
            self.rnn_cell_list.append(LSTMCell(self.input_size, self.hidden_size, bias=self.bias))      # bottom layer from input and across
      
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(LSTMCell(self.hidden_size, self.hidden_size, bias=self.bias))  # forward layer from prev layer and across

            # Backward
            self.rnn_cell_list_rev.append(LSTMCell(self.input_size, self.hidden_size, bias=self.bias))      # from input to first backward and across

            for i in range(self.num_layers - 1):
                self.rnn_cell_list_rev.append(LSTMCell(self.hidden_size, self.hidden_size, bias=self.bias))  # backward layer from prev layer and across
            
        elif mode == 'GRU':

            # Forward
            self.rnn_cell_list.append(GRUCell(self.input_size, self.hidden_size, bias=self.bias))      # bottom layer from input and across
        
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(GRUCell(self.hidden_size, self.hidden_size, bias=self.bias))  # forward layer from prev layer and across
            
            # Backward
            self.rnn_cell_list_rev.append(GRUCell(self.input_size, self.hidden_size, bias=self.bias))      # from input to first backward and across

            for i in range(self.num_layers - 1):
                self.rnn_cell_list_rev.append(GRUCell(self.hidden_size, self.hidden_size, bias=self.bias))  # backward layer from prev layer and across 

        elif mode == 'RNN_TANH':
            # Forward
            self.rnn_cell_list.append(BasicRNNCell(self.input_size, self.hidden_size, bias=self.bias, nonlinearity="tanh"))      # bottom layer from input and across
        
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(BasicRNNCell(self.hidden_size, self.hidden_size, bias=self.bias, nonlinearity="tanh"))  # forward layer from prev layer and across


            # Backward
            self.rnn_cell_list_rev.append(BasicRNNCell(self.input_size, self.hidden_size, bias=self.bias, nonlinearity="tanh"))      # from input to first backward and across

            for i in range(self.num_layers - 1):
                self.rnn_cell_list_rev.append(BasicRNNCell(self.hidden_size, self.hidden_size, bias=self.bias, nonlinearity="tanh"))  # backward layer from prev layer and across
  
        elif mode == 'RNN_RELU':

           # Forward
            self.rnn_cell_list.append(BasicRNNCell(self.input_size, self.hidden_size, bias=self.bias, nonlinearity="relu"))      # bottom layer from input and across
        
            for i in range(self.num_layers - 1): 
                self.rnn_cell_list.append(BasicRNNCell(self.hidden_size, self.hidden_size, bias=self.bias, nonlinearity="relu"))  # forward layer from prev layer and across


            # Backward
            self.rnn_cell_list_rev.append(BasicRNNCell(self.input_size, self.hidden_size, bias=self.bias, nonlinearity="relu"))      # last forward layer layer to first backward and across

            for i in range(self.num_layers - 1):
                self.rnn_cell_list_rev.append(BasicRNNCell(self.hidden_size, self.hidden_size, bias=self.bias, nonlinearity="relu"))  # backward layer from prev layer and across


        else:
            raise ValueError("Invalid RNN mode selected.")
        
        self.fc = nn.Linear(self.hidden_size*2, self.output_size)  # multiply by 2 since using concat of forward and back outputs 
 
        
        
    def forward(self, input, hx=None):
        
        # In this forward pass we want to create our Bidirectional RNN from the rnn cells,
        # .. taking the hidden states from the final RNN layer with their reversed counterparts
        # .. before concatening these and running them through the fully connected layer (fc)
        
        # The multi-layered RNN should be able to run when the mode is either 
        # .. LSTM, GRU, RNN_TANH or RNN_RELU.
        
        outs = []
        outs_rev = []

        # Keep separate lists because do not use output of forward layers as input to backward layers
        h0 = [None] * self.num_layers if hx is None else list(hx)
        h0_rev = [None] * self.num_layers if hx is None else list(hx)#.reverse()


        if self.mode == 'LSTM':

            # Iterate through forward direction layers
            # Iterate over each  time step
            for j in range(input.shape[1]):
                # Iterate over each layer step
                for i, cell in enumerate(self.rnn_cell_list):
                    # Forward direction layers
                    # First on input
                    if i == 0:
                        hx_new = cell.forward(input[:,j,:], hx=h0[i])
                    # Go through other forward layers
                    else:
                        # Input is output h of previous cell, and h is h of previous layer
                        hx_new = cell.forward(hx_new[0], hx=h0[i])
  
                    # Backward direction layers
                    # First on input
                    if i == 0:
                        hx_rev = cell.forward(input[:,-(j+1),:], hx=h0_rev[i])     # Reverse order of input   
                    # Go through other forward layers
                    else:
                        hx_rev = cell.forward(hx_rev[0], hx=h0_rev[i])

                    # Update h0 to store previous layers outputs
                    h0[i] = hx_new
                    outs.append(hx_new[0])

                    # Update h0_rev to store previous layers outputs
                    h0_rev[i] = hx_rev
                    outs_rev.append(hx_rev[0])

        else:
            # Iterate over each time step
            for j in range(input.shape[1]):
                # Iterate over each layer step
                for i, cell in enumerate(self.rnn_cell_list):
                    # Forward directio layers
                    # First on h0
                    if i == 0:
                        hx_new = cell.forward(input[:,j,:], hx=h0[i])
                    # Go through other forward layers
                    else:
                        hx_new = cell.forward(hx_new, hx=h0[i])

                    # Backward direction layers
                    # First on h0
                    if i == 0:
                        hx_rev = cell.forward(input[:,-(j+1),:], hx=h0_rev[i])   # reverse order of input
                    # Go through other forward layers
                    else:
                        hx_rev = cell.forward(hx_rev, hx=h0_rev[i])


                    h0[i] = hx_new
                    outs.append(hx_new)
                    h0_rev[i] = hx_rev
                    outs_rev.append(hx_rev)

        out = outs[-1].squeeze()
        out_rev = outs_rev[0].squeeze()
        out = torch.cat((out, out_rev), 1)
        out = self.fc(out)
        return out

## Training

In [56]:

seq_dim, input_dim = train_dataset[0][0].shape
output_dim = 3

hidden_dim = 32
layer_dim = 1
bias = True

### Change the code below to try running different models:
#model = RNNModel("LSTM", input_dim, hidden_dim, layer_dim, bias, output_dim)
#model = RNNModel("GRU", input_dim, hidden_dim, layer_dim, bias, output_dim)
#model = RNNModel("RNN_RELU", input_dim, hidden_dim, layer_dim, bias, output_dim)
#model = RNNModel("RNN_TANH", input_dim, hidden_dim, layer_dim, bias, output_dim)

model = BidirRecurrentModel("LSTM", input_dim, hidden_dim, layer_dim, bias, output_dim)
#model = BidirRecurrentModel("GRU", input_dim, hidden_dim, layer_dim, bias, output_dim)
#model = BidirRecurrentModel("RNN_RELU", input_dim, hidden_dim, layer_dim, bias, output_dim)
#model = BidirRecurrentModel("RNN_TANH", input_dim, hidden_dim, layer_dim, bias, output_dim)


if torch.cuda.is_available():
    model.cuda()
    
criterion = nn.CrossEntropyLoss()

learning_rate = 0.01
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

loss_list = []
iter = 0
max_v_accuracy = 0
reported_t_accuracy = 0
max_t_accuracy = 0
for epoch in range(num_epochs):
    for i, (audio, labels) in enumerate(train_loader):
        if torch.cuda.is_available():
            audio = Variable(audio.view(-1, seq_dim, input_dim).cuda())
            labels = Variable(labels.cuda())
        else:
            audio = Variable(audio.view(-1, seq_dim, input_dim))
            labels = Variable(labels)

        optimizer.zero_grad()

        outputs = model(audio)

        loss = criterion(outputs, labels)

        if torch.cuda.is_available():
            loss.cuda()

        loss.backward()

        optimizer.step()

        loss_list.append(loss.item())
        iter += 1

        if iter % valid_every_n_steps == 0:
            correct = 0
            total = 0
            for audio, labels in valid_loader:
                if torch.cuda.is_available():
                    audio = Variable(audio.view(-1, seq_dim, input_dim).cuda())
                else:
                    audio = Variable(audio.view(-1, seq_dim, input_dim))

                outputs = model(audio)

                _, predicted = torch.max(outputs.data, 1)

                total += labels.size(0)

                if torch.cuda.is_available():
                    correct += (predicted.cpu() == labels.cpu()).sum()
                else:
                    correct += (predicted == labels).sum()

            v_accuracy = 100 * correct // total
            
            is_best = False
            if v_accuracy >= max_v_accuracy:
                max_v_accuracy = v_accuracy
                is_best = True

            if is_best:
                for audio, labels in test_loader:
                    if torch.cuda.is_available():
                        audio = Variable(audio.view(-1, seq_dim, input_dim).cuda())
                    else:
                        audio = Variable(audio.view(-1, seq_dim, input_dim))

                    outputs = model(audio)

                    _, predicted = torch.max(outputs.data, 1)

                    total += labels.size(0)

                    if torch.cuda.is_available():
                        correct += (predicted.cpu() == labels.cpu()).sum()
                    else:
                        correct += (predicted == labels).sum()

                t_accuracy = 100 * correct // total
                reported_t_accuracy = t_accuracy

            print('Iteration: {}. Loss: {}. V-Accuracy: {}  T-Accuracy: {}'.format(iter, loss.item(), v_accuracy, reported_t_accuracy))



Iteration: 20. Loss: 1.0907610654830933. V-Accuracy: 33  T-Accuracy: 35
Iteration: 40. Loss: 0.9884630441665649. V-Accuracy: 61  T-Accuracy: 60
Iteration: 60. Loss: 0.8351296782493591. V-Accuracy: 68  T-Accuracy: 68
Iteration: 80. Loss: 0.7208311557769775. V-Accuracy: 67  T-Accuracy: 68
Iteration: 100. Loss: 0.6368197798728943. V-Accuracy: 70  T-Accuracy: 68
Iteration: 120. Loss: 0.6104040741920471. V-Accuracy: 70  T-Accuracy: 68
Iteration: 140. Loss: 0.6348065733909607. V-Accuracy: 69  T-Accuracy: 68
Iteration: 160. Loss: 0.6063405275344849. V-Accuracy: 73  T-Accuracy: 71
Iteration: 180. Loss: 0.5756142735481262. V-Accuracy: 71  T-Accuracy: 71
Iteration: 200. Loss: 0.7492965459823608. V-Accuracy: 69  T-Accuracy: 71
Iteration: 220. Loss: 0.7609963417053223. V-Accuracy: 73  T-Accuracy: 71
Iteration: 240. Loss: 0.6040704846382141. V-Accuracy: 76  T-Accuracy: 73
Iteration: 260. Loss: 0.5462620854377747. V-Accuracy: 75  T-Accuracy: 73
Iteration: 280. Loss: 0.5252678394317627. V-Accuracy: 7