In [1]:
import torch
import torch.nn as nn
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix
import seaborn as sns
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import os
import glob
import random, shutil
import numpy as np
import IPython.display as display
import librosa
import librosa.display


In [None]:
def set_device():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    if device != "cuda":
        print("WARNING: For this notebook to perform best, "
            "if possible, in the menu under `Runtime` -> "
            "`Change runtime type.`  select `GPU` ")
    else:
        print("GPU is enabled in this notebook.")

    return device

In [None]:
# Import necessary libraries.


genre_name = ['blues','classical','country','disco','hiphop','jazz','metal','pop','reggae','rock']


# 'Data/Total' 폴더 생성

if not os.path.exists('Data'):
        os.mkdir('Data')

# 'Data/Total' 폴더 생성
parent_directory = 'Data/all'

if not os.path.exists(parent_directory):
        os.mkdir(parent_directory)


# 'Data/Total'폴더에서 10개의 하위 폴더 생성
for i in range(10):
    folder_name = genre_name[i]
    folder_path = os.path.join(parent_directory, folder_name)
    
    # 폴더가 이미 존재하지 않는 경우에만 생성
    if not os.path.exists(folder_path):
        os.mkdir(folder_path)


# 장르별 3second mel-spectrogram 생성

original_data_dir = './archive/genres_original/'


for i in range(10):

    for j in range(100):

        original_data = original_data_dir+genre_name[i]+f"/{genre_name[i]}.{j:05d}.wav"

        if not os.path.exists(original_data):
            continue

        y, sr = librosa.load(original_data)
        
        segment_length = 3 * sr  # 3초 분량의 샘플 수
        for k in range(0, 10):
            plt.figure(figsize=(2**4, 2**4))
            plt.axis('off')
            plt.tight_layout()
            segment = y[k*segment_length:(k+1)*segment_length]
            mel_3second = librosa.feature.melspectrogram(y = segment, sr=sr)
            mel_3second_db = librosa.amplitude_to_db(mel_3second, ref=np.max)
            
            librosa.display.specshow(mel_3second_db, sr=sr)

            plt.savefig(f'{parent_directory}/{genre_name[i]}/{j:05d}-{k}.png',dpi=2**3) #bbox_inches='tight', pad_inches = 0
            plt.close()

In [None]:
# Create folder with training, testing and validation data.

spectrograms_dir = parent_directory
folder_names = ['Data/train/', 'Data/test/', 'Data/val/']
train_dir = folder_names[0]
test_dir = folder_names[1]
val_dir = folder_names[2]

for f in folder_names:
    if os.path.exists(f):
        shutil.rmtree(f)
        os.mkdir(f)
    else:
        os.mkdir(f)

# Loop over all genres.

genres = list(os.listdir(spectrograms_dir))
for g in genres:
    # find all images & split in train, test, and validation
    src_file_paths= []
    for im in glob.glob(os.path.join(spectrograms_dir, f'{g}',"*.png"), recursive=True):
        src_file_paths.append(im)
    random.shuffle(src_file_paths)
    test_files = src_file_paths[0:100]
    val_files = src_file_paths[100:200]
    train_files = src_file_paths[200:]

    #  make destination folders for train and test images
    for f in folder_names:
        if not os.path.exists(os.path.join(f + f"{g}")):
            os.mkdir(os.path.join(f + f"{g}"))

    # copy training and testing images over
    for f in train_files:
        shutil.copy(f, os.path.join(os.path.join(train_dir + f"{g}") + '/',os.path.split(f)[1]))
    for f in test_files:
        shutil.copy(f, os.path.join(os.path.join(test_dir + f"{g}") + '/',os.path.split(f)[1]))
    for f in val_files:
        shutil.copy(f, os.path.join(os.path.join(val_dir + f"{g}") + '/',os.path.split(f)[1]))

In [None]:
# Make a CNN & train it to predict genres.
class Conv_2d(nn.Module):
    def __init__(self, input_channels, output_channels, ksize=7, dropout=0.1):
        super(Conv_2d, self).__init__()
        self.conv = nn.Conv2d(input_channels, output_channels, ksize, padding = 1)
        self.bn = nn.BatchNorm2d(output_channels)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(2)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.maxpool(out)
        out = self.dropout(out)
        return out
    

class CNN(nn.Module):
    def __init__(self, num_channels=8, 
                       num_classes=10):
        super(CNN, self).__init__()

        # convolutional layers
        self.layer1 = Conv_2d(3, num_channels)
        self.layer2 = Conv_2d(num_channels, num_channels*2)
        self.layer3 = Conv_2d(num_channels*2, num_channels * 4)
        self.layer4 = Conv_2d(num_channels * 4, num_channels * 8)
        self.layer5 = Conv_2d(num_channels * 8, num_channels * 16,ksize=5)

        # dense layers
        self.dense1 = nn.Linear(num_channels * 16, 256)
        self.dense_bn = nn.BatchNorm1d(256)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(256, num_classes)

    def forward(self, x):
        
        # convolutional layers
        out = self.layer1(x)
        
        out = self.layer2(out)

        out = self.layer3(out)

        out = self.layer4(out)

        out = self.layer5(out)
        
        out = out.squeeze()

        # dense layers
        out = self.dense1(out)
        out = self.dense_bn(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense2(out)

        return out

In [2]:

def train(model, device, train_loader, validation_loader, epochs):
    criterion =  nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
    train_loss, validation_loss = [], []
    train_acc, validation_acc = [], []
    best_val_loss = float('inf')

    with tqdm(range(epochs), unit='epoch') as tepochs:
        tepochs.set_description('Training')
        for epoch in tepochs:
            model.train()
            # keep track of the running loss
            running_loss = 0.
            correct, total = 0, 0

            for data, target in train_loader:
                # getting the training set
                data, target = data.to(device), target.to(device)
                # Get the model output (call the model with the data from this batch)
                output = model(data)
                # Zero the gradients out)
                optimizer.zero_grad()
                # Get the Loss
                loss  = criterion(output, target)
                # Calculate the gradients
                loss.backward()
                # Update the weights (using the training step of the optimizer)
                optimizer.step()

                tepochs.set_postfix(loss=loss.item())
                running_loss += loss  # add the loss for this batch

                # get accuracy
                _, predicted = torch.max(output, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()

            # append the loss for this epoch
            train_loss.append(running_loss.detach().cpu().item()/len(train_loader))
            train_acc.append(correct/total)

            # evaluate on validation data
            model.eval()
            running_loss = 0.
            correct, total = 0, 0

            for data, target in validation_loader:
                # getting the validation set
                data, target = data.to(device), target.to(device)
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                tepochs.set_postfix(loss=loss.item())
                running_loss += loss.item()
                # get accuracy
                _, predicted = torch.max(output, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()

            validation_loss.append(running_loss/len(validation_loader))
            validation_acc.append(correct/total)

            if running_loss < best_val_loss:
                best_val_loss = running_loss
                torch.save(model.state_dict(), 'weight.pth')

    return train_loss, train_acc, validation_loss, validation_acc



In [3]:
def plot_loss_accuracy(train_loss, train_acc, validation_loss, validation_acc):
    epochs = len(train_loss)
    fig, (ax1, ax2) = plt.subplots(1, 2)
    ax1.plot(list(range(epochs)), train_loss, label='Training Loss')
    ax1.plot(list(range(epochs)), validation_loss, label='Validation Loss')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.set_title('Epoch vs Loss')
    ax1.legend()

    ax2.plot(list(range(epochs)), train_acc, label='Training Accuracy')
    ax2.plot(list(range(epochs)), validation_acc, label='Validation Accuracy')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy')
    ax2.set_title('Epoch vs Accuracy')
    ax2.legend()
    fig.set_size_inches(15.5, 5.5)
    plt.show()
    #plt.savefig(save_plot_path)



In [4]:
def plot_confusion_matrix(device,model,test_loader):
    GTZAN_GENRES = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']

    load_state_dict(torch.load('weight.pth'))
    print('loaded!')

    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for data, target in test_loader:
            data = data.to(device)
            target = target.to(device)

            # reshape and aggregate chunk-level predictions
            output = model(data)
            _, pred = torch.max(output, 1)

            # append labels and predictions
            y_true.extend(target.tolist())
            y_pred.extend(pred.tolist())

    sns.set(font_scale=1.2)  # 폰트 크기 조정
    plt.figure(figsize=(8, 6))
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')

    accuracy = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, xticklabels=GTZAN_GENRES, yticklabels=GTZAN_GENRES, cmap='YlGnBu')
    plt.show()
    print('Accuracy: %.4f' % accuracy)
    #plt.savefig(save_plot_path)

In [None]:
# Data loading.

folder_names = ['Data/train/', 'Data/test/', 'Data/val/']
train_dir = folder_names[0]
test_dir = folder_names[1]
val_dir = folder_names[2]

train_dataset = datasets.ImageFolder(
    train_dir,
    transforms.Compose([
        transforms.ToTensor(),
    ]))

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=16, shuffle=True, num_workers=4)

val_dataset = datasets.ImageFolder(
    val_dir,
    transforms.Compose([
        transforms.ToTensor(),
    ]))

val_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=16, shuffle=True, num_workers=4)

test_dataset = datasets.ImageFolder(
    val_dir,
    transforms.Compose([
        transforms.ToTensor(),
    ]))

test_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=16, shuffle=True, num_workers=4)

In [None]:
device = set_device()

cnn = CNN().to(device)
train_loss, train_acc, validation_loss, validation_acc = train(cnn, device, train_loader, val_loader, 150)


In [None]:
plot_loss_accuracy(train_loss, train_acc, validation_loss, validation_acc)

In [None]:
plot_confusion_matrix(device, cnn, test_loader)