In [1]:
# import os
# data_dir = "/home/thomasyim/.cache/kagglehub/datasets/andradaolteanu/"\
#     + "gtzan-dataset-music-genre-classification/versions/1"

# genre_dirs = os.listdir(f"{data_dir}/Data/genres_original/")

# filenames = []
# genres = []

# for genre in genre_dirs:
#     for file in os.listdir(f"{data_dir}/Data/genres_original/{genre}"):
#         filenames.append(file)
#         genres.append(genre)

In [2]:
# import pandas as pd

# data = {
#     "filename": filenames,
#     "genres": genres
# }

# df = pd.DataFrame(data)
# df = df.sample(frac = 1)
# df.head

In [3]:
# from sklearn.model_selection import train_test_split

# train, test = train_test_split(df, test_size=0.2)

In [4]:
# train.to_csv("classification_train.csv")
# len(train)

In [5]:
# test.to_csv("classification_test.csv")
# len(test)

In [6]:
data_dir = "/home/thomasyim/.cache/kagglehub/datasets/andradaolteanu/"\
    + "gtzan-dataset-music-genre-classification/versions/1"


In [25]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from transformers import AutoModel, Wav2Vec2FeatureExtractor, T5Tokenizer, T5ForConditionalGeneration, AutoProcessor
from transformers import Wav2Vec2Processor, Wav2Vec2Model
import torchaudio.transforms as T
from datasets import load_dataset

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


# Load Pre-trained Models
# encoder_model = AutoModel.from_pretrained("m-a-p/MERT-v1-330M", trust_remote_code=True)
# processor = AutoProcessor.from_pretrained("m-a-p/MERT-v1-330M", trust_remote_code=True)
# final_layer_dimension = 1024

processor = AutoProcessor.from_pretrained("laion/larger_clap_music")
encoder_model = AutoModel.from_pretrained("laion/larger_clap_music")
final_layer_dimension = 512

# processor = Wav2Vec2FeatureExtractor.from_pretrained("facebook/wav2vec2-base")
# encoder_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
# final_layer_dimension = 768

# Freeze MERT encoder
# for param in encoder_model.parameters():
#     param.requires_grad = False

Using device: cuda


In [26]:
genres = ["blues", "classical", "country", "disco", "hiphop", "jazz", "metal", "pop", "reggae", "rock"]
genre_to_idx = {genre: idx for idx, genre in enumerate(genres)}
idx_to_genre = {idx: genre for idx, genre in enumerate(genres)}

In [27]:
genre_to_idx

{'blues': 0,
 'classical': 1,
 'country': 2,
 'disco': 3,
 'hiphop': 4,
 'jazz': 5,
 'metal': 6,
 'pop': 7,
 'reggae': 8,
 'rock': 9}

In [28]:
def process_audio(filename, processor):
    waveform, sample_rate = torchaudio.load(filename)
    
    if waveform.size(0) > 1:
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    # print(sample_rate)

    if sample_rate != processor.sampling_rate:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=processor.sampling_rate)
        waveform = resampler(waveform)
    # print(self.resample_rate)
#     waveform = torch.nn.functional.pad(waveform, (0, 30 * processor_sample_rate - len(waveform[0])), mode="constant", value=0)
#     print(waveform.shape)

    waveform = waveform[:,:3 * processor.sampling_rate]
    audio_input = processor(waveform.squeeze().numpy(), sampling_rate=processor.sampling_rate, return_tensors="pt")
    return audio_input

def process_audio_clap(filename, processor):
    waveform, sample_rate = torchaudio.load(filename)
    
    if waveform.size(0) > 1:
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    # print(sample_rate)

    if sample_rate != processor.feature_extractor.sampling_rate:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=processor.feature_extractor.sampling_rate)
        waveform = resampler(waveform)
    # print(self.resample_rate)
#     waveform = torch.nn.functional.pad(waveform, (0, 30 * processor_sample_rate - len(waveform[0])), mode="constant", value=0)
#     print(waveform.shape)

    waveform = waveform[:,:3 * processor.feature_extractor.sampling_rate]
    audio_input = processor(audios=waveform.squeeze().numpy(), sampling_rate=processor.feature_extractor.sampling_rate, return_tensors="pt")
    return audio_input

In [29]:
import torchaudio
from torch.utils.data import Dataset

class MusicDataset(Dataset):
    def __init__(self, file_paths, genres, processor):
        self.file_paths = file_paths
        self.genres = genres
        self.processor = processor

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        cur_genre = self.genres[idx]
        label = genre_to_idx[cur_genre]
        
        file_path = self.file_paths[idx]
        
        audio_input = process_audio(f"{data_dir}/Data/genres_original/{cur_genre}/{file_path}", self.processor)

        return {"audio_input": audio_input, "label": label}
    
class MusicDatasetCLAP(Dataset):
    def __init__(self, file_paths, genres, processor):
        self.file_paths = file_paths
        self.genres = genres
        self.processor = processor

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        cur_genre = self.genres[idx]
        label = genre_to_idx[cur_genre]
        
        file_path = self.file_paths[idx]
        
        audio_input = process_audio_clap(f"{data_dir}/Data/genres_original/{cur_genre}/{file_path}", self.processor)

        return {"audio_input": audio_input, "label": label}
    


In [30]:
class GenreClassificationModel(nn.Module):
    def __init__(self, encoder):
        super(GenreClassificationModel, self).__init__()
        self.encoder = encoder
        self.layer_1 = nn.Sequential(
            nn.Linear(final_layer_dimension, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512)
        )
        self.layer_2 = nn.Linear(512, 10)

    def forward(self, audio_input):
#         print(audio_input["input_values"].size())
#             outputs = self.encoder.get_audio_features(**audio_input, output_hidden_states=True)
        outputs = self.encoder(**audio_input, output_hidden_states=True).last_hidden_state
#             print(outputs.shape)
#             print("Last output hidden")
#             print(outputs.last_hidden_state.shape)
#             final_layer_hidden = outputs.last_hidden_state
        
#         print("After encoder")
#         print(final_layer_hidden.shape)
#         print(outputs.shape)
        # Average across time dimension
        time_reduced_hidden = outputs.mean(-2)
#         print("Reduced time")
#         print(time_reduced_hidden.shape)
        
        hidden = self.layer_1(time_reduced_hidden)
#         print("After hidden")
#         print(hidden.shape)
        output = self.layer_2(hidden)
        return output
    
class GenreClassificationModelCLAP(nn.Module):
    def __init__(self, encoder):
        super(GenreClassificationModelCLAP, self).__init__()
        self.encoder = encoder
        self.layer_1 = nn.Sequential(
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512)
        )
        self.layer_2 = nn.Linear(512, 10)

    def forward(self, audio_input):
        outputs = self.encoder.get_audio_features(**audio_input)
        # print(outputs.shape)
        
        hidden = self.layer_1(outputs)
#         print("After hidden")
#         print(hidden.shape)
        output = self.layer_2(hidden)
        return output

In [31]:
# Initialize Model
model = GenreClassificationModelCLAP(encoder=encoder_model).to(device)

In [32]:
def collate_fn(batch):
    audio_inputs = {key: torch.cat([item["audio_input"][key] for item in batch], dim=0) for key in batch[0]["audio_input"]}
    labels = torch.tensor([item["label"] for item in batch])
    
    return audio_inputs, labels


In [33]:
import pandas as pd
df_train = pd.read_csv("classification_train.csv")
df_test = pd.read_csv("classification_test.csv")

dataset_train = MusicDatasetCLAP(list(df_train["filename"]), list(df_train["genres"]), processor)
dataset_test = MusicDatasetCLAP(list(df_test["filename"]), list(df_test["genres"]), processor)

dataloader_train = DataLoader(dataset_train, batch_size=8, collate_fn=collate_fn)
dataloader_test = DataLoader(dataset_test, batch_size=8, collate_fn=collate_fn)

In [34]:
import numpy as np
model.eval()
with torch.no_grad():
    res = model(dataset_train[0]["audio_input"].to(device))

print(dataset_train[0]["label"])
print(np.argmax(res.cpu().numpy()))

5
8


In [35]:
# import librosa
# from IPython.display import Audio

# # Load and preprocess the audio file
# file_path = "/home/thomasyim/.cache/kagglehub/datasets/andradaolteanu/gtzan-dataset-music-genre-classification/versions/1/Data/genres_original/jazz/jazz.00054.wav"
# y, sr = librosa.load(file_path, sr=24000)  # Resample to 16 kHz

# # Play the audio
# Audio(data=y, rate=sr)


In [36]:
import torch.nn as nn
import torch.optim as optim

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()  # For multi-class classification
optimizer = optim.Adam(model.parameters(), lr=1e-5)

In [37]:
import torch
from tqdm import tqdm

def train(model, dataloader, criterion, optimizer, epochs=10, device="cuda"):
    model.to(device)
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        correct = 0
        total = 0

        for batch in tqdm(dataloader):
            audio_inputs, labels = batch
            audio_inputs = {key: val.squeeze(0).to(device) for key, val in audio_inputs.items()}
            labels = labels.to(device)

            # Forward pass
            optimizer.zero_grad()
            outputs = model(audio_inputs)
            
            # Calculate loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Calculate accuracy
            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
#             print(f"Epoch {epoch + 1}/{epochs},  Current Loss: {loss:.4f}")

        # Epoch results
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss:.4f}, Accuracy: {correct / total:.4f}")


In [38]:
train(model, dataloader_train, criterion, optimizer, epochs=10, device=device)

100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:25<00:00,  1.17it/s]


Epoch 1/10, Loss: 207.0818, Accuracy: 0.2675


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:37<00:00,  1.02it/s]


Epoch 2/10, Loss: 189.2716, Accuracy: 0.3337


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [02:39<00:00,  1.60s/it]


Epoch 3/10, Loss: 177.0466, Accuracy: 0.3650


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [02:39<00:00,  1.60s/it]


Epoch 4/10, Loss: 165.9648, Accuracy: 0.4113


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [02:39<00:00,  1.60s/it]


Epoch 5/10, Loss: 156.0376, Accuracy: 0.4650


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [02:06<00:00,  1.27s/it]


Epoch 6/10, Loss: 144.2216, Accuracy: 0.5162


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:26<00:00,  1.15it/s]


Epoch 7/10, Loss: 133.5374, Accuracy: 0.5463


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:25<00:00,  1.17it/s]


Epoch 8/10, Loss: 122.3390, Accuracy: 0.5913


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:27<00:00,  1.15it/s]


Epoch 9/10, Loss: 111.9715, Accuracy: 0.6650


100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:26<00:00,  1.16it/s]

Epoch 10/10, Loss: 99.3605, Accuracy: 0.7200





In [39]:
def evaluate(model, dataloader, criterion, device="cuda"):
    model.to(device)
    model.eval()

    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in tqdm(dataloader):
            audio_inputs, labels = batch
            audio_inputs = {key: val.squeeze(0).to(device) for key, val in audio_inputs.items()}
            labels = labels.to(device)

            outputs = model(audio_inputs)
            
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    print(f"Validation Loss: {total_loss:.4f}, Accuracy: {correct / total:.4f}")


In [40]:
evaluate(model, dataloader_train, criterion, device=device)

100%|████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:06<00:00,  1.50it/s]

Validation Loss: 105.2952, Accuracy: 0.6675





In [41]:
evaluate(model, dataloader_test, criterion, device=device)

100%|██████████████████████████████████████████████████████████████████████████████████████████| 25/25 [00:16<00:00,  1.53it/s]

Validation Loss: 40.5602, Accuracy: 0.4100





In [42]:
torch.save(model.state_dict(), 'clap_unfrozen.pth')