Description

Task Description
* Phoneme Classification
* Training data: 3429 preprocessed audio features w/ labels (total 2116794 frames)
* Testing data: 857 preprocessed audio features w/o labels (total 527364 frames)
* Label: 41 classes, each class represents a phoneme

Evaluation metric
* Categorical accuracy

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import random
import os
import tqdm
import gc


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'device: {device}')

class MyDataset(Dataset):
    def __init__(self, x, y=None):
        self.data = x
        if y is not None:
            self.label = torch.LongTensor(y)
        else:
            self.label = None
    
    def __getitem__(self, index):
        if self.label is not None:
            return self.data[index], self.label[index]
        else:
            return self.data[index]
        
    def __len__(self):
        return len(self.data)

def load_feat(path):
    feat = torch.load(path)
    return feat

def shift(x, n):
    if n < 0:
        left = x[0].repeat(-n, 1)
        right = x[:n]
    elif n > 0:
        right = x[-1].repeat(n, 1)
        left = x[n:]
    else:
        return x

    return torch.cat((left, right), dim=0)

def concat_feat(x, concat_n):
    assert concat_n % 2 == 1 # n must be odd
    if concat_n < 2:
        return x
    seq_len, feature_dim = x.size(0), x.size(1)
    x = x.repeat(1, concat_n) 
    x = x.view(seq_len, concat_n, feature_dim).permute(1, 0, 2) # concat_n, seq_len, feature_dim
    mid = (concat_n // 2)
    for r_idx in range(1, mid+1):
        x[mid + r_idx, :] = shift(x[mid + r_idx], r_idx)
        x[mid - r_idx, :] = shift(x[mid - r_idx], -r_idx)

    return x.permute(1, 0, 2).view(seq_len, concat_n * feature_dim)

def preprocess_data(split, feat_dir, phone_dir, concat_nframes, train_ratio=0.8):
    class_num = 41 # NOTE: pre-computed, should not need change

    if split == 'train' or split == 'val':
        mode = 'train'
    elif split == 'test':
        mode = 'test'
    else:
        raise ValueError('Invalid \'split\' argument for dataset: PhoneDataset!')

    label_dict = {}
    if mode == 'train':
        for line in open(os.path.join(phone_dir, f'{mode}_labels.txt')).readlines():
            line = line.strip('\n').split(' ')
            label_dict[line[0]] = [int(p) for p in line[1:]]
        
        # split training and validation data
        usage_list = open(os.path.join(phone_dir, 'train_split.txt')).readlines()
        random.shuffle(usage_list)
        train_len = int(len(usage_list) * train_ratio)
        usage_list = usage_list[:train_len] if split == 'train' else usage_list[train_len:]

    elif mode == 'test':
        usage_list = open(os.path.join(phone_dir, 'test_split.txt')).readlines()

    usage_list = [line.strip('\n') for line in usage_list]
    print('[Dataset] - # phone classes: ' + str(class_num) + ', number of utterances for ' + split + ': ' + str(len(usage_list)))

    max_len = 3000000
    X = torch.empty(max_len, 39 * concat_nframes)
    if mode == 'train':
        y = torch.empty(max_len, dtype=torch.long)

    idx = 0
    for i, fname in tqdm.tqdm(enumerate(usage_list)):
        feat = load_feat(os.path.join(feat_dir, mode, f'{fname}.pt'))
        cur_len = len(feat)
        feat = concat_feat(feat, concat_nframes)
        if mode == 'train':
          label = torch.LongTensor(label_dict[fname])

        X[idx: idx + cur_len, :] = feat
        if mode == 'train':
          y[idx: idx + cur_len] = label

        idx += cur_len

    X = X[:idx, :]
    if mode == 'train':
      y = y[:idx]

    print(f'[INFO] {split} set')
    print(X.shape)
    if mode == 'train':
      print(y.shape)
      return X, y
    else:
      return X

# Model

In [None]:

class BasicBlock(torch.nn.Module):
    def __init__(self, input_dim, output_dim):
        super(BasicBlock, self).__init__()

        self.block = torch.nn.Sequential(
            torch.nn.Linear(input_dim, output_dim),
            torch.nn.ReLU()
        )

    def forward(self, x):
        return self.block(x)
    
class Classifier(torch.nn.Module):
    def __init__(self, input_dim, output_dim=41, hidden_layers=1, hidden_dim=256):
        super(Classifier, self).__init__()

        self.fc = torch.nn.Sequential(
            BasicBlock(input_dim, hidden_dim),
            *[BasicBlock(hidden_dim, hidden_dim) for _ in range(hidden_layers)],
            torch.nn.Linear(hidden_dim, output_dim),
        )

    def forward(self, x):
        return self.fc(x)


# HP

In [None]:
# data para
concat_nframes = 3
train_ratio = 0.8

# training parameters
batch_size = 512                # batch size
num_epoch = 10                   # the number of training epoch
learning_rate = 1e-4         # learning rate
model_path = './model.ckpt'     # the path where the checkpoint will be saved

# model para
input_dim = 39 * concat_nframes
hidden_layers = 2
hidden_dim = 64

# DataLoader

In [None]:
train_X, train_y = preprocess_data('train',
                                   feat_dir='./data/feat', 
                                   phone_dir='./data/',
                                   concat_nframes=concat_nframes,
                                   train_ratio=train_ratio
                                   )
val_X, val_y = preprocess_data('val',
                                   feat_dir='./data/feat', 
                                   phone_dir='./data/',
                                   concat_nframes=concat_nframes,
                                   train_ratio=train_ratio
                                   )
train_set = MyDataset(train_X, train_y)
val_set = MyDataset(val_X, val_y)

# remove raw features to save memory
# del train_X, train_y, val_X, val_y
# gc.collect()

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

# Training
* Macbook Air M1: 
  * 2m10s, pulic score 0.50048

* i5-13600kf + 4090: 
  * 1m57s

In [None]:
model = Classifier(input_dim=input_dim,
                   hidden_dim=hidden_dim,
                   hidden_layers=hidden_layers
                   ).to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

best_acc = 0
for epoch in range(num_epoch):
    model.train() # set the model to training mode
    for i, (features, labels) in enumerate(tqdm.tqdm(train_loader)):
        features, labels = features.to(device), labels.to(device)
        optimizer.zero_grad()
        pred = model(features)
        loss = criterion(pred, labels)
        loss.backward()
        optimizer.step()

        # TODO: train_acc, train_loss
    
    model.eval() # set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():
        for i, (features, labels) in enumerate(val_loader):
            features, labels = features.to(device), labels.to(device)
            pred = model(features)
            _, predicted = torch.max(pred, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    acc = correct / total
    print(f'epoch: {epoch}, acc: {acc}')
    if acc > best_acc:
        best_acc = acc
        torch.save(model.state_dict(), model_path)
        print(f'save model at {model_path}')
    print(f'best acc: {best_acc}')

In [None]:
# gc
del train_set, val_set, train_loader, val_loader, model
gc.collect()

# Testing

In [None]:
test_X = preprocess_data(split='test',
    feat_dir='./data/feat', 
    phone_dir='./data/',
    concat_nframes=concat_nframes,
)
test_set = MyDataset(test_X)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)
model = Classifier(input_dim=input_dim, hidden_layers=hidden_layers, hidden_dim=hidden_dim).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()

## github copilot generated code

In [None]:
# # test dataset
# preds = []
# with torch.no_grad():
#     for i, features in enumerate(tqdm.tqdm(test_loader)):
#         features = features.to(device)
#         pred = model(features)
#         _, predicted = torch.max(pred, 1)
#         preds.append(predicted.cpu().numpy())

# # write preds to CSV file
# with open('prediction-copilot.csv', 'w') as f:
#     f.write('id,class\n')
#     for i, pred in enumerate(preds):
#         f.write(f'{i},{pred}\n')

## sample code

In [None]:
pred = np.array([], dtype=np.int32)

model.eval()
with torch.no_grad():
    for i, features in enumerate(tqdm.tqdm(test_loader)):
        features = features.to(device)
        outputs = model(features)
        _, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
        pred = np.concatenate((pred, test_pred.cpu().numpy()), axis=0)

with open('prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred):
        f.write('{},{}\n'.format(i, y))