#**Self-Supervised Learning of Music Representations for Recommendation Systems**

This work explores self-supervised learning to derive music representations using the AST model and SimCLR framework.
It aims to enhance music recommendation systems by leveraging learned embeddings. The project employs the Free Music Archive
(FMA) dataset and addresses challenges in audio augmentation, InfoNCE loss, and self-supervised fine-tuning.


## Group members:

*   Andreas Cisi Ramos (246932)
*   Bruno Amaral Teixeira de Freitas (246983)




### Table of Contents <a class="anchor" id="topo"></a>

* [Part 1: Main dependencies](#part_01).
* [Part 2: FMA Dataset: Loading and Analysis](#part_02).
* [Part 3: Contrastive learning: Data augmentation](#part_03).
* [Part 4: SimCLR Model](#part_04).
* [Part 5: Training the SimCLR Model](#part_05).
* [Part 6: Embedding Creation and Visualization](#part_06).
* [Part 7: Recommendation system](#part_07).
* [Part 8: Qualitative Tests and Results](#part_08).




# Part 1: Main dependencies <a class="anchor" id="part_01"></a>

Here we install dependencies, libraries, and perform the necessary imports for the project.









In [None]:
!pip install torch torchaudio transformers matplotlib torch_audiomentations sox
!apt-get install sox libsox-dev libsox-fmt-all -y

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
from transformers import ASTFeatureExtractor, ASTModel
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import os
import numpy as np
import math
from IPython.display import Audio, display
import pandas as pd
from sklearn.manifold import TSNE

# Part 2: FMA Dataset: Loading and Analysis <a class="anchor" id="part_02"></a>

The FMA (Free Music Archive) dataset is a collection of music tracks with metadata, widely used for music classification and genre prediction tasks. In this project, we use the FMA Small version, which contains 8,000 songs, each approximately 30 seconds long, spanning multiple genres.

The dataset provides:

Audio files: Stored in .mp3 format.

Metadata: Includes track_id and track_genres information for mapping songs to their respective genres.

In [None]:
class BaseFMADataset(Dataset):
    """
    Base class for creating datasets from the FMA music dataset.

    Arguments:
    - audio_dir: Directory containing the audio files.
    - metadata_path: Path to the metadata CSV file.
    - file_list: List of audio file names.
    - song_duration: Desired duration of audio in seconds (default: 10).

    This class handles loading audio files, preprocessing them, and associating genres from the metadata.
    """

    def __init__(self, audio_dir, metadata_path, file_list, song_duration=10):
        """
        Initializes the dataset by loading metadata and setting parameters.

        Arguments:
        - audio_dir: Directory containing the audio files.
        - metadata_path: Path to the metadata CSV file.
        - file_list: List of audio file names.
        - song_duration: Duration of audio clips in seconds (default: 10).
        """
        self.audio_dir = audio_dir
        self.file_list = file_list
        self.genres = self._load_genres(metadata_path)
        self.song_duration = song_duration

    def _load_genres(self, metadata_path):
        """
        Loads genre information from the metadata file.

        Arguments:
        - metadata_path: Path to the metadata CSV file.

        Returns:
        - A dictionary mapping track IDs to their genre titles.
        """
        metadata = pd.read_csv(metadata_path)
        if 'track_id' not in metadata.columns or 'track_genres' not in metadata.columns:
            raise ValueError("The metadata table must contain 'track_id' and 'track_genres' columns.")

        def extract_genre_title(genre_list):
            """
            Extracts the genre title from a track's genre information.
            """
            try:
                genre_data = eval(genre_list)
                if isinstance(genre_data, list) and genre_data:
                    return genre_data[0].get('genre_title', 'Unknown')
                return 'Unknown'
            except Exception:
                return 'Unknown'

        metadata['genre_title'] = metadata['track_genres'].apply(extract_genre_title)
        return metadata.set_index('track_id')['genre_title'].to_dict()

    def get_genre(self, idx):
        """
        Retrieves the genre of a track based on its index in the file list.

        Arguments:
        - idx: Index of the track in the file list.

        Returns:
        - The genre title as a string.
        """
        track_id = int(self.file_list[idx].split('.')[0])
        return self.genres.get(track_id, "Unknown")

    def __len__(self):
        """
        Returns the total number of files in the dataset.

        Returns:
        - The number of files as an integer.
        """
        return len(self.file_list)

    def __getitem__(self, idx):
        """
        Loads and preprocesses an audio file by its index.

        Arguments:
        - idx: Index of the file in the file list.

        Returns:
        - A normalized and trimmed/padded waveform as a tensor.
        """
        file_name = self.file_list[idx]
        file_path = os.path.join(self.audio_dir, file_name[:3], file_name)

        waveform, sample_rate = torchaudio.load(file_path)

        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)

        # Resample to 16 kHz if needed
        if sample_rate != 16000:
            resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
            waveform = resampler(waveform)

        epsilon = 1e-8
        waveform = waveform / (waveform.abs().max() + epsilon)

        # Trim or pad waveform to target length
        target_length = 16000 * self.song_duration
        if waveform.shape[1] < target_length:
            padding = target_length - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, padding))
        else:
            waveform = waveform[:, :target_length]

        return waveform.squeeze(0)


class TrainFMADataset(BaseFMADataset):
    pass


class ValFMADataset(BaseFMADataset):
    pass


class TestFMADataset(BaseFMADataset):
    pass


def prepare_datasets(audio_dir, metadata_path, num_train, num_val, num_test):
    """
    Prepares training, validation, and test file lists from the dataset.

    Arguments:
    - audio_dir: Directory containing the audio files.
    - metadata_path: Path to the metadata CSV file.
    - num_train: Number of training samples.
    - num_val: Number of validation samples.
    - num_test: Number of test samples.

    Returns:
    - A tuple containing three lists: train_files, val_files, test_files.
    """
    train_files = []
    val_files = []
    test_files = []

    metadata = pd.read_csv(metadata_path)
    genres = metadata['track_id'].tolist()

    count = 0
    for i in range(156):  # Iterate through subdirectories
        audio_dir_sub = os.path.join(audio_dir, f"{i:03d}")
        if not os.path.exists(audio_dir_sub):
            continue

        for f in os.listdir(audio_dir_sub):
            if f.endswith(".mp3"):
                track_id = int(f.split('.')[0])
                if track_id in genres:
                    if count < num_train:
                        train_files.append(f)
                    elif count < num_train + num_val:
                        val_files.append(f)
                    elif count < num_train + num_val + num_test:
                        test_files.append(f)
                    else:
                        break
                    count += 1

        if count >= num_train + num_val + num_test:
            break

    return train_files, val_files, test_files


### Download and Setup

1. **Download the dataset and metadata from the links below:**

fma_small : "https://os.unil.cloud.switch.ch/fma/fma_small.zip" (~7GB)

fma_metadata: "https://os.unil.cloud.switch.ch/fma/fma_metadata.zip"

2. **Extract the files and organize the folders in the following structure:**

```python
fma_dataset/
├── fma_small/
│   ├── 000/
│   ├── 001/
│   └── ...
└── fma_metadata/
    ├── tracks.csv
    ├── genres.csv
    └── features.csv
```

3. **Replace the google_drive_path variable in the code with the path to the fma_dataset folder.**

```python
google_drive_path = "/path/to/fma_dataset"
```



In [None]:
from google.colab import drive
drive.mount('/content/drive')

google_drive_path = "./drive/MyDrive/fma_dataset"

audio_dir = google_drive_path + "/fma_small"
metadata_path = google_drive_path + "/fma_metadata/raw_tracks.csv"

Mounted at /content/drive


### Dataset and DataLoader Setup

This section splits the dataset into training, validation, and test sets, then creates DataLoader instances for efficient batching and shuffling during training and evaluation.


In [None]:
num_train = 1000
num_val   = 200
num_test  = 100

# Prepare Datasets and Dataloaders
train_files, val_files, test_files = prepare_datasets(audio_dir, metadata_path, num_train, num_val, num_test)

train_dataset = TrainFMADataset(audio_dir, metadata_path, train_files)
val_dataset = ValFMADataset(audio_dir, metadata_path, val_files)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

### Plotting Waveforms and Mel Spectrograms for Visualization

In this section, we will visualize the waveform and Mel spectrogram of the audio samples. The waveform represents the raw audio signal over time, while the Mel spectrogram displays the frequency content of the signal in a logarithmic scale, emphasizing the perceptual properties of sound. Additionally, we will play the audio for auditory inspection.


In [None]:
import numpy as np
import torchaudio
import matplotlib.pyplot as plt
import math
from IPython.display import Audio

def show_graphics(waveform):
    """
    Function to display the waveform and Mel spectrogram of an audio file.

    Arguments:
    waveform (Tensor): The waveform of the audio file (1D tensor of audio samples).
    """

    # Spectrogram parameters
    sample_rate = 16000
    n_fft = 512
    hop_length = 128
    n_mels = 128

    #  Generate the Mel spectrogram
    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=sample_rate,
        n_fft=n_fft,
        hop_length=hop_length,
        n_mels=n_mels
    )(waveform.unsqueeze(0))

    spectrogram_db = torchaudio.transforms.AmplitudeToDB()(mel_spectrogram)

    spectrogram_db = spectrogram_db.squeeze(0)

    num_frames = spectrogram_db.shape[1]
    time_axis = np.linspace(
        0,
        (num_frames - 1) * hop_length / sample_rate,
        num=num_frames
    )

    import math

    def hz_to_mel(hz):
        """Converts a frequency in Hertz (Hz) to the Mel scale."""
        return 2595 * math.log10(1 + hz / 700)

    def mel_to_hz(mel):
        """Converts a frequency in Mel scale to Hertz (Hz)."""
        return 700 * (10 ** (mel / 2595) - 1)


    # Equally spaced Mel frequencies
    mel_min = hz_to_mel(0)
    mel_max = hz_to_mel(sample_rate / 2)
    mel_points = np.linspace(mel_min, mel_max, n_mels)

    frequency_axis = mel_to_hz(mel_points)

    fig, axs = plt.subplots(1, 2, figsize=(15, 4))

    # Plot the waveform
    axs[0].plot(np.linspace(0, len(waveform) / sample_rate, num=len(waveform)), waveform.numpy())
    axs[0].set_title('Waveform')
    axs[0].set_xlabel('Time (s)')
    axs[0].set_ylabel('Amplitude')

    # Plot the Mel spectrogram
    im = axs[1].imshow(
        spectrogram_db.numpy(),
        origin='lower',
        aspect='auto',
        extent=[time_axis.min(), time_axis.max(), frequency_axis.min(), frequency_axis.max()],
        cmap='viridis'
    )
    axs[1].set_title('Mel Spectrogram')
    axs[1].set_xlabel('Time (s)')
    axs[1].set_ylabel('Frequency (Hz)')
    fig.colorbar(im, ax=axs[1], format='%+2.0f dB')

    plt.tight_layout()
    plt.show()

    # Play the audio
    display(Audio(waveform.numpy(), rate=sample_rate))

show_graphics(train_dataset[0])


# Part 3: Contrastive learning - Data augmentation<a class="anchor" id="part_03"></a>

In this section, we apply various data augmentation techniques to augment audio samples in our dataset. For each audio sample, we apply random augmentations, which include adding noise, time stretching (without changing the pitch), or pitch shifting (without changing the duration). This process helps to increase the diversity of our dataset, improving the robustness of models trained on this data.



In [None]:
def time_stretch(audio_tensor, rate, sample_rate=16000):
    """
    Function to apply time stretching to an audio waveform without altering the pitch.

    Arguments:
    audio_tensor (Tensor): The input audio waveform (1D tensor representing audio samples).
    rate (float): The factor by which to stretch or compress the audio (1.0 means no change).
    sample_rate (int, optional): The sample rate of the audio (default is 16000).

    Returns:
    Tensor: The time-stretched audio waveform.
    """
    effects = [
        ['tempo', f'{rate}']
    ]
    stretched_waveform, _ = torchaudio.sox_effects.apply_effects_tensor(
        audio_tensor.cpu().unsqueeze(0), sample_rate, effects)

    original_length = audio_tensor.shape[-1]
    stretched_length = stretched_waveform.shape[-1]

    if stretched_length < original_length:
        padding = original_length - stretched_length
        stretched_waveform = torch.nn.functional.pad(stretched_waveform, (0, padding))
    elif stretched_length > original_length:
        stretched_waveform = stretched_waveform[:, :original_length]

    return stretched_waveform.squeeze(0).to(audio_tensor.device)


def pitch_shift(audio_tensor, sample_rate, n_steps):
    """
    Function to apply pitch shifting to an audio waveform without altering the duration.

    Arguments:
    audio_tensor (Tensor): The input audio waveform (1D tensor representing audio samples).
    sample_rate (int): The sample rate of the audio.
    n_steps (int): The number of semitones to shift the pitch (positive for higher pitch, negative for lower).

    Returns:
    Tensor: The pitch-shifted audio waveform.
    """
    n_steps_cents = n_steps * 100
    effects = [
        ['pitch', f'{n_steps_cents}'],
        ['rate', f'{sample_rate}']
    ]
    shifted_waveform, _ = torchaudio.sox_effects.apply_effects_tensor(
        audio_tensor.cpu().unsqueeze(0), sample_rate, effects)
    return shifted_waveform.squeeze(0).to(audio_tensor.device)


def data_augmentation(audio_batch, choice=-1):
    """
    Function to apply data augmentation on a batch of audio samples.

    Arguments:
    audio_batch (Tensor): A batch of audio tensors (shape: [batch_size, target_length]).
    choice (int, optional): The augmentation choice (default is -1, which picks randomly).

    Returns:
    Tensor: The batch of augmented audio tensors.
    """
    augmented_batch = []
    for audio_tensor in audio_batch:
        augmented_audio = data_augmentation_single(audio_tensor, choice)
        augmented_batch.append(augmented_audio)
    augmented_batch = torch.stack(augmented_batch)
    return augmented_batch


def data_augmentation_single(audio_tensor, choice=-1):
    """
    Function to apply a single data augmentation to a single audio sample.

    Arguments:
    audio_tensor (Tensor): A single audio tensor (1D).
    choice (int, optional): The augmentation choice (default is -1, which picks randomly).

    Returns:
    Tensor: The augmented audio tensor.
    """
    if choice == -1:
        choice = torch.randint(0, 3, (1,)).item()
    if choice == 0:
        # Add noise
        noise = torch.randn_like(audio_tensor) * 0.05
        return audio_tensor + noise
    elif choice == 1:
        # Time Stretching without changing pitch
        rate = 1.0 + (torch.randn(1).item() * 0.2)
        stretched_waveform = time_stretch(audio_tensor, rate)
        return stretched_waveform
    elif choice == 2:
        # Pitch Shifting without changing duration
        n_steps = torch.randint(-2, 3, (1,)).item()
        shifted_waveform = pitch_shift(audio_tensor, 16000, n_steps)
        return shifted_waveform
    else:
        return audio_tensor


Plot and analysis of the 3 Data Augmentation Effects on the Same Audio

In [None]:
print("Add Noise")
show_graphics(data_augmentation_single(train_dataset[0], 0))
print("Time Stretching")
show_graphics(data_augmentation_single(train_dataset[0], 1))
print("Pitch Shifiting")
show_graphics(data_augmentation_single(train_dataset[0], 2))

# Part 4: SimCLR Model <a class="anchor" id="part_04"></a>

This section defines a **SimCLR model** using the AST model for feature extraction and a projection head for contrastive learning. The main components are:

1. **Encoder**:
   - If `train_from_scratch` is `True`, the AST model is initialized with random weights. Otherwise, it loads a pre-trained AST model from a specified checkpoint.
   - The AST model is used to extract features from the input data.

2. **Projection Head**:
   - A fully connected layer that projects the output of the AST encoder into a lower-dimensional space (`projection_dim`) for contrastive learning. It consists of two linear layers with a ReLU activation in between.

3. **Forward Pass**:
   - The `forward` method processes the input through the encoder, performs global average pooling on the output, and passes it through the projection head to generate the final projected embeddings.

## InfoNCE Loss

The InfoNCE loss function calculates the contrastive loss between positive pairs (augmented versions of the same input) and negative pairs (embeddings from different samples in the batch). It computes the cosine similarity between all pairs, normalizes it by their L2 norms, and scales it using a temperature parameter. Masks are applied to identify positive pairs and exclude self-similarity, while negative pairs are all other embeddings. The loss is then computed by applying log softmax to the similarity matrix and averaging the log probabilities of the positive pairs.










In [None]:
from transformers import ASTConfig

class SimCLRModel(nn.Module):
    """
    A model for SimCLR using the AST model as a feature extractor and a projection head
    for contrastive learning. This class implements a simple architecture for self-supervised
    learning with contrastive loss.

    Args:
        base_model_name (str, optional): Name of the pre-trained AST model to load (default is None).
        projection_dim (int): The dimensionality of the projection head's output (default is 128).
        train_from_scratch (bool): If True, the AST model is initialized with random weights;
                                    otherwise, a pre-trained model is loaded (default is False).
    """

    def __init__(self, base_model_name=None, projection_dim=128, train_from_scratch=False):
        """
        Initializes the SimCLRModel by setting up the encoder and the projection head.

        Args:
            base_model_name (str, optional): Name of the pre-trained AST model to load (default is None).
            projection_dim (int): The dimensionality of the projection head's output (default is 128).
            train_from_scratch (bool): Flag to initialize the AST model with random weights if True,
                                        or load a pre-trained model if False (default is False).
        """
        super(SimCLRModel, self).__init__()

        if train_from_scratch:
            self.encoder = ASTModel(ASTConfig())  # Default AST configuration
        else:
            self.encoder = ASTModel.from_pretrained(base_model_name)

        # Projection head for SimCLR (maps the encoder output to the projection space)
        self.projection_head = nn.Sequential(
            nn.Linear(self.encoder.config.hidden_size, 512),
            nn.ReLU(),
            nn.Linear(512, projection_dim)
        )

    def forward(self, x):
        """
        Performs a forward pass through the encoder and projection head.

        Args: x (dict): The input tensor in the form of a dictionary to be passed to the encoder.
              It must include the necessary inputs for the AST model (e.g., audio data).

        Returns: torch.Tensor: The projected embeddings after applying the projection head to the pooled output.
        """
        outputs = self.encoder(**x)
        pooled_output = outputs.last_hidden_state.mean(dim=1)
        projections = self.projection_head(pooled_output)

        return projections


In [None]:
def info_nce_loss(batch_embeddings, temperature=0.07):
    """
    Computes the InfoNCE loss considering the entire batch.

    Args:
        batch_embeddings (torch.Tensor): Concatenated embeddings from augmented versions.
                                         Shape: [2 * batch_size, embedding_dim].
        temperature (float): Temperature factor to scale similarity.

    Returns:
        torch.Tensor: InfoNCE loss value.
    """
    # Cosine similarity between all pairs in the batch
    cos_sim = torch.mm(batch_embeddings, batch_embeddings.T)  # Normalized dot product
    cos_sim /= torch.norm(batch_embeddings, dim=1).unsqueeze(1)  # L2 normalization

    # Temperature scaling
    cos_sim = cos_sim / temperature

    # Mask to avoid self-similarity
    batch_size = batch_embeddings.size(0) // 2
    self_mask = torch.eye(2 * batch_size, device=batch_embeddings.device).bool()
    cos_sim.masked_fill_(self_mask, -float('inf'))

    # Mask for positive pairs
    pos_mask = torch.zeros_like(cos_sim, dtype=torch.bool, device=batch_embeddings.device)
    for i in range(batch_size):
        pos_mask[i, i + batch_size] = True
        pos_mask[i + batch_size, i] = True

    # InfoNCE loss
    log_prob = F.log_softmax(cos_sim, dim=-1)
    loss = -log_prob[pos_mask].mean()
    return loss

# Part 5: Training SimCLR Model <a class="anchor" id="part_05"></a>

The `train_simclr_model` function is designed to train a SimCLR model using the InfoNCE loss. It allows flexibility to train the model from scratch or fine-tune a pre-trained model.

## Workflow

1. **Feature Extractor**:
   An AST feature extractor is loaded to preprocess augmented audio data into suitable input tensors.

2. **Training Loop**:
   - The model is trained over the specified number of epochs.
   - For each batch in the training dataset:
     - Data augmentation is applied to generate two augmented versions of the audio input.
     - These augmented samples are processed using the feature extractor.
     - The model generates embeddings for both versions.
     - The embeddings are concatenated, and the InfoNCE loss is calculated.

3. **Validation**:
   - After each epoch, the model's performance is evaluated on the validation dataset.
   - Validation loss is computed similarly to the training process.

4. **Model Saving**:
   After training is completed, the model's weights are saved to the specified file.

## Outputs
The function returns a dictionary containing:
- **`train_losses`**: List of training losses for each epoch.
- **`val_losses`**: List of validation losses for each epoch.

## Example Usage
```python
results = train_simclr_model(
    epochs=10,
    lr=3e-5,
    save_model_name="simclr_trained_model.pth",
    train_from_scratch=True
)
```

In [None]:
def train_simclr_model(epochs, lr, save_model_name = "simclr_trained_model.pth", train_from_scratch = True):
    """
    Trains the SimCLR model using the InfoNCE loss.

    Args:
        epochs (int): Number of training epochs.
        lr (float): Learning rate for the optimizer.
        save_model_name (str): File name for saving the trained model.
        train_from_scratch (bool): Whether to train the model from scratch or use a pre-trained base.


    Returns:
        dict: Dictionary containing training and validation losses.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Device selected:", device)

    # Initialize model
    model = SimCLRModel(
        base_model_name="MIT/ast-finetuned-audioset-10-10-0.4593",
        train_from_scratch= train_from_scratch
    ).to(device)

    optimizer = optim.Adam(model.parameters(), lr=lr)
    feature_extractor = ASTFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        model.train()
        epoch_loss = 0

        for i, batch in enumerate(train_loader):

            audio = batch.to(device)

            # Data augmentation for the entire batch
            augmented_1 = data_augmentation(audio)
            augmented_2 = data_augmentation(audio)

            # Convert to NumPy and apply feature extractor
            inputs_1 = feature_extractor(
                list(augmented_1.cpu().numpy()), sampling_rate=16000, return_tensors="pt", padding=True
            )
            inputs_2 = feature_extractor(
                list(augmented_2.cpu().numpy()), sampling_rate=16000, return_tensors="pt", padding=True
            )
            inputs_1 = {k: v.to(device) for k, v in inputs_1.items()}
            inputs_2 = {k: v.to(device) for k, v in inputs_2.items()}

            # Forward pass for both augmented versions
            projections_1 = model(inputs_1)
            projections_2 = model(inputs_2)

            # Concatenate projections of the two versions
            batch_embeddings = torch.cat([projections_1, projections_2], dim=0)

            # Calculate InfoNCE loss
            loss = info_nce_loss(batch_embeddings)

            # Optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        train_losses.append(epoch_loss)
        print(f"Epoch {epoch + 1}/{epochs}, Training Loss: {epoch_loss:.4f}")

        # Validation
        model.eval()
        val_epoch_loss = 0

        with torch.no_grad():
            for batch in val_loader:
                audio = batch.to(device)
                augmented = data_augmentation(audio)
                inputs = feature_extractor(
                    list(augmented.cpu().numpy()), sampling_rate=16000, return_tensors="pt", padding=True
                )
                inputs = {k: v.to(device) for k, v in inputs.items()}

                # Forward pass for validation
                projections = model(inputs)
                concat_projections = torch.cat([projections, projections], dim=0)
                val_epoch_loss += info_nce_loss(concat_projections).item()

        val_losses.append(val_epoch_loss)
        print(f"Epoch {epoch + 1}/{epochs}, Validation Loss: {val_epoch_loss:.4f}")

    # Save the model
    torch.save(model.state_dict(), save_model_name)
    print(f"Model saved as {save_model_name}")

    return {"train_losses": train_losses, "val_losses": val_losses}

## Training the model

In [None]:
# Call the function to train the model
results = train_simclr_model(
    epochs=10,
    lr=3e-5,
    save_model_name="ssimclr_trained_model.pth",
    train_from_scratch=True)

Device selected: cuda


# Analyzing epoch Losses

The `plot_losses` function visualizes the training and validation losses across epochs, allowing users to monitor and compare the performance of their model during training.

## Visualization Purpose
- **Compare Training vs. Validation Loss**:
  The plot provides insights into how well the model is fitting the training data and how it generalizes to validation data.
  
- **Overfitting/Underfitting Detection**:
  By analyzing the trends, users can detect overfitting (validation loss higher than training loss) or underfitting (both losses remaining high).



In [None]:
def plot_losses(train_losses, val_losses):
  """
    Plots the training and validation losses over the epochs.

    Args:
        train_losses (list): List of training losses for each epoch.
        val_losses (list): List of validation losses for each epoch.

    Returns:
        None
  """

  plt.figure(figsize=(10, 6))

  epochs = range(1, len(train_losses) + 1)

  plt.plot(epochs, train_losses, label="Train", marker='o')
  plt.plot(epochs, val_losses, label="Validation", marker='o', linestyle='--')

  plt.xlabel("Epoch")
  plt.ylabel("Loss")

  plt.title("Train and Validation losses over the epochs")

  plt.legend()
  plt.grid()
  plt.show()

## Plotting the graph

In [None]:
plot_losses(results["train_losses"], results["val_losses"])

# Part 6: Embedding Creation and Visualization <a class="anchor" id="part_06"></a>

This code compares the embeddings generated by the fine-tuned and original SimCLR models using a validation dataset. The fine-tuned model is loaded with previously trained weights, while the original model uses the default, non-fine-tuned weights.

The extract_embeddings function processes the audio data batch by batch, producing embeddings_fine_tuned from the fine-tuned model and embeddings_original from the original model. These embeddings can be used for further analysis, such as evaluating training quality or visualizing differences between the two models.



In [None]:
def preprocess_audio(audio_tensor, target_length=80000):
    """
    Preprocesses an audio tensor by converting it to mono, normalizing it, and adjusting its length.

    Args:
        audio_tensor (torch.Tensor): Input audio tensor.
        target_length (int): Desired length of the output audio tensor.

    Returns:
        torch.Tensor: Preprocessed audio tensor.
    """
    if audio_tensor.dim() > 1:
        audio_tensor = torch.mean(audio_tensor, dim=0, keepdim=True)  # Convert to mono if multi-channel

    audio_tensor = audio_tensor.squeeze(0)  # Ensure correct dimensionality
    epsilon = 1e-8
    audio_tensor = audio_tensor / (audio_tensor.abs().max() + epsilon)  # Normalize by maximum amplitude

    # Adjust the length of the audio tensor
    if audio_tensor.shape[-1] < target_length:
        padding = target_length - audio_tensor.shape[-1]
        audio_tensor = torch.nn.functional.pad(audio_tensor, (0, padding))  # Pad if shorter
    else:
        audio_tensor = audio_tensor[:target_length]  # Truncate if longer

    return audio_tensor

def extract_embeddings(model, loader, feature_extractor):
    """
    Extracts embeddings from audio data using a given model and feature extractor.

    Args:
        model (nn.Module): Pre-trained SimCLR model.
        loader (DataLoader): DataLoader with audio batches.
        feature_extractor (ASTFeatureExtractor): Pre-trained feature extractor.

    Returns:
        np.ndarray: Numpy array containing extracted embeddings.
    """
    model.eval()  # Set the model to evaluation mode
    embeddings = []
    with torch.no_grad():
        for batch in loader:
            audio = batch.to(device)
            # Preprocess each audio sample in the batch
            processed_audio = [preprocess_audio(a) for a in audio]
            audio_np = torch.stack(processed_audio).cpu().numpy()  # Stack into a single batch and convert to NumPy
            inputs = feature_extractor(
                list(audio_np), sampling_rate=16000, return_tensors="pt", padding=True
            )
            inputs = {k: v.to(device) for k, v in inputs.items()}  # Move inputs to device
            outputs = model.encoder(**inputs)  # Forward pass through the model
            pooled_output = outputs.last_hidden_state.mean(dim=1)  # Perform global average pooling
            embeddings.append(pooled_output.cpu().numpy())  # Append embeddings to the list
    embeddings = np.concatenate(embeddings, axis=0)  # Concatenate all embeddings into a single array
    return embeddings


**If you have a saved model and want to use its pre-trained weights, run this cell.**


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
feature_extractor = ASTFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")


#Complete the path to your saved model
model_path = './drive/MyDrive/simclr_pretrained_model.pth'

# Instantiating the model (use the correct name of the pre-trained AST model if needed)
fine_tuned_model = SimCLRModel("MIT/ast-finetuned-audioset-10-10-0.4593")  # If you used a specific name for the model

try:
    model_weights = torch.load(model_path)
    print("File loaded successfully!")
except Exception as e:
    print(f"Error loading the file: {e}")

# Loading the saved weights
fine_tuned_model.load_state_dict(torch.load(model_path))

# Setting the model to evaluation mode (if you'll be using it for inference)
fine_tuned_model.eval()
fine_tuned_model = fine_tuned_model.to(device)


# Load the original model
original_model = SimCLRModel("MIT/ast-finetuned-audioset-10-10-0.4593", train_from_scratch=True)
original_model = original_model.to(device)
original_model.eval()

# Extract embeddings for the fine-tuned and original models
fine_tuned_embeddings = extract_embeddings(fine_tuned_model, val_loader, feature_extractor)
original_embeddings = extract_embeddings(original_model, val_loader, feature_extractor)


**If you want to use the trained model without uploading, run this cell.**

In [None]:
model_name="simclr_trained_model.pth"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
feature_extractor = ASTFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

# Load the fine-tuned model
fine_tuned_model = SimCLRModel("MIT/ast-finetuned-audioset-10-10-0.4593", train_from_scratch=False)
fine_tuned_model.load_state_dict(torch.load(model_name))
fine_tuned_model = fine_tuned_model.to(device)
fine_tuned_model.eval()

# Load the original model
original_model = SimCLRModel("MIT/ast-finetuned-audioset-10-10-0.4593", train_from_scratch=True)
original_model = original_model.to(device)
original_model.eval()

# Extract embeddings for the fine-tuned and original models
fine_tuned_embeddings = extract_embeddings(fine_tuned_model, val_loader, feature_extractor)
original_embeddings = extract_embeddings(original_model, val_loader, feature_extractor)

## Embedding Visualization

This code visualizes the embeddings generated by both fine-tuned and original SimCLR models using t-SNE for dimensionality reduction. The embeddings from both models are combined, standardized, and then reduced to 2D for easier visualization.

Each data point (embedding) is colored based on its genre, allowing for a clearer comparison between the embeddings from the fine-tuned and original models.

The code also includes a function to map each genre to a unique color, and then it plots the embeddings for both models in side-by-side subplots, with a legend indicating the genres. The final plot is displayed with adjusted layout for clarity.

In [None]:
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

def apply_tsne(fine_tuned_embeddings, original_embeddings, perplexity=10, random_state=42):
    """
    Applies t-SNE for dimensionality reduction on the combined embeddings of both models.

    Parameters:
    - fine_tuned_embeddings: Embeddings generated by the fine-tuned model.
    - original_embeddings: Embeddings generated by the original model.
    - perplexity: The perplexity parameter for t-SNE.
    - random_state: Random seed for reproducibility.

    Returns:
    - embeddings_fine_tuned_2d: 2D embeddings for the fine-tuned model.
    - embeddings_original_2d: 2D embeddings for the original model.
    """
    combined_embeddings = np.concatenate([fine_tuned_embeddings, original_embeddings], axis=0)

    scaler = StandardScaler()
    combined_embeddings = scaler.fit_transform(combined_embeddings)

    # Apply t-SNE for dimensionality reduction (from high-dimensional to 2D)
    tsne_2d = TSNE(n_components=2, perplexity=perplexity, random_state=random_state)
    embeddings_2d = tsne_2d.fit_transform(combined_embeddings)

    # Split the reduced 2D embeddings back into fine-tuned and original sets
    embeddings_fine_tuned_2d = embeddings_2d[:len(fine_tuned_embeddings)]
    embeddings_original_2d = embeddings_2d[len(fine_tuned_embeddings):]

    return embeddings_fine_tuned_2d, embeddings_original_2d


def plot_embeddings(fine_tuned_embeddings_2d, original_embeddings_2d, fine_tuned_genres, original_genres):
    """
    Plots the 2D embeddings of both fine-tuned and original models with colors based on genres.

    Parameters:
    - fine_tuned_embeddings_2d: 2D embeddings for the fine-tuned model.
    - original_embeddings_2d: 2D embeddings for the original model.
    - fine_tuned_genres: List of genres for the fine-tuned model.
    - original_genres: List of genres for the original model.
    """
    # Map genres to unique colors for visualization
    all_genres = fine_tuned_genres + original_genres
    unique_genres = sorted(set(all_genres))
    genre_to_color = {genre: i for i, genre in enumerate(unique_genres)}
    num_genres = len(unique_genres)
    colors = plt.cm.get_cmap('tab10', num_genres)

    # Function to map genres to corresponding colors
    def get_colors(genres):
        return [colors(genre_to_color[genre]) for genre in genres]

    fine_tuned_colors = get_colors(fine_tuned_genres)
    original_colors = get_colors(original_genres)

    plt.figure(figsize=(16, 8))  # Adjust figure size

    # Plot fine-tuned embeddings
    plt.subplot(1, 2, 1)
    for genre in unique_genres:
        genre_indices = [i for i, g in enumerate(fine_tuned_genres) if g == genre]
        plt.scatter(
            fine_tuned_embeddings_2d[genre_indices, 0],
            fine_tuned_embeddings_2d[genre_indices, 1],
            label=genre,
            alpha=0.7
        )
    plt.title('Trained Embeddings')
    plt.xlabel('Dimension 1')
    plt.ylabel('Dimension 2')

    # Plot original embeddings
    plt.subplot(1, 2, 2)
    for genre in unique_genres:
        genre_indices = [i for i, g in enumerate(original_genres) if g == genre]
        plt.scatter(
            original_embeddings_2d[genre_indices, 0],
            original_embeddings_2d[genre_indices, 1],
            label=genre,
            alpha=0.7
        )
    plt.title('Untrained Embeddings')
    plt.xlabel('Dimension 1')
    plt.ylabel('Dimension 2')
    plt.legend(title='Genres', bbox_to_anchor=(1.05, 0.5), loc='center left', borderaxespad=0.)  # Adjust legend position

    # Final adjustments for better layout and show the plot
    plt.tight_layout()
    plt.show()


In [None]:
fine_tuned_embeddings_2d, original_embeddings_2d = apply_tsne(fine_tuned_embeddings, original_embeddings)

fine_tuned_genres = [val_dataset.get_genre(idx) for idx in range(len(fine_tuned_embeddings))]
original_genres = [val_dataset.get_genre(idx) for idx in range(len(original_embeddings))]

plot_embeddings(fine_tuned_embeddings_2d, original_embeddings_2d, fine_tuned_genres, original_genres)

# Part 7: Recommendation system <a class="anchor" id="part_07"></a>

This section of the code implements a music recommendation system by comparing embeddings generated from fine-tuned and original models. The `recommend_song` function calculates similarities between a query embedding and a dataset of embeddings using either cosine similarity or Euclidean distance. It returns the top closest matches along with their similarity or distance scores. The `display_audio` function allows the playback of audio samples directly in the notebook for user interaction, enhancing the visualization of results.

To demonstrate the system, an audio sample is selected from the validation dataset, and its embedding is computed using the fine-tuned model. Recommendations are then generated based on both fine-tuned and original embeddings, showing how model tuning affects the similarity scores and quality of suggestions. Results for each recommendation include the Music ID, Genre, and similarity/distance score, along with the audio playback of both the queried song and recommended tracks, allowing for a direct and intuitive comparison.


In [None]:
def recommend_song(query_embedding, embeddings, top_k=1, metric='cosine'):
    """
    Recommend songs based on the closest embeddings.

    Parameters:
    - query_embedding: The embedding of the query song.
    - embeddings: The embeddings of the dataset.
    - top_k: Number of recommendations to return.
    - metric: Similarity metric ('euclidean' or 'cosine').

    Returns:
    - List of tuples (index, distance/similarity) for the top_k nearest songs.
    """
    if metric == 'cosine':
        query_embedding = query_embedding / np.linalg.norm(query_embedding)
        embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
        distances = np.dot(embeddings, query_embedding.T)
        nearest_indices = np.argsort(-distances)[:top_k]
        return [(index, distances[index]) for index in nearest_indices]
    elif metric == 'euclidean':
        distances = np.linalg.norm(embeddings - query_embedding, axis=1)
        nearest_indices = np.argsort(distances)[:top_k]
        return [(index, distances[index]) for index in nearest_indices]

from IPython.display import Audio, display

def display_audio(audio_tensor, sample_rate=16000):
    """
    Display an audio player in the notebook.

    Parameters:
    - audio_tensor: The waveform tensor to play.
    - sample_rate: Sampling rate of the audio.
    """
    if torch.is_tensor(audio_tensor):
        audio_np = audio_tensor.cpu().numpy()
    else:
        audio_np = audio_tensor
    display(Audio(audio_np, rate=sample_rate))

In [None]:
def recommendation_systems(
    test_index, top_k_fine_tuned=2, top_k_original=1
):
    """
    Tests and compares recommendation systems using fine-tuned and original embeddings by leveraging a K-Nearest Neighbors (KNN) approach to identify and rank the most similar audio tracks.

    Args:
        test_index (int): Index of the test audio in the validation dataset.
        top_k_fine_tuned (int): Number of recommendations to return using fine-tuned embeddings.
        top_k_original (int): Number of recommendations to return using original embeddings.
    """
    # Select the test audio
    test_audio = val_dataset[test_index]

    # Extract embedding for the test audio
    test_embedding = extract_embeddings(
        fine_tuned_model,
        DataLoader([test_audio], batch_size=1),
        feature_extractor
    )

    # Recommendations using fine-tuned embeddings
    recommendations_fine_tuned = recommend_song(
        test_embedding[0], fine_tuned_embeddings, top_k=top_k_fine_tuned
    )

    # Recommendations using original embeddings
    recommendations_original = recommend_song(
        test_embedding[0], original_embeddings, top_k=top_k_original
    )

    # Display results
    print("Queried Audio:")
    print(f"Music ID: {test_index}, Genre: {val_dataset.get_genre(test_index)}")
    display_audio(test_audio.squeeze(0))

    print("\nRecommendations (Fine-Tuned Embeddings):")
    for rec_idx, distance in recommendations_fine_tuned:
        if rec_idx == test_index:
            continue
        print(f"Music ID: {rec_idx}, Genre: {val_dataset.get_genre(rec_idx)}, Cosine Similarity: {distance:.4f}")
        rec_audio = val_dataset[rec_idx]
        display_audio(rec_audio)

    print("\nRecommendations (Original Embeddings):")
    for rec_idx, distance in recommendations_original:
        print(f"Music ID: {rec_idx}, Genre: {val_dataset.get_genre(rec_idx)}, Cosine Similarity: {distance:.4f}")
        rec_audio = val_dataset[rec_idx]
        display_audio(rec_audio)

Test our recommendation system with a music index between 0 and len(val_dataset) - 1.








In [None]:
recommendation_systems(test_index= 0)

# Part 8: Qualitative Tests and Results <a class="anchor" id="part_08"></a>

In this section, we aim to evaluate the performance of our music recommendation system by conducting **50 tests** ( or len(val_dataset) ) using random samples from the validation dataset. Each recommendation is assessed and categorized into one of the following levels of satisfaction:

1. **Unsatisfactory**: The recommendations do not align with the queried audio or genre.
2. **Partially Satisfactory**: The recommendations partially match the queried audio or genre but lack precision.
3. **Satisfactory**: The recommendations closely match the queried audio or genre.

The evaluation is conducted manually, with human judgment serving as the benchmark for categorization. This approach ensures qualitative insights into the system's performance.


In [None]:
import random

num_val = len(val_dataset)
max_iterations = min(50, num_val)
random_indices = random.sample(range(num_val), max_iterations)

for i, test_index in enumerate(random_indices):
    print(f"\n----------------- Test {i + 1} ----------------------")
    recommendation_systems(test_index=test_index)

# Results

Based on the qualitative analysis conducted by humans on the **50 evaluated music tracks**, we obtained the following final results:

- **Satisfactory**: 23 tracks (46% of the total)
- **Partially Satisfactory**: 15 tracks (30% of the total)
- **Unsatisfactory**: 17 tracks (34% of the total)

Comparing with the original model without training, we obtained the following results:

- Satisfactory: 5 tracks (10% of the total)
- Partially Satisfactory: 10 tracks (20% of the total)
- Unsatisfactory: 35 tracks (70% of the total)

These results clearly demonstrate that the model has made significant progress. Although the analysis remains subjective and there is still room for improvement, the performance is notably better than what we would expect from a model trained without labeled data. The increase in "Satisfactory" and "Partially Satisfactory" tracks, coupled with the decrease in "Unsatisfactory" tracks, indicates that the model is on the right learning path. This suggests that, with further fine-tuning and refinement, the model could continue to improve and reach even better performance.