In [25]:
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pickle
import pretty_midi
import librosa
import librosa.display
import gc
from sklearn.preprocessing import StandardScaler
import warnings
from collections import Counter



import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset


from sklearn.preprocessing import StandardScaler

from Preprocessing import *
from ExtractGenre import *

import DatasetLoader as DL

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
InputPath = os.path.realpath('YAMF/genres_original')

GenreMapping = {'metal': 0, 'disco': 1, 'classical': 2, 'hiphop': 3, 'jazz': 4,
          'country': 5, 'pop': 6, 'blues': 7, 'reggae': 8, 'rock': 9}

In [9]:
def NormalizeSpectrogram(X):
   mean = np.mean(X)
   std = np.std(X)
   return (X - mean) / (std + 1e-6)


def DataCNN(InputPath = os.path.realpath('YAMF/genres_original'), length = 256):

   numErr = 0

   TrainDataList, ValDataList, DataList = [], [], []
   for dir in tqdm(os.listdir(InputPath)):
      
      DirPath = os.path.join(InputPath, dir)

      if not os.path.isdir(DirPath):
         continue

      genre = GenreMapping[dir]

      trainSong = 0
      for song in os.listdir(DirPath):
         warnings.filterwarnings('ignore')

         trainSong += 1
         SongPath = os.path.join(DirPath, song)

         #Train data
         if trainSong <= 80:
            try:
               y, sr = librosa.load(SongPath, sr=22050)
               mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
               S_db = librosa.amplitude_to_db(mel_spec, ref=np.max)
            except:
               numErr += 1
               continue 

            for _ in range(20):

               rIDX = np.random.randint(0, np.shape(S_db)[1] - length)
               indexs = np.arange(rIDX, rIDX + length)

               X = S_db[:, indexs]

               NormX = NormalizeSpectrogram(X)
               TrainDataList.append((NormX, genre))

         #Validation data
         elif trainSong > 80:
            try:
               y, sr = librosa.load(SongPath, sr=22050)
               mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
               S_db = librosa.amplitude_to_db(mel_spec, ref=np.max)
            except:
               numErr += 1
               continue 

            
            #SeparateSong = np.shape(S_db)[1] // length
            #for i in range(SeparateSong):
               # indexs = np.arange(i * length, i * length + length)
               # X = S_db[:, indexs]
               # NormX = NormalizeSpectrogram(X)
               # ValDataList.append((NormX, genre))
            
            for _ in range(8):

               rIDX = np.random.randint(0, np.shape(S_db)[1] - length)
               indexs = np.arange(rIDX, rIDX + length)

               X = S_db[:, indexs]

               NormX = NormalizeSpectrogram(X)
               ValDataList.append((NormX, genre))


   DataList.extend((TrainDataList, ValDataList))
   return DataList


In [None]:
#DataList = DataCNN()

100%|██████████| 11/11 [00:29<00:00,  2.72s/it]


In [None]:
# with open('YAMF/Test1.pkl', 'wb') as f:
#    pickle.dump(DataList, f)

In [5]:
from torch.utils.data import Dataset
import torch

class GenreDataset(Dataset):
    def __init__(self, path='YAMF/test.pkl', Train = True, transform=None):

        with open(path, 'rb') as f:
            TD = pickle.load(f)

        if Train:
            self.X = np.array([TD[0][i][0] for i in range(len(TD[0]))])
            self.Y = np.array([TD[0][i][1] for i in range(len(TD[0]))])

        else:
            self.X = np.array([TD[1][i][0] for i in range(len(TD[1]))])
            self.Y = np.array([TD[1][i][1] for i in range(len(TD[1]))])

        del TD
        gc.collect()

    def __len__(self):
        return len(self.Y)

    def __getitem__(self, idx):

        xTensor = self.X[idx]
        yTensor = self.Y[idx]

        xTensor = torch.tensor(xTensor, dtype=torch.float32).unsqueeze(0)
        return xTensor, torch.tensor(yTensor)

In [7]:
# trainData = GenreDataset(Train = True)
# valData = GenreDataset(Train = False)
# trainLoader = DataLoader(trainData, batch_size = 32, shuffle=True, num_workers=0)
# valLoader = DataLoader(valData, batch_size = 32, shuffle=True, num_workers=0)

In [26]:
class GenreCNN(nn.Module):
    def __init__(self, n_classes=10):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((4, 4)),  # Force 4×4 output
            nn.Flatten(),
            nn.Linear(4*4*64, 128),      # 4*4*128 = 2048
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, n_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)  # AdaptiveAvgPool2d and Flatten are inside classifier
        return x

In [9]:
# from torch.optim import Adam
# from torch.nn import CrossEntropyLoss


# from torch.optim import Adam
# from torch.nn import CrossEntropyLoss

# device = "cuda" if torch.cuda.is_available() else "cpu"
# model = GenreCNN()
# opt = Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, patience=3, factor=0.5)
# loss_fn = CrossEntropyLoss()
# model.to(device)

# print(device)

# epochs = 30

# train_losses = []
# val_losses = []
# train_accuracies = []
# val_accuracies = []

# for epoch in range(epochs):
#     print(f"Epoch: {epoch+1}")

#     # Training phase
#     model.train()
#     train_loss = 0
#     train_correct = 0
#     train_total = 0

#     for batch_x, batch_y in tqdm(trainLoader):
#         batch_x = batch_x.to(device)
#         batch_y = batch_y.to(device)

#         y_pred = model(batch_x)
#         loss = loss_fn(y_pred, batch_y)

#         opt.zero_grad()
#         loss.backward()
#         opt.step()

#         train_loss += loss.item()

#         _, predicted = torch.max(y_pred.data, 1)
#         train_total += batch_y.size(0)
#         train_correct += (predicted == batch_y).sum().item()

#     avg_train_loss = train_loss / len(trainLoader)
#     train_acc = train_correct / train_total

#     model.eval()
#     with torch.no_grad():
#         predictions = []
#         true = []
#         for batch_x, batch_y in tqdm(valLoader):
#             batch_x = batch_x.to(device)
#             batch_y = batch_y.to(device)
#             y_pred = model(batch_x)
#             predictions.append(y_pred)
#             true.append(batch_y)

#         predictions = torch.cat(predictions, axis=0)
#         true = torch.cat(true, axis=0)
#         val_loss = loss_fn(predictions, true)
#         predicted_classes = torch.argmax(predictions, dim=1)
#         val_acc = (predicted_classes == true).float().mean()

#     # Store metrics
#     train_losses.append(avg_train_loss)
#     val_losses.append(val_loss.item())
#     train_accuracies.append(train_acc)
#     val_accuracies.append(val_acc.item())

#     print(f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f}")
#     print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

### Another dataset with genre recognition

In [9]:
InputPath = os.path.realpath('clean_midi')
#GenreDict = Classifier(InputPath)

In [7]:
GenreMapping = {'metal': 0, 'disco': 1, 'classical': 2, 'hiphop': 3, 'jazz': 4,
          'country': 5, 'pop': 6, 'blues': 7, 'reggae': 8, 'rock': 9}

In [28]:
from joblib import Parallel, delayed
from collections import Counter
import os, numpy as np
import pretty_midi
import librosa
import torch
import warnings

ClassificationModel = torch.load('GenreCNN_Working.pth', map_location=torch.device('cpu'))

ClassificationModel.eval()  # Move outside for efficiency

def Count(PredictedClass):
    count = Counter(np.array(np.ravel(PredictedClass))).most_common(1)[0]
    out = (count[0], np.round(count[1]/12, 2))
    return out

def LoadMidi(path):
    try:
        return pretty_midi.PrettyMIDI(path)
    except:
        return None

def ProcessFile(dir, file, input_path):
    FilePath = os.path.join(input_path, dir, file)

    midi_data = LoadMidi(FilePath)
    if midi_data is None:
        return (f'{dir}/{file}', None)

    audio = midi_data.fluidsynth(fs= 22050, sf2_path = 'FluidR3_GM/FluidR3_GM.sf2')
    mel_spec = librosa.feature.melspectrogram(y=audio, sr=22050, n_mels=128)
    S_db = librosa.amplitude_to_db(mel_spec, ref=np.max)

    PredictedClass = []
    for _ in range(12):
        rIDX = np.random.randint(0, S_db.shape[1] - 128)
        X = S_db[:, rIDX:rIDX+128]

        NormX = NormalizeSpectrogram(X)
        xTensor = torch.tensor(NormX, dtype=torch.float32).unsqueeze(0).unsqueeze(0)

        with torch.no_grad():
            y_pred = ClassificationModel(xTensor)
            predictions = torch.softmax(y_pred, dim=1)
            PredictedClass.append(torch.argmax(predictions, dim=1))

    del S_db
    gc.collect()

    return (f'{dir}/{file[:-4]}', Count(PredictedClass))

AttributeError: 'collections.OrderedDict' object has no attribute 'eval'

In [57]:
from tqdm import tqdm

def Classifier(InputPath, n_jobs=-1):  # -1 = use all CPUs

    tasks = []
    #50
    for dir in os.listdir(InputPath)[1900:-1]:
        DirPath = os.path.join(InputPath, dir)
        if not os.path.isdir(DirPath):
            continue
        for file in os.listdir(DirPath):
            if file.endswith('.mid'):
                tasks.append((dir, file))

    results = Parallel(n_jobs=n_jobs)(
        delayed(ProcessFile)(dir, file, InputPath) for dir, file in tqdm(tasks)
    )

    GenreDict = {}
    numErr = 0

    for key, result in results:
        if result is None:
            numErr += 1
        else:
            GenreDict[key] = result

    print(f"Files with errors: {numErr}")
    return GenreDict

In [None]:
InputPath = os.path.realpath('clean_midi')

GenreDict = Classifier(InputPath)

In [59]:
np.save('GenreDict9_1900-2077.npy', GenreDict)

In [60]:
import glob

DatasetList = glob.glob('GenreDict*')
GenreDataset = {}
for file in DatasetList:
   F = np.load(file, allow_pickle=True)
   F = F.item()
   GenreDataset.update(F)

In [61]:
len(GenreDataset)

13148

In [63]:
np.save('GenreDict.npy', GenreDataset)