# **Phoneme Classification**


# Download Data
Download data from google drive, then unzip it.

- `libriphone/train_split.txt`: training metadata
- `libriphone/train_labels`: training labels
- `libriphone/test_split.txt`: testing metadata
- `libriphone/feat/train/*.pt`: training feature
- `libriphone/feat/test/*.pt`:  testing feature





In [None]:

from google.colab import drive
drive.mount('/content/drive')
!unzip '/content/drive/MyDrive/libriphone.zip' -d '/content/'


In [2]:
%ls libriphone

[0m[01;34mfeat[0m/  test_split.txt  train_labels.txt  train_split.txt



**Fixes the random seeds for the reproducibility.**

In [3]:
import numpy as np
import torch
import random

def same_seeds(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

In [4]:
import os
import torch
from tqdm import tqdm

def load_feat(path):
    feat = torch.load(path)
    return feat

def shift(x, n):
    if n < 0:
        left = x[0].repeat(-n, 1)
        right = x[:n]
    elif n > 0:
        right = x[-1].repeat(n, 1)
        left = x[n:]
    else:
        return x

    return torch.cat((left, right), dim=0)

def concat_feat(x, concat_n):
    assert concat_n % 2 == 1 # n should be odd, add the adjacent frames for learning
    if concat_n < 2:
        return x
    seq_len, feature_dim = x.size(0), x.size(1)
    x = x.repeat(1, concat_n)
    x = x.view(seq_len, concat_n, feature_dim).permute(1, 0, 2) # concat_n(21), seq_len, feature_dim
    mid = (concat_n // 2)
    for r_idx in range(1, mid+1):
        x[mid + r_idx, :] = shift(x[mid + r_idx], r_idx)
        x[mid - r_idx, :] = shift(x[mid - r_idx], -r_idx)

    return x.permute(1, 0, 2).view(seq_len, concat_n * feature_dim)

def preprocess_data(split, feat_dir, phone_path, concat_nframes, train_ratio=0.8, random_seed=1213):
    class_num = 41 # NOTE: pre-computed, should not need change

    if split == 'train' or split == 'val':
        mode = 'train'
    elif split == 'test':
        mode = 'test'
    else:
        raise ValueError('Invalid \'split\' argument for dataset: PhoneDataset!')

    label_dict = {}
    if mode == 'train':
        for line in open(os.path.join(phone_path, f'{mode}_labels.txt')).readlines():
            line = line.strip('\n').split(' ')
            label_dict[line[0]] = [int(p) for p in line[1:]]

        # split training and validation data
        usage_list = open(os.path.join(phone_path, 'train_split.txt')).readlines()
        random.seed(random_seed)
        random.shuffle(usage_list)
        train_len = int(len(usage_list) * train_ratio)
        usage_list = usage_list[:train_len] if split == 'train' else usage_list[train_len:]

    elif mode == 'test':
        usage_list = open(os.path.join(phone_path, 'test_split.txt')).readlines()

    usage_list = [line.strip('\n') for line in usage_list]
    print('[Dataset] - # phone classes: ' + str(class_num) + ', number of utterances for ' + split + ': ' + str(len(usage_list)))

    max_len = 3000000
    X = torch.empty(max_len, 39 * concat_nframes)
    if mode == 'train':
        y = torch.empty(max_len, dtype=torch.long)

    idx = 0
    for i, fname in tqdm(enumerate(usage_list)):
        feat = load_feat(os.path.join(feat_dir, mode, f'{fname}.pt'))
        cur_len = len(feat)
        feat = concat_feat(feat, concat_nframes)
        if mode == 'train':
            label = torch.LongTensor(label_dict[fname])

        X[idx: idx + cur_len, :] = feat
        if mode == 'train':
            y[idx: idx + cur_len] = label

        idx += cur_len

    X = X[:idx, :]
    if mode == 'train':
        y = y[:idx]

    print(f'[INFO] {split} set')
    print(X.shape)
    if mode == 'train':
        print(y.shape)
        return X, y
    else:
        return X

# Dataset

In [5]:
import torch
from torch.utils.data import Dataset

class LibriDataset(Dataset):
    def __init__(self, X, y=None):
        self.data = X
        if y is not None:
            self.label = torch.LongTensor(y)
        else:
            self.label = None

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)


# Model
Using the Recurrent Neural Network

In [10]:
import torch.nn as nn

class BasicBlock(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(BasicBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Linear(input_dim, output_dim),
            nn.BatchNorm1d(output_dim),
            nn.ReLU(),
            nn.Dropout(p=0.15)
        )

    def forward(self, x):
        x = self.block(x)
        return x

class Classifier(nn.Module):
    def __init__(self, input_dim, output_dim=41, hidden_layers=1, hidden_dim=256):
        super(Classifier, self).__init__()
        # Create BiLSTM
        self.input_size = 39   # input is a 39 dimension vector
        self.hidden_size = 512  # hidden_dimension
        self.num_layers = 6     # hidden_layers
        self.rnn = nn.LSTM(input_size=self.input_size, hidden_size=self.hidden_size, num_layers=self.num_layers, batch_first=True, dropout=0.3, bidirectional=True)
        self.fc = nn.Sequential(
            BasicBlock(2 * self.hidden_size, hidden_dim),
             *[BasicBlock(hidden_dim, hidden_dim) for _ in range(hidden_layers-1)],
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
         # x.shape: (batch_size, seq_len, RNN_input_size)
         x, _ = self.rnn(x)  # => (batch_size, seq_len, RNN_hidden_size)
         x = x[:, -1]        # => (batch_size, RNN_hidden_size)
         x = self.fc(x)      # => (batch_size, labels)

         return x

# Hyper-parameters

In [7]:
# Data prarameters
# TODO: change the value of "concat_nframes" for medium baseline
concat_nframes = 21 #61      # the number of frames to concat with, n must be odd (total 2k+1 = n frames)
train_ratio = 0.95           # the ratio of data used for training, the rest will be used for validation

# Training parameters
seed = 1213                  # random seed
batch_size = 512             # batch size
num_epoch = 15               # the number of training epoch
learning_rate =  1e-3        # learning rate
model_path = './model.ckpt'  # the path where the checkpoint will be saved

# Model parameters
# TODO: change the value of "hidden_layers" or "hidden_dim" for medium baseline
input_dim = 39 * concat_nframes  # the input dim of the model, you should not change the value
hidden_layers = 6            # the number of hidden layers
hidden_dim = 512             # the hidden dim


total_params = (
    (input_dim+1) * hidden_dim +
    (hidden_dim + 1) * hidden_dim * (hidden_layers - 1) +
    (hidden_dim + 1) * 41
)
print(f'Total params: {total_params}')

def get_dest_dim(input_dim, output_dim, hidden_layers, dest_hidden_layers, hidden_dim):
    a = dest_hidden_layers - 1  # a = l_d - 1
    b = input_dim + output_dim + dest_hidden_layers  #  b = i + o + l_d
    c = - (hidden_layers - 1) * (hidden_dim ** 2) - (input_dim + output_dim + hidden_layers) * hidden_dim  # c = - (l - 1) * (d ** 2) - (i + o + l) * d

    sqrt_part = (b ** 2) - 4 * a * c

    #(-b±√(b^2-4ac))/(2a)
    d_d_plus = (-b + sqrt_part**(0.5)) / (2 * a)
    d_d_minus = (-b - sqrt_part**(0.5)) / (2 * a)

    return (d_d_plus, d_d_minus)

dest_hidden_layers = 2

dest_hidden_dim, _ = get_dest_dim(input_dim, 41, hidden_layers, dest_hidden_layers, hidden_dim)
print(f"dest_hidden_layers: {dest_hidden_layers}，dest_hidden_dim: {round(dest_hidden_dim)}",)

Total params: 1754153
若将隐藏层网络层数改为: 2，则维数应当改为: 962


# Dataloader

In [8]:
from torch.utils.data import DataLoader
import gc

same_seeds(seed)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'DEVICE: {device}')

# Preprocess data
train_X, train_y = preprocess_data(split='train', feat_dir='./libriphone/feat', phone_path='./libriphone', concat_nframes=concat_nframes, train_ratio=train_ratio, random_seed=seed)
val_X, val_y = preprocess_data(split='val', feat_dir='./libriphone/feat', phone_path='./libriphone', concat_nframes=concat_nframes, train_ratio=train_ratio, random_seed=seed)

# Get dataset
train_set = LibriDataset(train_X, train_y)
val_set = LibriDataset(val_X, val_y)

# Remove raw feature to save memory
del train_X, train_y, val_X, val_y
gc.collect()

# Get dataloader
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

DEVICE: cuda
[Dataset] - # phone classes: 41, number of utterances for train: 3257


3257it [00:11, 294.84it/s]


[INFO] train set
torch.Size([2007632, 819])
torch.Size([2007632])
[Dataset] - # phone classes: 41, number of utterances for val: 172


172it [00:00, 177.18it/s]


[INFO] val set
torch.Size([109162, 819])
torch.Size([109162])


# Training

In [9]:
## For plotting learning curve
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter()

RESUME = True

# Create a model, and put it on the device specified.
model = Classifier(input_dim=input_dim, hidden_layers=hidden_layers, hidden_dim=hidden_dim).to(device)

if RESUME:
    model.load_state_dict(torch.load(model_path, map_location='cuda'))

# Define a loss function, and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Create a learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.8, patience=5, threshold=0.05)

best_acc = 0.0
for epoch in range(num_epoch):
    train_acc = 0.0
    train_loss = 0.0
    val_acc = 0.0
    val_loss = 0.0

    # ---------- Training ----------
    model.train()

    for i, batch in enumerate(tqdm(train_loader)):

        # A batch consists of features data and corresponding labels.
        features, labels = batch  # feature.shape: (batch_size, seq_len * input_size)

        # Forward the data. (Make sure data and model are on the same device.)
        features = features.to(device)
        features = features.view(-1, concat_nframes, 39).to(device) # feature.shape: (batch_size, seq_len, input_size)
        labels = labels.to(device)

        outputs = model(features) # (batch_size, labels)

        # Calculate the cross-entropy loss.
        # We don't need to apply softmax before computing cross-entropy as it is done automatically.
        loss = criterion(outputs, labels)

        # Gradients stored in the parameters in the previous step should be cleared out first.
        optimizer.zero_grad()

        # Compute the gradients for parameters.
        loss.backward()

        # Update the parameters with computed gradients.
        optimizer.step()

        # Get the index of the class with the highest probability
        _, train_pred = torch.max(outputs, 1)

        # Compute the accuracy for current batch.
        train_acc += (train_pred.detach() == labels.detach()).sum().item()
        train_loss += loss.item()

    # ---------- Validation ----------
    model.eval()

    # We don't need gradient in validation.
    # Using torch.no_grad() accelerates the forward process.
    with torch.no_grad():
        for i, batch in enumerate(tqdm(val_loader)):
            features, labels = batch
            features = features.to(device)
            features = features.view(-1, concat_nframes, 39).to(device)
            labels = labels.to(device)
            outputs = model(features)

            loss = criterion(outputs, labels)

            _, val_pred = torch.max(outputs, 1)

            val_acc += (val_pred.cpu() == labels.cpu()).sum().item()
            val_loss += loss.item()

    # Record the accuracy and lr.
    writer.add_scalar('Acc/train', train_acc/len(train_set), epoch)
    writer.add_scalar('Acc/valid', val_acc/len(val_set), epoch)
    writer.add_scalar('lr', optimizer.state_dict()['param_groups'][0]['lr'], epoch)

    # Print the information.
    print(f'[{epoch+1:03d}/{num_epoch:03d}] Train Acc: {train_acc/len(train_set):3.5f} Loss: {train_loss/len(train_loader):3.5f} | Val Acc: {val_acc/len(val_set):3.5f} loss: {val_loss/len(val_loader):3.5f}')

    # If the model improves, save a checkpoint at this epoch
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), model_path)
        print(f'saving model with acc {best_acc/len(val_set):.5f}')

    print(f"{epoch+1} lr: {optimizer.state_dict()['param_groups'][0]['lr']}")

    # Update learning rate based on best loss
    # Modify step() according to your scheduler
    scheduler.step(val_acc/len(val_set))

print(f'saving model with acc {best_acc/len(val_set):.5f}')


100%|██████████| 3922/3922 [49:50<00:00,  1.31it/s]
100%|██████████| 214/214 [00:48<00:00,  4.42it/s]


[001/015] Train Acc: 0.69548 Loss: 1.03859 | Val Acc: 0.74689 loss: 0.84968
saving model with acc 0.74689
1 lr: 0.001


100%|██████████| 3922/3922 [49:57<00:00,  1.31it/s]
100%|██████████| 214/214 [00:48<00:00,  4.42it/s]


[002/015] Train Acc: 0.82253 Loss: 0.58848 | Val Acc: 0.76062 loss: 0.85996
saving model with acc 0.76062
2 lr: 0.001


100%|██████████| 3922/3922 [50:04<00:00,  1.31it/s]
100%|██████████| 214/214 [00:48<00:00,  4.39it/s]


[003/015] Train Acc: 0.86893 Loss: 0.42690 | Val Acc: 0.76563 loss: 0.92360
saving model with acc 0.76563
3 lr: 0.001


 96%|█████████▋| 3781/3922 [48:18<01:48,  1.30it/s]


KeyboardInterrupt: 

In [None]:
%reload_ext tensorboard
%tensorboard --logdir=./runs/

In [None]:
del train_set, val_set
del train_loader, val_loader
gc.collect()

# Testing
Create a testing dataset, and load model from the saved checkpoint.

In [None]:
# load data
test_X = preprocess_data(split='test', feat_dir='./libriphone/feat', phone_path='./libriphone', concat_nframes=concat_nframes)
test_set = LibriDataset(test_X, None)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
# load model
model = Classifier(input_dim=input_dim, hidden_layers=hidden_layers, hidden_dim=hidden_dim).to(device)
model.load_state_dict(torch.load(model_path))

Make prediction.

In [None]:
pred = np.array([], dtype=np.int32)

model.eval()
with torch.no_grad():
    for i, batch in enumerate(tqdm(test_loader)):
        features = batch
        features = features.to(device)
#         features = features.view(-1, concat_nframes, 39).to(device)

        outputs = model(features)

        _, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
        pred = np.concatenate((pred, test_pred.cpu().numpy()), axis=0)


Write prediction to a CSV file.

After finish running this block, download the file `prediction.csv` from the files section on the left-hand side and submit it to Kaggle.

In [None]:
with open('prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred):
        f.write('{},{}\n'.format(i, y))