In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from torch.utils.data import Dataset
import torchaudio
import torch
from torch import nn
from torchsummary import summary
from torch.utils.data import DataLoader

from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import classification_report

import pandas as pd
import numpy as np

In [None]:
class MusicDataset(Dataset):

    def __init__(self, 
                 annotations_file, 
                 audio_dir, 
                 transformation, 
                 target_sample_rate,
                 num_samples,
                 label_encoder,
                 device):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples
        self.label_encoder = label_encoder

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)

        label = np.array(label_encoder[label])
        label = torch.from_numpy(label)
        return signal, label

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            resampler = resampler.to(self.device)
            signal = resampler(signal)
        return signal.to(self.device)

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    def _get_audio_sample_path(self, index):
        return self.audio_dir + str(self.annotations.iloc[index, 3])

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 0]

    def _get_lyric(self, index):
        return self.audio_dir + str(self.annotations.iloc[index, 2])

In [None]:
class CNNNetwork(nn.Module):

    def __init__(self):
        super().__init__()
        # 4 conv blocks / flatten / linear / softmax
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=3,
                stride=1,
                padding=2
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(128 * 5 * 4, 10)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_data):
        x = self.conv1(input_data)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.flatten(x)
        logits = self.linear(x)
        predictions = self.softmax(logits)
        return predictions

In [None]:
annotations_file = 'drive/MyDrive/khoa_luan/data_03/crawl_data.csv'
audio_dir = 'drive/MyDrive/khoa_luan/data_03/'

SAMPLE_RATE = 22050
NUM_SAMPLES = 22050
BATCH_SIZE = 128
EPOCHS = 8
LEARNING_RATE = 0.001

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate = SAMPLE_RATE,
    n_fft = 1024,
    hop_length = 512,
    n_mels = 64)

def create_data_loader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size)
    return train_dataloader


def train_single_epoch(model, data_loader, loss_fn, optimiser, device):
    for input, target in data_loader:
        input, target = input.to(device), target.to(device)

        # calculate loss
        prediction = model(input)
        loss = loss_fn(prediction, target)

        # backpropagate error and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

    print(f"loss: {loss.item()}")


def train(model, data_loader, loss_fn, optimiser, epochs, device):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished training")

label_encoder = {
    'khắc hưng': 0,
    'châu đăng khoa': 1,
    'khắc việt': 2,
    'phúc trường': 3,
    'nguyễn đình vũ': 4,
    'mr siro': 5,
    'vương anh tú': 6,
    'trịnh công sơn': 7,
    'tiên cookie': 8,
    'dương vỹ phúc': 9
    }

#CNN


In [None]:
if torch.cuda.is_available():
        device = "cuda"
else:
    device = "cpu"
print(f"Using device {device}")

Music = MusicDataset(annotations_file,
                     audio_dir, 
                     mel_spectrogram, 
                     SAMPLE_RATE,
                     NUM_SAMPLES,
                     label_encoder,
                     device)

train_dataloader = create_data_loader(Music, BATCH_SIZE)
cnn = CNNNetwork().to(device)
print(cnn)

# initialise loss funtion + optimiser
loss_fn = nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(cnn.parameters(),lr=LEARNING_RATE)

# train model
train(cnn, train_dataloader, loss_fn, optimiser, EPOCHS, device)

# save model
torch.save(cnn.state_dict(), "cnn.pth")
print("Trained CNNNetwork saved at cnn.pth")

Using device cuda
CNNNetwork(
  (conv1): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv4): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear): Linear(in_features=2560, out_features=10, bias=True)
  (softmax): Softmax(dim=1)
)
Epoch 1
loss: 2.38285

In [None]:
class_mapping = [
    'khắc hưng',
    'châu đăng khoa',
    'khắc việt',
    'phúc trường',
    'nguyễn đình vũ',
    'mr siro',
    'vương anh tú',
    'trịnh công sơn',
    'tiên cookie',
    'dương vỹ phúc'
]

def predict(model, input, target, class_mapping):
    model.eval()
    with torch.no_grad():
        predictions = model(input)
        predicted_index = predictions[0].argmax(0)
        predicted = class_mapping[predicted_index]
        expected = class_mapping[target]
    return predicted, expected

In [None]:
# load back the model
cnn = CNNNetwork().to(device)
state_dict = torch.load("cnn.pth")
cnn.load_state_dict(state_dict)

# load music dataset
Music = MusicDataset(annotations_file,
                     audio_dir, 
                     mel_spectrogram, 
                     SAMPLE_RATE,
                     NUM_SAMPLES,
                     label_encoder,
                     device)

true_rate = 0
for i in range(len(Music)):
  input, target = Music[i][0], Music[i][1] 
  input.unsqueeze_(0)
  predicted, expected = predict(cnn, input, target,class_mapping)
  if(predicted == expected):
    true_rate += 1

In [None]:
print(f"True predict rate: '{true_rate*100/len(Music)}'")

True predict rate: '13.807531380753138'


#SVM

In [None]:
class MusicDataset2(Dataset):

    def __init__(self, 
                 annotations_file, 
                 audio_dir, 
                 transformation, 
                 target_sample_rate,
                 num_samples,
                 label_encoder,
                 device):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples
        self.label_encoder = label_encoder

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(index)
        label = self._get_audio_sample_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        # signal = self.transformation(signal)

        # label = np.array(label_encoder[label])
        # label = torch.from_numpy(label)
        return signal, label

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            resampler = resampler.to(self.device)
            signal = resampler(signal)
        return signal.to(self.device)

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    def _get_audio_sample_path(self, index):
        return self.audio_dir + str(self.annotations.iloc[index, 3])

    def _get_audio_sample_label(self, index):
        return self.annotations.iloc[index, 0]

    def _get_lyric(self, index):
        return self.audio_dir + str(self.annotations.iloc[index, 2])

In [None]:
if torch.cuda.is_available():
        device = "cuda"
else:
    device = "cpu"
print(f"Using device {device}")

Music2 = MusicDataset2(annotations_file,
                       audio_dir, 
                       mel_spectrogram, 
                       SAMPLE_RATE,
                       NUM_SAMPLES,
                       label_encoder,
                       device)

Using device cpu


In [None]:
signal2 = []
label2 = []
for i in range(len(Music2)):
    temp1, temp2 = Music2[i]
    signal2.append(temp1.numpy())
    label2.append(temp2)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(signal2, label2, test_size = 0.2)


In [None]:
svmclassifier = SVC(kernel = 'linear')
svmclassifier.fit(X_train,y_train)
y_pred = svmclassifier.predict(X_test)
print(classification_report(y_test, y_pred))

In [None]:
# check = []
# for i in range(len(signal)):
#   check.append(signal[i].shape)

In [None]:
# len(set(check))

In [None]:
# def unique(list1):
 
#     # initialize a null list
#     unique_list = []
 
#     # traverse for all elements
#     for x in list1:
#         # check if exists in unique_list or not
#         if x not in unique_list:
#             unique_list.append(x)
#     # print list
#     for x in unique_list:
#         print(x)

In [None]:
# temp = np.array(X_train)
# unique(temp.shape)

In [None]:
# type(y_train)

In [None]:
# signal[0].shape