<center> <h1> <b> Speech Systems (EE6307) </b> </h1> </center>


<dt> <h4>
 <b> Programming Assignment - 07 - End to End Automatic Speech Recognition </b> 

Welcome to the seventh programming assignment in the speech systems (EE6307) course. In this programming assignment, we expect you to code an algorithm to recognize the text information in speech signal i,e, Automatic Speech Recognition (ASR). This programming assignment focus on an End-to-End ASR system that takes inputs as speech signal and textual information in it and trains the model without any explicit alignments. Connectionist Temporal Classification (CTC) loss is used to train the ASR system, and Character Error Rate (CER), and Word Error Rate (WER) are used to quantify the performance of the ASR system. We request you train the model using the "train" partition of the TIDIGITS database and evaluate the performance on the "test" partition. 


</h4> </dt> 




<b> Instructions </b>
1. Plagiarism is strictly prohibited.
2. Delayed submissions will be penalized with a scaling factor of 0.5 per day.
3. Please DO NOT use any machine learning libraries unless and otherwise specified.





<h4> <b> End to End Automatic Speech Recognition : </b>  In this part of the assignment, you need to code an algorithm to recognize the text information in speech signal.   </h4> 


[link to TIDIGITS website](https://catalog.ldc.upenn.edu/LDC93S10)

[link to TIDIGITS dataset](https://drive.google.com/file/d/1E_rcPI6RfyfyKgy-MTT7WN5Nd3VWNj7T/view?usp=sharing)

<dt> <h4> 1. Dataset </h4> </dt> 
<dd> <h4>  - TIDIGITS corpus contains speech originally designed and collected at Texas Instruments, Inc. (TI) to design and evaluate algorithms for speaker-independent recognition of connected digit sequences. The database includes 326 speakers (111 men, 114 women, 50 boys, and 51 girls), each pronouncing 77 digit sequences. Each speaker group is partitioned into test and training subsets. </h4> </dd> 

<dd> <h4> - The ASR system in this assignment needs to be trained using the "train" partition of the TIDIGITS database. Please go through the above link for finer details of the dataset. We also provided the "readme.doc" file in the shared folder. Please go through it to understand the data organization required for preparing the labels, i.e., text information.   </h4> </dd> 
<dd> <h4> - Testing data : We request you to evaluate the performance of ASR system on "test" partition of the TIDIGITS database.  </h4> </dd> 

<dt> <h4> 2. Data preprocessing </h4> </dt> 
<dd> <h4> - Convert the target information ( sequence of digits being spoken / sentence information ) in textual domain to sequence of numbers, where the number indicates the character position in our predefined character set.    </h4> </dd> 
<dd> <h4> - Extract log-mel filter bank energies from the speech signal    </h4> </dd> 


<dt> <h4> 3. Automatic Speech Recognition (ASR) - Architecture </h4> </dt> 

<img src ="https://raw.githubusercontent.com/SpeechPublications/SpeechSystems/main/model.png" >
<center> Black Diagram of Automatic Speech Recognition Model </center>

<dd> <h4> - Create the approapriate architecture ( number of layers, number of filters and hidden dimensions etc ) following the black diagram

<dt> <h4> 4. Training ASR system </h4> </dt> 
<dd> <h4> -  Issue : The input ( speech signal ) and outputs ( text ) are not time aligned. Aligning the data using Hidden Markov Model (HMM) may not be a suitable option.   </h4> </dd> 
<dd> <h4> -  The ASR system is trained using the Connectionist Temporal Classificaiton (CTC) loss  </h4> </dd> 


<dt> <h4> 5. Evaluation Metrics </h4> </dt> 
<dd> <h4> - Typically the performance of ASR system is quantified in terms of Word Error Rate (WER) or Character Error Rate (CER). The codes to compute the WER and CER will be provided to you, use them to evaluate the performance   </h4> </dd> 
<dd> <h4> -  CER : Percentage of characters that were incorrectly predicted  by the ASR system      </h4> </dd> 


<dt> <h4> 6. Evaluate the performance of ASR system </h4> </dt> 
<dd> <h4> -  Use the trained ASR system and predict the characters and words using the basic GreedyDecoder and evaluate the performance in terms of CER and WER  </h4> </dd> 



In [1]:
###############################################################################
#Mount the drive
###############################################################################
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
###############################################################################
#Download and unzip the dataset
###############################################################################
!ls "/content/drive/MyDrive/TIDIGITS.zip"
!cp -r "/content/drive/MyDrive/TIDIGITS.zip" .
!unzip "TIDIGITS.zip"

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: TIDIGITS/tidigits/test/woman/pl/5883228a.wav  
  inflating: TIDIGITS/tidigits/test/woman/pl/8512455a.wav  
  inflating: TIDIGITS/tidigits/test/woman/pl/5z41736a.wav  
  inflating: TIDIGITS/tidigits/test/woman/pl/8134a.wav  
  inflating: TIDIGITS/tidigits/test/woman/pl/z65a.wav  
  inflating: TIDIGITS/tidigits/test/woman/pl/ooo766oa.wav  
  inflating: TIDIGITS/tidigits/test/woman/pl/62o1a.wav  
  inflating: TIDIGITS/tidigits/test/woman/pl/5a.wav  
   creating: TIDIGITS/tidigits/test/woman/bj/
  inflating: TIDIGITS/tidigits/test/woman/bj/4b.wav  
  inflating: TIDIGITS/tidigits/test/woman/bj/8a.wav  
  inflating: TIDIGITS/tidigits/test/woman/bj/923a.wav  
  inflating: TIDIGITS/tidigits/test/woman/bj/86194a.wav  
  inflating: TIDIGITS/tidigits/test/woman/bj/oa.wav  
  inflating: TIDIGITS/tidigits/test/woman/bj/9357a.wav  
  inflating: TIDIGITS/tidigits/test/woman/bj/64881a.wav  
  inflating: TIDIGITS/tidigits/tes

In [3]:
#All imports
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import soundfile
import numpy
import pandas as pd
from tqdm import tqdm
import time
from typing import List
import IPython
from torchaudio.models.decoder import ctc_decoder
from torchaudio.utils import download_asset
from torchaudio.models.decoder import download_pretrained_files

In [31]:
###############################################################################
#Pre-processing the data
###############################################################################
class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        e 2
        f 3
        g 4
        h 5
        i 6
        n 7
        o 8
        r 9
        s 10
        t 11
        u 12
        v 13
        w 14
        x 15
        z 16
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])

        return ''.join(string).replace('<SPACE>', ' ')

class Features(nn.Module):
    def __init__(self,sample_rate=16000,n_mels=64,n_fft=512,log_input=True):
        super(Features, self).__init__()
        self.n_mels = n_mels 
        self.log_input = log_input
        self.n_fft = n_fft
        self.sample_rate = sample_rate
        
        self.melspectrogram = torch.nn.Sequential( torchaudio.transforms.MelSpectrogram(sample_rate=16000,n_mels=64,n_fft=512) )        
       
    def forward(self,x_input):
        x_mfb = self.melspectrogram(x_input) + 1e-6
        if self.log_input: x_mfb_log = x_mfb.log()
        return x_mfb_log

train_audio_transforms = nn.Sequential(
    Features(),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=15),
    torchaudio.transforms.TimeMasking(time_mask_param=50)
)

#######################
#Create object of Features class for training and testing data.
#######################

valid_audio_transforms = Features()

#######################
#Create object of TextTransform() class
#######################
text_transform = TextTransform()

def data_processing(data, data_type="train"):
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []
    # utts = []
    for (waveform, utterance) in data:
        if data_type == 'train':
            spec = train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        elif data_type == 'valid':
            spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        else:
            raise Exception('data_type should be train or valid')
        spectrograms.append(spec)
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))
        # utts.append(utterance)

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

    return spectrograms, labels, input_lengths, label_lengths

In [25]:
###############################################################################
#Speech Recognition Model
###############################################################################
#Create SpeechRecognitionModel following the black diagram provided.

class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 


class ResidualCNN(nn.Module):
    """Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf
        except with layer norm instead of batch norm
    """
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)


class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x


class SpeechRecognitionModel(nn.Module):
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x

class IterMeter(object):
    """keeps track of total iterations"""
    def __init__(self):
        self.val = 0

    def step(self):
        self.val += 1

    def get(self):
        return self.val

In [26]:
###############################################################################
#Training Script
###############################################################################
def train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter):
    #Set the model in training mode
    model.train()

    #Get the data length
    data_len = len(train_loader.dataset)

    #Iterate through the training dataset and train the model
    for batch_idx, _data in enumerate(train_loader):

        #Get the spectrograms (input data), labels (targets)      
        spectrograms, labels, input_lengths, label_lengths = _data 
        spectrograms, labels = spectrograms.to(device), labels.to(device)

        #Set the gradients of the parameters to zero's
        optimizer.zero_grad()

        #Obtain the predictions 
        output = model(spectrograms)  # (batch, time, n_class)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1) # (time, batch, n_class)

        #Compute the loss
        loss = criterion(output, labels, input_lengths, label_lengths)

        #Backpropagate the loss to get the gradients of the parameters
        loss.backward()

        #Update the parameters
        optimizer.step()

        #Update the learning rate scheduler
        scheduler.step()
        iter_meter.step()

        #Print the training metrics at regular interval, let us say once in 100 iterations etc
        if batch_idx % 100 == 0 or batch_idx == data_len:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(spectrograms), data_len,
                100. * batch_idx / len(train_loader), loss.item()))

In [32]:
###############################################################################
#Evaluation Metrics
###############################################################################

def avg_wer(wer_scores, combined_ref_len):
    return float(sum(wer_scores)) / float(combined_ref_len)


def _levenshtein_distance(ref, hyp):
    """Levenshtein distance is a string metric for measuring the difference
    between two sequences. Informally, the levenshtein disctance is defined as
    the minimum number of single-character edits (substitutions, insertions or
    deletions) required to change one word into the other. We can naturally
    extend the edits to word level when calculate levenshtein disctance for
    two sentences.
    """
    m = len(ref)
    n = len(hyp)

    # special case
    if ref == hyp:
        return 0
    if m == 0:
        return n
    if n == 0:
        return m

    if m < n:
        ref, hyp = hyp, ref
        m, n = n, m

    # use O(min(m, n)) space
    distance = np.zeros((2, n + 1), dtype=np.int32)

    # initialize distance matrix
    for j in range(0,n + 1):
        distance[0][j] = j

    # calculate levenshtein distance
    for i in range(1, m + 1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = i % 2
        distance[cur_row_idx][0] = i
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1
                i_num = distance[cur_row_idx][j - 1] + 1
                d_num = distance[prev_row_idx][j] + 1
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)

    return distance[m % 2][n]


def word_errors(reference, hypothesis, ignore_case=False, delimiter=' '):
    """Compute the levenshtein distance between reference sequence and
    hypothesis sequence in word-level.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param delimiter: Delimiter of input sentences.
    :type delimiter: char
    :return: Levenshtein distance and word number of reference sentence.
    :rtype: list
    """
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()

    ref_words = reference.split(delimiter)
    hyp_words = hypothesis.split(delimiter)

    edit_distance = _levenshtein_distance(ref_words, hyp_words)
    return float(edit_distance), len(ref_words)


def char_errors(reference, hypothesis, ignore_case=False, remove_space=False):
    """Compute the levenshtein distance between reference sequence and
    hypothesis sequence in char-level.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param remove_space: Whether remove internal space characters
    :type remove_space: bool
    :return: Levenshtein distance and length of reference sentence.
    :rtype: list
    """
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()

    join_char = ' '
    if remove_space == True:
        join_char = ''

    reference = join_char.join(filter(None, reference.split(' ')))
    hypothesis = join_char.join(filter(None, hypothesis.split(' ')))

    edit_distance = _levenshtein_distance(reference, hypothesis)
    return float(edit_distance), len(reference)


def wer(reference, hypothesis, ignore_case=False, delimiter=' '):
    """Calculate word error rate (WER). WER compares reference text and
    hypothesis text in word-level. WER is defined as:
    .. math::
        WER = (Sw + Dw + Iw) / Nw
    where
    .. code-block:: text
        Sw is the number of words subsituted,
        Dw is the number of words deleted,
        Iw is the number of words inserted,
        Nw is the number of words in the reference
    We can use levenshtein distance to calculate WER. Please draw an attention
    that empty items will be removed when splitting sentences by delimiter.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param delimiter: Delimiter of input sentences.
    :type delimiter: char
    :return: Word error rate.
    :rtype: float
    :raises ValueError: If word number of reference is zero.
    """
    edit_distance, ref_len = word_errors(reference, hypothesis, ignore_case,
                                         delimiter)

    if ref_len == 0:
        raise ValueError("Reference's word number should be greater than 0.")

    wer = float(edit_distance) / ref_len
    return wer


def cer(reference, hypothesis, ignore_case=False, remove_space=False):
    """Calculate charactor error rate (CER). CER compares reference text and
    hypothesis text in char-level. CER is defined as:
    .. math::
        CER = (Sc + Dc + Ic) / Nc
    where
    .. code-block:: text
        Sc is the number of characters substituted,
        Dc is the number of characters deleted,
        Ic is the number of characters inserted
        Nc is the number of characters in the reference
    We can use levenshtein distance to calculate CER. Chinese input should be
    encoded to unicode. Please draw an attention that the leading and tailing
    space characters will be truncated and multiple consecutive space
    characters in a sentence will be replaced by one space character.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param remove_space: Whether remove internal space characters
    :type remove_space: bool
    :return: Character error rate.
    :rtype: float
    :raises ValueError: If the reference length is zero.
    """
    edit_distance, ref_len = char_errors(reference, hypothesis, ignore_case,
                                         remove_space)

    if ref_len == 0:
        raise ValueError("Length of reference should be greater than 0.")

    cer = float(edit_distance) / ref_len
    return cer

In [166]:
###############################################################################
#Decoder
###############################################################################
def GreedyDecoder(output, labels, label_lengths, blank_label=17, collapse_repeated=True):
	arg_maxes = torch.argmax(output, dim=2)
	decodes = []
	targets = []
	for i, args in enumerate(arg_maxes):
		decode = []
		targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
		for j, index in enumerate(args):
			if index != blank_label:
				if collapse_repeated and j != 0 and index == args[j -1]:
					continue
				decode.append(index.item())
		decodes.append(text_transform.int_to_text(decode))
	return decodes, targets

In [165]:
###############################################################################
#Testing Script
###############################################################################
def test(model, device, test_loader, criterion, epoch, iter_meter, decoder="greedy"):
    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_cer, test_wer = [], []
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            #Load data and labels
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            #Get the predictions
            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            #Compute the test loss
            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)

            if decoder=="greedy":
              #Decode the text information using GreedyDecoder
              decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
              
            #Compute CER and WER
            for j in range(len(decoded_preds)):
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))

    #Compute average CER and WER
    avg_cer = sum(test_cer)/len(test_cer)
    avg_wer = sum(test_wer)/len(test_wer)

    print('Test set: Average loss: {:.4f}, Average CER: {:4f} Average WER: {:.4f}\n'.format(test_loss, avg_cer, avg_wer))

In [164]:
#################################################################
#Function to load the speech signal
#################################################################
def loadWAV(filename):
    signal, sr = torchaudio.load(filename)
    resampler = torchaudio.transforms.Resample(sr, 16000)
    return resampler(signal)

def textProc(textString):
    mapDict = {
            "z" : "zero",
            "o" : "oh",
            "1" : "one",
            "2" : "two",
            "3" : "three",
            "4" : "four",
            "5" : "five",
            "6" : "six",
            "7" : "seven",
            "8" : "eight",
            "9" : "nine"
    }
    label = [mapDict[x] for x in textString]
    return " ".join(label)

#################################################################
#Dataset loader to create labels from the text files
#################################################################
class TDigitsDataset(Dataset):
    def __init__(self,train_list,base_dir):

        #Create the properties of class
        self.read_files = [line.rstrip('\n') for line in open(train_list)]
        self.wavs = []
        self.labels = []

        #Read training files
        for sample in tqdm(self.read_files):
          filepath = sample.split(' ')[1]
          label = sample.split(' ')[0][3:-1]
          label = textProc(label)
          y = loadWAV(filepath)
          self.wavs.append(y)
          self.labels.append(label)
        
    def __getitem__(self, index):
        #Load wavefiles and labels and return them
        return (self.wavs[index], self.labels[index])

    def __len__(self):
        #Return the lenght of dataset
        return len(self.wavs)

In [36]:
BATCH_SIZE = 10
use_cuda = torch.cuda.is_available()

#Define training and testing lists
train_list = "TIDIGITS/train_list.txt"
test_list = "TIDIGITS/test_list.txt"
base_dir = "TIDIGITS/"

#Create dataset generators and loaders for training and testing data
print("Creating Training Dataset :")
train_dataset = TDigitsDataset(train_list, base_dir)

print("Creating Test Dataset :")
test_dataset = TDigitsDataset(test_list, base_dir)

#Create dataset loaders for training
kwargs = {'num_workers': 2, 'pin_memory': True} if use_cuda else {}
train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=BATCH_SIZE,
                                shuffle=True,
                                collate_fn=lambda x: data_processing(x, 'train'),
                                **kwargs)
    
#Create dataset loaders for testing
test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=BATCH_SIZE,
                                shuffle=False,
                                collate_fn=lambda x: data_processing(x, 'valid'),
                                **kwargs)

Creating Training Dataset :


100%|██████████| 8613/8613 [00:13<00:00, 646.85it/s]


Creating Test Dataset :


100%|██████████| 8700/8700 [00:13<00:00, 640.95it/s]


In [37]:
###############################################################################
#Main script to train and evaluate the model
###############################################################################
def main(hparams, train_loader, test_loader, state=None):
   
    #Set the device
    use_cuda = torch.cuda.is_available()
    torch.manual_seed(7)
    device = torch.device("cuda" if use_cuda else "cpu")

    #Create model object
    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)

    #Print model properties i.e layers, parameters etc
    print(model)
    print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))

    #Create optimizer
    optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])

    start_epoch = 1
    end_epoch = epochs + 1

    if state:
      model.load_state_dict(state['state_dict'])
      optimizer.load_state_dict(state['optimizer'])
      start_epoch = state['epoch']+1
      end_epoch = epochs + state['epoch']+1

    #Define loss function i.e nn.CTCLoss()
    criterion = nn.CTCLoss(blank=17).to(device)

    #Define scheduler
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                            steps_per_epoch=int(len(train_loader)),
                                            epochs=hparams['epochs'],
                                            anneal_strategy='linear')
    
    #Iterate through the epochs and train and test the model
    iter_meter = IterMeter()
    for epoch in range(start_epoch, end_epoch):
        #Train the model using train(.) function
        train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter)

        #Evaluate the model using test(.) function
        test(model, device, test_loader, criterion, epoch, iter_meter)

    state = {
      'epoch': epoch,
      'state_dict': model.state_dict(),
      'optimizer': optimizer.state_dict(),
    }

    return state

In [167]:
###############################################################################
#Execute the main script
###############################################################################
learning_rate = 5e-4
batch_size = 10
epochs = 5
hparams = {
          "n_cnn_layers": 2,
          "n_rnn_layers": 1,
          "rnn_dim": 128,
          "n_class": 18,
          "n_feats": 64,
          "stride":2,
          "dropout": 0.1,
          "learning_rate": learning_rate,
          "batch_size": batch_size,
          "epochs": epochs
}

filepath = "/content/drive/MyDrive/state_0.pth"
prevState = torch.load(filepath)

state  = main(hparams, train_loader, test_loader, state=prevState)
torch.save(state, "/content/drive/MyDrive/state.pth")

SpeechRecognitionModel(
  (cnn): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (rescnn_layers): Sequential(
    (0): ResidualCNN(
      (cnn1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (cnn2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (dropout1): Dropout(p=0.1, inplace=False)
      (dropout2): Dropout(p=0.1, inplace=False)
      (layer_norm1): CNNLayerNorm(
        (layer_norm): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
      )
      (layer_norm2): CNNLayerNorm(
        (layer_norm): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
      )
    )
    (1): ResidualCNN(
      (cnn1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (cnn2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (dropout1): Dropout(p=0.1, inplace=False)
      (dropout2): Dropout(p=0.1, inplace=False)
      (layer_norm1): CNNLayerNorm(
        (layer_norm): LayerNorm((32,),

<h4> <b> Improvements : </b>

<dd> <h4> - Experiment with the architecture and come up with the best architecture 
  </h4> </dd> 

<dd> <h4> - Use different decoders and report your observations  </h4> </dd> 

## Improvements & Different decoders in architecture

In [100]:
import torch
import torchaudio
import time
from typing import List
import IPython
import matplotlib.pyplot as plt
from torchaudio.models.decoder import ctc_decoder
from torchaudio.utils import download_asset

tokens = ["'", "<SPACE>", "e" ,"f" ,"g" ,"h" ,"i" ,"n" ,"o" ,"r" ,"s" ,"t" ,"u" ,"v" ,"w" ,"x" ,"z", "17", "|"]

################################################################################
#Beam Search Decoder
################################################################################

beam_search_decoder = ctc_decoder(
    lexicon=None,
    tokens=tokens,
    nbest=3,
    beam_size=1500,
    blank_token="17"
)

In [138]:
def test(hparams, state, device, test_loader, decoder="greedy"):

    model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
    model.load_state_dict(state['state_dict'])

    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_cer, test_wer = [], []

    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            #Load data and labels
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            #Get the predictions
            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            if decoder=="greedy":
              #Decode the text information using GreedyDecoder
              decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)

            elif decoder=="beam":
              decoded_preds = []
              decoded_targets = []

              for emission in output.transpose(0, 1):
                emission = emission[None,:]
                beam_search_result = beam_search_decoder(emission)
                def rem(x):
                  if x not in [17,18]:
                    return True
                  return False
                final_tokens = list(filter(rem, beam_search_result[0][0].tokens.tolist()))
                beam_search_transcript = text_transform.int_to_text(final_tokens)
                decoded_preds.append(beam_search_transcript)

              for i in range(len(labels)):
                decoded_targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
            
            #Compute CER and WER
            for j in range(len(decoded_preds)):
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))

    #Compute average CER and WER
    avg_cer = sum(test_cer)/len(test_cer)
    avg_wer = sum(test_wer)/len(test_wer)

    print('Test set: Average CER: {:4f} Average WER: {:.4f}\n'.format(avg_cer, avg_wer))

In [121]:
hparams = {
          "n_cnn_layers": 2,
          "n_rnn_layers": 1,
          "rnn_dim": 128,
          "n_class": 18,
          "n_feats": 64,
          "stride":2,
          "dropout": 0.1
}

indexes = list(range(0, 1000, 1))
testset_1 = torch.utils.data.Subset(test_dataset, indexes)
decoder_test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=BATCH_SIZE,
                                shuffle=False,
                                collate_fn=lambda x: data_processing(x, 'valid'))

filepath = "/content/drive/MyDrive/state.pth"
state = torch.load(filepath)

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

test(hparams, state, device, decoder_test_loader, decoder="greedy")


evaluating...
Test set: Average CER: 0.007433 Average WER: 0.0228



In [162]:
use_cuda = torch.cuda.is_available()
device = "cpu"

test(hparams, state, device, decoder_test_loader, decoder="beam")


evaluating...
Test set: Average CER: 0.007433 Average WER: 0.0228



## Understanding beam sizes and their impact

In [148]:
model = SpeechRecognitionModel(
        hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
        hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
        ).to(device)
model.load_state_dict(state['state_dict'])
model.eval()

speech_file = "/content/1396z33a.wav"
IPython.display.Audio(speech_file)

In [149]:
waveform, sample_rate = torchaudio.load(speech_file)
spec = valid_audio_transforms(waveform).squeeze(0)
output = model(spec[None, None, :])
output = F.log_softmax(output, dim=2)

beam_search_result = beam_search_decoder(output)

def rem(x):
    if x not in [17,18]:
        return True
    return False

final_tokens = list(filter(rem, beam_search_result[0][0].tokens.tolist()))
beam_search_transcript = text_transform.int_to_text(final_tokens)

if sample_rate != 16000:
    waveform = torchaudio.functional.resample(waveform, sample_rate, 16000)

actual_transcript = "one three nine six zero three three"

beam_search_wer = torchaudio.functional.edit_distance(actual_transcript, beam_search_transcript) / len(
    actual_transcript
)

print("Actual Transcript : ", actual_transcript)
print(f"WER: {beam_search_wer}")

Actual Transcript :  one three nine six zero three three
WER: 0.02857142857142857


In [150]:
timesteps = beam_search_result[0][0].timesteps
predicted_tokens = beam_search_decoder.idxs_to_tokens(beam_search_result[0][0].tokens)

print(predicted_tokens, len(predicted_tokens))
print(timesteps, timesteps.shape[0])

['|', 'o', 'n', 'e', '<SPACE>', 't', 'h', 'r', 'e', 'e', '<SPACE>', 'n', 'i', 'n', 'e', 's', 'i', 'x', '<SPACE>', 'z', 'e', 'r', 'o', '<SPACE>', 't', 'h', 'r', 'e', 'e', '<SPACE>', 't', 'h', 'r', 'e', 'e', '|'] 36
tensor([  0,   1,   2,  16,  18,  21,  23,  25,  27,  30,  34,  36,  37,  39,
         48,  56,  57,  80,  84,  86,  87,  97, 101, 108, 110, 111, 114, 115,
        119, 121, 123, 124, 127, 129, 134, 155], dtype=torch.int32) 36


In [157]:
def print_decoded(decoder, emission, param, param_value):
    start_time = time.monotonic()

    result = decoder(emission)
    decode_time = time.monotonic() - start_time

    final_tokens = list(filter(rem, beam_search_result[0][0].tokens.tolist()))
    transcript = text_transform.int_to_text(final_tokens)

    score = result[0][0].score
    print(f"{param} {param_value:<3}: {transcript} (score: {score:.2f}; {decode_time:.4f} secs)")

In [154]:
for i in range(3):
    final_tokens = list(filter(rem, beam_search_result[0][i].tokens.tolist()))
    transcript = text_transform.int_to_text(final_tokens)
    score = beam_search_result[0][i].score
    print(f"{transcript} (score: {score})")

one three ninesix zero three three (score: -10.97612489641432)
one three oinesix zero three three (score: -11.78038354099516)
one three nineix zero three three (score: -11.96708570302252)


In [159]:
beam_sizes = [1, 5, 50, 500]

for beam_size in beam_sizes:
    beam_search_decoder = ctc_decoder(
        lexicon=None,
        tokens=tokens,
        nbest=3,
        beam_size=beam_size,
        blank_token="17"
    )

    print_decoded(beam_search_decoder, output, "beam size", beam_size)

beam size 1  : one three ninesix zero three three (score: -10.98; 0.0018 secs)
beam size 5  : one three ninesix zero three three (score: -10.98; 0.0024 secs)
beam size 50 : one three ninesix zero three three (score: -10.98; 0.0291 secs)
beam size 500: one three ninesix zero three three (score: -10.98; 0.2853 secs)


In [160]:
num_tokens = len(tokens)
beam_size_tokens = [1, 5, 10, num_tokens]

for beam_size_token in beam_size_tokens:
    beam_search_decoder = ctc_decoder(
        lexicon=None,
        tokens=tokens,
        nbest=3,
        beam_size_token=beam_size_token,
        blank_token="17"
    )

    print_decoded(beam_search_decoder, output, "beam size token", beam_size_token)

beam size token 1  : one three ninesix zero three three (score: -10.98; 0.0015 secs)
beam size token 5  : one three ninesix zero three three (score: -10.98; 0.0419 secs)
beam size token 10 : one three ninesix zero three three (score: -10.98; 0.0107 secs)
beam size token 19 : one three ninesix zero three three (score: -10.98; 0.0183 secs)


In [161]:
beam_thresholds = [1, 5, 10, 25]

for beam_threshold in beam_thresholds:
    beam_search_decoder = ctc_decoder(
        lexicon=None,
        tokens=tokens,
        nbest=3,
        beam_threshold=beam_threshold,
        blank_token="17"
    )

    print_decoded(beam_search_decoder, output, "beam threshold", beam_threshold)

beam threshold 1  : one three ninesix zero three three (score: -10.98; 0.0043 secs)
beam threshold 5  : one three ninesix zero three three (score: -10.98; 0.0074 secs)
beam threshold 10 : one three ninesix zero three three (score: -10.98; 0.0074 secs)
beam threshold 25 : one three ninesix zero three three (score: -10.98; 0.0150 secs)


## Report

Above speech recognition model with 400K parameters gives 0.58% CER and 1.7% WER on test dataset. With TDIGITS dataset, we are getting same performance for both greedy and beam search decoding. As observed on TDIGITS, there is no much performance imporvement with bigger beam search sizes.