In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import os
import pytorch_lightning as pl
from torch.utils.data import Dataset, DataLoader
from sklearn import model_selection
import torchvision.transforms as transforms
import torchvision.io 
import librosa
from PIL import Image
import albumentations as alb

In [None]:
class TransformationType:
    TORCHVISION = "torchvision"
    ALB = "albumentations"

class Models:
    RESNET34 = "resnet34"
    RESNET50 = "resnet50"
    RESNEXT50 = "resnext50_32x4d"    

class ImgStats:
    IMAGENET_MEAN = [0.485, 0.456, 0.406]
    IMAGENET_STD = [0.229, 0.224, 0.225]    

# CONSTANTS
class Config:
    NUM_CLASSES = 5
    BATCH_SIZE = 96
    NUM_FOLDS = 5
    UNFREEZE_EPOCH_NO = 1
    NUM_EPOCHS = 5
    NUM_WORKERS = 8
    INPUT_IMAGE_SIZE = (128,128)
    IMG_MEAN = ImgStats.IMAGENET_MEAN
    IMG_STD = ImgStats.IMAGENET_STD
    FAST_DEV_RUN = True
    PRECISION = 16
    DATA_ROOT_FOLDER = "./data/"
    PATIENCE = 5
    SUBSET_ROWS_FRAC = 0.1
    TRAIN_ON_SUBSET = True
    RANDOM_SEED = 42
    MODEL_TO_USE = Models.RESNET34
    # model hyperparameters
    MODEL_PARAMS = {    
        "drop_out": 0.25,
        "lr": 0.00036
    }

In [None]:
class AudioConfig:
    # settings
    hop_length = 512 # number of samples per time-step in spectrogram
    n_mels = 128 # number of bins in spectrogram. Height of image
    time_steps = 384 # number of time-steps. Width of image
    sampling_rate = 22050
    duration = 10 # sec
    fmin = 20
    fmax = sampling_rate // 2
    n_mels = 128
    n_fft = hop_length * 2
    padmode = 'constant'
    samples = sampling_rate * duration

In [None]:

# split the training dataframe into kfolds for cross validation. We do this before any processing is done
# on the data. We use stratified kfold if the target distribution is unbalanced
def strat_kfold_dataframe(df, target_col_name, num_folds=Config.NUM_FOLDS):
    # we create a new column called kfold and fill it with -1
    df["kfold"] = -1
    # randomize of shuffle the rows of dataframe before splitting is done
    df = df.sample(frac=1, random_state=Config.RANDOM_SEED).reset_index(drop=True)
    # get the target data
    y = df[target_col_name].values
    skf = model_selection.StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=Config.RANDOM_SEED)
    for fold, (train_index, val_index) in enumerate(skf.split(X=df, y=y)):
        df.loc[val_index, "kfold"] = fold    
    return df     

df_train = pd.read_csv(Config.DATA_ROOT_FOLDER + "train.csv")
# filter out records without any corresponding mel spectrogram image
df_train["mspec_exists"] = df_train.filename.map(
    lambda fp: os.path.exists(Config.DATA_ROOT_FOLDER + "processed_train/mel_spec/" + fp.split(".")[0] + ".jpg")
)
df_train = df_train[df_train.mspec_exists]
df_train = strat_kfold_dataframe(df_train, target_col_name="genre_id")
df_train.head()

In [None]:
Config.NUM_CLASSES = len(df_train.genre_id.unique())

In [None]:
# A dataset contains the logic to fetch, load and if required transform data to bring it to a format
# that can be used by dataloaders for training. Image size is (128, 385, 3)
class AudioMelSpecImgDataset(Dataset):
    def __init__(self, df, file_name_col, target_col, img_root_folder, transform=None, target_transform=None):
        self.df = df
        self.file_name_col = file_name_col
        self.target_col = target_col
        self.img_root_folder = img_root_folder
        self.transform = transform
        self.target_transform = target_transform

    def __getitem__(self, index):        
        file_name_noext = self.df.loc[index, self.file_name_col].split(".")[0]
        img_path = self.img_root_folder + "/" + file_name_noext + ".jpg"
        img = Image.open(img_path)
        img_label = self.df.loc[index, self.target_col]
        if self.transform is not None:
            img_tfmd = self.transform(img)            
        if self.target_transform is not None:
            img_label = self.target_transform(img_label)
        return img_tfmd, img_label

    def __len__(self):
        return len(self.df)

In [None]:
from torchvision.transforms import ToTensor, RandomResizedCrop, ToPILImage

train_transform = transforms.Compose([
        #ToPILImage(),
        RandomResizedCrop(size=(Config.INPUT_IMAGE_SIZE[0], Config.INPUT_IMAGE_SIZE[1])),                
        ToTensor()
])

val_transform = transforms.Compose([
        #ToPILImage(),
        RandomResizedCrop(size=(Config.INPUT_IMAGE_SIZE[0], Config.INPUT_IMAGE_SIZE[1])),        
        ToTensor()        
])

In [None]:
def get_fold_dls(fold, df_imgs, img_root_folder):
    df_train = df_imgs[df_imgs["kfold"] != fold].reset_index(drop=True)
    df_val = df_imgs[df_imgs["kfold"] == fold].reset_index(drop=True)    
    ds_train = AudioMelSpecImgDataset(
        df_train, 
        file_name_col="filename",
        target_col="genre_id",
        img_root_folder=img_root_folder,
        transform=train_transform,
        target_transform=torch.as_tensor
    )
    ds_val = AudioMelSpecImgDataset(
        df_val, 
        file_name_col="filename",
        target_col="genre_id",
        img_root_folder=img_root_folder,
        transform=val_transform,
        target_transform=torch.as_tensor
    )        
    dl_train = DataLoader(ds_train, batch_size=Config.BATCH_SIZE, shuffle=True, num_workers=Config.NUM_WORKERS)    
    dl_val = DataLoader(ds_val, batch_size=Config.BATCH_SIZE, num_workers=Config.NUM_WORKERS)
    return dl_train, dl_val, ds_train, ds_val

In [None]:
# display images along with their labels from a batch where images are in form of numpy arrays 
# if predictions are provided along with labels, these are displayed too
def show_batch(img_ds, num_items, num_rows, num_cols, predict_arr=None):
    fig = plt.figure(figsize=(12, 6))    
    img_index = np.random.randint(0, len(img_ds)-1, num_items)
    for index, img_index in enumerate(img_index):  # list first 9 images
        img, lb = img_ds[img_index]        
        ax = fig.add_subplot(num_rows, num_cols, index + 1, xticks=[], yticks=[])
        if isinstance(img, torch.Tensor):
            img = img.detach().numpy()
        if isinstance(img, np.ndarray):
            # the image data has RGB channels at dim 0, the shape of 3, 64, 64 needs to be 64, 64, 3 for display            
            img = img.transpose(1, 2, 0)
            ax.imshow(img)        
        if isinstance(lb, torch.Tensor):
            # extract the label from label tensor
            lb = lb.item()            
        title = f"Actual: {lb}"
        if predict_arr: 
            title += f", Pred: {predict_arr[img_index]}"        
        ax.set_title(title)  

In [None]:
# img_path = "./data/processed_train/mel_spec/000001.jpg"
# img = Image.open(img_path)
# print(type(img))
# img_arr = np.array(img)
# img_arr = np.stack([img_arr]*3, axis=-1)
# plt.imshow(img)
# print(img_arr.shape)

In [None]:
# print(img_arr.shape)
# img = transforms.ToPILImage()(img_arr)
# print(type(img))
# img_tfmd = val_transform(img)
# img_np = img_tfmd.detach().numpy()
# if isinstance(img_np, np.ndarray):
#     # the image data has RGB channels at dim 0, the shape of 3, 64, 64 needs to be 64, 64, 3 for display            
#     img_np = img_np.transpose(1, 2, 0)
#     plt.imshow(img_np)
#     print(img_np.shape)

In [None]:
dl_train, dl_val, ds_train, ds_val = get_fold_dls(0, df_train, Config.DATA_ROOT_FOLDER + "/processed_train/mel_spec")
show_batch(ds_val, 8, 2, 4)

In [32]:
import torchvision.models as models
import torch.nn as nn
from torch.nn.functional import cross_entropy
import torchmetrics
import timm

class ImageClassificationLitModel(pl.LightningModule):
    def __init__(self, num_classes, hparams, model_to_use):
        super().__init__()
        self.save_hyperparameters()
        self.lr = hparams["lr"]
        self.num_classes = num_classes              
        self.backbone, self.classifier = self.get_backbone_classifier(model_to_use, hparams["drop_out"], num_classes) 

    @staticmethod
    def get_backbone_classifier(model_to_use, drop_out, num_classes):
        pt_model = timm.create_model(model_to_use, pretrained=True)
        backbone = None
        classifier = None
        if model_to_use in [Models.RESNET34, Models.RESNET50, Models.RESNEXT50]:            
            backbone = nn.Sequential(*list(pt_model.children())[:-1])
            in_features = pt_model.fc.in_features
            classifier = nn.Sequential(
                nn.Dropout(drop_out),
                nn.Linear(in_features, num_classes)
            )    
        return backbone, classifier

    def forward(self, x):
        features = self.backbone(x)
        features = torch.flatten(features, 1)                
        x = self.classifier(features)
        return x

    def configure_optimizers(self):
        model_optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, self.parameters()), lr=self.lr)
        lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(model_optimizer, "min")        
        return {
            "optimizer": model_optimizer, 
            "lr_scheduler": {
                "scheduler": lr_scheduler,
                "monitor": "val_loss",
                "frequency": 1
            }
        }

    def training_step(self, batch, batch_idx):
        X, y = batch
        y_pred = self(X)
        loss = cross_entropy(y_pred, y)
        train_f1 = torchmetrics.functional.f1(preds=y_pred, target=y, num_classes=self.num_classes, average="micro")
        self.log("train_loss", loss, on_step=True, on_epoch=True, logger=True, prog_bar=True)
        self.log("train_f1", train_f1, on_step=True, on_epoch=True, logger=True, prog_bar=True)
        return loss        

    def validation_step(self, batch, batch_idx):
        X, y = batch
        y_pred = self(X)
        val_loss = cross_entropy(y_pred, y)
        val_f1 = torchmetrics.functional.f1(preds=y_pred, target=y, num_classes=self.num_classes, average="micro")
        self.log("val_loss", val_loss, on_step=True, on_epoch=True, logger=True, prog_bar=True)
        self.log("val_f1", val_f1, on_step=True, on_epoch=True, logger=True, prog_bar=True)
        return {"loss": val_loss, "val_f1": val_f1}

In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint, BackboneFinetuning, EarlyStopping

# For results reproducibility 
# sets seeds for numpy, torch, python.random and PYTHONHASHSEED.
pl.seed_everything(Config.RANDOM_SEED, workers=True)

In [None]:
from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.callbacks import Callback

class MetricsAggCallback(Callback):
    def __init__(self, metric_to_monitor, mode):
        self.metric_to_monitor = metric_to_monitor
        self.metrics = []
        self.best_metric = None
        self.mode = mode
        self.best_metric_epoch = None

    def on_validation_epoch_end(self, trainer: Trainer, pl_module: LightningModule):
        metric_value = trainer.callback_metrics[self.metric_to_monitor].cpu().detach().item()
        val_loss = trainer.callback_metrics["val_loss"].cpu().detach().item()
        print(f"metric {self.metric_to_monitor} = {metric_value}, val_loss={val_loss}")        
        self.metrics.append(metric_value)
        if self.mode == "max":
            self.best_metric = max(self.metrics)
            self.best_metric_epoch = self.metrics.index(self.best_metric)        

In [None]:
def run_training(fold, dl_train, dl_val, fold_loss, fold_acc, find_lr=True):
        fold_str = f"fold{fold}"
        print(f"Running training for {fold_str}")
        tb_logger = None
        chkpt_file_name = "best_model_{epoch}_{val_loss:.4f}"        
        multiplicative = lambda epoch: 1.5
        backbone_finetuning = BackboneFinetuning(Config.UNFREEZE_EPOCH_NO, multiplicative, verbose=True)
        early_stopping_callback = EarlyStopping(monitor="val_loss", patience=Config.PATIENCE, mode="min", verbose=True)        
        if fold is not None:       
            chkpt_file_name = fold_str + "_" + chkpt_file_name
            tb_logger = pl.loggers.TensorBoardLogger(save_dir="logs", version=fold_str)
        else:
            tb_logger = pl.loggers.TensorBoardLogger(save_dir="logs")        
        cassava_model = ImageClassificationLitModel(
            num_classes=Config.NUM_CLASSES, 
            hparams=Config.MODEL_PARAMS,        
            model_to_use=Config.MODEL_TO_USE
        )    
        loss_chkpt_callback = ModelCheckpoint(dirpath="./model", verbose=True, monitor="val_loss", mode="min", filename=chkpt_file_name)
        acc_chkpt_callback = MetricsAggCallback(metric_to_monitor="val_acc", mode="max")
        trainer = pl.Trainer(
            gpus=1,
            # For results reproducibility 
            deterministic=True,
            auto_select_gpus=True,
            progress_bar_refresh_rate=20,
            max_epochs=Config.NUM_EPOCHS,
            logger=tb_logger,
            auto_lr_find=True,    
            precision=Config.PRECISION,    
            weights_summary=None, 
            fast_dev_run=Config.FAST_DEV_RUN,                   
            callbacks=[loss_chkpt_callback, acc_chkpt_callback, backbone_finetuning, early_stopping_callback]
        )
        if find_lr:
            trainer.tune(model=cassava_model, train_dataloaders=dl_train)
            print(cassava_model.lr)
        trainer.fit(cassava_model, train_dataloaders=dl_train, val_dataloaders=dl_val)                
        if not Config.FAST_DEV_RUN:
            fold_loss.append(loss_chkpt_callback.best_model_score.cpu().detach().item())
            fold_acc.append(acc_chkpt_callback.best_metric)
            print(f"Loss for {fold_str} = {fold_loss[fold]}, accuracy = {fold_acc[fold]}")
        del trainer, cassava_model, backbone_finetuning, early_stopping_callback, acc_chkpt_callback, loss_chkpt_callback 

In [None]:
import statistics

def print_exp_statistics(fold_loss, fold_acc):
    print("Loss across folds")
    print(fold_loss)
    print("Accuracy across folds")
    print(fold_acc)
    #mean_loss = statistics.mean(fold_loss)
    #mean_acc = statistics.mean(fold_acc)
    #std_loss = statistics.stdev(fold_loss)
    #std_acc = statistics.stdev(fold_acc)
    #print(f"mean loss across folds = {mean_loss}, loss stdev across fold = {std_loss}")
    #print(f"mean accuracy across folds = {mean_acc}, accuracy stdev across fold = {std_acc}")

In [None]:
find_lr = True
fold_loss = []
fold_acc = []
for fold in range(Config.NUM_FOLDS):
    dl_train, dl_val, ds_train, ds_val = get_fold_dls(fold, df_train)
    run_training(fold, dl_train, dl_val, fold_loss, fold_acc, find_lr)
    break  
print_exp_statistics(fold_loss, fold_acc)       