In [1]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torch import optim
import random

# for accesing files in the directory
import glob
import errno

In [2]:
# Input batch X (acoustic) to be passed for training
path = './asr_data/train/*.npy'
files = glob.glob(path)

# List of input batches
X = []

cnt = 0
acoustics = []
maxL = 0
for i,name in enumerate(files):    
    ac = np.load(name)
    acoustics.append(np.transpose(ac))
    maxL = max(maxL, ac.shape[1])
    if cnt==31:
        # Acoustic inputs should be FloatTensors: B x L x 20 
        batch_X = torch.zeros(32,maxL,20)
        for j,ac in enumerate(acoustics):
            batch_X[j, :ac.shape[0], :] = torch.from_numpy(ac)
        X.append(batch_X)
        cnt = 0
        acoustics = []
        maxL = 0
    cnt += 1

print(len(X))

44


In [3]:
# Test Input batch to be passed for calculating testing CER
path = './asr_data/test/*.npy'
files = glob.glob(path)

# List of input batches
cer_testX = []

cnt = 0
acoustics = []
maxL = 0
for i,name in enumerate(files):    
    ac = np.load(name)
    acoustics.append(np.transpose(ac))
    maxL = max(maxL, ac.shape[1])
    if cnt==31:
        # Acoustic inputs should be FloatTensors: B x L x 20 
        batch_X = torch.zeros(32,maxL,20)
        for j,ac in enumerate(acoustics):
            batch_X[j, :ac.shape[0], :] = torch.from_numpy(ac)
        cer_testX.append(batch_X)
        cnt = 0
        acoustics = []
        maxL = 0
    cnt += 1

print(len(cer_testX))

16


In [4]:
# Label/Character encoding
chars = ['<PAD>','<SOS>', '<EOS>',' ',"a","b","c","d","e","f","g","h","i","j","k", \
         "l","m","n","o","p","q","r","s","t","u", "v","w","x","y","z"]
dict(enumerate(chars))
int2char = dict(enumerate(chars))
char2int = {ch:i for i,ch in int2char.items()}

In [5]:
# Input Label batch to be passed for Training
path = './asr_data/train/*.txt'
files = glob.glob(path)

# List of label baatches and associated masks
Y, M = [], []

labels = []
cnt = 0
maxL_Y = 0
for name in files:
    with open(name) as f:
        text = f.read()
    lb = ['<SOS>']+list(text)+['<EOS>']
    labels.append(lb)
    maxL_Y = max(maxL_Y, len(lb))
    if cnt==31:
        # As <PAD> == 0,
        batch_Y = torch.zeros(32,maxL_Y)
        mask_Y = torch.zeros(32,maxL_Y)
        for i,lb in enumerate(labels):
            for j,ch in enumerate(lb):
                batch_Y[i,j] = char2int[ch]
                mask_Y[i,j] = 1
        # Labels should be LongTensors: B x maxL 
        batch_Y = batch_Y.type(torch.LongTensor)
        Y.append(batch_Y)
        M.append(mask_Y)
        cnt = 0
        maxL_Y = 0
        labels = []
    cnt+=1  

print(len(M))

44


In [6]:
# Test Label batch to be used for testing-CER calculation
path = './asr_data/test/*.txt'
files = glob.glob(path)

# List of label baatches and associated masks
cer_Y, cer_M = [], []

labels = []
cnt = 0
maxL_Y = 0
for name in files:
    with open(name) as f:
        text = f.read()
    lb = ['<SOS>']+list(text)+['<EOS>']
    labels.append(lb)
    maxL_Y = max(maxL_Y, len(lb))
    if cnt==31:
        # As <PAD> == 0,
        batch_Y = torch.zeros(32,maxL_Y)
        mask_Y = torch.zeros(32,maxL_Y)
        for i,lb in enumerate(labels):
            for j,ch in enumerate(lb):
                batch_Y[i,j] = char2int[ch]
                mask_Y[i,j] = 1
        # Labels should be LongTensors: B x maxL 
        batch_Y = batch_Y.type(torch.LongTensor)
        cer_Y.append(batch_Y)
        cer_M.append(mask_Y)
        cnt = 0
        maxL_Y = 0
        labels = []
    cnt+=1  

print(len(M))

44


In [7]:
# Create a class Encoder, which inherits the properties and methods from the parent class nn.Module
class Encoder(nn.Module):
    def __init__(self, vocab_size):
        super(Encoder, self).__init__()
        self.input_size = 20   # given 20 x L acoustic inputs
        self.hidden_size = 128
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, batch_first = True)
        
    def forward(self, inp):
        # input to LSTM = B x L x 20
        output, hidden = self.lstm(inp)
        # output, (h_n, c_n) = self.lstm(embedding, (h, c)) ----- (h,c) initialized to zero
        # output size = B x Lx 128
        # (h,c) are from the last time step: both have size [1,B,128]
        # return the last hidden output 1 x B x H
        return (hidden[0][0,:,:],hidden[1][0,:,:])
        

class Decoder(nn.Module):
    def __init__(self, vocab_size):
        super(Decoder, self).__init__()
        self.embedding_size = 256
        self.decoder_hidden_size = 128
        self.embedding = nn.Embedding(vocab_size, self.embedding_size)        
        # NOTE: Use LSTM Cell here instead if you want to control the hidden state at each time step.
        self.lstm = nn.LSTMCell(self.embedding_size, self.decoder_hidden_size)
        self.lin = nn.Linear(self.decoder_hidden_size, vocab_size)
       
    def forward_step(self, word_embedding, hidden):
        output, new_cell_state = self.lstm(word_embedding, hidden)
        new_hidden = output
        vocab_distrbtn = F.softmax(self.lin(output), dim=1)
        return vocab_distrbtn, (new_hidden, new_cell_state)
        
    def forward(self, inpt, encoder_hidden, mask_Y, beta):
        t_max = inpt.shape[1]
        loss = 0
        # SOS = 1
        word = inpt[:,0]
        word_embedding = self.embedding(word)

        hidden = encoder_hidden
        for t in range(t_max-1):
            vocab_dist, hidden = self.forward_step(word_embedding, hidden)  # vocab_dist = B x V = 10 x 30
            word = torch.argmax(vocab_dist, dim=1)   # word = B x 1
            
            # DAgger policy = beta*oracle + (1-beta)*model
            u = random.uniform(0, 1)
            if u<=beta:
                # Teacher Forcing
                word_embedding = self.embedding(inpt[:,t+1])
            else:
                # Model's output as next input
                word_embedding = self.embedding(word)
            
            # Cross Entropy Loss
            # ground truth B x 1 is the char at time step t+1 or t+1th column in B x L = 32 x L
            true_label = inpt[:,t+1]            
            # one hot encode the true label # B x 1 = 32 x 1 --> 32 x 30
            onehot = torch.zeros((32,30))
            for i in range(32):
                onehot[i][true_label[i]]=1
            # Cross entropy loss: vocab_dist 32 x 30, onehot 32 x 30
            NLL = (-1)*torch.log(vocab_dist)
            ce_loss = torch.sum(NLL*onehot, dim=1)
            loss += torch.sum(ce_loss*mask_Y[:,t])
            
        # averaged loss over the entire batch (except padding)
        return loss/torch.sum(mask_Y)

In [8]:
def train(encoder, decoder, n_epochs, learning_rate, X, Y, M, cer_testX, cer_Y, cer_M):
    encoder_optimizer = torch.optim.Adam(encoder.parameters())
    decoder_optimizer = torch.optim.Adam(decoder.parameters())
    # Default parameters: lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
    
    train_cerloss = []
    test_cerloss = []
    
#     beta = 1   # All oracle
#     beta = 0   # All Model
#     beta = 0.75
    for epoch in range(EPOCHS):
        epoch_loss = 0.0
        num_batches = len(X)
        num_test_batches = len(cer_testX)
        
        
        # beta = beta - 0.05
        beta = np.exp(-epoch)

        for i, batch in enumerate(X):
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()

            encoder_hidden = encoder(X[i])
            loss = decoder(Y[i], encoder_hidden, M[i], beta)

            loss.backward(retain_graph=True)
            encoder_optimizer.step()
            decoder_optimizer.step()

            epoch_loss += loss.item()
            
        # Testing Loss
        test_loss = 0
        for j, batch in enumerate(cer_testX):
            encoder_hidden = encoder(cer_testX[j])
            t_loss = decoder(cer_Y[j], encoder_hidden, cer_M[j], beta)
            test_loss+=t_loss
                      
        print("Epoch {}: Training Loss: {}, Testing Loss: {}".format(epoch, epoch_loss/num_batches, \
                                                                     test_loss/num_test_batches))
        train_cerloss.append(epoch_loss/num_batches)
        test_cerloss.append(test_loss/num_test_batches)
    return train_cerloss, test_cerloss


def testDecoder(decoder, encoder_hidden):
    t_max = 80
    prediction = []
    # SOS = 1
    word = torch.ones(1)
    word = word.type(torch.LongTensor)
    word_embedding = decoder.embedding(word)

    # Feed in the encoder_hidden
    hidden = encoder_hidden
    for t in range(t_max-1):
        vocab_dist, hidden = decoder.forward_step(word_embedding, hidden)  # vocab_dist = B x V = 10 x 30
        word = torch.argmax(vocab_dist, dim=1)   # word = B x 1
        if word==2:
            break
        prediction.append(word)
        # Model's output as next input
        word_embedding = decoder.embedding(word)

    return prediction


def evaluate(encoder, decoder, testX, testY):

    num_batches = len(testX)
    ret = []
    with torch.no_grad():
        for i, batch in enumerate(testX):
            encoder_hidden = encoder(testX[i])
            prediction = testDecoder(decoder, encoder_hidden)
            sentence = []
            for ii in prediction:
                sentence.append(int2char[int(ii)])
            ret.append(''.join(sentence))
#             print(''.join(sentence))
    return ret
#     print(total_acc / num_batches)

In [None]:
EMB_SIZE = 256
HIDDEN_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 0.01
VOCAB_SIZE = 30    #26 + space,sos,eos,pad

encoder = Encoder(VOCAB_SIZE)
decoder = Decoder(VOCAB_SIZE)
cer_tr, cer_ts = train(encoder, decoder, EPOCHS, LEARNING_RATE, X, Y, M, cer_testX, cer_Y, cer_M)

Epoch 0: Training Loss: 2.7066415602510627, Testing Loss: 2.3830313682556152
Epoch 1: Training Loss: 2.8563515652309763, Testing Loss: 2.791280508041382
Epoch 2: Training Loss: 2.8817535042762756, Testing Loss: 2.8771705627441406
Epoch 3: Training Loss: 2.902845165946267, Testing Loss: 2.9045560359954834


In [None]:
import matplotlib.pyplot as plt
# Training and Testing CER Plots
fig, ax = plt.subplots(figsize = (8,5));


# plt.ylim(0,3.2)
# plt.xlim(0.5,10)
plt.plot(range(EPOCHS), cer_tr, label = 'Training')
plt.plot(range(EPOCHS), cer_ts, label = 'Testing')
ax.legend()
ax.set_title('CER loss, beta = 0 (All Model)', fontsize=16)
ax.set_xlabel('Epoch', fontsize=16);
ax.set_ylabel('CE loss', fontsize=16);
# plt.show()

In [12]:
from nltk.metrics.distance import edit_distance
# Test Input(acoustic)
path = './asr_data/test/*.npy'
files = glob.glob(path)

# List of input batches
testX = []
acoustics = []
cnt = 0
for i,name in enumerate(files):    
    ac = np.load(name)
#     print(ac.shape)
    # Acoustic inputs should be FloatTensors: B x L x 20 
    a = torch.transpose(torch.from_numpy(ac), 0,1)
#     print(a.shape)
    a = a.view(1,a.shape[0], a.shape[1])
#     print(a.shape)
    testX.append(a)
#     if cnt==4: break
#     cnt+=1
print(len(testX))

# Training Label batch
path = './asr_data/test/*.txt'
files = glob.glob(path)

true_label = []
cnt = 0
for name in files:
    with open(name) as f:
        text = f.read()
    true_label.append(text)
#     if cnt==4: break
#     cnt+=1
print(len(true_label))

504
504


In [13]:
pred = evaluate(encoder, decoder, testX, 0)

cer = 0
for i in range(len(testX)):
#     print(pred[i])
#     print(true_label[i])
    cer += edit_distance(pred[i], true_label[i]) / len(pred[i])
avg_cer = cer/len(testX)
print(avg_cer)

0.7907876230661046
