In [None]:
# install torchsummary
!pip install -i https://pypi.tuna.tsinghua.edu.cn/simple torchsummary

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset
import torchaudio
from torch.utils.data import DataLoader
import pandas as pd
import os
from itertools import product
from collections import namedtuple
from collections import OrderedDict
from IPython.display import display,clear_output
import time
import json
from torchsummary import summary
import matplotlib.pyplot as plt

torch.set_printoptions(linewidth=120)

In [None]:
# Add a column to the alphabetical list of label styles in the label file in numeric format
# Test dataset
ANNOTATIONS_FILE = "/kaggle/input/projectdataset/GTZAN_TEST/GTZAN_TEST/features_30_sec_test.csv"
dataframe = pd.read_csv(ANNOTATIONS_FILE)

labels = set()
for row in range(len(dataframe)):
    labels.add(dataframe.iloc[row, -1])
labels_list = []
for label in labels:
    labels_list.append(label)
sorted_labels = sorted(labels_list)
sorted_labels
mapping = {}
for index, label in enumerate(sorted_labels):
    mapping[label] = index
dataframe["num_label"] = dataframe["label"]
new_dataframe = dataframe.replace({"num_label": mapping})
new_dataframe

In [None]:
new_dataframe.to_csv("features_30_sec_test_final.csv")

In [None]:
# Training dataset
import pandas as pd
ANNOTATIONS_FILE = "/kaggle/input/projectdataset/GTZAN/GTZAN/features_30_sec.csv"
dataframe = pd.read_csv(ANNOTATIONS_FILE)
labels = set()
for row in range(len(dataframe)):
    labels.add(dataframe.iloc[row, -1])
labels_list = []
for label in labels:
    labels_list.append(label)
sorted_labels = sorted(labels_list)
mapping = {}
for index, label in enumerate(sorted_labels):
    mapping[label] = index
dataframe["num_label"] = dataframe["label"]
new_dataframe = dataframe.replace({"num_label": mapping})
new_dataframe

In [None]:
new_dataframe.to_csv("features_30_sec_final.csv")

In [None]:
# RunBuild class to manage hyperparameters, which can be automatically combined 
# during the training process for predefined hyperparameters
class RunBuilder():
    @staticmethod
    def get_runs(params):
        Run = namedtuple('Run', params.keys())
        
        runs = []
        
        for element in product(*params.values()):
            runs.append(Run(*element))
        
        return runs

In [None]:
# Runtime data management classes
class RunManager():
    def __init__(self):
        #Training set
        # Number of epoches
        self.epoch_count = 0
        # Loss value per epoch
        self.epoch_loss = 0
        # Number of correct predictions per epoch
        self.epoch_correct_num = 0
        # Start time of training per epoch
        self.epoch_start_time = None
        
        # Test dataset
        self.test_epoch_count = 0
        self.test_epoch_loss = 0
        self.test_epoch_correct_num = 0
        
        
        # Hyperparameters of each run, number of cycles, etc.
        self.run_params = None
        self.run_count = 0
        self.run_data = []
        self.run_start_time = None
        
        
        self.network = None
        self.loader = None
        # tensorboard
        self.tb = None
    
    def begin_run(self, run, network, loader, test_loader):
        # Initial start time
        self.run_start_time = time.time()
        # Initialising hyperparameters
        self.run_params = run
        # run times +1
        self.run_count += 1
        
        self.network = network
        self.loader = loader
        self.test_loader = test_loader
        # Load tensorboard
        self.tb = SummaryWriter(comment=f'-{run}')
        
        # signal: sampling signal sr: sampling frequency
        signal, sr, address = next(iter(self.loader))
        
        
        # Signal conversion to mel-spectrum is missing here, no image visualisation added yet
        
        # Neural network structure image visualisation
        self.tb.add_graph(
            self.network,
            signal.to(run.device)
        )
        
    def end_run(self):
        # Close tensorboard to write data
        self.tb.close()
        # Each epoch is re-counted again
        self.epoch_count = 0
        self.test_epoch_count = 0
        
    def begin_epoch(self):
        self.epoch_start_time = time.time()
        self.epoch_count += 1
        self.epoch_loss = 0
        self.epoch_correct_num = 0
        
        self.test_epoch_count += 1
        self.test_epoch_loss = 0
        self.test_epoch_correct_num = 0
        
    def end_epoch(self):
        epoch_duration = time.time() - self.epoch_start_time
        run_duration = time.time() - self.run_start_time

        # Training set loss values
        loss = self.epoch_loss / len(self.loader.dataset)
        # Test set Accuracy Rate
        accuracy = self.epoch_correct_num / len(self.loader.dataset)
        print(f'Accuracy Rate：{self.epoch_correct_num} / {len(self.loader.dataset)}')
        
        # Test set
        # print(f"{self.test_epoch_correct_num}+{len(self.test_loader.dataset)}")
        test_loss = self.test_epoch_loss / len(self.test_loader.dataset)
        test_accuracy = self.test_epoch_correct_num / len(self.test_loader.dataset)
        
        # Add the loss function image
        self.tb.add_scalars('Loss', {"train_loss": loss, 
                                    "test_loss": test_loss}, self.epoch_count)
        # Add an image of the accuracy function
        self.tb.add_scalars('Accuracy', {"train_accuracy": accuracy, 
                                        "test_accuracy": test_accuracy}, self.epoch_count)
        
        # self.tb.add_scalar('Test_Loss', test_loss, self.epoch_count)
        
        #self.tb.add_scalar('Test_Accuracy', test_accuracy, self.epoch_count)
        
        for name, param in self.network.named_parameters():
            # The value of each layer of the neural network
            self.tb.add_histogram(name, param, self.epoch_count)
            # Gradient corresponding to each layer value
            self.tb.add_histogram(f'{name}.grad', param.grad, self.epoch_count)

        results = OrderedDict()

        results['run'] = self.run_count
        results['epoch'] = self.epoch_count
        results['loss'] = loss
        results['accuracy'] = accuracy
        results['epoch duration'] = epoch_duration
        results['run duration'] = run_duration

        for k, v in self.run_params._asdict().items():
            results[k] = v

        self.run_data.append(results)

        df = pd.DataFrame.from_dict(self.run_data, orient='columns')

        clear_output(wait = True)
        display(df)
        
    # def test_view(self):
        
    # Core number
    def get_num_workers(self,num_workers):
        self.epoch_num_workers = num_workers

    # Training set 
    # Record the loss of each epoch. Training set    
    def track_loss(self,loss,batch):
        self.epoch_loss += loss.item()*batch[0].shape[0]
    
    # Test set
    def test_loss(self,test_loss, test_batch):
         self.test_epoch_loss += test_loss.item()*test_batch[0].shape[0]
    
    # Record the number of correct tests on each epoch
    def test_num_correct(self, test_preds, test_labels):
        
        self.test_epoch_correct_num += self.get_correct_num(test_preds, test_labels)
        
    # Training set
    def track_num_correct(self, preds, labels):
        self.epoch_correct_num += self.get_correct_num(preds, labels)
    
    def get_correct_num(self, preds, labels):
        return preds.argmax(dim=1).eq(labels).sum().item()
    
    # Training data saved in CSV file
    def save(self, fileName):
        pd.DataFrame.from_dict(
            self.run_data, orient='columns'
        ).to_csv(f'{fileName}.csv')
        
        with open(f'{fileName}.json', 'w', encoding='utf-8') as f:
            json.dump(self.run_data, f, ensure_ascii=False, indent=4)

In [None]:
# Data pre-processing classes

class GTZANDataset(Dataset):
    def __init__(self,
                 annotations_file,
                 audio_dir,
                 transformation,
                 target_sample_rate,
                 num_samples,
                 device):
        # Read the label file
        self.annotations = pd.read_csv(annotations_file)
        # Reading audio addresses
        self.audio_dir = audio_dir
        # Set the device
        self.device = device
        # loaded into the deviceLoading Mel spectrum data into the device
        self.transformation = transformation.to(self.device)
        # Setting sampling frequency
        self.target_sample_rate = target_sample_rate
        # Set number of samples
        self.num_samples = num_samples
        
        
    # Returns the number of audio files
    def __len__(self):
        return len(self.annotations)

    
    # Get data, tags, paths for audio
    def __getitem__(self, index):
        # Get the song path
        audio_sample_path = self._get_audio_sample_path(index)
        # Get the label
        label = self._get_audio_sample_label(index)
        # signal: sampling signal sr: sampling frequency
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        # Control sampling frequency
        signal = self._resample_if_necessary(signal, sr)
        # Dual channel->single channel
        signal = self._mix_down_if_necessary(signal)
        # Control the number of samples
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        # Transforming the mel spectrum
        signal = self.transformation(signal)
        return signal, label, audio_sample_path

    
    # Whether the signal needs to be cropped. 
    # If the number of picks > the set number -> crop.
    def _cut_if_necessary(self, signal):
        # print('_cut_if_necessary')
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal
    
    
    # Whether the signal needs to be replenished. Fill in 0 to the right to replenish,
    # If the number of picks < the set number -> replenish
    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        # print('_right_pad_if_necessary')
        if length_signal < self.num_samples:
            
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            # last_dim_padding.to(self.device)
            
            signal = torch.nn.functional.pad(signal, last_dim_padding)

        return signal

    
    # Resetting the sampling frequency
    def _resample_if_necessary(self, signal, sr):
        # print('_resample_if_necessary')
        # If the actual sampling frequency does not match the setting -> reset it
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate).to(self.device)
            signal = resampler(signal)
            # signal = torchaudio.functional.resample(signal, sr, self.target_sample_rate)
            
        return signal


    # Changing the audio from dual channel to single channel
    def _mix_down_if_necessary(self, signal):
        # print('_mix_down_if_necessary')
        
        # If the number of channels is greater than 1 ->
        # take the average value and turn it into a single channel
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    # Splicing and extraction of audio paths
    def _get_audio_sample_path(self, index):
        # print('_get_audio_sample_path')
        fold = f"{self.annotations.iloc[index, -2]}"
        path = os.path.join(self.audio_dir, fold, self.annotations.iloc[
            index, 1])
        return path
    
    
    # Extracting labels from csv files
    def _get_audio_sample_label(self, index):
        # print('_get_audio_sample_label')
        return self.annotations.iloc[index, -1]
    

if __name__ == "__main__":
    ANNOTATIONS_FILE = "./features_30_sec_final.csv"
    AUDIO_DIR = "/kaggle/input/projectdataset/GTZAN/GTZAN/genres_original"
    SAMPLE_RATE = 22050
    NUM_SAMPLES = 22050 * 5 # -> 1 second of audio
    plot = True

    if torch.cuda.is_available():
        device = "cuda"
    else:
        device = "cpu"
    print(f"Using {device} device")

    mfcc = torchaudio.transforms.MFCC(
        sample_rate=SAMPLE_RATE,
        n_mfcc=40,
        log_mels=True
    )

    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        # Windows size
        hop_length=512,
        # Mel Frequency
        n_mels=64
    )

    # objects inside transforms module are callable!
    # ms = mel_spectrogram(signal)

    gtzan = GTZANDataset(
        ANNOTATIONS_FILE,
        AUDIO_DIR,
        mfcc,
        SAMPLE_RATE,
        NUM_SAMPLES,
        device
    )

    print(f"There are {len(gtzan)} samples in the dataset")

    if plot:
        signal, label, path = gtzan[666]
        print(f'path:{path}')
        signal = signal.cpu()
        print(signal.shape)
        
        plt.figure(figsize=(16, 8), facecolor="white")
        plt.imshow(signal[0,:,:], origin='lower')
        plt.autoscale(False)
        plt.xlabel("Time")
        plt.ylabel("Frequency")
        plt.colorbar()
        plt.axis('auto')
        plt.show()


In [None]:
ANNOTATIONS_FILE = "./features_30_sec_final.csv"
AUDIO_DIR = "/kaggle/input/projectdataset/GTZAN/GTZAN/genres_original"
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050  * 5

# These next three functions don't actually do anything later and can be removed
# Creating a data loading set
def create_data_loader(train_data, batch_size):
    train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True,num_workers=0, pin_memory=True)
    return train_dataloader


# Training for each epoch
def train_single_epoch(model, data_loader, loss_fn, optimiser, device):
    for input, target in data_loader:
        input, target = input.to(device), target.to(device)

        # calculate loss
        prediction = model(input)
        loss = loss_fn(prediction, target)

        # backpropagate error and update weights
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

    print(f"loss: {loss.item()}")

# Training
def train(model, data_loader, loss_fn, optimiser, device, epochs):
    for i in range(epochs):
        print(f"Epoch {i+1}")
        train_single_epoch(model, data_loader, loss_fn, optimiser, device)
        print("---------------------------")
    print("Finished training")

In [None]:
# AlexNet network
class AlexNet(nn.Module):
    def __init__(self, num_classes=10):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            # Convolution
            # Input channel 1, output channel 64 Convolution kernel size 11*11 
            # Step size 4 Zero padding 2
            nn.Conv2d(1, 64, kernel_size=11, stride=4, padding=2),
            # ReLU activation function
            nn.ReLU(inplace=True),
            # Maximum pooling
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        # Flat
        self.flatten = nn.Flatten()
        # Classifier
        self.classifier = nn.Sequential(
            # Linear classifier Fully connected layer
            nn.Linear(12288, 1024),
            nn.ReLU(inplace=True),
            # Dropout Random inactivation
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(1024, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3, inplace=False),
            nn.Linear(1024, num_classes),
        )
    # Forward transmission
    def forward(self, x):
        x = self.features(x)
        #x = x.view(-1, 3072)
        x = self.flatten(x)
        x = self.classifier(x)
        return x

In [None]:
if __name__ == '__main__':
    from torchsummary import summary
    alex=AlexNet().to("cuda")
    summary(alex, (1, 128, 111* 5))

In [None]:
torch.manual_seed(128)

In [None]:
# Defining the dictionary of hyperparameters
params = OrderedDict(
    lr = [.001, .0001]
    , batch_size = [64]
    , num_workers = [0]
    , device = ['cuda']
    
)

# Training set label file address
ANNOTATIONS_FILE = "./features_30_sec_final.csv"
# Training set audio file address
AUDIO_DIR = "/kaggle/input/projectdataset/GTZAN/GTZAN/genres_original"

# Test set
ANNOTATIONS_FILE_TEST = "./features_30_sec_test_final.csv"
AUDIO_DIR_TEST = "/kaggle/input/projectdataset/GTZAN_TEST/GTZAN_TEST/genres_original"