In [1]:
import pdb
import librosa
import numpy as np
import sys
import string
import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch import nn
from torch.nn.utils.rnn import pad_sequence
from dataset import AsrDataset
from model import LSTM_ASR
import librosa
import os
import pandas as pd


In [2]:
import string
import torch
import librosa
import os
import pandas as pd
from torch.utils.data import Dataset

class AsrDataset(Dataset):
    def __init__(self, file_lbls, lbl_names='data/clsp.lblnames', text=None):
        """
        :param scr_file: clsp.trnscr
        :param feature_type: "quantized" or "mfcc"
        :param feature_file: clsp.trainlbls or clsp.devlbls
        :param feature_label_file: clsp.lblnames
        :param wav_scp: clsp.trnwav or clsp.devwav
        :param wav_dir: wavforms/
        """
        assert self.feature_type in ['discrete', 'mfcc']

        self.blank = "<blank>"
        self.silence = "<sil>"

        # === write your code here ===
        
        # Create a dictionary which store the alphabet
        phones = {'_':27,
                'a':1, 'b':2, 'c':3, 'd':4, 'e':5, 'f':6, 'g':7,
                'h':8, 'i':9, 'j':10, 'k':11, 'l':12, 'm':13, 'n':14, 'o':15, 'p':16,
                'q':17, 'r':18, 's':19, 't':20, 'u':21, 'v':22,
                'w':23, 'x':24, 'y':25, 'z':26}
        phones_rev = {v: k for k, v in phones.items()}
        self.phones = phones
        self.phones_rev = phones_rev
        self.text = text
        
        # Create vocab and label to index
        lblnames = []
        with open(lbl_names, 'r') as n:
            lines = n.readlines()[1:]
            for l in lines:
                lblnames.append(l.strip('\n'))
        index = [num for num in range(1,len(lblnames)+1)]
        self.vocab = {lblnames[i] : index[i] for i in range(len(lblnames))}
        
        # Create word_labels, used in train_test_split
        self.word_labels = []
        self.dataset = self.load_quantized_features(file_lbls, text=text)
        self.word_labels = np.array(self.word_labels)
        
    def load_quantized_features(self, file_lbls, text=None):
        dataset = {}

        # Extract labels for each utterance and convert to index tensor
        lbl_seqs = []
        with open(file_lbls, 'r') as t:
            lines = t.readlines()[1:]
            for j in range(len(lines)):
                lbls = lines[j].split(" ")[:-1]
                l_tensor = []
                for lbl in lbls:
                    l_idx = self.vocab.get(lbl)
                    l_tensor.append(l_idx)
                lbl_seqs.append(torch.tensor(l_tensor))

        # Read in the words and convert to index tensor
        if text is not None:
            words = []
            with open(text, 'r') as s:
                lines = s.readlines()[1:]
                for l in lines:
                    w = '_' + l.strip('\n') + '_'
                    w_tensor = []
                    for let in list(w):
                        w_idx = self.phones.get(let)
                        w_tensor.append(w_idx)
                    words.append(torch.tensor(w_tensor))

            for idx in range(len(lbl_seqs)):
                dataset.update({idx: {'feats':lbl_seqs[idx], 'target_tokens':words[idx]}})
                w_lbl = int(''.join(map(str, words[idx].detach().numpy().tolist())))
                self.word_labels.append(w_lbl)
        else: 
            for idx in range(len(lbl_seqs)):
                dataset.update({idx: {'feats':lbl_seqs[idx]}})

        return dataset


    def __len__(self):
        """
        :return: num_of_samples
        """
        return len(self.script)

    def __getitem__(self, idx):
        """
        Get one sample each time. Do not forget the leading- and trailing-silence.
        :param idx: index of sample
        :return: spelling_of_word, feature
        """
        # === write your code here ===
        
        if torch.is_tensor(idx):
            idx = idx.tolist()
        if self.text is None:
            return self.dataset[idx]['feats'], None
        return self.dataset[idx]['feats'], self.dataset[idx]['target_tokens']


    # This function is provided
    def compute_mfcc(self, wav_scp, wav_dir):
        """
        Compute MFCC acoustic features (dim=40) for each wav file.
        :param wav_scp:
        :param wav_dir:
        :return: features: List[np.ndarray, ...]
        """
        features = []
        with open(wav_scp, 'r') as f:
            for wavfile in f:
                wavfile = wavfile.strip()
                if wavfile == 'jhucsp.trnwav':  # skip header
                    continue
                wav, sr = librosa.load(os.path.join(wav_dir, wavfile), sr=None)
                feats = librosa.feature.mfcc(y=wav, sr=16e3, n_mfcc=40, hop_length=160, win_length=400).transpose()
                features.append(feats)
        return features


In [3]:
import torch


class LSTM_ASR_Discrete(torch.nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size=257, alphabet_size=28):
        super().__init__()
        assert feature_type in ['discrete', 'mfcc']
        # Build your own neural network. Play with different hyper-parameters and architectures.
        # === write your code here ===
        
        super(LSTM_ASR_Discrete, self).__init__()
        self.embedding_dim = embedding_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

        self.lin1 = nn.Linear(embedding_dim, embedding_dim)
        self.relu = nn.ReLU()

        self.lstm = nn.LSTM(embedding_dim, int(hidden_dim/2), batch_first=True, bidirectional=True)#, num_layers=2, dropout=0.4)

        self.decoder = nn.Linear(hidden_dim, alphabet_size)



    def forward(self, batch_features):
        """
        :param batch_features: batched acoustic features
        :return: the output of your model (e.g., log probability)
        """
        # === write your code here ===
        
        embeds = self.word_embeddings(x)

        out = self.relu(self.lin1(embeds))

        out = torch.nn.utils.rnn.pack_padded_sequence(out, x_lens, batch_first=True, enforce_sorted=False)
        lstm_out, _ = self.lstm(out)
        lstm_out, _ = torch.nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)

        letter_probs = self.decoder(lstm_out)
        log_probs = F.log_softmax(letter_probs, dim=2)

        return log_probs


In [4]:
#!/usr/bin/env python

# 2022 Dongji Gao
# 2022 Yiwen Shao

import os
import sys
import string
import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch import nn
from torch.nn.utils.rnn import pad_sequence
from dataset import AsrDataset
from model import LSTM_ASR


def collate_fn(batch):
    """
    This function will be passed to your dataloader.
    It pads word_spelling (and features) in the same batch to have equal length.with 0.
    :param batch: batch of input samples
    :return: (recommended) padded_word_spellings, 
                           padded_features,
                           list_of_unpadded_word_spelling_length (for CTCLoss),
                           list_of_unpadded_feature_length (for CTCLoss)
    """
    # === write your code here ===
    pass


def train(train_dataloader, model, ctc_loss, optimizer):
    # === write your code here ===
    pass


def decode():
    # === write your code here ===
    pass

def compute_accuracy():
    # === write your code here ===
    pass

def main():
    training_set = YOUR_TRAINING_SET
    test_set = YOUR_TEST_SET

    train_dataloader = TRAIN_DATALOADER
    test_dataloader = TEST_DATALOADER

    model = LSTM_ASR

    # your can simply import ctc_loss from torch.nn
    loss_function = CTC_LOSS_FUNCTION

    # optimizer is provided
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-3)

    # Training
    num_epochs = YOUR_NUM_EPOCHS
    for epoch in range(num_epochs):
        train(train_dataloader, model, loss_function, optimizer)

    # Testing (totally by yourself)
    decode()

    # Evaluate (totally by yourself)
    compute_accuracy()


if __name__ == "__main__":
    main()


usage: ipykernel_launcher.py [-h] [--stage STAGE] [--features {Discrete,MFCC}]
                             [--max-epochs MAX_EPOCHS] [--lr LR]
                             [--custom-ctc]
ipykernel_launcher.py: error: unrecognized arguments: -f /Users/whhqund/Library/Jupyter/runtime/kernel-ec5c60bf-4276-4ca4-84f3-4be12947280e.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
