<a href="https://colab.research.google.com/github/FionaZZhang/DeepLearnMuse/blob/main/pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from sklearn.model_selection import GroupShuffleSplit
from PIL import Image
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision.transforms import ToTensor
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch.nn as nn
import torchvision.models as models
from torchvision.transforms import Compose, Resize, ToTensor
from PIL import Image
from torch.utils.data import Dataset
import torch.nn.functional as F

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# model settings
MUSIC = "/content/drive/MyDrive/ColabNotebooks/mel_spectrograms"
FEATURES = "/content/drive/MyDrive/ColabNotebooks/features_30_sec.csv"
EPOCH = 100
BATCH_SIZE = 64
MODEL_NAME = "testing"
PATIENCE = 10
device = torch.device('cuda')


# Data Loader and Model ------------------------------------------------------------------------------------------------
# Data loader

class MusicDataset(Dataset):
    def __init__(self, img_paths, labels, transform=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        label = self.labels[idx]
        return img, label

transform = Compose([
    Resize((224, 224)),
    ToTensor()
])

class resnet18(nn.Module):
    def __init__(self, num_classes):
        super(resnet18, self).__init__()

        # Use a pre-trained ResNet model
        self.base_model = models.resnet18(weights=True)
        num_ftrs = self.base_model.fc.in_features
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])  # remove last layer
        self.dropout = nn.Dropout(p=0.5)
        # self.batchnorm = nn.BatchNorm1d(num_ftrs)
        self.leakyrelu = nn.LeakyReLU(0.1)
        self._classifier = nn.Linear(num_ftrs, num_classes)
        # self._init_weights()

    def forward(self, x):
        x = self.base_model(x)
        x = x.view(x.size(0), -1)
        # x = self.leakyrelu(x)
        x = self.dropout(x)
        score = self._classifier(x)
        return score

    def _init_weights(self) -> None:
        for layer in self.base_model.modules():
            if isinstance(layer, nn.Conv2d):
                nn.init.kaiming_uniform_(layer.weight)
            elif isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)

class ResNetLSTM(nn.Module):
    def __init__(self, num_classes):
        super(ResNetLSTM, self).__init__()

        # Use a pre-trained ResNet model
        self.base_model = models.resnet18(pretrained=True)
        num_ftrs = self.base_model.fc.in_features
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])  # remove last layer

        self.lstm = nn.LSTM(num_ftrs, 512, batch_first=True, bidirectional=True)  # LSTM layer
        self._classifier = nn.Linear(512, num_classes)  # classifier layer based on LSTM output

        self._init_weights()

    def forward(self, x):
        x = self.base_model(x)

        # The output from ResNet is a 3D tensor;
        # We consider the two last dimensions as defining a sequence and apply LSTM
        x = x.view(x.size(0), x.size(1), -1).transpose(1, 2)
        x, _ = self.lstm(x)
        x = x[:, -1, :]  # We take the output of the LSTM at the last timestep

        score = self._classifier(x)
        return score

    def _init_weights(self) -> None:
        for layer in self.base_model.modules():
            if isinstance(layer, nn.Conv2d):
                nn.init.kaiming_uniform_(layer.weight)
            elif isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)


class ResNetGRU(nn.Module):
    def __init__(self, num_classes):
        super(ResNetGRU, self).__init__()

        # Use a pre-trained ResNet model
        self.base_model = models.resnet18(pretrained=True)
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])  # remove last layer

        # Calculate the correct number of features
        dummy_input = torch.zeros(1, 3, 224, 224)
        dummy_output = self.base_model(dummy_input)
        num_ftrs = dummy_output.size(1) * dummy_output.size(2)

        self.conv1d = nn.Conv1d(in_channels=num_ftrs, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.rnn = nn.GRU(input_size=64, hidden_size=100, num_layers=3, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(200, num_classes)
        self.batch_norm = nn.BatchNorm1d(num_features=100)  # Add BatchNorm layer

        self._init_weights()

    def forward(self, x):
        batch_size, c, h, w = x.size()
        x = self.base_model(x)
        x = x.view(x.size(0), -1, x.size(3))
        x = self.conv1d(x)
        x = x.transpose(1, 2)
        x, _ = self.rnn(x)
        x_forward = x[:, -1, :self.rnn.hidden_size]
        x_backward = x[:, 0, self.rnn.hidden_size:]
        x = torch.cat((x_forward, x_backward), dim=1)
        x = self.dropout(x)
        # x = self.batch_norm(x)
        x = self.fc(x)
        return x

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)


class GRU(nn.Module):
    def __init__(self, num_classes):
        super(GRU, self).__init__()

        # GRU part for sequence-like processing
        self.conv1d = nn.Conv1d(in_channels=224, out_channels=512, kernel_size=3, stride=1, padding=1)
        self.rnn = nn.GRU(input_size=672, hidden_size=256, num_layers=3, batch_first=True, bidirectional=True)
        self.gru_fc = nn.Linear(512, 256)

        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        batch_size, c, h, w = x.size()

        x = x.permute(0, 1, 3, 2)

        x_gru = x.contiguous().view(x.size(0), x.size(2), -1)
        x_gru = self.conv1d(x_gru)
        x_gru, _ = self.rnn(x_gru)
        x_gru_forward = x_gru[:, -1, :self.rnn.hidden_size]
        x_gru_backward = x_gru[:, 0, self.rnn.hidden_size:]
        x_gru = torch.cat((x_gru_forward, x_gru_backward), dim=1)
        gru_out = self.gru_fc(x_gru)

        out = self.dropout(gru_out)
        result = self.fc(out)

        return result



class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.attention = nn.Linear(self.hidden_size * 2, 1)

    def forward(self, output):
        # output => [batch_size, seq_len, hidden_size*2]
        energy = self.attention(output)
        attention = F.softmax(energy.squeeze(-1), dim=1)
        # attention => [batch_size, seq_len]
        return attention.unsqueeze(-1)


class ParallelResNetGRUAttention(nn.Module):
    def __init__(self, num_classes):
        super(ParallelResNetGRUAttention, self).__init__()

        # ResNet part for image-like processing
        self.base_model = models.resnet18(pretrained=True)
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.resnet_fc = nn.Linear(512, 256)

        # GRU part for sequence-like processing
        self.conv1d = nn.Conv1d(in_channels=224, out_channels=512, kernel_size=3, stride=1, padding=1)
        self.rnn = nn.GRU(input_size=672, hidden_size=256, num_layers=1, batch_first=True, bidirectional=True)
        self.attention = Attention(hidden_size=256)  # Add attention layer
        self.gru_fc = nn.Linear(512, 256)

        self.dropout = nn.Dropout(0.5)
        self.batch_norm = nn.BatchNorm1d(num_features=512)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        batch_size, c, h, w = x.size()

        # ResNet forward pass
        resnet_out = self.base_model(x)
        resnet_out = self.adaptive_pool(resnet_out)
        resnet_out = torch.flatten(resnet_out, 1)
        resnet_out = self.resnet_fc(resnet_out)

        # GRU forward pass
        x_gru = x.view(x.size(0), x.size(2), -1)
        x_gru = self.conv1d(x_gru)
        x_gru, _ = self.rnn(x_gru)

        # Apply attention
        attention_weights = self.attention(x_gru)
        x_gru = (x_gru * attention_weights).sum(dim=1)

        gru_out = self.gru_fc(x_gru)

        # Combine ResNet and GRU outputs
        out = torch.cat((resnet_out, gru_out), dim=1)
        out = self.dropout(out)
        result = self.fc(out)

        return result



class BasicCNN(nn.Module):
    def __init__(self, num_classes):
        super(BasicCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2)

        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.fc1 = nn.Linear(401408, 1024)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = self.relu3(self.fc1(x))
        score = self.fc2(x)
        return score


class FcnCnn(nn.Module):
    def __init__(self, num_classes):
        super(FcnCnn, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.25),
        )
        dummy_input = torch.randn(1, 3, 224, 224)
        dummy_output = self.conv_layers(dummy_input)
        self.output_size = int(np.prod(dummy_output.shape))

        self.fc_layers = nn.Sequential(
            nn.Linear(self.output_size, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)  # flatten the tensor
        x = self.fc_layers(x)
        return x


class EarlyStopping:
    def __init__(self, patience=10, verbose=False, delta=-0.03):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta

    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).')
        torch.save(model.state_dict(), 'checkpoint.pt')
        self.val_loss_min = val_loss


class ParallelResNetGRU(nn.Module):
    def __init__(self, num_classes):
        super(ParallelResNetGRU, self).__init__()

        # ResNet part for image-like processing
        self.base_model = models.resnet18(pretrained=True)
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])
        self.adaptive_pool = nn.AdaptiveMaxPool2d((1, 1))
        self.resnet_fc = nn.Linear(512, 256)
        self.batch_norm1 = nn.BatchNorm1d(256)

        # GRU part for sequence-like processing
        self.rnn = nn.GRU(input_size=672, hidden_size=256, num_layers=1, batch_first=True, bidirectional=True)
        self.gru_fc = nn.Linear(512, 256)

        self.dropout = nn.Dropout(0.5)
        self.batch_norm = nn.BatchNorm1d(num_features=512)
        self.fc = nn.Linear(512, num_classes)

        self._init_weight(self.fc)

    def _init_weight(self, layer):
        if isinstance(layer, nn.Linear) or isinstance(layer, nn.Conv1d):
            nn.init.xavier_uniform_(layer.weight)

    def forward(self, x):
        batch_size, c, h, w = x.size()

        # ResNet forward pass
        resnet_out = self.base_model(x)
        resnet_out = self.adaptive_pool(resnet_out)
        resnet_out = torch.flatten(resnet_out, 1)
        resnet_out = self.resnet_fc(resnet_out)
        resnet_out = self.dropout(resnet_out)
        resnet_out = self.batch_norm1(resnet_out)

        # GRU forward pass
        x = x.permute(0, 1, 3, 2)
        x_gru = x.contiguous().view(x.size(0), x.size(2), -1)
        x_gru, _ = self.rnn(x_gru)
        x_gru_forward = x_gru[:, -1, :self.rnn.hidden_size]
        x_gru_backward = x_gru[:, 0, self.rnn.hidden_size:]
        x_gru = torch.cat((x_gru_forward, x_gru_backward), dim=1)
        gru_out = self.gru_fc(x_gru)

        # Combine ResNet and GRU outputs
        out = torch.cat((resnet_out, gru_out), dim=1)
        out = self.dropout(out)
        result = self.fc(out)

        return result


class ParallelResNetGRUSimp(nn.Module):
    def __init__(self, num_classes):
        super(ParallelResNetGRUSimp, self).__init__()

        # ResNet part for image-like processing
        self.base_model = models.resnet18(pretrained=True)
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])
        self.adaptive_pool = nn.AdaptiveMaxPool2d((1, 1))
        self.resnet_fc = nn.Linear(512, 100)
        self.batch_norm1 = nn.BatchNorm1d(100)

        # GRU part for sequence-like processing
        self.rgb_to_grayscale = transforms.Grayscale(num_output_channels=1)
        self.rnn = nn.GRU(input_size=224, hidden_size=50, num_layers=1, batch_first=True, bidirectional = True)
        self.dropout = nn.Dropout(0.5)
        self.batch_norm2 = nn.BatchNorm1d(100)

        self.fc = nn.Linear(200, num_classes)

    #     self._init_weight(self.fc)

    # def _init_weight(self, layer):
    #     if isinstance(layer, nn.Linear) or isinstance(layer, nn.Conv1d):
    #         nn.init.xavier_uniform_(layer.weight)

    def forward(self, x):
        batch_size, c, h, w = x.size()

        # ResNet forward pass
        resnet_out = self.base_model(x)
        resnet_out = self.adaptive_pool(resnet_out)
        resnet_out = torch.flatten(resnet_out, 1)
        resnet_out = self.resnet_fc(resnet_out)
        resnet_out = self.dropout(resnet_out)
        resnet_out = self.batch_norm1(resnet_out)


        # GRU forward pass
        x = self.rgb_to_grayscale(x)
        x = x.permute(0, 1, 3, 2)
        x_gru = x.contiguous().view(x.size(0), x.size(2), -1)
        x_gru, _ = self.rnn(x_gru)
        x_gru_forward = x_gru[:, -1, :self.rnn.hidden_size]
        x_gru_backward = x_gru[:, 0, self.rnn.hidden_size:]
        x_gru = torch.cat((x_gru_forward, x_gru_backward), dim=1)
        # x_gru = self.gru_fc(x_gru)
        # x_gru = self.batch_norm2(x_gru)

        # Combine ResNet and GRU outputs
        out = torch.cat((resnet_out, x_gru), dim=1)
        # out = self.dropout(out)
        result = self.fc(out)

        return result


# Here it starts ------------------------------------------------------------------------------------------------------
# Get music features
features_df = pd.read_csv(FEATURES)
# Encoded genres
genres = { 'rock': 0, 'pop': 1, 'classical': 2, 'reggae': 3, 'disco': 4, 'jazz': 5, 'metal': 6, 'country': 7, 'blues': 8, 'hiphop': 9}

# get music images
music_dataset = []
genre_target = []
song_ids = []
for name in os.listdir(MUSIC):
  filename = os.path.join(MUSIC, name)
  music_dataset.append(filename)
  genre, song_id, png = name.split(".")
  genre_target.append(genre)
  song_ids.append(genre + str(int(song_id) // 5))

# Convert the genre labels to a tensor
genre_encoded = [genres[item] for item in genre_target]
genre_encoded_tensor = torch.tensor(genre_encoded, dtype=torch.long)

# Create a MusicDataset object to load the images, features, and labels
music_dataset = MusicDataset(music_dataset, genre_encoded_tensor, transform)
music_dataloader = DataLoader(music_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Now, use the song IDs to perform a group-based train/test split
group_shuffle_split = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

# We use the splitter to split the indices of our dataset
train_inds, test_inds = next(group_shuffle_split.split(music_dataset, genre_encoded, groups=song_ids))

# We use these indices to produce the final train and test sets
train_dataset = Subset(music_dataset, train_inds)
test_dataset = Subset(music_dataset, test_inds)

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Initialize the model
model = ParallelResNetGRU(num_classes=10)
model = model.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
early_stopping = EarlyStopping(patience=PATIENCE, verbose=True)


# Train the model
num_epochs = EPOCH
actual_epochs = 0
train_acc_list = []
test_acc_list = []
train_loss_list = []
test_loss_list = []
y_true_list = []
y_pred_list = []

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_correct = 0.0
    train_total = 0.0
    actual_epochs += 1

    for i, (image, labels) in enumerate(train_dataloader):
        optimizer.zero_grad()
        outputs = model(image.to(device))
        loss = criterion(outputs, labels.to(device))
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * image.size(0)  # scale by batch size
        _, predicted = torch.max(outputs.data, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels.to(device)).sum().item()

    train_loss = train_loss / len(train_dataloader.dataset)
    train_acc = train_correct / train_total

    # Evaluate the model on the test set
    model.eval()
    test_loss = 0.0
    test_correct = 0.0
    test_total = 0.0
    y_true = []
    y_pred = []

    with torch.no_grad():
        for image, labels in test_dataloader:
            outputs = model(image.to(device))
            loss = criterion(outputs, labels.to(device))
            test_loss += loss.item() * image.size(0)
            _, predicted = torch.max(outputs.data, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels.to(device)).sum().item()
            # collect predictions and true labels
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

    test_loss = test_loss / len(test_dataloader.dataset)
    test_acc = test_correct / test_total
    train_acc_list.append(train_acc)
    test_acc_list.append(test_acc)
    train_loss_list.append(train_loss)
    test_loss_list.append(test_loss)
    y_true_list.extend(y_true)
    y_pred_list.extend(y_pred)
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')
    # Early stopping
    early_stopping(test_loss, model)
    if early_stopping.early_stop:
        print("Early stopping")
        break

num_epochs = actual_epochs
model.load_state_dict(torch.load('checkpoint.pt'))
torch.save(model.state_dict(), '/content/drive/MyDrive/ColabNotebooks/results/model_' + MODEL_NAME + '_best.pt')

# Plot the training and testing accuracy over time
plt.plot(range(num_epochs), train_acc_list, label='Training Accuracy')
plt.plot(range(num_epochs), test_acc_list, label='Testing Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig('/content/drive/MyDrive/ColabNotebooks/results/model_' + MODEL_NAME + '_acc.png')
plt.close()

# Plot the training and testing loss over time
plt.plot(range(num_epochs), train_loss_list, label='Training Loss')
plt.plot(range(num_epochs), test_loss_list, label='Testing Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.savefig('/content/drive/MyDrive/ColabNotebooks/results/model_' + MODEL_NAME + '_loss.png')
plt.close()

# Calculate evaluation metrics
print("\nClassification Report:")
print(classification_report(y_true_list, y_pred_list))
print("Confusion Matrix:")
cm = confusion_matrix(y_true_list, y_pred_list)

# Plot confusion matrix
label_names = ['rock', 'pop', 'classical', 'reggae', 'disco', 'jazz', 'metal', 'country', 'blues', 'hiphop']
fig, ax = plt.subplots(figsize=(8, 8))
im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
ax.set(xticks=np.arange(cm.shape[1]),
       yticks=np.arange(cm.shape[0]),
       xticklabels=label_names, yticklabels=label_names,
       title='Confusion matrix',
       ylabel='True label',
       xlabel='Predicted label')
plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
         rotation_mode="anchor")
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        ax.text(j, i, format(cm[i, j], 'd'),
                ha="center", va="center",
                color="white" if cm[i, j] > cm.max() / 2. else "black")
fig.tight_layout()
plt.savefig('/content/drive/MyDrive/ColabNotebooks/results/CM_model_' + MODEL_NAME + '.png')
plt.close()



AttributeError: ignored