In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
%cd '/content/drive/MyDrive/MLDL1/MLDL1_hw4'

/content/drive/MyDrive/MLDL1/MLDL1_hw4


In [None]:
# Extract the audio files. This may take up to several minutes.
!tar -xf 'data/audio.tar.xz' -C 'data'

In [None]:
# Import required libraries
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import glob
import librosa
import IPython.display as ipd
!pip install sentencepiece
import sentencepiece
import numpy as np
import random
from tqdm import tqdm
import math
import pickle

SEED = 1234

np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
random.seed(SEED)
torch.backends.cudnn.deterministic = True

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sentencepiece
  Downloading sentencepiece-0.1.99-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m54.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentencepiece
Successfully installed sentencepiece-0.1.99
cuda


In [None]:
sentencepiece.SentencePieceTrainer.Train('--input=data/text.csv --model_prefix=tokenizer --model_type=bpe --vocab_size=1144 --pad_id=0 --unk_id=1 --bos_id=2 --eos_id=3')
tokenizer = sentencepiece.SentencePieceProcessor()
tokenizer.load('tokenizer.model')

True

In [None]:
BATCH_SIZE = 64
VOCAB_SIZE = 1144
PAD_TOKEN_ID = 0
BOS_TOKEN_ID = 2  # beginning of sentence token
EOS_TOKEN_ID = 3  # end of sentence token
INPUT_DIM = 1024
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
EPOCHS = 5
LR = 0.0005

In [None]:
def train(model, data_loader, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for audio, text in tqdm(data_loader):
      audio = audio.to(device)
      text = text.to(device)
      output = model(audio, text)
      output_size = output.shape[-1]
      output = output[:,1:].reshape(-1, output_size)
      target = text[:,1:-1].reshape(-1).long()
      optimizer.zero_grad()
      loss = criterion(output, target)
      loss.backward()
      torch.nn.utils.clip_grad_norm_(model.parameters(), 1)
      optimizer.step()
      epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

def evaluate(model, data_loader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
      for audio, text in tqdm(data_loader):
        audio = audio.to(device)
        text = text.to(device)
        output = model(audio, text)
        output_size = output.shape[-1]
        output = output[:,1:].reshape(-1, output_size)
        target = text[:,1:-1].reshape(-1).long()
        loss = criterion(output, target)
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

In [None]:
class KSSDataset(Dataset):
    def __init__(self, mode='train'):
        self.mode = mode
        audio_list = sorted(glob.glob('data/audio/*'))
        with open("data/text.csv", "r") as f:
          text_list = f.read().splitlines()
        if mode =='train':
          self.audio_list = audio_list[:-2000]
          self.text_list = text_list[:-2000]
        elif mode == 'val':
          self.audio_list = audio_list[-2000:-1000]
          self.text_list = text_list[-2000:-1000]
        else:
          self.audio_list = audio_list[-1000:]
          self.text_list = text_list[-1000:]

    def __len__(self):
        return len(self.audio_list)

    def __getitem__(self, idx):
        y, sr = librosa.load(self.audio_list[idx])
        audio_feature = librosa.feature.mfcc(y=y, sr=sr, hop_length=160, n_mfcc=33, n_fft=400, window='hamming').swapaxes(0, 1)
        text_encoded = tokenizer.encode(self.text_list[idx], out_type=int)
        return audio_feature, text_encoded

In [None]:
def collate_fn(list_items):
    # 1. Pad sequence so that the length is same with the longest sequence in a batch.
    # For audio, pad with zeros and for text, pad with PAD_TOKEN_ID.
    # 2. Append BOS_TOKEN_ID and EOS_TOKEN_ID to each the beginning and end of the text sequence.
    # audio_batch shape: [batch_size, audio_sequence_length, input_size]
    # text_batch shape: [batch_size, text_sequence_length(including bos, eos tokens)]
    audio_padded = []
    text_padded = []
    ################### YOUR CODE ###################
    max_len_audio, max_len_text = 0, 0
    max_len_audio = max(x[0].shape[0] for x in list_items)
    max_len_text = max(len(x[1]) for x in list_items) + 2

    audio_batch= []
    text_batch= []

    for (audio_feature, text_encoded) in list_items:
      audio_padded = torch.tensor(audio_feature)
      audio_padded = torch.cat([audio_padded, torch.zeros((max_len_audio - audio_padded.shape[0], audio_padded.shape[1]))])
      text= torch.tensor([BOS_TOKEN_ID] + text_encoded + [EOS_TOKEN_ID])
      text_padded = torch.cat([text, torch.full((max_len_text - len(text),), PAD_TOKEN_ID)])

      text_batch.append(text_padded)
      audio_batch.append(audio_padded)

    audio_batch= torch.stack(audio_batch) # must be [batch_size, audio_sequence_length, input_size] # batch = 64, input size == 33
    text_batch= torch.stack(text_batch) # must be [batch_size, text_sequence_length] # batch = 64
    ##################################################
    return audio_batch, text_batch

In [None]:
train_dataset = KSSDataset(mode='train')
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
val_dataset = KSSDataset(mode='val')
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
test_dataset = KSSDataset(mode='test')
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

In [None]:
# Implement each of the RNN cells that form the building blocks of the encoder and decoder.
# You may refer to the Pytorch documentation and implementation.
# https://pytorch.org/docs/stable/generated/torch.nn.RNNCell.html
# https://pytorch.org/docs/stable/generated/torch.nn.GRUCell.html
# https://pytorch.org/docs/stable/generated/torch.nn.LSTMCell.html

class RNNCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(RNNCell, self).__init__()
        self.hidden_size = hidden_size
        ################### YOUR CODE ###################
        self.Wx = nn.Linear(input_size, hidden_size)
        self.Wh = nn.Linear(hidden_size, hidden_size)
        ##################################################

    def forward(self, x, h=None):
        if h is None:
          h = torch.zeros(x.shape[0], self.hidden_size).to(device)
        # x shape: [batch_size, embedding_size]
        # h shape: [batch_size, hidden_size]
        ################### YOUR CODE ###################
        h = F.tanh(self.Wx(x) + self.Wh(h))
        ##################################################
        return h


class GRUCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(GRUCell, self).__init__()
        self.hidden_size = hidden_size
        ################### YOUR CODE ###################
        self.Wxr = nn.Linear(input_size, hidden_size,)
        self.Whr = nn.Linear(hidden_size, hidden_size)
        self.Wxz = nn.Linear(input_size, hidden_size)
        self.Whz = nn.Linear(hidden_size, hidden_size)
        self.Wx = nn.Linear(input_size, hidden_size)
        self.Wh = nn.Linear(hidden_size, hidden_size)
        ##################################################

    def forward(self, x, h=None):
        if h is None:
          h = torch.zeros(x.shape[0], self.hidden_size).to(device)
        # x shape: [batch_size, embedding_size]
        # h shape: [batch_size, hidden_size]
        ################### YOUR CODE ###################
        r = F.sigmoid(self.Wxr(x) + self.Whr(h)) # (hidden_size) + (hidden_size)
        z = F.sigmoid(self.Wxz(x) + self.Whz(h)) # (hidden_size) + (hidden_size)
        h = (1 - z) * h + z * F.tanh(self.Wx(x) + self.Wh(r*h))
        ##################################################
        return h


class LSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTMCell, self).__init__()
        self.hidden_size = hidden_size
        ################### YOUR CODE ###################
        self.Wxi = nn.Linear(input_size, hidden_size)
        self.Whi = nn.Linear(hidden_size, hidden_size)
        self.Wxf = nn.Linear(input_size, hidden_size)
        self.Whf = nn.Linear(hidden_size, hidden_size)
        self.Wxc = nn.Linear(input_size, hidden_size)
        self.Whc = nn.Linear(hidden_size, hidden_size)
        self.Wxo = nn.Linear(input_size, hidden_size)
        self.Who = nn.Linear(hidden_size, hidden_size)
        ##################################################

    def forward(self, x, h=None, c=None):
        if h is None:
          h = torch.zeros(x.shape[0], self.hidden_size).to(device)
        if c is None:
          c = torch.zeros(x.shape[0], self.hidden_size).to(device)
        # x shape: [batch_size, embedding_size]
        # h shape: [batch_size, hidden_size]
        # c shape: [batch_size, hidden_size]
        ################### YOUR CODE ###################
        i = F.sigmoid(self.Wxi(x) + self.Whi(h))
        f = F.sigmoid(self.Wxf(x) + self.Whf(h))
        c = f * c + i * F.tanh(self.Wxc(x) + self.Whc(h))
        o = F.sigmoid(self.Wxo(x) + self.Who(h))
        h = o * F.tanh(c)
        ##################################################
        return h, c

In [None]:
# Do not modify this cell.
# This class extracts the audio features to input to the Seq2Seq model.
class FeatureExtractor(nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        self.conv = nn.Sequential(
          nn.Conv2d(1, 32, kernel_size=3, padding=1),
          nn.ReLU(),
          nn.BatchNorm2d(32),
          nn.MaxPool2d(2, 2),
          nn.Conv2d(32, 64, kernel_size=3, padding=1),
          nn.ReLU(),
          nn.BatchNorm2d(64),
          nn.Conv2d(64, 128, kernel_size=3, padding=1),
          nn.ReLU(),
          nn.BatchNorm2d(128),
          nn.MaxPool2d(2, 2)
      )

    def forward(self, inputs):
        x = self.conv(inputs.unsqueeze(1))
        x = x.transpose(1, 2)
        x = x.reshape(x.shape[0], x.shape[1], -1)
        return x

In [None]:
# Implement the LSTMEncoder and AttentionLSTMDecoder as shown in Figure 1.
# LSTMEncoder returns hidden states h1, h2.. and the final cell state.
# AttentionLSTMDecoder attends to the encoder's outputs using dot product attention.
# It takes in the final hidden state of the encoder s0, calculates the attention score and attention coefficients to get a0.
# s0 and a0 are concatenated and goes through fc and tanh layer to form s0_.
# The hidden states from the decoder goes through fc1 for final predictions.
# The inputs to the decoder should go through the embedding layer to be able to input to the LSTM.

class LSTMEncoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size):
        super(LSTMEncoder, self).__init__()
        self.embedding_size = embedding_size
        self.LSTMCell = LSTMCell(input_size, hidden_size)

    def forward(self, inputs):
        # inputs shape: [batch_size, sequence length, input_size]
        # hidden_states shape: [batch_size, sequence_length, hidden_size]
        # cell_state shape: [batch_size, hidden_size]
        hidden_states = []
        ################### YOUR CODE ###################
        sequence_length = inputs.shape[1]
        batch_size = inputs.shape[0]
        cell_state = None
        hidden_state = None

        for t in range(sequence_length):
            x = inputs[:, t, :]
            hidden_state, cell_state = self.LSTMCell(x, hidden_state, cell_state)
            hidden_states.append(hidden_state)
        hidden_states = torch.stack(hidden_states, dim=1)
        ##################################################
        return hidden_states, cell_state


class AttentionLSTMDecoder(nn.Module):
    def __init__(self, embedding_size, hidden_size, output_size):
        super(AttentionLSTMDecoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embedding_size)
        self.LSTMCell = LSTMCell(embedding_size, hidden_size)
        self.fc0 = nn.Linear(hidden_size * 2, hidden_size)
        self.fc1 = nn.Linear(hidden_size, output_size)

    def forward(self, inputs, hidden_states, cell_state):
        # You should use teacher forcing. (Instead of the model's prediction, take the next step ground truth as input.)
        # Originally, teacher forcing is used in training and turned off in validation and testing. For this assignment, we will keep it simple and use teacher forcing for train/validation/test.
        # inputs shape: [batch_size, target sequence length(including bos, eos tokens)]
        # predictions shape: [batch_size, target seq len-1, target_vocab_size]
        ################### YOUR CODE ###################
        inputs = self.embedding(inputs)
        sequence_length = inputs.shape[1]
        s = hidden_states[:,-1,:]

        predictions = []
        for t in range(0, sequence_length - 1):
          score_attn = torch.unsqueeze(s, dim = 1) @ hidden_states.transpose(-2,-1)
          score_attn = score_attn.softmax(dim = -1)
          token_context = score_attn @ hidden_states
          token_context = torch.squeeze(token_context, dim = 1)
          s = F.tanh(self.fc0(torch.cat([s, token_context], dim = 1)))
          x = inputs[:,t,:]
          s, cell_state = self.LSTMCell(x, s, cell_state)
          output = self.fc1(s)
          predictions.append(output)

        predictions = torch.stack(predictions, dim=1)
        ##################################################
        return predictions


class Seq2Seq(nn.Module):
    def __init__(self, feature_extractor, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.feature_extractor = feature_extractor
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target):
        source_feats = self.feature_extractor(source)
        enc_out = self.encoder(source_feats)
        dec_out = self.decoder(target, *enc_out)
        return dec_out  # shape: [batch_size, target seq len-1, target_vocab_size]

In [None]:
feature_extractor = FeatureExtractor()
encoder = LSTMEncoder(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM)
decoder = AttentionLSTMDecoder(EMBEDDING_DIM, HIDDEN_DIM, VOCAB_SIZE)

model = Seq2Seq(feature_extractor, encoder, decoder)
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)

In [None]:
# Train your model. All cell outputs should be shown below.
best_val_loss = float('inf')
for epoch in range(EPOCHS):
  train_loss = train(model, train_loader, optimizer, criterion)
  val_loss = evaluate(model, val_loader, criterion)

  if val_loss < best_val_loss:
    best_valid_loss = val_loss
    torch.save(model.state_dict(), 'model.pt')

  print(f'Epoch: {epoch} | \tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
  print(f'Epoch: {epoch} | \t Val. Loss: {val_loss:.3f} |  Val. PPL: {math.exp(val_loss):7.3f}')

  mel_basis = filters.mel(sr=sr, n_fft=n_fft, **kwargs)
100%|██████████| 170/170 [05:32<00:00,  1.95s/it]
100%|██████████| 16/16 [00:19<00:00,  1.19s/it]


Epoch: 0 | 	Train Loss: 2.761 | Train PPL:  15.814
Epoch: 0 | 	 Val. Loss: 1.121 |  Val. PPL:   3.069


100%|██████████| 170/170 [05:22<00:00,  1.90s/it]
100%|██████████| 16/16 [00:20<00:00,  1.28s/it]


Epoch: 1 | 	Train Loss: 0.641 | Train PPL:   1.899
Epoch: 1 | 	 Val. Loss: 0.352 |  Val. PPL:   1.422


100%|██████████| 170/170 [05:24<00:00,  1.91s/it]
100%|██████████| 16/16 [00:18<00:00,  1.16s/it]


Epoch: 2 | 	Train Loss: 0.234 | Train PPL:   1.264
Epoch: 2 | 	 Val. Loss: 0.163 |  Val. PPL:   1.177


100%|██████████| 170/170 [05:26<00:00,  1.92s/it]
100%|██████████| 16/16 [00:19<00:00,  1.22s/it]


Epoch: 3 | 	Train Loss: 0.116 | Train PPL:   1.122
Epoch: 3 | 	 Val. Loss: 0.092 |  Val. PPL:   1.096


100%|██████████| 170/170 [05:33<00:00,  1.96s/it]
100%|██████████| 16/16 [00:19<00:00,  1.23s/it]

Epoch: 4 | 	Train Loss: 0.066 | Train PPL:   1.069
Epoch: 4 | 	 Val. Loss: 0.058 |  Val. PPL:   1.060





In [None]:
# Test your model. All cell outputs should be shown below.
model.load_state_dict(torch.load('model.pt'))
test_loss = evaluate(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f}')

100%|██████████| 16/16 [00:19<00:00,  1.24s/it]

Test Loss: 0.056 | Test PPL:   1.058





In [None]:
idx = 0
y, sr = librosa.load(test_dataset.audio_list[idx])
audio_feature = librosa.feature.mfcc(y=y, sr=sr, hop_length=160, n_mfcc=33, n_fft=400, window='hamming').swapaxes(0, 1)
text_encoded = tokenizer.encode(test_dataset.text_list[idx], out_type=int)

In [None]:
ipd.Audio(test_dataset.audio_list[idx], rate=8000, autoplay=False)

In [None]:
sample_output = model(torch.Tensor(audio_feature).unsqueeze(0).to(device), torch.IntTensor(text_encoded).unsqueeze(0).to(device))[0]
sample_output = torch.argmax(sample_output, dim=-1)
tokenizer.decode(sample_output.tolist())

'악수는 서양의 풍습입니다'