# HW2

In [None]:
# for colab
!wget https://oss.shisheng.icu/ml2023spring-hw2.zip
!unzip ml2023spring-hw2.zip

## Libs

In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset
import random
import os
from tqdm import tqdm
import gc

## Utils

In [None]:
def same_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

## Feature Processing

In [None]:
def load_feature(feat_path):
    features = torch.load(feat_path)
    return features


def shift(x, n):
    if n < 0:
        left = x[0].repeat(-n, 1)
        right = x[:n]
    elif n > 0:
        right = x[-1].repeat(n, 1)
        left = x[n:]
    else:
        return x

    return torch.cat((left, right), dim=0)


def concat_feat(x, concat_n):
    assert concat_n % 2 == 1  # n must be odd
    if concat_n < 2:
        return x
    seq_len, feature_dim = x.size(0), x.size(1)
    x = x.repeat(1, concat_n)
    x = x.view(seq_len, concat_n, feature_dim).permute(1, 0, 2)  # concat_n, seq_len, feature_dim
    mid = (concat_n // 2)
    for r_idx in range(1, mid + 1):
        x[mid + r_idx, :] = shift(x[mid + r_idx], r_idx)
        x[mid - r_idx, :] = shift(x[mid - r_idx], -r_idx)

    return x.permute(1, 0, 2).view(seq_len, concat_n * feature_dim)

In [None]:
def preprocess_data(split, feat_dir, phone_path, concat_nframes, train_ratio=0.8):
    class_num = 41  # 41 different phonemes in dataset

    # determine mode
    if split == 'train' or split == 'val':
        mode = 'train'
    elif split == 'test':
        mode = 'test'

    label_dict = {}

    if mode == 'train':
        for line in open(os.path.join(phone_path, 'train_labels.txt')).readlines():
            line = line.strip('\n').split(' ')
            label_dict[line[0]] = [int(p) for p in line[1:]]

        # split training and validation data
        usage_list = open(os.path.join(phone_path, 'train_split.txt')).readlines()
        random.shuffle(usage_list)
        train_len = int(len(usage_list) * train_ratio)
        usage_list = usage_list[:train_len] if split == 'train' else usage_list[train_len:]
    elif mode == 'test':
        usage_list = open(os.path.join(phone_path, 'test_split.txt')).readlines()

    usage_list = [line.strip('\n') for line in usage_list]
    print('[Dataset] phone classes: ' + str(class_num) + ', number of utterances for ' + split + ': ' + str(
        len(usage_list)))

    max_len = 3000000
    X = torch.empty(max_len, 39 * concat_nframes)  # X for features
    if mode == 'train':
        y = torch.empty(max_len, dtype=torch.long)  # y for labels(training mode only)

    idx = 0
    for i, feature_name in tqdm(enumerate(usage_list)):
        feat = load_feature(os.path.join(feat_dir, mode, f'{feature_name}.pt'))
        cur_len = len(feat)
        feat = concat_feat(feat, concat_nframes)
        if mode == 'train':
            label = torch.LongTensor(label_dict[feature_name])

        X[idx: idx + cur_len, :] = feat
        if mode == 'train':
            y[idx: idx + cur_len] = label

        idx += cur_len

    X = X[:idx, :]
    if mode == 'train':
        y = y[:idx]

    print(f'[INFO] {split} set')
    print(X.shape)
    if mode == 'train':
        print(y.shape)
        return X, y
    else:
        return X

## Dataset and Model

I tried to use LSTM

Reference:  
<https://wandb.ai/sauravmaheshkar/LSTM-PyTorch/reports/Using-LSTM-in-PyTorch-A-Tutorial-With-Examples--VmlldzoxMDA2NTA5>
<https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html>

In [None]:
class LibriphoneDataset(Dataset):
    def __init__(self, X, y=None):
        self.data = X
        if y is not None:
            self.label = torch.LongTensor(y)
        else:
            self.label = None

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, output_dim=41, hidden_layers=4, hidden_dim=256, batch_size = 8):
        super(LSTMClassifier, self).__init__()
        self.input_dim = input_dim
        self.batch_size = batch_size
        self.hidden_dim = hidden_dim
        self.hidden_layers = hidden_layers
        
        # input
        self.fc = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU()
        )
        
        # lstm
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, hidden_layers, batch_first=True)
        
        # output
        self.bc = nn.Sequential(
            nn.Linear(hidden_dim, output_dim)
        )


    def forward(self, x):
        x = self.fc(x)
        out, _ = self.lstm(x)
        out = self.bc(out)
        return out

## Hyperparameters

In [None]:
# data
concat_nframes = 11
train_ratio = 0.8

# model
input_dim = 39 * concat_nframes
hidden_layers = 4
hidden_dim = 64

# training
seed = 721
batch_size = 8
target_epochs = 1
learning_rate = 1e-3
model_path = './model.ckpt'

# others
same_seeds(seed)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'DEVICE: {device}')

## Load Data

In [None]:
# train set
train_X, train_y = preprocess_data(split='train', feat_dir='libriphone\\feat', phone_path='libriphone', concat_nframes=concat_nframes, train_ratio=train_ratio)
train_dataset = LibriphoneDataset(train_X, train_y)

# val set
val_X, val_y = preprocess_data(split='val', feat_dir='libriphone\\feat', phone_path='libriphone', concat_nframes=concat_nframes, train_ratio=train_ratio)
val_dataset = LibriphoneDataset(val_X, val_y)

# free memory
del train_X, train_y, val_X, val_y
gc.collect()

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

## Train

In [None]:
model = LSTMClassifier(input_dim=input_dim, hidden_layers=hidden_layers, hidden_dim=hidden_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

best_acc = 0.0
for epoch in range(target_epochs):
    train_acc, train_loss, val_acc, val_loss = 0.0, 0.0, 0.0, 0.0
    
    # training
    model.train()
    for i, batch in enumerate(tqdm(train_loader)):
        features, labels = batch
        features = features.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad() 
        outputs = model(features) 
        
        loss = criterion(outputs, labels)
        loss.backward() 
        optimizer.step() 
        
        _, train_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
        train_acc += (train_pred.detach() == labels.detach()).sum().item()
        train_loss += loss.item()
    
    # validation
    model.eval()
    with torch.no_grad():
        for i, batch in enumerate(tqdm(val_loader)):
            features, labels = batch
            features = features.to(device)
            labels = labels.to(device)
            outputs = model(features)
            
            loss = criterion(outputs, labels) 
            
            _, val_pred = torch.max(outputs, 1) 
            val_acc += (val_pred.cpu() == labels.cpu()).sum().item() # get the index of the class with the highest probability
            val_loss += loss.item()

    print('[Epoch %d/%d] Train Acc: %.3f, Train Loss: %.3f | Val Acc: %.3f, Val Loss: %.3f' % \
          (epoch + 1, target_epochs, train_acc/len(train_dataset), train_loss/len(train_loader), val_acc/len(val_dataset), val_loss/len(val_loader)))

    # if the model improves, save a checkpoint at this epoch
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), model_path)
        print(f'saving model with acc {best_acc/len(val_dataset):.5f}')

In [None]:
del train_dataset, val_dataset
del train_loader, val_loader
gc.collect()

## Test

In [None]:
# load test data
test_X = preprocess_data(split='test', feat_dir='/libriphone/feat', phone_path='/libriphone', concat_nframes=concat_nframes)
test_set = LibriphoneDataset(test_X)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)

# load model
model = LSTMClassifier(input_dim=input_dim, hidden_layers=hidden_layers, hidden_dim=hidden_dim).to(device)
model.load_state_dict(torch.load(model_path))

# make prediction
model.eval()
pred = np.array([], dtype=np.int32)
with torch.no_grad():
    for i, batch in enumerate(tqdm(test_loader)):
        features = batch
        features = features.to(device)
        outputs = model(features)
        
        _, test_pred = torch.max(outputs, 1)
        pred = np.concatenate((pred, test_pred.cpu().numpy()), axis=0)
        
with open('prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred):
        f.write('{},{}\n'.format(i, y))