In [None]:
!git clone https://github.com/MTG/da-tacos.git

In [None]:
import os
os.chdir('da-tacos')

In [None]:
!pip install -r requirements.txt

In [None]:
#Running directly from Colab UI
os.makedirs('/content/coveranalysis_single_files_output', exist_ok=True)
!python3 download_da-tacos.py --dataset coveranalysis --type single_files --source gdrive --outputdir /content/coveranalysis_single_files_output
!unzip /content/coveranalysis_single_files_output/da-tacos_coveranalysis_subset_single_files.zip -d /content/coveranalysis_single_files_output


In [None]:
#Running through my gdrive
from google.colab import drive
drive.mount('/content/drive')
!unzip /content/drive/MyDrive/da-tacos_coveranalysis_subset_single_files.zip -d /content/drive/MyDrive/datacos

In [None]:
#Necessary Libraries
import torch
import torch.nn as nn
import torch.optim as optim
import h5py
import numpy as np
from torch.utils.data import Dataset, DataLoader
import random
from tqdm import tqdm
import os

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
#Dataset Class - Creating pairs with labels Y=0 and Y=1 and returns a compined tensor

import deepdish as dd

class DATACOSDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        """
        Initialize the dataset by reading in the pairs from the directory structure.
        Each subfolder contains 2 cover-related songs, and you create pairs with labels.
        """
        self.data_dir = data_dir
        self.transform = transform
        self.pairs = []  # Will store (song1, song2, label)
        self.subfolders = [f.path for f in os.scandir(data_dir) if f.is_dir()]

        # Generate pairs with label Y=0 (cover-related)
        for folder in self.subfolders:
            songs = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.h5')]
            if len(songs) == 2:  # Assumption: each folder contains exactly 2 songs
                cover, original = songs
                self.pairs.append((cover, original, 0))  # Y=0 for similar pairs (cover-related)

        # Generate non-cover pairs with label Y=1
        all_songs_by_folder = {folder: [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.h5')] for folder in self.subfolders}

        for _ in range(len(self.pairs)):  # Generate the same number of non-cover pairs
            # Randomly sample two different subfolders
            folder1, folder2 = random.sample(self.subfolders, 2)  # Ensure two different subfolders
            song1 = random.choice(all_songs_by_folder[folder1])
            song2 = random.choice(all_songs_by_folder[folder2])

            self.pairs.append((song1, song2, 1))  # Y=1 for non-similar pairs (not cover-related)

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        song1_path, song2_path, label = self.pairs[idx]

        # Load the .h5 files
        song1_data = self.load_h5(song1_path)
        song2_data = self.load_h5(song2_path)

        if self.transform:
            song1_data = self.transform(song1_data)
            song2_data = self.transform(song2_data)

        return song1_data, song2_data, torch.tensor(label, dtype=torch.float)

    def load_h5(self, file_path):
        """
        Helper function to load the .h5 file and return a tensor combining the features.
        """
        # Load the .h5 file using deepdish
        data = dd.io.load(file_path)

        # Extract all requested features
        chroma_cens_data = data.get('chroma_cens', None)
        crema_data = data.get('crema', None)
        hpcp_data = data.get('hpcp', None)
        key_extractor_data = data.get('key_extractor', None)
        madmom_features_data = data.get('madmom_features', None)
        mfcc_htk_data = data.get('mfcc_htk', None)
        tags_data = data.get('tags', None)
        label_data = data.get('label', None)

        feature_list = []

        # Append the features (ensure they are tensors)
        if chroma_cens_data is not None:
            chroma_cens_data = torch.tensor(chroma_cens_data, dtype=torch.float)
            if chroma_cens_data.dim() == 1:
                chroma_cens_data = chroma_cens_data.unsqueeze(-1)  # Reshape to 2D if necessary
            feature_list.append(chroma_cens_data)
        if crema_data is not None:
            crema_data = torch.tensor(crema_data, dtype=torch.float)
            if crema_data.dim() == 1:
                crema_data = crema_data.unsqueeze(-1)  # Reshape to 2D if necessary
            feature_list.append(crema_data)
        if hpcp_data is not None:
            hpcp_data = torch.tensor(hpcp_data, dtype=torch.float)
            if hpcp_data.dim() == 1:
                hpcp_data = hpcp_data.unsqueeze(-1)  # Reshape to 2D if necessary
            feature_list.append(hpcp_data)

        # Handle the key_extractor_data (if it's a dictionary, extract relevant data)
        if key_extractor_data is not None:
            if isinstance(key_extractor_data, dict):
                key_extractor_values = key_extractor_data.get('tonnetz', None)  # Adjust based on actual structure
                if key_extractor_values is not None:
                    key_extractor_values = torch.tensor(key_extractor_values, dtype=torch.float)
                    if key_extractor_values.dim() == 1:
                        key_extractor_values = key_extractor_values.unsqueeze(-1)
                    feature_list.append(key_extractor_values)
            else:
                key_extractor_data = torch.tensor(key_extractor_data, dtype=torch.float)
                if key_extractor_data.dim() == 1:
                    key_extractor_data = key_extractor_data.unsqueeze(-1)
                feature_list.append(key_extractor_data)

        # Handle madmom_features_data (if it's a dictionary, extract relevant data)
        if madmom_features_data is not None:
            if isinstance(madmom_features_data, dict):
                madmom_values = madmom_features_data.get('tempo', None)  # Adjust key based on actual structure
                if madmom_values is not None:
                    madmom_values = torch.tensor(madmom_values, dtype=torch.float)
                    if madmom_values.dim() == 1:
                        madmom_values = madmom_values.unsqueeze(-1)
                    feature_list.append(madmom_values)
            else:
                madmom_features_data = torch.tensor(madmom_features_data, dtype=torch.float)
                if madmom_features_data.dim() == 1:
                    madmom_features_data = madmom_features_data.unsqueeze(-1)
                feature_list.append(madmom_features_data)

        if mfcc_htk_data is not None:
            mfcc_htk_data = torch.tensor(mfcc_htk_data, dtype=torch.float)
            if mfcc_htk_data.dim() == 1:
                mfcc_htk_data = mfcc_htk_data.unsqueeze(-1)  # Reshape to 2D if necessary
            feature_list.append(mfcc_htk_data)

        # Handle tags_data (check if it's a string or list of strings)
        if tags_data is not None:
            if isinstance(tags_data, str):
                tag_map = {'tag1': 0, 'tag2': 1, 'tag3': 2}  # Example: map your tags to integers
                tag_value = tag_map.get(tags_data, -1)  # Use -1 for unknown tags
                tag_tensor = torch.tensor([tag_value], dtype=torch.float).unsqueeze(-1)
                feature_list.append(tag_tensor)
            elif isinstance(tags_data, list):  # If it's a list of tags (strings)
                tag_map = {'tag1': 0, 'tag2': 1, 'tag3': 2}  # Example: map your tags to integers
                tag_values = [tag_map.get(tag, -1) for tag in tags_data]  # Use -1 for unknown tags
                tag_tensor = torch.tensor(tag_values, dtype=torch.float).unsqueeze(-1)
                feature_list.append(tag_tensor)

        # Pad features to the same size if necessary
        feature_lengths = [f.shape[0] for f in feature_list]  # Get the lengths of each feature
        max_length = max(feature_lengths)  # Find the maximum length

        # Pad the features to have the same length
        for i, feature in enumerate(feature_list):
            if feature.shape[0] < max_length:
                pad_size = max_length - feature.shape[0]
                feature_list[i] = torch.nn.functional.pad(feature, (0, 0, 0, pad_size))

        # Stack all features together
        combined_features = torch.cat(feature_list, dim=-1)

        return combined_features

In [None]:
#Example of DatacosClass usage
dataset = DATACOSDataset(data_dir='/content/drive/MyDrive/datacos/da-tacos_coveranalysis_subset_single_files')

for i in range(4999, 5001):
  # Print the first 5 pairs
    song1_data, song2_data, label = dataset[i]
    print(f"Pair {i+1}:")
    print(f"Song 1 Data: {song1_data.shape}, Song 2 Data: {song2_data.shape}, Label: {label}")

In [None]:
train_data_1 = [dataset[i] for i in range(4000)]  # First 4000 pairs for training
train_data_2 = [dataset[i] for i in range(5000, 9000)]  # Next 4000 pairs for training (5000 to 8999)

val_data_1 = [dataset[i] for i in range(4000, 5000)]  # 1000 pairs for validation (4000 to 4999)
val_data_2 = [dataset[i] for i in range(9000, 10000)]  # 1000 pairs for validation (9000 to 9999)

# Combine the training and validation parts
train_data = train_data_1 + train_data_2  # Final training data (4000 + 4000)
val_data = val_data_1 + val_data_2  # Final validation data (1000 + 1000)

# Shuffle both the training and validation datasets
random.shuffle(train_data)
random.shuffle(val_data)

In [None]:
#Checking in a small set of the dataset

train_data_1 = [dataset[i] for i in range(2)]
train_data_2 = [dataset[i] for i in range(5, 6)]

val_data_1 = [dataset[i] for i in range(4, 5)]
val_data_2 = [dataset[i] for i in range(9, 10)]

# Combine the training and validation parts
train_data = train_data_1 + train_data_2
val_data = val_data_1 + val_data_2

# Shuffle both the training and validation datasets
random.shuffle(train_data)
random.shuffle(val_data)

In [None]:
# Checking the memory usage of individual samples
song1_data, song2_data, label = dataset[0]
print(f"Memory usage of song1_data: {song1_data.element_size() * song1_data.nelement() / (1024 ** 3)} GB")
print(f"Memory usage of song2_data: {song2_data.element_size() * song2_data.nelement() / (1024 ** 3)} GB")

In [None]:
# Create DataLoader objects for both train and validation sets
trainloader = DataLoader(train_data, batch_size=128, shuffle=True)
validloader = DataLoader(val_data, batch_size=128, shuffle=True)

# Check the size of trainloader and validloader
print(f"Trainloader size: {len(trainloader)} batches")
print(f"Validloader size: {len(validloader)} batches")

In [None]:
#Model Architecture
import torch
import torch.nn as nn
class DrLIM(nn.Module):

    def __init__(self):
        super().__init__()
        self.layer1 = nn.Conv2d(in_channels = 1, out_channels = 15, kernel_size = 6, padding = 0, stride = 1)
        self.relu = nn.ReLU()
        self.subsampling = nn.MaxPool2d(kernel_size = 3, stride = 3)
        self.layer2 = nn.Conv2d(in_channels = 15, out_channels = 30, kernel_size = 9, padding = 0, stride = 1)
        # self.relu = nn.ReLU(),
        self.fc = nn.Linear(15, 1)

    def forward(self, x):
        x = self.layer1(x)
        x = self.relu(x)
        x = self.subsampling(x)
        x = self.layer2(x)
        x = x.reshape(-1, 2, 15)
        x = self.relu(x)
        x = self.fc(x)
        return x

In [None]:
def CL_Loss(x1, x2, Y, m = 1):
    Euclidean_norm = torch.sqrt((x1 - x2)**2) # Euclidean Distance
    return torch.mean((1-Y).reshape(-1, 1, 1) * 1/2 * Euclidean_norm**2 + Y.reshape(-1, 1, 1) * 1/2 * torch.maximum(torch.Tensor([0]).to(device), m - Euclidean_norm)**2)

In [None]:
from torch import optim
import numpy as np
from tqdm.auto import tqdm

In [None]:
#Training parameters

epochs = 200 # Iteration Number
cnt = 0      # early stopping count

model = DrLIM().to(device)
# criterion = CL_Loss()
optimizer = optim.SGD(model.parameters(), lr=0.001) # Adam Optimizer

if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model).to(device)

# save train and val Loss
train_loss = torch.zeros(epochs)
val_loss = torch.zeros(epochs)

# save train and val Acc
train_acc = torch.zeros(epochs)
val_acc = torch.zeros(epochs)

# initial loss value is inf.
valid_loss_min = np.Inf
valid_acc_max = 0

In [None]:
#Training the Model

for epoch in range(epochs):

    model.train() # train mode

    for x1, x2, y in tqdm(trainloader):
        y = torch.tensor(y, dtype = torch.float64).to(device)
        x1, x2 = x1.to(device), x2.to(device)
        optimizer.zero_grad() # optimizer initialization

        Gw_x1 = model(x1) # model output of x1
        Gw_x2 = model(x2) # model output of x2

        # Calculate accuracy
        loss = CL_Loss(Gw_x1, Gw_x2, y)
        loss.backward() # backward
        optimizer.step() # optimizer step
        train_loss[epoch] += loss.item() # loss

        ans = torch.tensor((Gw_x1[:, 0] < Gw_x1[:, 1]) != (Gw_x2[:, 0] < Gw_x2[:, 1]), dtype=torch.float64) # Similar to Contrastive Leaning Loss
        equals = ans == y.reshape(ans.shape)   # calculate acc
        train_acc[epoch] += torch.mean(equals.type(torch.FloatTensor)).item()  # mean

    # AVG Loss
    train_loss[epoch] /= len(trainloader)
    train_acc[epoch] /= len(trainloader)


    # valid ,
    model.eval()
    with torch.no_grad():
        for x1, x2, y in tqdm(validloader):
            y = torch.tensor(y, dtype = torch.float64).to(device)
            x1, x2 = x1.to(device), x2.to(device)

            Gw_x1 = model(x1)
            Gw_x2 = model(x2)

            loss = CL_Loss(Gw_x1, Gw_x2, y)
            val_loss[epoch] += loss.item() # Loss


            ans = torch.tensor((Gw_x1[:, 0] < Gw_x1[:, 1]) != (Gw_x2[:, 0] < Gw_x2[:, 1]), dtype=torch.float64)
            equals = ans == y.reshape(ans.shape)
            val_acc[epoch] += torch.mean(equals.type(torch.FloatTensor)).item()  # mean

    # validation Loss and accuracy
    val_loss[epoch] /= len(validloader)
    val_acc[epoch] /= len(validloader)

    # print loss and accuracy
    print(f"Epoch {epoch+1}/{epochs}.. "
          f"Train loss: {train_loss[epoch]:.3f}.. "
          f"Train acc: {train_acc[epoch]:.3f}.. "
          f"val loss: {val_loss[epoch]:.3f}.. "
          f"val accuracy: {val_acc[epoch]:.3f}")

    if val_acc[epoch] >= valid_acc_max:
        print('Validation acc increased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_acc_max,
        val_acc[epoch]))
        torch.save(model.module.state_dict(), 'DrLIM.pt')
        valid_acc_max = val_acc[epoch]

        # Early stopping
        cnt = 0

    # Early stopping and Loss
    if cnt >= 10:
        print("Early Stopping")
        break

    cnt+=1 #Loss


In [None]:
#Potential Transforms - 1.FFT-Downsampling

import numpy as np

# Assuming 'tensor' is our DatasetClass output
tensor = np.random.random((20890, 41760))

# Apply FFT to get frequency representation
fft_tensor = np.fft.fft2(tensor)

# Filter out selected frequencies (e.g., keep low frequencies only or whatever wroks with our given task)
# For instance, zero out all but the lowest frequencies
fft_tensor[100:, :] = 0  # Zero out high frequencies along one dimension
fft_tensor[:, 100:] = 0  # Zero out high frequencies along the other dimension

# Apply inverse FFT to get the downsampled tensor in spatial domain
downsampled_tensor = np.fft.ifft2(fft_tensor)


In [None]:
#Potential Transforms - 2.PCA
from sklearn.decomposition import PCA
import numpy as np

# Assuming `tensor` is your original 2D tensor (e.g., [20890, 41760])
tensor = np.random.random((20890, 41760))

# Apply FFT to the tensor to get the frequency domain representation
fft_tensor = np.fft.fft2(tensor)

# Flatten the FFT result to apply PCA
fft_flattened = fft_tensor.flatten().reshape(-1, 1)

# Perform PCA to reduce dimensionality (e.g., keeping top 10 components)
pca = PCA(n_components=10)
pca_result = pca.fit_transform(fft_flattened)

# Reconstruct the downsampled tensor using the inverse PCA transformation
downsampled_tensor = pca.inverse_transform(pca_result)