In [None]:
import torch.nn as nn
from torch.optim import Adam
import torch
from torch.utils.data import DataLoader, SubsetRandomSampler, random_split
from torchvision import datasets, transforms, models
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
import numpy as np
import os
import random
import warnings
import tqdm.notebook as tqdm
import matplotlib.pyplot as plt
import torchvision
from google.colab import drive
import copy
import pandas as pd


# looked at pytorch documentation

device = 'cuda' if torch.cuda.is_available() else 'cpu'
drive.mount("/content/drive")

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


device(type='cuda', index=0)

In [None]:
# Hyperparameters
learning_rate = 0.0001
batch_size = 30
num_epochs = 50
weight_decay = 0.05


In [None]:
# Load Data
data_path = '/content/drive/MyDrive/DL_FP'
path = '/content/drive/MyDrive/DL_FP_ViT'


# Load weights
weights = torchvision.models.ViT_B_16_Weights.DEFAULT



In [None]:
# Similarly for testing, we just normalize it without any augmentation
transforms_ = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])


In [None]:
percent_data_used = 1

# Load dataset
transform = transforms.Compose([transforms.ToTensor()])
full_dataset = datasets.ImageFolder(root=data_path + '/spectrograms/',transform=transform)

# Determine the number of samples to take for 10%
num_samples = len(full_dataset)
num_subset = int(percent_data_used*num_samples)

# Perform the split
subset, _ = random_split(full_dataset, [num_subset, num_samples - num_subset])

# Select the dataset to use

dataset = full_dataset

# Define the indices
indices = list(range(len(dataset)))
np.random.shuffle(indices)

# Split the data into training (60%), validation (20%) and testing (20%)
train_split = int(np.floor(0.6 * len(dataset)))
valid_split = int(np.floor(0.8 * len(dataset)))

train_indices = indices[:train_split]
valid_indices = indices[train_split:valid_split]
test_indices = indices[valid_split:]

# Create Samplers
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(valid_indices)
test_sampler = SubsetRandomSampler(test_indices)

# Create DataLoaders
dataloaders = {
    'train': DataLoader(dataset, batch_size=batch_size, sampler=train_sampler, num_workers=4),
    'valid': DataLoader(dataset, batch_size=batch_size, sampler=valid_sampler, num_workers=4),
    'test': DataLoader(dataset, batch_size=batch_size, sampler=test_sampler, num_workers=4)
}

# Apply different transforms to the data
dataloaders['train'].dataset.transform = transforms_
dataloaders['valid'].dataset.transform = transforms_
dataloaders['test'].dataset.transform = transforms_

# Create DataLoaders
dataset_sizes = {x: len(dataloaders[x].sampler) for x in dataloaders.keys()}



In [None]:
train_loader = dataloaders['train']
test_loader = dataloaders['test']
vlid_loader = dataloaders['valid']

In [None]:
# construct the model

preprocess = weights.transforms()

model = torchvision.models.vit_b_16(weights=weights)

# freeze all the parameters

for param in model.parameters():
  param.requires_grad = False

# Unfreeze the last swin transformer block
for param in model.encoder.layers.encoder_layer_11.parameters():
  param.requires_grad = True

for param in model.encoder.ln.parameters():
  param.requires_grad = True

# change the last fc layer
num_inft = model.heads.head.in_features
model.heads.head = nn.Linear(num_inft, 8)
model = model.to(device)


In [None]:
new_p = 0.85

model.encoder.layers.encoder_layer_11.dropout.p = new_p
model.encoder.layers.encoder_layer_11.mlp[2].p = new_p
model.encoder.layers.encoder_layer_11.mlp[4].p = new_p

In [None]:
def get_loss(model: nn.Module, test_loader: torch.utils.data.DataLoader):

  total_loss = 0
  n_batches = 0
  correct = 0
  total = 0

  # since we're not training, we don't need to calculate the gradients for our outputs
  with torch.no_grad():
      for data in tqdm.tqdm(test_loader, colour='green', desc='test', leave=False):
          images, labels = data
          images = images.cuda()
          labels = labels.cuda()
          outputs = model(images)
          total_loss += criterion(outputs, labels).item()
          
          n_batches += 1
          total += len(labels)
           
          pred_labels = outputs.argmax(dim=1)
          correct += (pred_labels == labels).sum().item()
           
  return (total_loss / n_batches), (correct / total)


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

def percision_recall_F1(model, dataloader, num_classes):

    total_labels = []
    total_predictions = []

    with torch.no_grad():
        for inputs, labels in dataloader:

            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            predicted = outputs.argmax(dim=1)
            total_labels.extend(labels.cpu().numpy())
            total_predictions.extend(predicted.cpu().numpy())
            
    labels = total_labels
    predicted = total_predictions

    precision = precision_score(labels, predicted, average="macro")
    recall = recall_score(labels, predicted, average="macro")
    f1 = f1_score(labels, predicted, average="macro")

    print(f'Precision: {precision} \n Recall: {recall} \n F1: {f1}')

    return precision, recall

In [None]:


checkpoint = ""
prev_epoch = 0

if checkpoint != "":
  checkpoint = torch.load(path + f"/checkpoints/checkpoint_{prev_epoch}.pth" )
  model.load_state_dict(checkpoint['model_state_dict'])
  optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
  epoch = checkpoint['epoch']
  loss = checkpoint['loss']

In [None]:
from typing import Tuple
import tqdm.notebook as tqdm
import torch.optim as optim


criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

In [None]:
log_freq = 100
checkpoint_interval = 20

train_losses = []
valid_losses = []
train_accs = []
valid_accs = []
total_steps = 0

best_acc = 0.0
best_loss = 100
best_model_wts = copy.deepcopy(model.state_dict())

for epoch in tqdm.trange(num_epochs, desc='Epoch', colour='pink'):  # loop over the dataset multiple times
    running_loss = 0.0
    running_loss_steps = 0
    num_train_predictions_correct = 0
    num_train_predictions_total = 0

    for i, data in enumerate(tqdm.tqdm(train_loader, desc='batch', colour='blue', leave=False), 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs = inputs.cuda()
        labels = labels.cuda()

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        optimizer.step()

        # compute train accuracy
        pred_labels = outputs.argmax(dim=1)
        num_train_predictions_correct += (pred_labels == labels).sum().item()
        num_train_predictions_total += len(pred_labels)

        # # print statistics
        total_steps += 1
        running_loss += loss.item()
        running_loss_steps += 1


    valid_loss, valid_acc = get_loss(model, vlid_loader)
    avg_train_loss = running_loss / running_loss_steps
    avg_train_acc = (num_train_predictions_correct / num_train_predictions_total)
    print(f'[Step {total_steps}] train_loss: {avg_train_loss:.3f} || valid_loss = {valid_loss:.3f}')
    print(f'\t\t train_acc={avg_train_acc*100:.1f}% || valid_acc={valid_acc*100:.1f}%')
    # 
    train_losses.append(avg_train_loss)
    valid_losses.append(valid_loss)
    train_accs.append(avg_train_acc)
    valid_accs.append(valid_acc)
    # 
    num_train_predictions_correct = 0
    num_train_predictions_total = 0
    running_loss = 0.0
    running_loss_steps = 0

    if valid_loss < best_loss:
      best_loss = valid_loss
      print(f"model saved. loss:{valid_loss}")
      torch.save(model.state_dict(), path + "/best_model")

    if epoch % checkpoint_interval == 0:
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
            # You can include more stuff here...
        }, f'/content/drive/MyDrive/DL_FP_ViT/checkpoints/checkpoint_{epoch}.pth')


print('Finished Training')




Epoch:   0%|          | 0/50 [00:00<?, ?it/s]

In [None]:
import pandas as pd

performance = pd.DataFrame({"train_losses": train_losses, "valid_losse": valid_losses, "train_accs": train_accs, "valid_accs": valid_accs})

In [None]:
performance.to_csv(path + "/performances_may_17_12pm.csv")
# performance = pd.read_csv(path + "/performances_may_17_12pm.csv")

In [None]:
torch.save(model.state_dict(), "/content/drive/MyDrive/DL_FP_ViT/model")

In [None]:
percision_recall_F1(model, test_loader, 8)

In [None]:
# Load training weight

best_model_wts = torch.load("/content/drive/MyDrive/DL_FP_ViT/best_model")
model.load_state_dict(best_model_wts)
get_loss(model, test_loader)

In [None]:
def plot_data(train_accuracy, test_accuracy, train_loss, test_loss):
    epochs = range(1, len(train_accuracy) + 1)

    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train_accuracy, color='magenta', label='Train Accuracy', linestyle='-')
    plt.plot(epochs, test_accuracy, color='turquoise', label='Validation Accuracy', linestyle='-')
    plt.title('ViT Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)
    plt.show()

    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train_loss, color='magenta', label='Train Loss', linestyle='-')
    plt.plot(epochs, test_loss, color='turquoise', label='Validation Loss', linestyle='-')
    plt.title('ViT Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

plot_data(performance["train_accs"], performance["valid_accs"],performance["train_losses"],performance["valid_losse"])

In [None]:
import seaborn as sns

def plot_confusion_matrix(model, dataloader, num_classes):


    total_labels = []
    total_predictions = []

    with torch.no_grad():
        for inputs, labels in dataloader:

            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            predicted = outputs.argmax(dim=1)
            total_labels.extend(labels.cpu().numpy())
            total_predictions.extend(predicted.cpu().numpy())

            # for i in range(len(labels)):
            #     label = labels[i]
            #     correct_predictions[label] += (predicted[i] == label).item()
            #     total_predictions[label] += 1

    # Compute the confusion matrix
    

    genres = ["Electronic", "Experimental", "Folk", "Hip-Hop", "Instrumental", "International", "Pop", "Rock"]

    cm = confusion_matrix(total_labels, total_predictions)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="cool", xticklabels=genres, yticklabels=genres)
    plt.xticks(rotation=30)

    plt.title("ViT_B_16 Confusion Matrix")
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

plot_confusion_matrix(model,test_loader, 8)

