# **Author**: Adwoa Asantewaa Bremang 
# **Project**: Speech recognition model training

---



**Description:**

The speech were converted to frames of utterances, these utterances had unaligned phonemes as labels. Therefore, the project focused on predicting phonomes mapped to utterance of test data using CNN-LSTM RNN model and CTCLOSS to aid in mapping utterance to phonomes.
The model was evaluated using the levenshtein distance. The model trained with approximately 22000 train dataset was able to predict labels for train datasets with an average levenshtein distance of approximately 8.

In [2]:
from google.colab import drive
#drive.mount('/content/gdrive')


## Libraries installations 

In [None]:
!pip install python-Levenshtein


In [None]:
!git clone --recursive https://github.com/parlance/ctcdecode.git
!pip install wget
%cd ctcdecode
!pip install .
%cd ..

In [None]:
!git clone https://github.com/1ytic/pytorch-edit-distance
%cd pytorch-edit-distance
!python setup.py install
%cd ..

## Imported classes

In [None]:
import os
import numpy as np
from PIL import Image

import torch
import Levenshtein
import torchvision   
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score
from skimage import io
from torchvision import transforms
import pandas as pd
from torch.nn.utils.rnn import pad_sequence
from ctcdecode import CTCBeamDecoder
#from torch_edit_distance import *
#from pytorch-edit-distance import torch_edit_distance
cuda = torch.cuda.is_available()
cuda

## loading data

In [None]:
dev = np.load('dev.npy',allow_pickle=True)
dev_labels = np.load('dev_labels.npy', allow_pickle=True)
test = np.load('test.npy', allow_pickle=True)
train = np.load('train.npy', allow_pickle=True)
train_labels = np.load('train_labels.npy',allow_pickle=True)
print("dev.shape", dev.shape)
print("dev_label.shape", dev_labels[0].shape)
print("test.shape", test.shape)
print("train.shape", train.shape)
print("train_labels.shape", train_labels.shape)
from phoneme_list import *

label_map = PHONEME_MAP

## dataset loading

In [23]:
class LibriDataset(Dataset):
  def __init__(self,X,y):    
    self.X = X
    if y is None:
      self.y = None
    else: 
      self.y = y

  def __len__(self):
    length = len(self.X)
    return length
  
  def __getitem__(self,idx):
    features = self.X[idx]

    labels = None
    if self.y is not None: 
      labels = self.y[idx]+1
      return torch.Tensor(features), torch.Tensor(labels)
    else:
      return torch.Tensor(features)


## collate function 

In [24]:
def collate(batch):
    y = []
    x = []
    x_length = []
    y_length =[]
    for X,Y in batch:
       x.append(X)
       y.append(Y)
       x_length.append(X.shape[0])
       y_length.append(len(Y))
    x_out =pad_sequence(x,batch_first=False)
    y_out =pad_sequence(y,batch_first=True)

    return (x_out, x_length),(y_out,  y_length)

In [25]:
def collate_test(batch):
    y = []
    x = []
    x_length = []
    y_length =[]
    for X in batch:
       x.append(X)
       x_length.append(X.shape[0])
    x_out =pad_sequence(x,batch_first=False)

    return (x_out, x_length)

In [None]:
batch_size = 16

train_dataset = LibriDataset(train, train_labels)

train_loader = DataLoader(train_dataset, batch_size= batch_size  , shuffle=True,num_workers=3, pin_memory=True,collate_fn =lambda b: collate(b))

In [None]:
dev_dataset = LibriDataset(dev, dev_labels) 
dev_loader = DataLoader(dev_dataset, batch_size= batch_size  , shuffle=False,num_workers=3, pin_memory=True,collate_fn =lambda b: collate(b))

In [None]:
test_dataset = LibriDataset(test,None)
test_loader = DataLoader(test_dataset, batch_size= batch_size  , shuffle=False,num_workers=3, pin_memory=True,collate_fn =lambda b: collate_test(b))

## Model

---

  The model is a CNN_LSTM RNN Model. The input data is passed through an 1d CNN model which is average pooled. The output is passed to a bidirectional LSTM model. The output is passed to a linear layer of logSoftmax.


In [29]:
class model(nn.Module):
  def __init__(self, input,hidden_sizes,output, no_layers):   
    super(model, self).__init__() 
    #.......................................................
    in_channels = input
    out_channels = 128
    self.cnn = nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding=0, bias=False)
    self.bn = nn.BatchNorm1d(out_channels)
    self.avg_pool = nn.AvgPool1d(3, stride=2)
        
    self.linear = nn.Linear(out_channels , input)

    #..........................................................
    self.lstm = nn.LSTM(input,hidden_sizes,num_layers = no_layers, bidirectional=True, batch_first=True, dropout = 0.5)
    #self.dropout = nn.Dropout(0.4)
    #self.linear = nn.Linear(hidden_sizes*2, hidden_sizes*2)
    self.relu = nn.ReLU()
    self.output = nn.Linear(hidden_sizes * 2, output)

  def forward(self,x,lengths):
    #print(x.shape)
    x = self.cnn(x.permute(0,2,1))
    #print(x.shape)
    x = self.avg_pool(x)
    #print(x.shape)
    
    x  =torch.nn.utils.rnn.pack_padded_sequence(x, lengths, batch_first=False, enforce_sorted=False)
    x = self.lstm(x)[0]
    
    x, out_lens = torch.nn.utils.rnn.pad_packed_sequence(x, batch_first=False)
    x = self.dropout(x)
    x = self.output(x).log_softmax(dim = 2)

    return x, out_lens


## training function 

In [33]:

def training(model,optimizer,criterion, data_loader,scheduler):
  model.train()
  loss = 0.0
  for i,(train_data,train_label) in enumerate(data_loader):
    optimizer.zero_grad()
    train_d_ = train_data[0].to(device)
    train_d_lens = torch.Tensor(train_data[1]).to(torch.int)
    train_l_ = train_label[0].to(device)
    train_l_lens = torch.Tensor(train_label[1]).to(torch.int)
    print(train_d_.shape,train_d_lens.shape,train_l_.shape,train_l_lens.shape)
    m
    train_output, out_lens = model(train_d_,train_d_lens)
    train_loss = criterion(train_output,train_l_, out_lens,train_l_lens)
    loss += train_loss.item()
    train_loss.backward()
    optimizer.step()
    #scheduler.step()
    '''if(i == 20):
      torch.save(model.state_dict(), 'checkpoint2.pth')'''
  loss /= len(data_loader)
  print("Training loss", loss)
  return loss

## Decoder function 

In [None]:
decoder = CTCBeamDecoder(
    PHONEME_LIST,
    model_path=None,
    alpha=0,
    beta=0,
    cutoff_top_n=40,
    cutoff_prob=1.0,
    beam_width=50,
    num_processes=4,
    #blank_id=0,
    log_probs_input=True
)

In [None]:
def label_string(i):
  return label_map[i.numpy().astype(int)]

In [None]:
def edit_distance_cal(pred,target):
  for i, p in enumerate(pred):
    if(i< len(target)):
      dis = Levenshtein.distance(p,target[i])
  return dis

In [None]:
def decode_seq(pred):
  string_out = []
  lens = []
  for i, p in enumerate(pred):
    string = list(map(label_string, list(p)))

    string_out.append((''.join(string)))
  return string_out



## Validation

In [None]:
def validation_model(model,criterion,test_loader):
  with torch.no_grad():
    model.eval()
    dev_loss  =0.0
    total_predictions =0.0
    correct_predictions =0
    predict = 0.0
    predict_store = []
    target =[]
    count = 0
    new = []
    dis=0
    accuracy = 0
    for i,(dev_data,dev_label) in enumerate(dev_loader):
      
      dev_d_ = dev_data[0].to(device)
      dev_d_lens = torch.Tensor(dev_data[1]).to(torch.int)
      dev_l_ = dev_l[0].to(device)
      dev_l_lens = torch.Tensor(dev_label[1]).to(torch.int)
      dev_output, out_lens= model(dev_d_,dev_d_lens)
   
      total_predictions += dev_l_lens.size(0)

      dev_loss = criterion(dev_output,dev_l_, out_lens,dev_l_lens)
  
      dev_loss += dev_loss.item()
      predicted = torch.transpose(dev_output, 0,1)
      align = []
      predict_beam_results, _, _, predict_out_lens = decoder.decode(predicted.data.cpu(),torch.IntTensor(out_lens.to((torch.int))))
      for i, out_len in enumerate(predict_out_lens):
          align.append(predict_beam_results[i, 0, :predict_out_lens[i, 0]])

      pred_seq = decode_seq(align)
      target_seq = decode_seq(dev_l_.data.cpu())
    
      dis = edit_distance_cal(pred_seq,target_seq)
      
      
      predict_store.append(dis)
    dis_loss = np.sum(np.array(predict_store))/len(predict_store)
    dev_loss /= len(dev_loader) 

    print("edit_distance",dis_loss)
    print("Validation loss :",dev_loss)
    
    return dev_loss, dis_loss

In [31]:
def init_weights(layer):
        if type(layer) == nn.Linear:
           torch.nn.init.kaiming_normal_(layer.weight.data)
           #torch.nn.init.xavier_uniform_(layer.weight.data, gain=1.0)
        if type(layer) == nn.LSTM:
           torch.nn.init.uniform_(layer.weight_hh_l0.data, a=-0.1, b=0.1)
           torch.nn.init.uniform_(layer.weight_ih_l0.data, a=-0.1, b=0.1)
           torch.nn.init.uniform_(layer.bias_hh_l0.data, a=-0.1, b=0.1)
           torch.nn.init.uniform_(layer.bias_ih_l0.data, a=-0.1, b=0.1)


In [None]:
n_epochs = 30
Train_loss = []
Test_loss = []
Test_acc = []
predict =[]
input = 13
len_phonemes = 42
no_layers = 3
hidden_sizes = 256
model = model(input,hidden_sizes,len_phonemes, no_layers)
device = torch.device("cuda" if cuda else "cpu")
model.apply(init_weights)
model.to(device)

In [None]:
%cd /content/

In [None]:


criterion = nn.CTCLoss()
count = 0
weightDecay= 5e-6
optimizer = torch.optim.Adam(model.parameters(), lr=0.001,weight_decay=weightDecay)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 3, gamma =0.1)
for i in range(n_epochs):
    train_loss = training(model,optimizer,criterion, train_loader,scheduler)
    test_loss, test_dis = validation_model(model,criterion,dev_loader)
    
    Train_loss.append(train_loss)
    Test_loss.append(test_loss)
    Test_acc.append(test_dis)
    torch.save(model.state_dict(), './gdrive/MyDrive/abremang_hw3p2/checkpoint'+str(i+1)+'.pth')
    torch.save(model.state_dict(), 'checkpoint'+str(i+1)+'.pth')
    count = count +1
    print("epoch",count)
    print('='*20)

In [None]:
state_dict = torch.load('checkpoint14.pth')


In [None]:
model.load_state_dict(state_dict)

## Testing 

In [None]:
def test_model(model,test_loader):

  dev_loss  =0.0
  total_predictions =0.0
  correct_predictions =0
  output = 0.0
  counter =0
  store_c = []
  predict = []
  pred_seq = []
  for test_values in test_loader:
      test_values_ = test_values[0].to(device)
      test_values_lens = torch.Tensor(test_values[1]).to(torch.int)
      dev_output, out_len = model(test_values_,test_values_lens)
      predicted = torch.transpose(dev_output, 0,1)
      align =[]
      test_beam_results, _, _, test_out_lens = decoder.decode((predicted.data.cpu()),torch.IntTensor(out_len.to(torch.int)))
      for i, out_len in enumerate(test_out_lens):
          align.append(test_beam_results[i, 0, :test_out_lens[i, 0]])
      deco = decode_seq(align)
      predict.append(deco)
    

  predict = np.concatenate(predict)
  return predict

In [None]:
predict = test_model(model,test_loader)


In [None]:
store_c = np.arange(len(predict))
data = {"id":store_c,"label":predict}
df = pd.DataFrame(data)
df.to_csv("data1_epoch14.csv", index=False)