In [None]:
import numpy as np
import warnings
import logging
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
from sklearn.metrics import mean_squared_error, accuracy_score, f1_score, precision_score, recall_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.utils import shuffle
from skimage.color import rgb2gray
from glob import glob
seed = 100
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

In [None]:
#settings
plt.rc('font', size=16)
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

np.random.seed(seed)

tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

# Dataset loading

In [None]:
#Starting from a prepocessed dataset, where each audio is already been cut in 11 parts, this code loads each part in order.
genre_dict = {"blues":0,"classical":1,"country":2,"disco":3,"hiphop":4,"jazz":5,"metal":6,"pop":7,"reggae":8,"rock":9}
path_image_files = "/kaggle/input/mel_spectrogram_imgs/"
genres = list(genre_dict.keys())

for genre in genres:
    print("\t",genre)
    path = path_image_files + genre + "/" + genre + "_"
    if genre == "jazz":
        for i in range(1089):
            inputs.append(rgb2gray(mpimg.imread(path + str(i) +".png")[:,:,0:3]))
            labels.append(genre)
    else:
        for i in range(1100):
            inputs.append(rgb2gray(mpimg.imread(path + str(i) +".png")[:,:,0:3]))
            labels.append(genre)

    print(len(inputs))
    print(len(labels))

In [None]:
inputs = np.asarray(inputs)
labels = np.asarray(labels)

#For the prediction, the labels are turned into the one-hot encoder notation
enc = OneHotEncoder(sparse_output = False)
labels = labels.reshape(-1, 1)
labels = enc.fit_transform(labels)

In [None]:
print(labels.shape)
print(inputs.shape)
input_shape = inputs.shape[1:]
output_shape = labels.shape[1:]

#This vector contains the indices of the first slice of each audio. Since audios are loaded in order, we use them as reference.
original_indices = np.arange(0, inputs.shape[0], 11)
original_labels = labels[original_indices]
print(original_labels.shape)
print(original_indices.shape)

# Network

In [None]:
#This is the RGLU block.
def RGLU(x, filters, name):
    x1 = tfkl.Conv1D(filters=filters, kernel_size=3, strides=1, padding="same", data_format="channels_first", name=name)(x)
    x2 = tfkl.Conv1D(filters=filters, kernel_size=3, strides=1, padding="same", data_format="channels_first", name=name + 'B')(x)
    x1 = tfkl.BatchNormalization()(x1)
    x2 = tfkl.BatchNormalization()(x2)
    x1 = tfk.activations.sigmoid(x1)
    y = tfkl.Multiply()([x1,x2])
    #This if perform the add operation when the input and output size match.
    if x.shape == y.shape:
        y = tfkl.Add()([x, y])
    return y

In [None]:
#This class implements a positional encoding layer.
class PositionalEncoding(tfkl.Layer):
    def __init__(self, sequence_length, d_model):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding(sequence_length, d_model)
    
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'sequence_length': self.pos_encoding.shape[1],
            'd_model': self.pos_encoding.shape[2],
        })
        return config

    def positional_encoding(self, sequence_length, d_model):
        angle_rads = self.get_angles(np.arange(sequence_length)[:, np.newaxis],
                                     np.arange(d_model)[np.newaxis, :],
                                     d_model)
        
        # apply sin to even indices in the array; 2i
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        
        # apply cos to odd indices in the array; 2i+1
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)
    
    def get_angles(self, pos, i, d_model):
        angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
        return pos * angle_rates
    
    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]


In [None]:
#This is the transformer block, formed with the positional encoding, multi-head self-attention and the final 
#feed-forward layers.
def transformer_encoder(x, num_heads, ff_dim, dropout_rate):
    
    sequence_length = 256
    d_model = 10  # Embedding dimension
    
    x = PositionalEncoding(sequence_length, d_model)(x)
    
    # Multihead attention
    attention_output = tfkl.MultiHeadAttention(num_heads=num_heads, key_dim=x.shape[-1])(x, x)
    attention_output = tfkl.Dropout(dropout_rate)(attention_output)
    x = tfkl.LayerNormalization(epsilon=1e-6)(x + attention_output)
    
    #Feed-forward
    y = tfkl.Dense(units=ff_dim, activation = 'relu')(x)
    y = tfkl.Dropout(dropout_rate)(y)
    y = tfkl.Dense(units=x.shape[-1])(y)
    y = tfkl.Dropout(dropout_rate)(y)
    x = tfkl.LayerNormalization(epsilon=1e-6)(x + y)
    
    return x

In [None]:
#Build the actual model
def build_model(input_shape=input_shape, output_shape=output_shape):
    tf.random.set_seed(seed)

    input_layer = tfkl.Input(shape=input_shape, name='Input')
    filters = 64
    
    #This is the convolutional part of the model.
    x = tfkl.Conv1D(filters=filters, kernel_size=3, strides=1, padding="same", data_format="channels_first", name='conv')(input_layer)
    for i in range(5):
        x = RGLU(x, filters, 'conv_' + str(i))
        #These ifs are used to increase the filters number.
        if (i == 1 or i ==3):
            filters += 64
        if (i==3):
            filters += 64
        x = RGLU(x, filters, 'conv2_' + str(i))
        x = tfkl.MaxPooling1D(pool_size=2, strides=2, padding="valid", data_format="channels_first",name = 'mp' + str(i))(x)
    
    #This is the encoder part, implemented by just calling the transformer block.
    x = transformer_encoder(x, 8, 2048, 0.1)
    
    #This is the decoder part, where the code performs both GAP and GMP, concatenates the results and then performs the prediction.
    x1 = tfkl.GlobalAveragePooling1D(data_format='channels_first')(x)
    x2 = tfkl.GlobalMaxPooling1D(data_format='channels_first')(x)
    x = tfkl.Concatenate()([x1, x2])
    
    x = tfkl.Dense(units = 200, activation='relu', name = 'dense1')(x)
    x = tfkl.Dropout(0.2)(x)
    x = tfkl.Dense(units = 100, activation='relu', name = 'dense2')(x)
    x = tfkl.Dropout(0.2)(x)
    output_layer = tfkl.Dense(units=10, activation='softmax',name='Output')(x)

    # Connect input and output through the Model class
    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='CNN')

    # Compile the model using Categorical Crossentropy as loss, Adam as learning rate update technique and accuracy as the main metrics.
    model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(), metrics=['accuracy'])

    # Return the model
    return model

In [None]:
#Print the model
model = build_model()
model.summary()
tfk.utils.plot_model(model, expand_nested=True, show_shapes=True)

# Training and Testing

In [None]:
#This class implements k_fold keeping the distribution of each class equal through each fold. 
def stratified_multilabel_kfold(X, y, n_splits=10, shuffle_data=True, random_state=None):
    """
    Stratified K-Fold cross-validation for multilabel data with 80/10/10 train/validation/test split.
    
    Parameters:
    - X: Feature matrix.
    - y: Multilabel binary matrix (samples x labels).
    - n_splits: Number of folds (default is 10).
    - shuffle_data: Whether to shuffle the data before splitting.
    - random_state: Seed for shuffling (if applicable).

    Returns:
    - folds: List of (train_index, val_index, test_index) tuples for each fold.
    """
    if shuffle_data:
        X, y = shuffle(X, y, random_state=random_state)
    
    n_samples, n_labels = y.shape
    
    # Desired sizes for train, validation, and test sets
    n_test = n_val = n_samples // n_splits
    n_train = n_samples - n_test - n_val

    fold_indices = [[] for _ in range(n_splits)]
    label_counts_per_fold = [np.zeros(n_labels) for _ in range(n_splits)]
    
    # Get the number of labels per sample
    samples_with_counts = [(i, y[i].sum()) for i in range(n_samples)]
    samples_with_counts = sorted(samples_with_counts, key=lambda x: x[1], reverse=True)

    # Assign each sample to the fold with the least corresponding labels
    for sample_index, _ in samples_with_counts:
        min_fold = np.argmin([label_counts_per_fold[i].sum() for i in range(n_splits)])
        fold_indices[min_fold].append(sample_index)
        label_counts_per_fold[min_fold] += y[sample_index]

    # Create the train/validation/test splits
    folds = []
    for i in range(n_splits):
        test_indices = fold_indices[i]
        remaining_indices = [idx for j in range(n_splits) if j != i for idx in fold_indices[j]]
        
        # Shuffle remaining indices to randomize the split
        if shuffle_data:
            np.random.seed(random_state)
            np.random.shuffle(remaining_indices)
        
        val_indices = remaining_indices[:n_val]
        train_indices = remaining_indices[n_val:n_val + n_train]
        
        folds.append((train_indices, val_indices, test_indices))

    return folds

In [None]:
custom_folds = stratified_multilabel_kfold(inputs, labels, random_state=42)

accuracies = []
precisions = []
recalls = []
f1_scores = []

#This is the training part of the model.
for fold, (train_indices, val_indices, test_indices) in enumerate(custom_folds):
    
    X_train, X_val, X_test = inputs[train_indices], inputs[val_indices], inputs[test_indices]
    y_train, y_val, y_test = labels[train_indices], labels[val_indices], labels[test_indices]
    
    print(f"Fold {fold + 1}")
    print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
    print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")
    print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")
    print()

    #The model is built every time in order to reinitialize weights for each fold.
    model = build_model()
    #We used early stopping technique to reduce overfitting.
    early_stopping = tfk.callbacks.EarlyStopping(monitor='val_accuracy', patience=80, mode='auto', restore_best_weights=True)
    
    # Train the model and save its history
    history = model.fit(
        x=X_train,
        y=y_train,
        batch_size=32,
        epochs=500,
        validation_data=(X_val, y_val),
        callbacks=[early_stopping]
    ).history
    
    # Predict the test set
    y_pred = model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = np.argmax(y_test, axis=1)
    
    # Calculate metrics for this fold
    accuracies.append(accuracy_score(y_true, y_pred_classes))
    precisions.append(precision_score(y_true, y_pred_classes, average='weighted'))
    recalls.append(recall_score(y_true, y_pred_classes, average='weighted'))
    f1_scores.append(f1_score(y_true, y_pred_classes, average='weighted'))

In [None]:
#The final result is computed by averaging metrics through each fold.
mean_accuracy = np.mean(accuracies)
mean_precision = np.mean(precisions)
mean_recall = np.mean(recalls)
mean_f1_score = np.mean(f1_scores)

print(f'Mean Accuracy: {mean_accuracy:.4f}')
print(f'Mean Precision: {mean_precision:.4f}')
print(f'Mean Recall: {mean_recall:.4f}')
print(f'Mean F1-Score: {mean_f1_score:.4f}')

In [None]:
#This is the plot of each metrics through the folds.
folds = range(1, 11)

plt.figure(figsize=(10, 6))

plt.plot(folds, accuracies, marker='o', label='Accuracy')
plt.plot(folds, precisions, marker='o', label='Precision')
plt.plot(folds, recalls, marker='o', label='Recall')
plt.plot(folds, f1_scores, marker='o', label='F1-Score')
plt.title('Metrics Across 10 Folds')
plt.xlabel('Fold')
plt.ylabel('Score')
plt.xticks(folds)
plt.legend()
plt.grid(True)
plt.show()

# Voting based testing

In [None]:
#The 10 folds are created using the indices of the first part of each code. 
custom_folds = stratified_multilabel_kfold(original_indices, original_labels, random_state=42)

accuracies = []
precisions = []
recalls = []
f1_scores = []

for fold, (train_indices, val_indices, test_indices) in enumerate(custom_folds):
    
    X_train = list()
    X_val = list()
    X_test = list()
    y_train = list()
    y_val = list()
    y_test = list()
    
    #Each set for each fold is composed by slices belonging to the same audio. 
    for i in train_indices:
        for j in range(11):
            X_train.append(inputs[original_indices[i] + j])
            y_train.append(labels[original_indices[i] + j])
    for i in val_indices:
        for j in range(11):
            X_val.append(inputs[original_indices[i] + j])
            y_val.append(labels[original_indices[i] + j])
    for i in test_indices:
        for j in range(11):
            X_test.append(inputs[original_indices[i] + j])
    y_test = original_labels[test_indices]
    
    X_train = np.asarray(X_train)
    y_train = np.asarray(y_train)
    X_val = np.asarray(X_val)
    y_val = np.asarray(y_val)
    X_test = np.asarray(X_test)
    
    print(f"Fold {fold + 1}")
    print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
    print(f"X_val shape: {X_val.shape}, y_val shape: {y_val.shape}")
    print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")
    print()
    
    model = build_model()
    early_stopping = tfk.callbacks.EarlyStopping(monitor='val_accuracy', patience=100, mode='auto', restore_best_weights=True)
    #We shuffled the training set and validation set to introduce a little bit of stochasticity
    X_train, y_train = shuffle(X_train, y_train, random_state=0)
    X_val, y_val = shuffle(X_val, y_val, random_state=0)
    
    # Train the model and save its history
    history = model.fit(
        x=X_train,
        y=y_train,
        batch_size=32,
        epochs=700,
        validation_data=(X_val, y_val),
        callbacks=[early_stopping]
    ).history
    
    # Predict the test set
    y_pred_temp = model.predict(X_test)
    y_true = np.argmax(y_test, axis=1)
    y_pred = list()
    
    # Perform sum along the columns to count votes for each class
    for i in range(0, y_pred_temp.shape[0], 11):
        votes = np.sum(y_pred_temp[i:i+11], axis=0)

        # Identify the class with the maximum votes
        max_vote_class = np.argmax(votes)
        
        y_pred.append(max_vote_class)
        
    y_pred_classes = np.asarray(y_pred)
    
    # Calculate metrics for this fold
    accuracies.append(accuracy_score(y_true, y_pred_classes))
    precisions.append(precision_score(y_true, y_pred_classes, average='weighted'))
    recalls.append(recall_score(y_true, y_pred_classes, average='weighted'))
    f1_scores.append(f1_score(y_true, y_pred_classes, average='weighted'))
    print(accuracy_score(y_true, y_pred_classes))

In [None]:
#This is the plot of each metrics through the folds.
folds = range(1, 11)

plt.figure(figsize=(10, 6))

plt.plot(folds, accuracies, marker='o', label='Accuracy')
plt.plot(folds, precisions, marker='o', label='Precision')
plt.plot(folds, recalls, marker='o', label='Recall')
plt.plot(folds, f1_scores, marker='o', label='F1-Score')
plt.title('Metrics Across 10 Folds')
plt.xlabel('Fold')
plt.ylabel('Score')
plt.xticks(folds)
plt.legend()
plt.grid(True)
plt.show()