## Music genre classification

In [25]:
import librosa
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import os

In [26]:
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from segment import OUTPUT_FOLDER

In [27]:
import multiprocessing

In [28]:
class DataExtractor:
    def __init__(self, n_mfcc = 20):
        self.n_mfcc = n_mfcc

    def load_data(self, filename, genre):
        self.genre = genre

        self.y, self.sr = librosa.load(filename)
        self.feature_extract()

    def feature_extract(self):
        # Tempo information
        self.tempo = librosa.feature.tempo(y=self.y, sr=self.sr).round()

        # Separate harmonic and percussive components, Tonnetz
        self.y_harmonic, self.y_percussive = librosa.effects.hpss(self.y)
        self.tonnetz = librosa.feature.tonnetz(y=self.y, sr=self.sr)

        # Mathematical features
        features_list = {}

        features_list['tempo'] = [self.tempo.min(), self.tempo.mean(), self.tempo.max(), self.tempo.var()]
        features_list['y_harmoic'] = [self.y_harmonic.min(), self.y_harmonic.mean(), self.y_harmonic.max(), self.y_harmonic.var()]
        features_list['y_percussive'] = [self.y_percussive.min(), self.y_percussive.mean(), self.y_percussive.max(), self.y_percussive.var()]
        features_list['tonnetz'] = [self.tonnetz.min(), self.tonnetz.mean(), self.tonnetz.max(), self.tonnetz.var()]

        # Other Sound features
        cstft=librosa.feature.chroma_stft(y=self.y, sr=self.sr)
        features_list['cstft'] = [cstft.min(), cstft.mean(), cstft.max(), cstft.var()]

        srms=librosa.feature.rms(y=self.y)
        features_list['srms'] = [srms.min(), srms.mean(), srms.max(), srms.var()]

        specband=librosa.feature.spectral_bandwidth(y=self.y, sr=self.sr)
        features_list['specband'] = [specband.min(), specband.mean(), specband.max(), specband.var()]

        speccent=librosa.feature.spectral_centroid(y=self.y, sr=self.sr)
        features_list['speccent'] = [speccent.min(), speccent.mean(), speccent.max(), speccent.var()]

        rolloff = librosa.feature.spectral_rolloff(y=self.y, sr=self.sr)
        features_list['rolloff'] = [rolloff.min(), rolloff.mean(), rolloff.max(), rolloff.var()]

        zero_crossing_rate = librosa.feature.zero_crossing_rate(y=self.y)
        features_list['zero_crossing_rate'] = [zero_crossing_rate.min(), zero_crossing_rate.mean(), zero_crossing_rate.max(), zero_crossing_rate.var()]

        mfcc = librosa.feature.mfcc(y=self.y, sr=self.sr, n_mfcc= self.n_mfcc)
        for i in range(self.n_mfcc):
            features_list[f'mfcc_{i}'] = [mfcc[i].min(), mfcc[i].mean(), mfcc[i].max(), mfcc[i].var()]

        self.features_df = pd.DataFrame(features_list).transpose()
        self.features_df.columns = ['min', 'mean', 'max', 'var']

    def get_data(self, data_print = False):
        # print the data  
        if(data_print):
            self.print_features()

        # Get the dataframe with label
        np_data = self.features_df.to_numpy()
        np_data = np_data.reshape((1, 120))

        return (np_data, self.genre)

    def print_features(self):
        print(f"Genre: {self.genre}")
        print(f"Tempo: {self.tempo}")

        print(self.features_df)

    def plot_waveform(self):
        # Plot
        plt.figure(figsize=(14, 5))
        librosa.display.waveshow(y= self.y, sr=self.sr)
        plt.title("Waveform")
        plt.show()

    def plot_tonnetz(self):
        pass

    def plot_Harmonic_Percussion(self):
        # Plot
        fig = plt.figure(figsize=(14, 5))
        ax1 = fig.subplots() #Creates the Axis
        ax2 = ax1.twinx()    #Creates twin axis

        librosa.display.waveshow(self.y_harmonic, sr=self.sr, color='r', ax= ax1)
        librosa.display.waveshow(self.y_percussive, sr=self.sr, color='b', ax = ax2)
        plt.title("Harmonic and Percussive Component")
        plt.show()

In [29]:
DataExtrc = DataExtractor()

DataExtrc.load_data(r'data\bengal\bengali1_fullsong.mp3', 'bengali')
DataExtrc.print_features()

Genre: bengali
Tempo: [99.]
                           min         mean           max           var
tempo                99.000000    99.000000     99.000000  0.000000e+00
y_harmoic            -1.001796    -0.000002      0.995477  2.023340e-02
y_percussive         -1.037359    -0.000128      1.043433  9.070307e-03
tonnetz              -0.653017    -0.012017      0.653477  2.396049e-02
cstft                 0.000000     0.324342      1.000000  9.598421e-02
srms                  0.000000     0.152839      0.568597  1.512722e-02
specband              0.000000  2522.219075   3843.048791  2.782880e+05
speccent              0.000000  2339.426118   5839.908943  8.044558e+05
rolloff               0.000000  5035.855931  10271.337891  4.493367e+06
zero_crossing_rate    0.000000     0.098357      0.450684  3.068565e-03
mfcc_0             -461.054596  -134.229691     49.051128  8.494335e+03
mfcc_1              -59.865063    82.586563    195.069305  1.313935e+03
mfcc_2              -72.737030    -3

In [30]:
# class DataExtractor:
#     # Multiprocessing
#     lock = multiprocessing.Lock()

#     # Librosa predefined features
#     feature_functions = {}
#     feature_functions['harmonic'] = librosa.effects.harmonic
#     feature_functions['percussive'] = librosa.effects.percussive

#     feature_functions['tempo'] = librosa.feature.chroma_stft
#     feature_functions['tonnnetz'] = librosa.feature.tonnetz
#     feature_functions['cstft'] = librosa.feature.chroma_stft
#     feature_functions['rms'] = librosa.feature.rms
#     feature_functions['spec_band'] = librosa.feature.spectral_bandwidth
#     feature_functions['spec_cent'] = librosa.feature.spectral_centroid
#     feature_functions['spec_rolloff'] = librosa.feature.spectral_rolloff
#     feature_functions['zero_cr'] = librosa.feature.zero_crossing_rate
#     feature_functions['mfcc'] = librosa.feature.mfcc

#     def __init__(self, n_mfcc = 20):
#         self.n_mfcc = n_mfcc

#     def load_data(self, filename, genre):
#         self.genre = genre

#         self.y, self.sr = librosa.load(filename)
#         self.feature_extract()

#     def feature_extract(self):
#         processes = []

#         # Mathematical features
#         features_list = {}
#         for key in list(self.feature_functions.keys()):
#             p = multiprocessing.Process(target= self.parallel_extract, args= (key, features_list))
            
#             processes.append(p)
#             p.start()

#         for p in processes:
#             p.join()

#         print(features_list)
#         # self.features_df = pd.DataFrame(features_list).transpose()
#         # self.features_df.columns = ['min', 'mean', 'max', 'var']

#     # For features
#     # Special case for mfcc
#     def parallel_extract(self, feature_name, features_list):
#         feature_data = self.feature_functions[feature_name](self.y, self.sr)

#         # store in same features using lock
#         with self.lock():
#             if(feature_name != 'mfcc'):
#                 features_list[feature_name] = [feature_data.min(), feature_data.mean(), 
#                                                feature_data.max(), feature_data.var()]
#             else:
#                 print('mfcc time')


#     def get_data(self, data_print = False):
#         # print the data  
#         if(data_print):
#             self.print_features()

#         # Get the dataframe with label
#         np_data = self.features_df.to_numpy()
#         np_data = np_data.reshape((1, 120))

#         return (np_data, self.genre)

#     def print_features(self):
#         print(f"Genre: {self.genre}")
#         print(f"Tempo: {self.tempo}")

#         print(self.features_df)

#     def plot_waveform(self):
#         # Plot
#         plt.figure(figsize=(14, 5))
#         librosa.display.waveshow(y= self.y, sr=self.sr)
#         plt.title("Waveform")
#         plt.show()

#     def plot_tonnetz(self):
#         pass

#     def plot_Harmonic_Percussion(self):
#         # Plot
#         fig = plt.figure(figsize=(14, 5))
#         ax1 = fig.subplots() #Creates the Axis
#         ax2 = ax1.twinx()    #Creates twin axis


#         librosa.display.waveshow(y_harmonic, sr=self.sr, color='r', ax= ax1)
#         librosa.display.waveshow(y_percussive, sr=self.sr, color='b', ax = ax2)
#         plt.title("Harmonic and Percussive Component")
#         plt.show()

In [31]:
# DataExtrc = DataExtractor()

# DataExtrc.load_data(r'data\bengal\bengali1_fullsong.mp3', 'bengali')

In [32]:
np_data, genre = DataExtrc.get_data()
print(np_data.shape)

(1, 120)


In [33]:
datapath= OUTPUT_FOLDER
genres, num_genre = None, 0

try:
    genres= sorted(os.listdir(datapath))
    num_genre = len(genres)
    print(num_genre)
except:
    print("No folder found")
    print("Run segment.py")

2


In [34]:
# Torch
MY_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(MY_DEVICE)

SAVE_MODEL = False

D_TYPE = torch.float

# Training Parameters
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 0.001

EPOCHS = 10
BATCH_SIZE = 2

RAND_SHUFFLE = False

cpu


In [35]:
class genreDataSet(Dataset):
    n_mfcc = 20

    def __init__(self, dataPath):
        super().__init__()

        self.data_extractor = DataExtractor(self.n_mfcc)
        self.label = []
        self.X = []

        self.genres = sorted(os.listdir(datapath))
        self.num_genres = len(self.genres)

        for genre in genres:
            genre_path = os.path.join(datapath, genre)
            # print("Genre:",genre)
            for _,_,files in os.walk(genre_path):
                for file in files:
                    # print(file)
                    
                    self.data_extractor.load_data(dataPath + '/' + genre + '/' + file, genre)
                    x, x_label = self.data_extractor.get_data()

                    self.label.append(x_label)
                    self.X.append(x)

        self.length = len(self.label)

    def __len__(self):
        return self.length
    
    def __getitem__(self, index):
        label = self.label[index]
        X = self.X[index]

        one_hot = torch.zeros(self.num_genres, dtype= D_TYPE)
        if label in self.genres:
            hot_index = self.genres.index(label)
        one_hot[hot_index] = 1

        return (
            torch.tensor(X, dtype= D_TYPE),
            one_hot
        )
    
genre_loader = DataLoader(genreDataSet(datapath), 
                          batch_size= BATCH_SIZE, shuffle= RAND_SHUFFLE,
                          
                          # experimental for gpu
                          # pin_memory= True
                        )

In [36]:
###############################
# Model parameters

IN_DIM = 120
EMBED_DIM = 64

###############################
class GenreClassifier(nn.Module):
    name = "GenreClassifier.pth"

    base_dim_med = 64
    base_dim_large = 128

    num_heads = 4
    drop_out = 0.1

    def load_model(self):
        self.load_state_dict(torch.load(self.name, weights_only= True))

    def __init__(self, indim, embed_dim, outdim):
        super().__init__()

        self.indim = indim
        self.embed_dim = embed_dim
        self.outdim = outdim

        self.transform_layer = nn.Sequential(
             # Transform pass
             nn.Linear(self.indim, self.embed_dim),
             nn.LayerNorm(self.embed_dim)
        )

        self.self_attention_layer = nn.MultiheadAttention(self.embed_dim, self.num_heads, batch_first= True)

        self.base_layers= nn.Sequential(
            nn.LayerNorm(self.embed_dim),
            nn.Dropout(self.drop_out),
            
            # Deep pass 1
            nn.Linear(self.embed_dim, self.base_dim_large),
            nn.Dropout(self.drop_out),
            nn.ReLU(self.base_dim_large),
            
            # Deep Pass 2
            nn.Linear(self.base_dim_large, self.base_dim_med),
            nn.Dropout(self.drop_out),
            nn.ReLU(self.base_dim_med),

            # Final Pass
            nn.Linear(self.base_dim_med, self.outdim),
            nn.Softmax(dim= -1)
        )

    # For debug
    '''def forward(self, x):
        print(x.shape)
        
        for layer in self.transform_layer:
            x = layer(x)
            print(x.shape, layer.type)

        x_attn, _ = self.self_attention_layer(x, x, x)
        print(x_attn.shape, self.self_attention_layer.type)

        for layer in self.base_layers:
            x = layer(x)
            print(x.shape, layer.type)

        return x'''

    def forward(self, x):
        x = self.transform_layer(x)

        x_attn, _ = self.self_attention_layer(x, x, x)

        # Self Normalisation (for experiments)
        # x_attn = x_attn + x

        return self.base_layers(x_attn)
    
model = GenreClassifier(IN_DIM, EMBED_DIM, num_genre)

In [37]:
output = model(torch.tensor(np_data, dtype= D_TYPE))
print(output)

tensor([[0.5372, 0.4628]], grad_fn=<SoftmaxBackward0>)


In [38]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr= LEARNING_RATE, weight_decay= WEIGHT_DECAY)

In [39]:
# Training function
def train_model(model: GenreClassifier, dataloader: DataLoader, num_epochs):
    model.to(MY_DEVICE)
    model.train()  # Set model to train mode

    least_loss = torch.inf
    for epoch in range(num_epochs):
        running_loss = 0.0
        for (genre_data, genre) in dataloader:

            # Send to Device
            Xs = genre_data.squeeze(1).to(MY_DEVICE)
            labels = genre.squeeze(1).to(MY_DEVICE)

            # Forward pass
            outputs = model(Xs)
            
            # print(outputs, type(outputs))
            # print(labels, type(labels))
            # print(outputs.shape, labels.shape)

            # Loss
            loss = criterion(outputs, labels)
            # print(loss)

            # Zero the gradients
            optimizer.zero_grad()

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        # Print loss at the end of epoch
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader)}")

        # Save model at the end of each epoch (best only)
        if(running_loss <= least_loss and SAVE_MODEL):
            least_loss = running_loss
            torch.save(model.state_dict(), model.name)
            print(f"Model saved after epoch {epoch+1}")

# Train the model
train_model(model, genre_loader, num_epochs= EPOCHS)

Epoch [1/10], Loss: 0.6965186297893524
Epoch [2/10], Loss: 0.7266868054866791
Epoch [3/10], Loss: 0.6118471920490265
Epoch [4/10], Loss: 0.5707031786441803
Epoch [5/10], Loss: 0.5687529891729355
Epoch [6/10], Loss: 0.5750177502632141
Epoch [7/10], Loss: 0.5689035654067993
Epoch [8/10], Loss: 0.5701285004615784
Epoch [9/10], Loss: 0.5610448569059372
Epoch [10/10], Loss: 0.5720008909702301


In [40]:
test_loader = DataLoader(genreDataSet(datapath), batch_size= 1)
model = GenreClassifier(IN_DIM, EMBED_DIM, num_genre)

In [41]:
# Test function
def test_model(model: GenreClassifier, dataloader: DataLoader):
    model.to(MY_DEVICE)
    model.eval()  

    for (genre_data, genre) in dataloader:
        # Send to Device
        Xs = genre_data.squeeze(1).to(MY_DEVICE)
        labels = genre.squeeze(1).to(MY_DEVICE)

        # Forward pass
        outputs = model(Xs)

        # Loss
        loss = criterion(outputs, labels)

        pred = outputs.argmax(dim=-1).item()
        actual = labels.argmax(dim=-1).item()

        print(f"model: {genres[pred]}, label: {genres[actual]}")
        print(loss)

# load weights if exists
if(os.path.exists(model.name)):
    model.load_model()

test_model(model, test_loader)

model: bengal, label: bengal
tensor(0.6661, grad_fn=<DivBackward1>)
model: bengal, label: bengal
tensor(0.6652, grad_fn=<DivBackward1>)
model: bengal, label: bengal
tensor(0.6659, grad_fn=<DivBackward1>)
model: bengal, label: tamilnadu
tensor(0.7217, grad_fn=<DivBackward1>)


In [42]:
# from transformers import AutoProcessor, MusicgenForConditionalGeneration
# from transformers import Trainer, TrainingArguments

# from peft import LoraConfig, get_peft_model

# model_id = "facebook/musicgen-small"
# mydevice = "cpu"

In [43]:
# Processor
# processor = AutoProcessor.from_pretrained(model_id)

# Model
# model = MusicgenForConditionalGeneration.from_pretrained(model_id)

# print(f"Memory footprint: {model.get_memory_footprint() / 1e6:.2f} MB")
# print(model)