# Phoneme Classification

## Data Preparation and Initialization

### Import Packages

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from warmup_scheduler import GradualWarmupScheduler  #https://github.com/ildoonet/pytorch-gradual-warmup-lr
import gc
import torch
import torch.nn as nn
import pytorch_warmup as warmup # https://github.com/ildoonet/pytorch-gradual-warmup-lr
import swats # https://github.com/Mrpatekful/swats
from random import randint
import statistics

### Download Data

In [None]:
!gdown "1VVtc1chpLGlEst3h65eN1aX5V6iTl8qT" --output data.zip
!unzip data.zip
!ls 

### Initialization

In [None]:
myseed = 666  # set a random seed for reproducibility

print('Loading data ...')

data_root='./timit_11/'
train = np.load(data_root + 'train_11.npy')
train_label = np.load(data_root + 'train_label_11.npy')
test = np.load(data_root + 'test_11.npy')

print('Size of training data: {}'.format(train.shape))
print('Size of testing data: {}'.format(test.shape))

### Dataset

In [None]:
class TIMITDataset(Dataset):
    def __init__(self, X, y=None):
        self.data = torch.from_numpy(X).float()
        if y is not None:
            y = y.astype(np.int)
            self.label = torch.LongTensor(y)
        else:
            self.label = None

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)

## Neural Network

### Neural Network Construction

In [None]:
class Classifier(nn.Module):
    def __init__(self, L1, L2, L3, L4):
        super(Classifier, self).__init__()
        self.layer1 = nn.Linear(429, L1)
        self.layer2 = nn.Linear(L1, L2)
        self.layer3 = nn.Linear(L2, L3)
        self.layer4 = nn.Linear(L3, L4)
        self.out = nn.Linear(L4, 39) 

        self.act_fn = nn.ReLU()
        self.dropout_2 = nn.Dropout(0.6)
        self.batchnorm_1 = nn.BatchNorm1d(L1)
        self.batchnorm_2 = nn.BatchNorm1d(L2)
        self.batchnorm_3 = nn.BatchNorm1d(L3)
        self.batchnorm_4 = nn.BatchNorm1d(L4)

    def forward(self, x):
        x = self.layer1(x)
        x = self.batchnorm_1(x)
        x = self.act_fn(x)
        x = self.dropout_2(x)

        x = self.layer2(x)
        x = self.batchnorm_2(x)
        x = self.act_fn(x)
        x = self.dropout_2(x)

        x = self.layer3(x)
        x = self.batchnorm_3(x)
        x = self.act_fn(x)
        x = self.dropout_2(x)

        x = self.layer4(x)
        x = self.batchnorm_4(x)
        x = self.act_fn(x)
        x = self.dropout_2(x)

        x = self.out(x)
        
        return x

## Model Training

### Hyperparameters

In [None]:
config = {
    'train_size': 0.8,
    'num_epoch': 2000,                # maximum number of epochs
    'BATCH_SIZE': 180000,               # mini-batch size for dataloader
    'optimizer': 'Adam',              # optimization algorithm (optimizer in torch.optim)
    'optim_hparas': {                # hyper-parameters for the optimizer (depends on which optimizer you are using)
        'lr': 0.001,                 # learning rate of optimizer
        'weight_decay': 0.00002,
    }
}

times = 10
preds=[]

# the path where checkpoint saved
model_path = './model.ckpt'

### Helper Function

In [None]:
#check device
def get_device():
  return 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
# fix random seed
def same_seeds(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  
    np.random.seed(seed)  
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

### Construct Dataset

In [None]:
"""Split the labeled data into a training set and a validation set"""
train_x, val_x, train_y, val_y = train_test_split(train, train_label, random_state=myseed, train_size=config['train_size'])
print('Size of training set: {}'.format(train_x.shape))
print('Size of validation set: {}'.format(val_x.shape))


"""Create a data loader from the dataset"""
train_set = TIMITDataset(train_x, train_y)
val_set = TIMITDataset(val_x, val_y)
train_loader = DataLoader(train_set, batch_size=config['BATCH_SIZE'], shuffle=True) #only shuffle the training data
val_loader = DataLoader(val_set, batch_size=config['BATCH_SIZE'], shuffle=False)

"""Cleanup the unneeded variables to save memory.<br>"""
del train, train_label, train_x, train_y, val_x, val_y
gc.collect()

### Model Training

In [None]:
# fix random seed for reproducibility
same_seeds(myseed)

# get device 
device = get_device()
print(f'DEVICE: {device}')

In [None]:
for time in range(times):
    # create model, define a loss function, and optimizer
    model = Classifier(
        L1=randint(1024, 2048),
        L2=randint(512, 1024),
        L3=randint(256, 512),
        L4=randint(128, 256),
        ).to(device)
    criterion = nn.CrossEntropyLoss() 
    optimizer = getattr(torch.optim, config['optimizer'])(
            model.parameters(), **config['optim_hparas'])

    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5000)
    scheduler_warmup = GradualWarmupScheduler(optimizer, multiplier=1, total_epoch=100, after_scheduler=scheduler)

    # start training
    best_acc = 0.0
    for epoch in range(config['num_epoch']):
        train_acc = 0.0
        train_loss = 0.0
        val_acc = 0.0
        val_loss = 0.0

        # training
        model.train() # set the model to training mode
        scheduler_warmup.step(epoch)
        for i, data in enumerate(train_loader):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad() 
            outputs = model(inputs) 
            batch_loss = criterion(outputs, labels)
            _, train_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
            batch_loss.backward() 
            optimizer.step()
            scheduler.step()

            train_acc += (train_pred.cpu() == labels.cpu()).sum().item()
            train_loss += batch_loss.item()

            now_lr = get_lr(optimizer)

        # validation
        if len(val_set) > 0:
            model.eval() # set the model to evaluation mode
            with torch.no_grad():
                for i, data in enumerate(val_loader):
                    inputs, labels = data
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    batch_loss = criterion(outputs, labels) 
                    _, val_pred = torch.max(outputs, 1) 
                
                    val_acc += (val_pred.cpu() == labels.cpu()).sum().item() # get the index of the class with the highest probability
                    val_loss += batch_loss.item()

                print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f} | Val Acc: {:3.6f} loss: {:3.6f} | lr: {:3.6f}'.format(
                    epoch + 1, config['num_epoch'], train_acc/len(train_set), train_loss/len(train_loader), val_acc/len(val_set)
                    , val_loss/len(val_loader), now_lr
                ))

                # if the model improves, save a checkpoint at this epoch
                if val_acc > best_acc:
                    best_acc = val_acc
                    torch.save(model.state_dict(), model_path)
                    print('saving model with acc {:3.6f}'.format(best_acc/len(val_set)))

        else:
            print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f}'.format(
                epoch + 1, config['num_epoch'], train_acc/len(train_set), train_loss/len(train_loader)
            ))

    # if not validating, save the last epoch
    if len(val_set) == 0:
        torch.save(model.state_dict(), model_path)
        print('saving model at last epoch')

    """
    Testing
    Create a testing dataset, and load model from the saved checkpoint.
    """

    # create testing dataset
    test_set = TIMITDataset(test, None)
    test_loader = DataLoader(test_set, batch_size=config['BATCH_SIZE'], shuffle=False)

    """Make prediction"""
    model.eval() # set the model to evaluation mode
    predict = []
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            inputs = data
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability

            for y in test_pred.cpu().numpy():
                predict.append(y)
    preds.append(predict)

### Save Predictions

In [None]:
preds = np.asarray(preds)

# Ensemble predictions
mode_preds = []
for i in range(preds.shape[1]):
    mode_preds.append(statistics.mode(preds[:, i]))

with open('prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(mode_preds):
        f.write('{},{}\n'.format(i, y))