In [2]:
import numpy as np
import pandas as pd
import os
import json
import librosa
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
import jsonlines
import pickle

In [3]:
sr = 22050
TOTAL_SAMPLES = 29 * sr

NUM_SLICES = 10
SAMPLES_PER_SLICE = int(TOTAL_SAMPLES / NUM_SLICES)

In [4]:

def preprocess_data_MFCC(source_path, tfrecord_path):

    def _bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.io.serialize_tensor(value).numpy()]))

    def _int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

    def serialize_example(mfcc, label):
        feature = {
            'mfcc': _bytes_feature(tf.convert_to_tensor(mfcc, dtype=tf.float32)),
            'label': _int64_feature(label)
        }
        example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
        return example_proto.SerializeToString()

    with tf.io.TFRecordWriter(tfrecord_path) as writer:

        # Browse each file, slice it, and generate the 128-band MFCC for each slice.
        for i, (dirpath, dirname, filenames) in enumerate(os.walk(source_path)):
            for file in filenames:
                # Exclude a corrupted wav file that makes everything crash.
                if os.path.join(dirpath, file) != 'genres_original/jazz/jazz.00054.wav':
                    song, sr = librosa.load(os.path.join(dirpath, file), duration=29)
                    for s in range(NUM_SLICES):
                        start_sample = SAMPLES_PER_SLICE * s
                        end_sample = start_sample + SAMPLES_PER_SLICE

                        slice_song = song[start_sample:end_sample]

                        # Compute MFCC
                        mfcc = librosa.feature.mfcc(y=slice_song, sr=sr, n_mfcc=128)
                        mfcc = mfcc.T

                        # Serialize example and write to TFRecord file
                        label = i - 1  # Adjust label indexing if necessary
                        example = serialize_example(mfcc, label)
                        writer.write(example)
                else:
                    pass

In [5]:
def preprocess_data_STFT(source_path, tfrecord_path):
    
    def _bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.io.serialize_tensor(value).numpy()]))

    def _int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

    def serialize_example(stft, label):
        feature = {
            'stft': _bytes_feature(stft),
            'label': _int64_feature(label)
        }
        example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
        return example_proto.SerializeToString()


    with tf.io.TFRecordWriter(tfrecord_path) as writer:

        # Browse each file, slice it, and generate the STFT for each slice.
        for i, (dirpath, dirname, filenames) in enumerate(os.walk(source_path)):
            for file in filenames:
                # Exclude a corrupted wav file that makes everything crash.
                if os.path.join(dirpath, file) != 'genres_original/jazz/jazz.00054.wav':
                    song, sr = librosa.load(os.path.join(dirpath, file), duration=29)
                    for s in range(NUM_SLICES):
                        start_sample = SAMPLES_PER_SLICE * s
                        end_sample = start_sample + SAMPLES_PER_SLICE

                        slice_song = song[start_sample:end_sample]

                        # Compute STFT
                        stft = librosa.stft(slice_song, n_fft=2048, hop_length=512)
                        stft_magnitude = librosa.amplitude_to_db(abs(stft))
                        stft_magnitude = stft_magnitude.T

                        # Serialize example and write to TFRecord file
                        label = i - 1  # Adjust label indexing if necessary
                        example = serialize_example(stft_magnitude, label)
                        writer.write(example)
                else:
                    pass

In [6]:
def load_data_MFCC(json_path):
    # Load the JSON data from the file
    with open(json_path, 'r') as f:
        data = json.load(f)

    x = np.array(data["mfcc"])
    y = np.array(data["labels"])

    return x, y

In [7]:
def load_data_STFT(json_path):
    # Load the JSON data from the file
    with open(json_path, 'r') as f:
        data = json.load(f)

    x = np.array(data["stft"])
    y = np.array(data["labels"])

    return x, y

In [8]:
def prepare_datasets(inputs, targets, split_size):
    
    # Creating a validation set and a test set.
    inputs_train, inputs_val, targets_train, targets_val = train_test_split(inputs, targets, test_size=split_size)
    inputs_train, inputs_test, targets_train, targets_test = train_test_split(inputs_train, targets_train, test_size=split_size)
    
    # Our CNN model expects 3D input shape.
    inputs_train = inputs_train[..., np.newaxis]
    inputs_val = inputs_val[..., np.newaxis]
    inputs_test = inputs_test[..., np.newaxis]
    
    return inputs_train, inputs_val, inputs_test, targets_train, targets_val, targets_test

In [9]:
def design_model(input_shape):

    # Define the first input
    input_1 = tf.keras.Input(shape=input_shape)
    
    # Branch 1
    x1 = tf.keras.layers.Conv2D(20, (16, 16), activation='relu')(input_1)
    x1 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x1)
    x1 = tf.keras.layers.BatchNormalization()(x1)
    
    x1 = tf.keras.layers.Conv2D(22, (13, 13), activation='relu', padding='same')(x1)
    x1 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x1)
    x1 = tf.keras.layers.BatchNormalization()(x1)
    
    x1 = tf.keras.layers.Conv2D(26, (8, 8), activation='relu', padding='same')(x1)
    x1 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x1)
    x1 = tf.keras.layers.BatchNormalization()(x1)
    
    x1 = tf.keras.layers.Conv2D(38, (7, 7), activation='relu', padding='same')(x1)
    x1 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x1)
    x1 = tf.keras.layers.BatchNormalization()(x1)
    
    x1 = tf.keras.layers.Conv2D(39, (4, 4), activation='relu', padding='same')(x1)
    x1 = tf.keras.layers.AveragePooling2D((2, 2), padding='same')(x1)
    x1 = tf.keras.layers.BatchNormalization()(x1)
    x1 = tf.keras.layers.Dropout(0.4)(x1)
    
    x1 = tf.keras.layers.Flatten()(x1)
    x1 = tf.keras.layers.Dense(200, activation='relu')(x1)

    # Define the second input
    input_2 = tf.keras.Input(shape=input_shape)
    
    # Branch 2
    x2 = tf.keras.layers.Conv2D(31, (15, 15), activation='relu')(input_2)
    x2 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x2)
    x2 = tf.keras.layers.BatchNormalization()(x2)
    
    x2 = tf.keras.layers.Conv2D(38, (11, 11), activation='relu', padding='same')(x2)
    x2 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x2)
    x2 = tf.keras.layers.BatchNormalization()(x2)
    
    x2 = tf.keras.layers.Conv2D(43, (9, 9), activation='relu', padding='same')(x2)
    x2 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x2)
    x2 = tf.keras.layers.BatchNormalization()(x2)
    
    x2 = tf.keras.layers.Conv2D(57, (6, 6), activation='relu', padding='same')(x2)
    x2 = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x2)
    x2 = tf.keras.layers.BatchNormalization()(x2)
    
    x2 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x2)
    x2 = tf.keras.layers.AveragePooling2D((2, 2), padding='same')(x2)
    x2 = tf.keras.layers.BatchNormalization()(x2)
    x2 = tf.keras.layers.Dropout(0.4)(x2)
    
    x2 = tf.keras.layers.Flatten()(x2)
    x2 = tf.keras.layers.Dense(200, activation='relu')(x2)

    # Concatenate the two branches
    merged = tf.keras.layers.Concatenate()([x1, x2])

    # Final output layer
    output = tf.keras.layers.Dense(10, activation='softmax')(merged)

    # Create the model
    model = tf.keras.models.Model(inputs=[input_1, input_2], outputs=output)
    
    return model


In [10]:
def design_model(input_shape):

    # Let's design the model architecture.
    x1 = tf.keras.models.Sequential([

        tf.keras.layers.Conv2D(20, (16, 16), activation='relu', input_shape=input_shape),
        tf.keras.layers.MaxPooling2D((2, 2), padding='same'),
        tf.keras.layers.BatchNormalization(),
        
        tf.keras.layers.Conv2D(22, (13, 13), activation='relu',padding='same'),
        tf.keras.layers.MaxPooling2D((2, 2), padding='same'),
        tf.keras.layers.BatchNormalization(),

        tf.keras.layers.Conv2D(26, (8, 8), activation='relu',padding='same'),
        tf.keras.layers.MaxPooling2D((2, 2), padding='same'),
        tf.keras.layers.BatchNormalization(),

        tf.keras.layers.Conv2D(38, (7, 7), activation='relu',padding='same'),
        tf.keras.layers.MaxPooling2D((2, 2),  padding='same'),
        tf.keras.layers.BatchNormalization(),

        tf.keras.layers.Conv2D(39, (4, 4), activation='relu',padding='same'),
        tf.keras.layers.AveragePooling2D((2, 2), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.4),
        
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(200, activation='relu'), 
    ])

    return x1

In [11]:
def make_prediction(model, X, y, idx):
    
    genre_dict = {
        0 : "blues",
        1 : "classical",
        2 : "country",
        3 : "disco",
        4 : "hiphop",
        5 : "jazz",
        6 : "metal",
        7 : "pop",
        8 : "reggae",
        9 : "rock",
        }
        
    predictions = model.predict(X)
    genre = np.argmax(predictions[idx])
    
    print("\n---Now testing the model for one audio file---\nThe model predicts: {}, and ground truth is: {}.\n".format(genre_dict[genre], genre_dict[y[idx]]))

In [12]:
def plot_performance(hist):
    
    acc = hist.history['acc']
    val_acc = hist.history['val_acc']
    loss = hist.history['loss']
    val_loss = hist.history['val_loss']

    epochs = range(len(acc))

    plt.plot(epochs, acc, 'r', label='Training accuracy')
    plt.plot(epochs, val_acc, 'b', label='Validation accuracy')
    plt.title('Training and validation accuracy')
    plt.legend()
    plt.figure()

    plt.plot(epochs, loss, 'r', label='Training Loss')
    plt.plot(epochs, val_loss, 'b', label='Validation Loss')
    plt.title('Training and validation loss')
    plt.legend()

    plt.show()

def plot_history(hist):
    plt.figure(figsize=(20,15))
    fig, axs = plt.subplots(2)
    # accuracy subplot
    axs[0].plot(hist.history["acc"], label="train accuracy")
    axs[0].plot(hist.history["val_acc"], label="test accuracy")    
    axs[0].set_ylabel("Accuracy")
    axs[0].legend(loc="lower right")
    axs[0].set_title("Accuracy eval")
    
    # Error subplot
    axs[1].plot(hist.history["loss"], label="train error")
    axs[1].plot(hist.history["val_loss"], label="test error")    
    axs[1].set_ylabel("Error")
    axs[1].set_xlabel("Epoch")
    axs[1].legend(loc="upper right")
    axs[1].set_title("Error eval")
    
    plt.show()

In [13]:
preprocess_data_MFCC(source_path="genres_original", tfrecord_path="mfcc.tfrecord")

I0000 00:00:1727281060.856230    1800 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-09-25 10:17:41.017376: W tensorflow/core/common_runtime/gpu/gpu_device.cc:2343] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


KeyboardInterrupt: 

In [None]:
preprocess_data_STFT(source_path="genres_original", tfrecord_path="stft.tfrecord")

In [69]:
feature_description = {
    'stft': tf.io.FixedLenFeature([], tf.string),
    'label': tf.io.FixedLenFeature([], tf.int64),
}

def _parse_function(example_proto):
    parsed_example = tf.io.parse_single_example(example_proto, feature_description)
    stft = tf.io.parse_tensor(parsed_example['stft'], out_type=tf.float32)
    label = parsed_example['label']
    return stft, label

def create_dataset(tfrecord_path, batch_size=32, shuffle_buffer_size=1000):
    raw_dataset = tf.data.TFRecordDataset(tfrecord_path)
    parsed_dataset = raw_dataset.map(_parse_function)
    dataset = parsed_dataset.shuffle(shuffle_buffer_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

stft_data = create_dataset('stft.tfrecord')

# Example of data from the dataset
for stft, label in stft_data.take(1):
    print("STFT:", stft.numpy())
    print("Label:", label.numpy())
    print("STFT shape:", stft.shape)

STFT: [[[ 14.802731    14.823547    17.585005   ... -24.201668   -24.221752
   -24.237051  ]
  [  5.893271     7.070983     3.1723175  ... -30.234993   -30.245562
   -30.344965  ]
  [-39.340614   -21.47533      7.953444   ... -39.75795    -39.75795
   -39.75795   ]
  ...
  [-29.203302   -16.962463    -9.829614   ... -39.75795    -39.75795
   -39.75795   ]
  [-39.419315   -24.786474   -35.671097   ... -39.75795    -39.75795
   -39.75795   ]
  [-20.615211   -26.535805   -14.911647   ... -39.75795    -39.75795
   -39.75795   ]]

 [[-20.596478   -12.092114     1.0525944  ... -41.2703     -41.2703
   -41.2703    ]
  [-23.937958   -17.40458      2.8702223  ... -41.2703     -41.2703
   -41.2703    ]
  [-41.2703     -16.770346     3.8446593  ... -41.2703     -41.2703
   -41.2703    ]
  ...
  [-40.61619    -40.331684   -37.865555   ... -41.2703     -41.2703
   -41.2703    ]
  [-36.025414   -39.69497    -39.904434   ... -41.2703     -41.2703
   -41.2703    ]
  [-13.753759   -14.458856   -16.1262

In [131]:
# Feature description to parse TFRecord files
feature_description = {
    'mfcc': tf.io.FixedLenFeature([], tf.string),
    'label': tf.io.FixedLenFeature([], tf.int64),
}

def _parse_function(example_proto):
    parsed_example = tf.io.parse_single_example(example_proto, feature_description)
    mfcc = tf.io.parse_tensor(parsed_example['mfcc'], out_type=tf.float32)
    label = parsed_example['label']
    return mfcc, label

def create_dataset(tfrecord_path, batch_size=32, shuffle_buffer_size=1000):
    raw_dataset = tf.data.TFRecordDataset(tfrecord_path)
    parsed_dataset = raw_dataset.map(_parse_function)
    dataset = parsed_dataset.shuffle(shuffle_buffer_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

def split_dataset(dataset, split_ratio=0.8):
    dataset_size = len(list(dataset))  # Note: This approach is memory-intensive for large datasets
    train_size = int(dataset_size * split_ratio)
    test_size = dataset_size - train_size
    
    train_dataset = dataset.take(train_size)
    test_dataset = dataset.skip(train_size).take(test_size)
    
    return train_dataset, test_dataset

mfcc_data = create_dataset('mfcc.tfrecord')

mfcc_train, mfcc_test = split_dataset(mfcc_data)

for mfcc, label in mfcc_train.take(1):
    mfcc_shape=mfcc.shape

In [132]:
input_shape = (mfcc_shape[1], mfcc_shape[2], 1)
model = design_model(input_shape)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [133]:
model.compile(optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.0025),
                     loss='sparse_categorical_crossentropy',
                     metrics = ['acc']
                     )

In [134]:
model.summary() 

In [136]:
def prepare_dataset(dataset):
    dataset = dataset.shuffle(1000).batch(32).prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

mfcc_train = prepare_dataset(mfcc_train)
mfcc_test = prepare_dataset(mfcc_test)


In [138]:
history = model.fit(mfcc_train,
                    validation_data=mfcc_test,
                    epochs=75
                    )

Epoch 1/75


ValueError: Exception encountered when calling Sequential.call().

[1mCannot take the length of shape with unknown rank.[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=<unknown>, dtype=float32)
  • training=True
  • mask=None

In [141]:
sample_batch = next(iter(mfcc_train))
try:
    model(sample_batch[0])  # Pass a batch of features through the model
except Exception as e:
    print("Error during model call:", e)

Error during model call: Exception encountered when calling Sequential.call().

[1mInput 0 of layer "conv2d_15" is incompatible with the layer: expected axis -1 of input shape to have value 1, but received input with shape (32, 32, 125, 128)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(32, 32, 125, 128), dtype=float32)
  • training=None
  • mask=None


In [None]:
plot_performance(history)

In [None]:
train_accuracy = history.history['acc'][-1]  # Last epoch accuracy
test_accuracy = history.history['val_acc'][-1]  # Last epoch validation accuracy

# Print accuracies
print('Training accuracy:', train_accuracy)
print('Test accuracy:', test_accuracy)

In [None]:
plot_history(history)

In [None]:
make_prediction(model, Xtest_MFCC, ytest_MFCC, 24)

In [None]:
# Save the model to a file
with open('CNN.pkl', 'wb') as file:
    pickle.dump(model, file)