# **BioCAS 2023 Grand Challenge**

### Team

This project is ...

<hr style="border:2px solid grey">

## Overall Working Flow <a id="list_of_content"></a>

## 1. [Initiate Framework](#initialization)
## 2. [Generate Dataloaders](#dataloaders)
## 3. [Pretrain SupCon Model](#supcon)
## 4. [Evaluate SupCon Model](#tsen)
## 5. [FineTune Overall Model](#finetune)
## 6. [Evaluate Fintunned Model](#evaluate)

<hr style="border:2px solid grey">

## Step 1. Initiate Framework <a id="initialization"></a>

In [None]:
from __future__ import print_function

import os
import sys
import time
import math
import logging
import warnings

import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
import numpy as np
import matplotlib.pyplot as plt
import snntorch.functional as SF

from collections import Counter
from torchvision import transforms
from sklearn.metrics import confusion_matrix
from sklearn.manifold import TSNE
from sklearn.exceptions import UndefinedMetricWarning

import src.models as mdl
import src.data.dataset as ds
import src.data.dataloader as dl
import src.utils.visualizing as vs

from src.train import train_supcon, valid_supcon, train_model
from src.test import test_supcon, test_model
from src.utils.supcontrast import TwoCropTransform, SupConLoss
from src.utils.supcontrast import adjust_learning_rate, set_optimizer, save_model

import importlib
importlib.reload(mdl)
importlib.reload(ds)
importlib.reload(dl)
importlib.reload(vs)

print("Packages Loaded")

In [None]:
# -------------------- Globals --------------------#
# Device Config
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {DEVICE} on {torch.cuda.get_device_name(0)} :D ")

# Feature Config
MAX_LENGTH = 3.0
SR = 8000
HOP_LENGTH = 128
MAX_LENGTH_SAMPLES = int(MAX_LENGTH * SR / HOP_LENGTH)
INPUT_X_DIM = int(MAX_LENGTH * SR / HOP_LENGTH)
N_F_BIN = 64
N_FFT = 512
FEATURE = "mfcc"

# Log Config
formatter = logging.Formatter("%(asctime)s:%(levelname)s:%(name)s:%(message)s")

# Warning Config
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)

In [None]:
# -------------------- Define customized Argparse --------------------#
class Argparse:
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    def print_args(self):
        argparse_dict = vars(self)
        for key, value in argparse_dict.items():
            print(f"{key}: {value}")

opt = Argparse(
    # Dataset Config
    task_in = "Task_12", data_path = "SPRSound/", 
    batch_size = 32, val_percent = 0.2,
    
    # Model Config
    model = "resnet18", embedding_size = 512, 
    head = "linear", ckpt = "ckpt_epoch_200.pth", 

    # Train Config
    print_freq = 50, save_freq = 50, epochs = 100, 

    # Optim Config
    optimizer = "SGD",
    learning_rate = 0.001, momentum = 0.9,
    lr_decay_rate = 0.1, lr_decay_epochs = "70,80,90",
    weight_decay = 1e-4, dropout = 0.25,

    # SupCon Config
    temperature = 0.1, method = "SupCon",

    # Other Config
    cosine = True, warm = False, verbose = False,
)

iterations = opt.lr_decay_epochs.split(",")
opt.lr_decay_epochs = list([])
for it in iterations:
    opt.lr_decay_epochs.append(int(it))

# warm-up for large-batch training,
if opt.batch_size > 256:
    opt.warm = True
if opt.warm:
    opt.warmup_from = 0.01
    opt.warm_epochs = 10
    if opt.cosine:
        eta_min = opt.learning_rate * (opt.lr_decay_rate ** 3)
        opt.warmup_to = eta_min + (opt.learning_rate - eta_min) * (
                1 + math.cos(math.pi * opt.warm_epochs / opt.epochs)) / 2
    else:
        opt.warmup_to = opt.learning_rate

# set the path according to the environment
opt.model_path = "./ckpts/PreTrain-Models/{}".format(opt.task_in)
opt.model_name = "{}_{}{}_{}{}_hop{}_{}_lr{}_temp{}_drop{}_val{}".format(
    opt.model, 
    FEATURE, 
    N_F_BIN, 
    opt.head,
    opt.embedding_size,
    HOP_LENGTH, 
    opt.optimizer,
    opt.learning_rate,
    opt.temperature,
    opt.dropout,
    opt.val_percent,
)
opt.save_folder = os.path.join(opt.model_path, opt.model_name)
if not os.path.isdir(opt.save_folder):
    os.makedirs(opt.save_folder)

# Print all arguments
opt.print_args()

In [None]:
# -------------------- User-defined functions --------------------#
def setupLogger(name, logPath, level=logging.INFO):
    handler = logging.FileHandler(logPath)
    handler.setFormatter(formatter)
    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.addHandler(handler)
    return logger

log_path = "logs/BioCAS-Notes.logs"
if not os.path.exists(log_path):
    open(log_path, "a").close()
logger = setupLogger("ResultsLogger", log_path)

### [Back to List of Content](#list_of_content)

<hr style="border:2px solid grey">

## Step 2. Generate Dataloaders <a id="dataloaders"></a>

By providing the input task ```task_in``` and PATH of data ```data_path```, the three ***Datasets*** and four ***Dataloaders*** are generated. 

In [None]:
task_in = opt.task_in
data_path = opt.data_path

data_dict={
    "train":[
        os.path.join(data_path, "train_wav"), 
        os.path.join(data_path, "train_json")
    ],
    "intra_test":[
        os.path.join(data_path, "test_wav"), 
        os.path.join(data_path, "test_json/intra_test_json")
    ],
    "inter_test":[
        os.path.join(data_path, "test_wav"),
        os.path.join(data_path, "test_json/inter_test_json")
    ],
}

main_task = int(task_in[-2])
sub_task = int(task_in[-1])

In [None]:
trainDataset, intra_testDataset, inter_testDataset = ds.genDatasets(
    task=main_task, 
    data_dict=data_dict,
    resample=None,
    feature=FEATURE,
    pre_emph=False,
    pos_norm="zscore",
    n_mfcc=N_F_BIN,
    hop_length=HOP_LENGTH,
    n_fft=N_FFT,
)

In [None]:
# Prepare trainValLoader for SupCon pretraining
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(size=(N_F_BIN, MAX_LENGTH_SAMPLES), padding=0, pad_if_needed=True),
])

supcon_loader = dl.trainValLoader(
    trainDataset,
    sub_task,
    valid_size=opt.val_percent,
    batch_size=opt.batch_size,
    collate_fn=lambda batch: dl.supcon_collate(
        batch, task_in, sub_task, transform=TwoCropTransform(train_transform)
    ),
    train_sampler="balanced",
    val_sampler="balanced",
)

In [None]:
# Create non-contrastive dataloaders
dataloader = dl.trainValLoader(
    trainDataset,
    sub_task,
    valid_size=opt.val_percent,
    batch_size=opt.batch_size,
    collate_fn=lambda batch: dl.custom_collate(
        batch, MAX_LENGTH_SAMPLES, task_in, sub_task
    ),
    train_sampler="balanced",
    val_sampler="balanced",
)

print("\n\nGenerating Dataloader for Intra Dataset...")
intra_testloader = dl.testLoader(
    intra_testDataset,
    batch_size=opt.batch_size,
    collate_fn=lambda batch: dl.custom_collate(
        batch, MAX_LENGTH_SAMPLES, task_in, sub_task
    ),
    shuffle_in=False,
)

print("\nGenerating Dataloader for Inter Dataset...")
inter_testloader = dl.testLoader(
    inter_testDataset,
    batch_size=opt.batch_size,
    collate_fn=lambda batch: dl.custom_collate(
        batch, MAX_LENGTH_SAMPLES, task_in, sub_task
    ),
    shuffle_in=False,
)

In [None]:
for batch_idx, (data, label, info) in enumerate(dataloader["train"]):        
    if batch_idx <= 5:
        print(batch_idx, data.size(), info.size(), label.cpu().numpy().shape, Counter(label.cpu().numpy().flatten()))
    else: break

In [None]:
num_classes = len(dl.classes[task_in])
print("The number of classes for "+ task_in + f" is {num_classes}")

### [Back to List of Content](#list_of_content)

<hr style="border:2px solid grey">

## Step 3. Pretrain SupCon Model <a id="supcon"></a>

In [None]:
# Setup model, criterion and optimizer
model = mdl.SupConResNet(
    name=opt.model,
    head=opt.head, 
    feat_dim=opt.embedding_size,
    dropout=opt.dropout,
)
criterion = SupConLoss(temperature=opt.temperature)
if torch.cuda.is_available():
    if torch.cuda.device_count() > 1:
        model.encoder = nn.DataParallel(model.encoder)
    model = model.cuda()
    criterion = criterion.cuda()
    cudnn.benchmark = True

optimizer = set_optimizer(opt, model)

In [None]:
# training routine
best_loss = 0
best_epoch = 1
hist_loss = []
for epoch in range(1, opt.epochs + 1):
    adjust_learning_rate(opt, optimizer, epoch)

    # train for one epoch
    time1 = time.time()
    train_loss = train_supcon(supcon_loader["train"], model, criterion, optimizer, epoch, opt)
    hist_loss.append(train_loss)
    valid_loss = valid_supcon(supcon_loader["val"], model, criterion, opt)
    hist_loss.append(1/valid_loss)
    time2 = time.time()
    print("epoch {}, total time {:.2f}, train loss: {:.2f}, valid loss: {:.2f}".format(epoch, time2 - time1, train_loss, 1/valid_loss))

    if valid_loss > best_loss:
        best_loss = valid_loss
        best_model = model
        best_optimizer = optimizer
        best_epoch = epoch

    if epoch % opt.save_freq == 0:
        save_file = os.path.join(
            opt.save_folder, "ckpt_epoch_{epoch}.pth".format(epoch=epoch))
        save_model(model, optimizer, opt, epoch, save_file)

# save the best model
save_file = os.path.join(opt.save_folder, "best.pth")
save_model(best_model, best_optimizer, opt, opt.epochs, save_file)

# log results
log_msg = (
    f"PreTrain - Model: {opt.model}_{FEATURE}{N_F_BIN}, Task: {opt.task_in}. Epoch: {opt.epochs}, "
    f"Last train loss: {train_loss:>0.2f}, "
    f"Best valid loss: {1/best_loss:>0.2f} at epoch {best_epoch}, "
    f"Method: {opt.method}. Temperature: {opt.temperature:>0.2f}, "
    f"Embedding size: {opt.head}{opt.embedding_size}, "
    f"Hop length: {HOP_LENGTH}, Optimizer: {opt.optimizer}, "
    f"Learning rate: {opt.learning_rate}, Dropout: {opt.dropout} "
)
logger.info(log_msg)

In [None]:
# Plot loss during training
plt.figure(figsize=(8, 6))
plt.plot(hist_loss[0::2], label="train_loss")
plt.plot(hist_loss[1::2], label="validation_loss")
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.draw()

### [Back to List of Content](#list_of_content)

<hr style="border:2px solid grey">

## Step 4. Evaluate SupCon Model <a id="tsen"></a>

Visualizing via T-SNE plot

In [None]:
# load SupCon model
model_path = os.path.join(opt.save_folder, opt.ckpt)
model_info = torch.load(model_path)
SupCon = mdl.SupConResNet(
    name=opt.model, 
    head=opt.head, 
    feat_dim=opt.embedding_size,
    dropout=opt.dropout,
).to(DEVICE)
SupCon.load_state_dict(model_info["model"])
SupCon.eval()
print("SupCon model is switched to evaluation mode")

In [None]:
# get embedding array
inputloader = dataloader["train"]
targets = []
embeddings = torch.zeros((0, opt.embedding_size), dtype=torch.float32)
for data, label, _ in inputloader:
    data = data.to(DEVICE)
    embedding = SupCon(data)
    targets.extend(label.detach().cpu().tolist())
    embeddings = torch.cat((embeddings, embedding.detach().cpu()), dim=0)

X = np.array(embeddings)
y = np.array(targets)

In [None]:
tsne1 = TSNE(n_components=2, perplexity=30, learning_rate=200, random_state=42)
X_embedded1 = tsne1.fit_transform(X)

tsne2 = TSNE(n_components=2, perplexity=50, learning_rate=500, random_state=42)
X_embedded2 = tsne2.fit_transform(X)

# Visualize the t-SNE embeddings
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.scatter(X_embedded1[:, 0], X_embedded1[:, 1], c=y, cmap="jet")
plt.title("t-SNE with perplexity=30,\n learning_rate=200")

plt.subplot(1, 2, 2)
plt.scatter(X_embedded2[:, 0], X_embedded2[:, 1], c=y, cmap="jet")
plt.title("t-SNE with perplexity=50,\n learning_rate=500")

plt.show()

### [Back to List of Content](#list_of_content)

<hr style="border:2px solid grey">

## Step 5. FineTune Overall Model <a id="finetune"></a>

In [None]:
def get_model(model):
    if model == "SupCon":
        PRS_model = mdl.PRS_classifier(opt, num_classes=num_classes, pretrain=True).to(DEVICE)

        for param in PRS_model.encoder.parameters():
            param.requires_grad = False

        for param in PRS_model.classifier.parameters():
            param.requires_grad = True

        loss_fn = nn.NLLLoss()
        spike = False
        return PRS_model, loss_fn, spike
    
    elif model == "ResNet":
        PRS_classifier = mdl.simpleResNet(
            num_classes=num_classes,
        ).to(DEVICE)
        loss_fn = nn.NLLLoss()
        spike = False
        return PRS_classifier, loss_fn, spike
    
    elif model == "CNN":
        PRS_classifier = mdl.leanCNN(
            num_class=num_classes,
            input_fdim=N_F_BIN,
            input_tdim=INPUT_X_DIM,
        ).to(DEVICE)
        loss_fn = nn.NLLLoss()
        spike = False
        return PRS_classifier, loss_fn, spike
    
    elif model == "SNN":
        PRS_classifier = mdl.customSNet(
            num_steps=10, beta=0.5, num_class=num_classes
        ).to(DEVICE)
        loss_fn = SF.loss.ce_count_loss(False, False)
        spike = True
        return PRS_classifier, loss_fn, spike
    
    else:
        raise Exception(
            "No such model, please select within (SupCon, ResNet)"
            )


In [None]:
def iteration(model_name, strategy, epoch_num=10, num_iters=2, save_model=True):

    best_result = {}
    for item in strategy:
        best_result[item] = 0
    best_dict = {}

    print('Perform', model_name, 'Model Architecture\n')
    
    for i in range(num_iters):
        print('\nModel iter: {}/{}...\n'.format(i+1, num_iters))
        PRS_classifier, loss_fn, spike = get_model(model_name)
        print('Model Reset')
        
        optimizer = optim.Adam(filter(lambda p: p.requires_grad, PRS_classifier.parameters()), lr=0.001, weight_decay=0.0001) 

        best_iter_dict = train_model(
            device=DEVICE,
            task=task_in,
            dataloader=dataloader, 
            model=PRS_classifier, 
            criterion=loss_fn,
            optimizer=optimizer,
            n_epochs=epoch_num,
            print_every=1,
            verbose=True,
            plot_results=True, 
            validation=True,
            save_ckpt=False,
            spike=spike,
            model_name=model_name,
            strategy=strategy,
        )
        ### End of model and task customization <<<<<
        print('')

        for item in strategy:
            test_classifier,_,spike = get_model(model_name)
            test_classifier.load_state_dict(best_iter_dict[item]["model_state_dict"])
            test_classifier.eval()

            inter_score,*_ = test_model(
                device=DEVICE,
                task=main_task,
                dataloader=inter_testloader,
                trained_model=test_classifier,
                spike=spike
            )
            intra_score,*_ = test_model(
                device=DEVICE,
                task=main_task,
                dataloader=intra_testloader,
                trained_model=test_classifier,
                spike=spike
            )
            test_score = (inter_score + intra_score)/2

            print('Iter best test score under', item, 'is:', test_score)
            
            if test_score > best_result[item]:
                best_result[item] = test_score
                best_dict[item] = best_iter_dict[item]
            
            epoch,_,_,score, acc, loss = best_iter_dict[item].values()
            valloss = 1/loss
            log_msg = (
                f"FineTune - Model: {model_name}_{FEATURE}, Task: {task_in}. Epoch: {epoch_num}, "
                f"Strategy: {item}, "
                f"TestScore: {test_score:>0.2f} at epoch {epoch}, "
                f"ValScore: {score:>0.2f}, "
                f"ValAccuracy: {acc:>0.2f}, "
                f"ValLoss: {valloss:>0.2f}, "
                f"IntraScore: {intra_score:>0.2f}, "
                f"InterScore: {inter_score:>0.2f} "
            )
            logger.info(log_msg)
        print('')
    
    if save_model:
        PATH = "ckpts/FineTune-Models/{}_{}.pt".format(model_name, task_in)
        torch.save(best_dict, PATH)

    return best_result

In [None]:
strategy = ["score", "accuracy", "loss"]
best_results = iteration('CNN', strategy, epoch_num = 100, num_iters = 1)
print('\nBest results of iters: ', best_results)

### [Back to List of Content](#list_of_content)

<hr style="border:2px solid grey">

## Step 6. Evaluate Finetunned Model <a id="evaluate"></a>

In [None]:
STRATEGY = "score"
MODEL = "SupCon"
CLASSES = {
    "Task_11": {0:'N', 1:'A'},
    "Task_12": {0:'N', 1:'R', 2:'W', 3:'S', 4:'C', 5:'F', 6:'W&C'},
    "Task_21": {0:'N', 1:'P', 2:'A'},
    "Task_22": {0:'N', 1:'P', 2:'C', 3:'D', 4:'C&D'},
}

PRS_classifier,_,spike = get_model(MODEL)
PATH = "ckpts/FineTune-Models/{}_{}.pt".format(MODEL, task_in)
CheckPoint = torch.load(PATH)
PRS_classifier.load_state_dict(
    CheckPoint[STRATEGY]["model_state_dict"]
)
PRS_classifier.eval()
print("PRS model is switched to evaluation mode")

In [None]:
print("\nTrain Testing...")

train_score, truth, preds = test_model(
    device=DEVICE,
    task=main_task,
    dataloader=dataloader["train"],
    trained_model=PRS_classifier,
    verbose=True,
    spike=spike
)
cm = confusion_matrix(y_true=truth, y_pred=preds)
vs.plot_confusion_matrix(cm=cm, normalize=True, classes=CLASSES[task_in])

In [None]:
print("\nValid Testing...")

valid_score, truth, preds = test_model(
    device=DEVICE,
    task=main_task,
    dataloader=dataloader["val"],
    trained_model=PRS_classifier,
    verbose=True,
    spike=spike
)
cm = confusion_matrix(y_true=truth, y_pred=preds)
vs.plot_confusion_matrix(cm=cm, normalize=True, classes=CLASSES[task_in])

In [None]:
print("\nIntra Testing...")
    
intra_score, truth, preds = test_model(
    device=DEVICE,
    task=main_task,
    dataloader=intra_testloader,
    trained_model=PRS_classifier,
    verbose=True,
    spike=spike
)
cm = confusion_matrix(y_true=truth, y_pred=preds)
vs.plot_confusion_matrix(cm=cm, normalize=True, classes=CLASSES[task_in])

In [None]:
print("\nInter Testing...")

inter_score, truth, preds = test_model(
    device=DEVICE,
    task=main_task,
    dataloader=inter_testloader,
    trained_model=PRS_classifier,
    verbose=True,
    spike=spike
)
cm = confusion_matrix(y_true=truth, y_pred=preds)
vs.plot_confusion_matrix(cm=cm, normalize=True, classes=CLASSES[task_in])

In [None]:
# log results
log_msg = (
    f"Testing - Model: {opt.model}_{FEATURE}, Task: {opt.task_in}, "
    f"Train Score: {train_score:>0.2f}, "
    f"Valid Score: {valid_score:>0.2f}, "
    f"Intra Score: {intra_score:>0.2f}, "
    f"Inter Score: {inter_score:>0.2f} at {opt.model_name}/{opt.ckpt} "
)
logger.info(log_msg)

If decide to move into `models` folder please run next cell

In [None]:
ckpt_path = "ckpts/FineTune-Models/{}_{}.pt".format(MODEL, task_in)
mdls = torch.load(ckpt_path)
save_path = "models/{}/model.pt".format(task_in[-2:])
torch.save(mdls["score"], save_path)