In [5]:
import pandas as pd
import numpy as np
import os
import torch
import numpy as np
import torch.nn.functional as F
import librosa
from sklearn.model_selection import train_test_split
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
from torchvision import models
import torch.nn as nn
import warnings
from tqdm.notebook import tqdm

In [27]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print(device)

In [28]:
esc_10 = False


In [None]:
trainPath = '/esc/audio/audio/16000/'
trainData = pd.read_csv('/esc/esc50.csv')
if esc_10:
    new_df = trainData[trainData["esc10"]==True].reset_index()
    categories = list(set(list(new_df.category.values)))
    for i in range(len(new_df)):
        cat = new_df.loc[i, "category"]
        new_df.loc[i, "target"] = categories.index(cat)
    trainData = new_df
print(len(trainData))
trainData.head()

In [29]:
class Dataset(Dataset):
    def __init__(self, dataframe, fold=None, val=False, test=False):
        
        self.fold = fold  
        all_folds = [1, 2, 3, 4, 5]
        test_fold = 5-fold
        all_folds.remove(test_fold)
        if test==False:
            df = dataframe[dataframe['fold'].isin(all_folds)]
            train_df, val_df = train_test_split(df, test_size=0.1, random_state=42)
            if val ==True:
                self.dataframe = val_df
            else:
                self.dataframe = train_df
               
        elif test==True:
            self.dataframe = dataframe[dataframe['fold'] == test_fold]
    def __getitem__(self, index):
        path_to_file = self.get_path_to_file(index)
        signal = self.preprocess_signal(path_to_file)

        x = np.stack([cv2.resize(signal, (224, 224)) for _ in range(3)])
        y = self.dataframe.target.values[index]
        return torch.tensor(x, dtype=torch.float), int(y)
    
    def get_path_to_file(self, index):
        return f'{trainPath}/{self.dataframe.filename.values[index]}'
    def preprocess_signal(self, path_to_file):
        signal, _ = librosa.load(path_to_file, sr=16000)
        signal = librosa.feature.melspectrogram(y=signal)
        signal = librosa.power_to_db(signal)
        return signal

    def __len__(self):
        return self.dataframe.shape[0]

In [30]:
# Add a linear layer for classification (after training SimCLR model)
class Classifier(torch.nn.Module):
    def __init__(self, feature_dim, num_classes):
        super(Classifier, self).__init__()
        self.fc = torch.nn.Linear(feature_dim, num_classes)

    def forward(self, x):
        x = self.fc(x)
        return x


num_classes = 10  # Set the number of classes in UrbanSound8K
feature_dim= 2048
classifier = Classifier(feature_dim, num_classes).to(device)

In [None]:
counts = trainData.target.value_counts()
num_classes = len(counts)
counts

In [None]:
batch_size = 32
fold = 0
trainSet = Dataset(trainData, fold=fold)
valSet = Dataset(trainData, fold=fold, val=True)
testSet = Dataset(trainData, fold=fold, test=True)
trainLoader = DataLoader(trainSet, batch_size=batch_size, shuffle=True)
valLoader = DataLoader(valSet , batch_size=batch_size)
testLoader = DataLoader(testSet , batch_size=batch_size)

print('Training set: {}, Validation set: {}, Test Set: {}'.format(len(trainSet), len(valSet), len(testSet)))

print("Folds of training set:", set(list(trainSet.dataframe["fold"].values)))
print("Folds of validation set:", set(list(valSet.dataframe["fold"].values)))
print("Folds of test set:", set
(list(testSet.dataframe["fold"].values)))

In [None]:
class CustomResNet50(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomResNet50, self).__init__()
        # Load a pre-trained ResNet50
        self.resnet50 = models.resnet50(pretrained=True)
        # Freeze all layers in ResNet50
        for param in self.resnet50.parameters():
            param.requires_grad = True  # Set true to unfreeze

        # Get the input features of the original fully connected layer
        in_features = self.resnet50.fc.in_features

        # Replace the fully connected layer with custom layers
        self.resnet50.fc = nn.Sequential(
            nn.Linear(in_features, 512),
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(512, 256),
            nn.ReLU(),
            #nn.Dropout(0.2),
            nn.Linear(256, num_classes),
        )

        # Unfreeze the custom fully connected layers
        for param in self.resnet50.fc.parameters():
            param.requires_grad = True

    def forward(self, x):
        return self.resnet50(x)

def get_pretext_loaded(model):
    backbone_weights = torch.load('/kaggle/input/weights/resnet_pretext_model_esc50_3classes1.pth', map_location='cpu')
    model_dict = dict(model.resnet50.state_dict())
    #print(backbone_weights["model_state"].keys())
    for key, weights in backbone_weights["model_state"].items():
        a = key[9:]
        if a.find('fc') == -1: #key should not contain fc
            model_dict[a] = weights #update all keys except fc
    
    model.resnet50.load_state_dict(model_dict)
    a = model_dict['conv1.weight'][0]
    b = backbone_weights["model_state"]["resnet50.conv1.weight"][0]
    
    if np.array_equal(np.array(a), np.array(b)):
        print("Backbone Loaded")
    else:
        print("Backbone not loaded")
    return model

In [None]:
try:
    df = pd.read_csv("training_metadata.csv")
except:

    df = pd.DataFrame({"Fold": [], "Epoch": [], "Train Loss": [], "Val Loss": [], "Train Acc": [], "Val Acc": [], "Test Acc": []})

In [None]:
from time import time 
warnings.filterwarnings("ignore")

start_time = time()
folds = 5
for fold in range(2, folds):
    batch_size = 32
    trainSet = Dataset(trainData, fold=fold)
    valSet = Dataset(trainData, fold=fold, val=True)
    testSet = Dataset(trainData, fold=fold, test=True)
    trainLoader = DataLoader(trainSet, batch_size=batch_size, shuffle=True, num_workers = 2)
    valLoader = DataLoader(valSet , batch_size=batch_size, num_workers = 2)
    testLoader = DataLoader(testSet , batch_size=batch_size, num_workers = 2)

    print('Training set: {}, Validation set: {}, Test Set: {}'.format(len(trainSet), len(valSet), len(testSet)))
    print("Folds of training set:", set(list(trainSet.dataframe["fold"].values)))
    print("Folds of validation set:", set(list(valSet.dataframe["fold"].values)))
    print("Folds of test set:", set(list(testSet.dataframe["fold"].values)))

    model = get_pretext_loaded(CustomResNet50(num_classes=50))
    
    if torch.cuda.device_count() >= 2:
        print(f"Using {torch.cuda.device_count()} GPUs!")
        device = torch.device("cuda")
        model = torch.nn.DataParallel(model)  # Wrap the model for multi-GPU use
        model.to(device)
    else:
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        model.to(device)
        print("Using single GPU or CPU")
        
    epochs = 50
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    cost = torch.nn.CrossEntropyLoss()
    best_val_accuracy = 0.0
    best_val_loss = 10000
    checkpoint_path = f"resnet_proposed_esc_50_model_fold{fold+1}.pth"
    

    if os.path.isfile(checkpoint_path):
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state'])
        optimizer.load_state_dict(checkpoint['optimizer_state'])
        best_val_accuracy = checkpoint['best_val_accuracy']
        start_epoch = checkpoint['epoch'] + 1
        print("Resuming training from epoch: "+str(start_epoch) +" with best_val_accuracy: "+str(best_val_accuracy))
    else:
        start_epoch = 0
    
    for epoch in range(start_epoch, epochs):
        train_loss = 0
        val_loss = 0
        train_correct = 0
        val_correct = 0
        
        model.train()
        for x, y in tqdm(trainLoader):
            optimizer.zero_grad()
            x,y = x.to(device),y.to(device)
            pred = model(x)
            loss = cost(pred, y)
            train_loss += cost(pred, y).item()
            train_correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            loss.backward()
            optimizer.step()

        model.eval()
        with torch.no_grad():
            for x, y in tqdm(valLoader):
                x,y = x.to(device),y.to(device)
                pred = model(x)
                loss = cost(pred, y)
                val_loss += cost(pred, y).item()
                val_correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        train_loss = train_loss/len(trainLoader)
        val_loss = val_loss/len(valLoader)
        train_accuracy = train_correct / len(trainData)
        val_accuracy = val_correct / len(valSet)
        
        if val_loss < best_val_loss:
            print(f"Validation loss has improved from {best_val_loss} to {val_loss}")
            best_val_loss = val_loss
            
        print("epoch = %d, train_loss = %.5f, val_loss = %.5f, train_accuracy = %.5f, val_accuracy = %.5f" % (epoch, train_loss, val_loss, train_accuracy, val_accuracy))
        
        #saving the model when the val_accuracy improves
        if val_accuracy > best_val_accuracy:
            print(f"Validation Accuracy improved from {best_val_accuracy:.5f} to {val_accuracy:.5f}. Saving checkpoint.")
            best_val_accuracy = val_accuracy
            torch.save({
                'epoch': epoch,
                'model_state': model.state_dict(),
                'optimizer_state': optimizer.state_dict(),
                'best_val_accuracy': best_val_accuracy
            }, f"resnet_proposed_esc_50_model_fold{fold+1}.pth")
        df2 = pd.DataFrame({"Fold": [fold+1], "Epoch": [epoch], "Train Loss": [train_loss], "Val Loss": [val_loss], "Train Acc": [train_accuracy], "Val Acc": [val_accuracy], "Test Acc": [0]})
        df = pd.concat([df, df2], ignore_index=True)
        df.to_csv("training_metadata.csv", index=False)
        print(df.tail(1))
    #getting final best validation accuracy here
    
    checkpoint_path = f"resnet_proposed_esc_50_model_fold{fold+1}.pth"
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state'])
    model.eval()
    test_correct = 0
    for x, y in tqdm(testLoader):
        x,y = x.to(device),y.to(device)
        pred = model(x)
        pred = torch.softmax(pred, axis=1)       
        test_correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_accuracy = test_correct / len(testSet)
    print(f"Test accuracy for fold: {fold+1} is: {test_accuracy*100}")
    #change test accuracy in dataframe where fold = fold and epoch is max
    df.loc[(df.Fold == fold) & (df.Epoch == df[df.Fold == fold].Epoch.max()), "Test Acc"] = test_accuracy
    df.to_csv("training_metadata.csv", index=False)
        
end_time = time()
total_time = end_time - start_time
print(f'Total Training Time: {total_time:.2f} seconds')