In [0]:
a = []
while(1):
    a.append('1')

In [0]:
from google.colab import files
!pip install -q kaggle

In [0]:
uploaded = files.upload()

In [0]:
%cd ..
!mkdir root/.kaggle/
!cp content/kaggle.json root/.kaggle/kaggle.json
!kaggle competitions download -c 11-785-s20-hw3p2

In [0]:
!ls

In [0]:
!unzip \*.zip

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
import pandas as pd
import numpy as np

#loading dataset into numpy object
train_data = np.load('wsj0_train', allow_pickle=True)
train_labels = np.load('wsj0_train_merged_labels.npy', allow_pickle = True)
val_data = np.load('wsj0_dev.npy', allow_pickle=True)
val_labels = np.load('wsj0_dev_merged_labels.npy', allow_pickle=True)
test_data = np.load('wsj0_test', allow_pickle=True)

In [0]:
print(train_data.shape, train_labels.shape)
print(val_data.shape, val_labels.shape)
print(test_data.shape)

In [0]:
import torch
import sys
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.utils.rnn import *

from torch.utils import data
from torch.utils.data import DataLoader, Dataset, TensorDataset

import matplotlib.pyplot as plt
import time

cuda = torch.cuda.is_available()
cuda

In [0]:
class MyDataset(data.Dataset):

    def __init__(self, X, Y):
        self.length = len(X)
        self.X = X
        self.Y = Y

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        return (torch.Tensor(self.X[index]), torch.Tensor(self.Y[index]))

In [0]:
from torch.utils.data import Dataset, DataLoader

#create dataloaders
def pad_collate(batch):
  (xx, yy) = zip(*batch)
  x_lens = torch.LongTensor([len(x) for x in xx])
  y_lens = torch.LongTensor([len(y) for y in yy])
  xx_pad = pad_sequence(xx)
  yy_pad = pad_sequence(yy, batch_first=True)
  return xx_pad, yy_pad, x_lens, y_lens

#adding context
k = 12
num_workers = 8 if cuda else 0 
    
# Training
train_dataset = MyDataset(train_data, train_labels)
train_loader_args = dict(shuffle=True, batch_size=64, num_workers=num_workers, pin_memory=True, collate_fn = pad_collate)
train_loader = data.DataLoader(train_dataset, **train_loader_args)

# Validating
val_dataset = MyDataset(val_data, val_labels)
val_loader_args = dict(shuffle=False, batch_size=64, num_workers=num_workers, pin_memory=True, collate_fn = pad_collate)
val_loader = data.DataLoader(val_dataset, **val_loader_args)

#delete to obtain more memory
#del train_data
#del train_labels
#del val_data
#del val_labels

In [0]:
class Model(nn.Module):
    def __init__(self, in_vocab, out_vocab, hidden_size):
        super(Model, self).__init__()
        self.cnn1 = nn.Conv1d(in_channels = in_vocab, out_channels = 128, kernel_size = 3, stride = 1, padding = 1)
        self.batchnorm = nn.BatchNorm1d(128)
        self.lstm = nn.LSTM(input_size = 256, hidden_size = hidden_size, num_layers = 4, bidirectional = True)
        self.output = nn.Linear(hidden_size*2, out_vocab)
    
    def forward(self, X, lengths):
        
        X = self.cnn1(X.transpose(0,1).transpose(1,2))
        X = self.batchnorm(X)
        packed_X = pack_padded_sequence(X.transpose(1,2).transpose(0,1), lengths, enforce_sorted=False)
        packed_out = self.lstm(packed_X)[0]
        out, out_lens = pad_packed_sequence(packed_out)
        # Log softmax after output layer is required since`nn.CTCLoss` expects log probabilities.
        out = self.output(out).log_softmax(2)
        return out, out_lens

In [0]:
torch.manual_seed(11785)
model = Model(40,47,256)
device = torch.device("cuda" if cuda else "cpu")
print(device)
print(model)
model.to(device)
criterion = nn.CTCLoss(blank = 46)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=5e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience = 2, factor = 0.8)

In [0]:
def train_epoch(model, train_loader, criterion, optimizer):
    
    start_time = time.time()
    model.train()
    running_loss = 0.0

    for batch_idx, (X, Y, X_lens, Y_lens) in enumerate(train_loader):
      optimizer.zero_grad()
      X = X.to(device)
      Y = Y.to(device)
      out, out_lens = model(X, X_lens)
      loss = criterion(out, Y, out_lens, Y_lens)
      running_loss += loss.item()
      loss.backward()
      optimizer.step()

    running_loss /= len(train_loader)
    end_time = time.time()

    print('Training Loss: ', running_loss, 'Time: ',end_time - start_time, 's')
    return running_loss

In [0]:
def test_model(model, val_loader, criterion, PHONEME_MAP):
    
    decoder = CTCBeamDecoder(PHONEME_MAP, beam_width=10, log_probs_input=True, blank_id=46)

    start_time = time.time()

    with torch.no_grad():
      model.eval()

      running_loss = 0.0
      D = 0.0

      for batch_idx, (X, Y, X_lens, Y_lens) in enumerate(val_loader):
        
        X = X.to(device)
        Y = Y.to(device)
        out, out_lens = model(X, X_lens)
        predict_Y, _, _, predict_Y_lens = decoder.decode(out.transpose(0, 1), out_lens)
        d = editDistance(Y, Y_lens, predict_Y, predict_Y_lens, PHONEME_MAP, decoder)
        D += d
        loss = criterion(out, Y, out_lens, Y_lens)
        running_loss += loss.item()

      running_loss /= len(val_loader)
      total_size = len(val_loader)*64
      D /= total_size
      end_time = time.time()
      print('Validation Loss: ', running_loss, 'Time: ', end_time - start_time, 's')
      print("Average Edit Distance", D)
      return running_loss

import stringdist

def editDistance(Y, Y_lens, predict_Y, predict_Y_lens, PHONEME_MAP, decoder):

    distance = 0
    batch_size = Y.shape[0]

    for i in range(batch_size):
      Y_seq = Y[i,:int(Y_lens[i])]
      Y_pron = ''.join(PHONEME_MAP[int(i)] for i in Y_seq)

      predict_Y_seq = predict_Y[i,0,:predict_Y_lens[i,0]]
      predict_Y_pron = ''.join(PHONEME_MAP[i] for i in predict_Y_seq)
      d = stringdist.levenshtein(Y_pron, predict_Y_pron)
      distance += d
    
    return distance

In [0]:
Train_loss = []
Val_loss = []
Val_acc = []

for epoch in range(20):
  train_loss = train_epoch(model, train_loader, criterion, optimizer)
  val_loss = test_model(model, val_loader, criterion, PHONEME_MAP)
  scheduler.step(val_loss)
  print('='*20)

In [0]:
Train_loss = []
Val_loss = []
Val_acc = []

for epoch in range(10):
  train_loss = train_epoch(model, train_loader, criterion, optimizer)
  val_loss = test_model(model, val_loader, criterion, PHONEME_MAP)
  scheduler.step(val_loss)
  print('='*20)

In [0]:
def predict_model(model, test_data, PHONEME_MAP):

    df = pd.DataFrame(columns = ["id","Predicted"])
    ids = []
    predicted = []

    decoder = CTCBeamDecoder(PHONEME_MAP, beam_width=10, log_probs_input=True, blank_id=46)

    for i in range(len(test_data)):
      X = torch.Tensor(test_data[i])
      length = len(X)
      X = X.reshape((length,1,40))
      X_lens = torch.Tensor([length])
      
      X = X.to(device)
      out, out_lens = model(X, X_lens)
      predict_Y, _, _, predict_Y_lens = decoder.decode(out.transpose(0, 1), out_lens)
      predict_Y_seq = predict_Y[0,0,:predict_Y_lens[0,0]]
      predict_Y_pron = ''.join(PHONEME_MAP[i] for i in predict_Y_seq)
      ids.append(i)
      predicted.append(predict_Y_pron)
    
    df['id'] = ids
    df['Predicted'] = predicted
    print(df)
    saved = df.to_csv('result.csv', index = None, header=True)

In [0]:
predict_model(model, test_data, PHONEME_MAP)

In [0]:
PHONEME_LIST = [
    "+BREATH+",
    "+COUGH+",
    "+NOISE+",
    "+SMACK+",
    "+UH+",
    "+UM+",
    "AA",
    "AE",
    "AH",
    "AO",
    "AW",
    "AY",
    "B",
    "CH",
    "D",
    "DH",
    "EH",
    "ER",
    "EY",
    "F",
    "G",
    "HH",
    "IH",
    "IY",
    "JH",
    "K",
    "L",
    "M",
    "N",
    "NG",
    "OW",
    "OY",
    "P",
    "R",
    "S",
    "SH",
    "SIL",
    "T",
    "TH",
    "UH",
    "UW",
    "V",
    "W",
    "Y",
    "Z",
    "ZH"
]

PHONEME_MAP = [
    '_',  # "+BREATH+"
    '+',  # "+COUGH+"
    '~',  # "+NOISE+"
    '!',  # "+SMACK+"
    '-',  # "+UH+"
    '@',  # "+UM+"
    'a',  # "AA"
    'A',  # "AE"
    'h',  # "AH"
    'o',  # "AO"
    'w',  # "AW"
    'y',  # "AY"
    'b',  # "B"
    'c',  # "CH"
    'd',  # "D"
    'D',  # "DH"
    'e',  # "EH"
    'r',  # "ER"
    'E',  # "EY"
    'f',  # "F"
    'g',  # "G"
    'H',  # "HH"
    'i',  # "IH"
    'I',  # "IY"
    'j',  # "JH"
    'k',  # "K"
    'l',  # "L"
    'm',  # "M"
    'n',  # "N"
    'G',  # "NG"
    'O',  # "OW"
    'Y',  # "OY"
    'p',  # "P"
    'R',  # "R"
    's',  # "S"
    'S',  # "SH"
    '.',  # "SIL"
    't',  # "T"
    'T',  # "TH"
    'u',  # "UH"
    'U',  # "UW"
    'v',  # "V"
    'W',  # "W"
    '?',  # "Y"
    'z',  # "Z"
    'Z',  # "ZH"
]

In [0]:
print(len(PHONEME_LIST), len(PHONEME_MAP))
PHONEME_LIST.append("BLANK")
PHONEME_MAP.append(' ')
print(len(PHONEME_LIST), len(PHONEME_MAP))

In [0]:
pip install python-Levenshtein

In [0]:
pip install StringDist

In [0]:
!git clone --recursive https://github.com/parlance/ctcdecode.git
!pip install wget
%cd ctcdecode
!pip install .
%cd ..

In [0]:
from ctcdecode import CTCBeamDecoder

decoder = CTCBeamDecoder([' ', 'A'], beam_width=4)
probs = torch.Tensor([[0.2, 0.8], [0.8, 0.2]]).unsqueeze(0)
print(probs.size())
print(torch.LongTensor([2]))
out, _, _, out_lens = decoder.decode(probs, torch.LongTensor([2]))
print(out[0, 0, :out_lens[0, 0]])