# Classify Audio using CNN's

Content:
1. Load GTZAN dataset
2. Explore the dataset
3. Train a default model 
4. Alter the model to achieve better results
5. Train a model with your own music
6. Using your own model

In [None]:
%pip install -q tensorflow===2.8.0 librosa===0.9.1

In [None]:
import glob
import os
import warnings

from pathlib import Path
from random import randint
from typing import List, Optional, Tuple

import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import shutil
import tensorflow as tf

from IPython.display import Audio, Image
from IPython.core.display import display
from google.colab import files
from sklearn.preprocessing import minmax_scale
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, LeakyReLU, ReLU, Activation, BatchNormalization, Rescaling
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.regularizers import l2
from tensorflow.keras import utils

# 1. Load GTZAN dataset

Obtain the dataset by running the following cell which clones the repository, containing the .wav-files and training data.


In [None]:
!git clone --depth 1 https://github.com/AvisiLabs/techday-music.git
gtzan_path = "/content/techday-music/gtzan-genres"
gtzan_processed_path = "/content/techday-music/gtzan-processed"

# 2. Explore the dataset

First we define some functions that will help us while exploring and preprocessing our data.

In [None]:
HOP_LENGTH = 512

def load_audio(path: str) -> Tuple[np.ndarray, int]:
  """
    Loads audio using Librosa.
    In this way the audio is resampled to a sample rate of 22050Hz, 
    the data is normalised to a range between -1 and 1
    and the audio channels are flattened into mono.
  """
  return librosa.load(path)

def show_audio_player(audio: np.ndarray, sample_rate: int) -> None:
  """
    Show an audio player for the given audio and sample rate
  """
  display(Audio(audio, rate=sample_rate))

def show_waveform(audio: np.ndarray, sample_rate: int) -> None:
  """
    Show a waveform for the given audio and sample rate
  """
  plt.figure(figsize = (16,6))
  librosa.display.waveshow(y=audio, sr=sample_rate)
  plt.show()
  plt.close()

def create_mel_spectrogram(audio: np.ndarray, sample_rate: int, file_name: Optional[str] = None) -> None:
  S = librosa.feature.melspectrogram(y=audio, sr=sample_rate, n_mels=128,
                                    fmax=8000)
  DB = librosa.amplitude_to_db(S, ref=np.max)
  fig, ax = plt.subplots(figsize=(16, 6))

  if file_name != None:
    plt.axis('off')
    librosa.display.specshow(DB, sr=sample_rate, hop_length=HOP_LENGTH, x_axis='time', y_axis='mel', cmap='gray')
    plt.savefig(file_name, bbox_inches='tight', pad_inches=0, transparent=False)
  else:
    librosa.display.specshow(DB, sr=sample_rate, hop_length=HOP_LENGTH, x_axis='time', y_axis='mel')
    plt.show()

  plt.clf()
  plt.close()

def inspect_audio_file(file_path: str) -> None:
  audio, sample_rate = load_audio(file_path)
  print("Audio player:")
  show_audio_player(audio=audio, sample_rate=sample_rate)
  print("Waveform:")
  show_waveform(audio=audio, sample_rate=sample_rate)
  print("Mel spectrogram:")
  create_mel_spectrogram(audio, sample_rate)


After that, we are able to load some example files.


In [None]:
inspect_audio_file(os.path.join(gtzan_path, "pop/pop.00051.wav"))

In [None]:
inspect_audio_file(os.path.join(gtzan_path, "pop/pop.00038.wav"))

In [None]:
inspect_audio_file(os.path.join(gtzan_path, "metal/metal.00010.wav"))

To see that the spectrograms for different genres do differ, we are displaying 5 examples for both metal and classical songs.

Note that the images are grayscale from now on. This is for performance improvements while training the model.

In [None]:
def display_examples_for_genre(genre: str):
  directory = os.path.join(gtzan_processed_path, genre)
  
  for i in range(10,15):
    filename = os.path.join(directory, genre + ".000" + str(i) + "-0.png")
    print(filename)
    display(Image(filename))

In [None]:
display_examples_for_genre("metal")

In [None]:
display_examples_for_genre("classical")

# 3. Train a default model

First we change the runtime type of our Colab Notebook to a GPU one: Runtime -> Change Runtime Type -> Choose for GPU under "Hardware Accelerator". This is needed to ensure we have enough performance for training the model quickly.

Because this connects us to a new runtime, we have to rerun the previous cells. For this press the shortcut CMD/ctrl + F8 / Runtime -> Run before. Make sure you do this after selecting this cell. This can take a while, so this might be a good time to grab a new beer 🍺

In [None]:
#@title Genres
#@markdown Choose three distinct genres that will be used for training and inference.
#@markdown Make sure you run this cell after selecting your genres.
first_genre = 'jazz' #@param ["blues","classical", "country", "disco", "hiphop", "jazz", "metal", "pop", "reggae", "rock"]
second_genre = 'hiphop' #@param ["blues","classical", "country", "disco", "hiphop", "jazz", "metal", "pop", "reggae", "rock"]
third_genre = 'pop' #@param ["blues","classical", "country", "disco", "hiphop", "jazz", "metal", "pop", "reggae", "rock"]

genres_to_copy = [first_genre, second_genre, third_genre]
genres_to_copy.sort()

# Copy dataset
We copy the folders of your selected genres to our dataset folder.

The dataset is structured as follows:

- dataset
  - blues
    - 00000.png
    - 00001.png
    - ...
  - classical
    - 00000.png
    - 00001.png
    - ...

In [None]:
dataset_folder = "/content/dataset"
!mkdir -p {dataset_folder}
dataset_path = os.path.join(dataset_folder, "gtzan_processed")
if not os.path.exists(dataset_path):
  for genre in genres_to_copy:
    genre_git_path = os.path.join(gtzan_processed_path, genre)
    genre_path = os.path.join(dataset_path, genre)
    shutil.copytree(genre_git_path, genre_path)

# Create dataset from files
Since our images are already grouped by genre, we can use the `image_dataset_from_directory` function to load everything into a dataset. 

We split the dataset into two parts: one dataset for training the model and a smaller dataset (30% of all data) for evaluating how our model performs.

In [None]:
INPUT_SHAPE = (int(326 / 2), int(892 / 2), 1)
COLOR_MODE = "grayscale"
VALIDATION_SPLIT = 0.3
SEED = 123
BATCH_SIZE = 16

train_dataset = utils.image_dataset_from_directory(
    dataset_path,
    color_mode=COLOR_MODE,
    validation_split=VALIDATION_SPLIT,
    subset="training",
    seed=SEED,
    image_size=(INPUT_SHAPE[0], INPUT_SHAPE[1]),
    batch_size=BATCH_SIZE
).cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)

validation_dataset = utils.image_dataset_from_directory(
    dataset_path,
    color_mode=COLOR_MODE,
    validation_split=VALIDATION_SPLIT,
    subset="validation",
    seed=SEED,
    image_size=(INPUT_SHAPE[0], INPUT_SHAPE[1]),
    batch_size=BATCH_SIZE
).cache().prefetch(buffer_size=tf.data.AUTOTUNE)

# Define model

We define our model, which consists of several blocks:
* Rescaling, which normalizes our image data by converting 8-bit color to a floating point number
* Convolution, which has kernels that move over the image to obtain information about the image (features)
* Pooling, which reduce the number of parameters by combining information from previous layers
* Output, which connects the features from the final layers to the labels we can assign to the input

In [None]:
def define_model(number_of_classes: int):
    result_model = Sequential(name='SoundClassifier')
    
    base_kernels = 8
    
    weight_decay = 1e-4

    result_model.add(Rescaling(1./255, input_shape=INPUT_SHAPE))
    
    # CONV1
    result_model.add(Conv2D(base_kernels, kernel_size=5, strides=1, padding='same', kernel_regularizer=l2(weight_decay)))
    result_model.add(ReLU())
    result_model.add(BatchNormalization())
    
    # CONV2
    result_model.add(Conv2D(base_kernels, kernel_size=5, strides=1, padding='same', kernel_regularizer=l2(weight_decay)))
    result_model.add(ReLU())
    result_model.add(BatchNormalization())
    
    # POOL + dropout
    result_model.add(MaxPooling2D(pool_size=3, strides=3))
    result_model.add(Dropout(rate=0.4))
    
    # CONV3
    result_model.add(Conv2D(base_kernels * 2, kernel_size=3, strides=3, padding='same', kernel_regularizer=l2(weight_decay)))
    result_model.add(ReLU())
    result_model.add(BatchNormalization())
    
    # POOL + dropout
    result_model.add(MaxPooling2D(pool_size=3, strides=3))
    result_model.add(Dropout(rate=0.4))
    
    # FC layers
    result_model.add(Flatten())
    result_model.add(Dense(number_of_classes, activation = 'softmax'))
    result_model.summary()
    return result_model

model = define_model(len(genres_to_copy))

model.compile(
    loss='sparse_categorical_crossentropy', 
    optimizer=Adam(learning_rate=0.01, decay=1e-6), 
    metrics=['accuracy'])

# Train model

We train the defined model using the dataset which we have loaded. The training runs for 30 epochs, which means the dataset is presented 30 times to the model. We also shuffle the data every epoch. By doing so, the model can learn from all samples regardless of the order in which it is presented to the model. 

During training you will see various metrics in the log-output. The loss metric represents how far the model is from giving a perfect prediction. The accuracy metric tells you how many labels were correctly predicted. 

The loss and accuracy metric are for the training dataset (from which the model is learning) and the val_loss and val_accuracy metrics are for the validation dataset. Since the validation dataset contains samples which the model has not seen yet, this will be the most important metric to watch.

In [None]:
checkpoint_folder = "/content/soundclassifier_checkpoint"
!mkdir -p {checkpoint_folder}
checkpoint_filename = "model.weights.best.hdf5"
checkpoint_path = os.path.join(checkpoint_folder, checkpoint_filename)

def train_model(model):
    checkpointer = ModelCheckpoint(filepath=checkpoint_path, verbose=1, 
                               save_best_only=True)

    hist = model.fit(
        train_dataset, 
        validation_data=validation_dataset, 
        batch_size=BATCH_SIZE, 
        epochs=30,
        callbacks=[checkpointer], 
        verbose=1)
    return hist

history = train_model(model)
def print_training_history(history):
    plt.plot(history.history['accuracy'], label='train accuracy')
    plt.plot(history.history['val_accuracy'], label='validation accuracy')
    plt.legend()
    plt.show()
print_training_history(history)

In [None]:
def get_prediction(img):
    arr = np.array(img)
    arr = arr.reshape(INPUT_SHAPE)
    arr = np.expand_dims(arr, axis=0)
    prediction = model.predict(arr)
    
    bestclass = ''
    bestconf = -1
    for n in [0,1,2]:
        if prediction[0][n] > bestconf:
            bestclass = str(n)
            bestconf = prediction[0][n]
    return (bestclass, bestconf)

def visualize_prediction(image, sorted_labels):
    plt.figure(figsize=(16, 8), dpi=80)
    plt.axis('off')
    plt.imshow(image.numpy().astype("uint8").squeeze(axis=2), cmap="gray")
    (bestclass, bestconf) = get_prediction(image)
    print(f"Predicted genre: {sorted_labels[int(bestclass)]}, confidence: {bestconf}")
    plt.show()
    plt.close()

def visualize_random_prediction(dataset):
    for images, labels in dataset.shuffle(1000).take(1):
      i = randint(0, len(images)-1)
      label = labels[i]
      image = images[i]
      print(f"Actual genre: {genres_to_copy[label.numpy()]}")
      visualize_prediction(image, genres_to_copy)

visualize_random_prediction(validation_dataset)

Want to see if the model also performs on your own music? Run the following cells to upload a file and pass it to the trained model.

In [None]:
def read_image(file_path):
  img = tf.io.read_file(file_path)
  img = tf.io.decode_png(img, channels=1)
  return tf.image.resize(img, [INPUT_SHAPE[0], INPUT_SHAPE[1]])

def predict_for_uploaded(uploaded, labels):
  upload_base_path = "/content"
  uploaded_file_name = next(iter(uploaded))
  uploaded_file_path = os.path.join(upload_base_path, uploaded_file_name)

  inference_audio, inference_sample_rate = load_audio(uploaded_file_path)

  inference_samples = take_samples_from_audio(inference_audio, inference_sample_rate)

  for i, sample in enumerate(inference_samples):
    show_audio_player(sample, inference_sample_rate)
    processed_file_path = os.path.join(upload_base_path, os.path.splitext(uploaded_file_name)[0] + "-" + str(i) + ".png")
    create_mel_spectrogram(audio=sample, sample_rate=inference_sample_rate, file_name=processed_file_path)
    image = read_image(processed_file_path)
    visualize_prediction(image, labels)
    print()

def take_samples_from_audio(audio: np.ndarray, sample_rate: int) -> List[np.ndarray]:
  samples = []
  total_length = len(audio)
  mid = int(total_length / 2)

  if (total_length == 30 * sample_rate):
    samples.append(audio)
  if (total_length >= 60 * sample_rate):
    samples.append(audio[mid - (sample_rate * 30): mid])
    samples.append(audio[mid: mid + (sample_rate * 30)])
  elif (total_length < 60  * sample_rate and total_length >= 30 * sample_rate):
    samples.append(audio[mid - (sample_rate * 15): mid + (sample_rate * 15)])
  else:
    print("Audio must be equal to or longer than 30 seconds.")

  return samples

In [None]:
uploaded = files.upload()
predict_for_uploaded(uploaded, genres_to_copy)

# 4. Alter the model to achieve better results

The model isn't that good at predicting the correct genre (yet!).
Change the hyperparameters of the model above, then retrain the model by running the cells again.

Think of:
- Number of epochs
- Number of layers
- Number of kernels, kernel size or stride
- Pooling type and size
- Optimizer and learning rate


# 5. Train a model with your own music

In [None]:
#@markdown Choose if you want the data to be read from and persisted to a Google Drive folder (which is created for you). 
#@markdown In this way, you don't have to process your music files every time you run this notebook.
#@markdown Check or uncheck the checkbox and run this cell.
persist_processed_data_on_drive = True #@param {type:"boolean"}

music_path = None
music_processed_path = None
if persist_processed_data_on_drive:
  print("Data will be persisted on Drive")
  from google.colab import drive
  mount_path = "/content/drive"
  drive.mount(mount_path, force_remount=True)
  music_path = os.path.join(mount_path, "MyDrive/music")
  music_processed_path =  os.path.join(mount_path, "MyDrive/music-processed")
else:
  print("Data will NOT be persisted on Drive")
  music_path = "/content/music"
  music_processed_path = "/content/music-processed"

!mkdir -p {music_path} {music_processed_path}

Earlier we used the GTZAN dataset which uses samples of 30 seconds long. For using our own music, we also want to take samples of 30 seconds.

The code beneath searches for all files inside the music path, takes samples from the files and create spectrograms for it. These spectrograms are written to the music-processed path.

Depending on your choice for persisting data in Google Drive you can either upload your music files to the created Google Drive directory (with the name "music") or the music directory which can be found in the file explorer.

The expected structure for the files in the music path is:
- One directory per different genre
- Inside a genre directory, file names must be unique

The supported audio formats are:
- .aifc
- .aiff / .aif
- .au
- .flac
- .ogg
- .opus
- .wav
- .mp3
- .m4a / .mp4



In [None]:
# Filter following warning: UserWarning: PySoundFile failed. Trying audioread instead.
warnings.filterwarnings('ignore')

def spectrograms_exists_for_file(file: str):
  spectrograms_search_path = os.path.splitext(file.replace(music_path, music_processed_path))[0] + "*.png"
  return len(glob.glob(spectrograms_search_path)) != 0

def create_spectrogram_for_file(file: str):
  if spectrograms_exists_for_file(file):
    print("Skipping because spectrograms already exist for", file)
    return

  print("Processing", file)

  audio, sample_rate = load_audio(file)
  samples = take_samples_from_audio(audio, sample_rate)

  for i, sample in enumerate(samples):
    processed_file_path = Path(os.path.splitext(file.replace(music_path, music_processed_path))[0] + "-" + str(i) + ".png")
    os.makedirs(processed_file_path.parent, exist_ok=True)
    create_mel_spectrogram(audio=sample, sample_rate=sample_rate, file_name=processed_file_path)
 

print(os.path.join(music_path, '*/*'))
music_files = glob.glob(os.path.join(music_path, '*/*'))

for file in music_files:  
  create_spectrogram_for_file(file)

print("Finished creating spectrograms.")

custom_labels = next(os.walk(music_processed_path))[1]
custom_labels.sort()

In [None]:
VALIDATION_SPLIT = 0.3
SEED = 420
BATCH_SIZE = 16

custom_train_dataset = utils.image_dataset_from_directory(
    music_processed_path,
    color_mode=COLOR_MODE,
    validation_split=VALIDATION_SPLIT,
    subset="training",
    seed=SEED,
    image_size=(INPUT_SHAPE[0], INPUT_SHAPE[1]),
    batch_size=BATCH_SIZE
).cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)

custom_validation_dataset = utils.image_dataset_from_directory(
    music_processed_path,
    color_mode=COLOR_MODE,
    validation_split=VALIDATION_SPLIT,
    subset="validation",
    seed=SEED,
    image_size=(INPUT_SHAPE[0], INPUT_SHAPE[1]),
    batch_size=BATCH_SIZE
).cache().prefetch(buffer_size=tf.data.AUTOTUNE)

We define a new model, which you can tweak again so it performs well for your own dataset.

In [None]:
def define_model(number_of_classes: int):
    result_model = Sequential(name='SoundClassifier')
    
    base_kernels = 8
    
    weight_decay = 1e-4

    result_model.add(Rescaling(1./255, input_shape=INPUT_SHAPE))
    
    # CONV1
    result_model.add(Conv2D(base_kernels, kernel_size=5, strides=1, padding='same', kernel_regularizer=l2(weight_decay)))
    result_model.add(ReLU())
    result_model.add(BatchNormalization())
    
    # CONV2
    result_model.add(Conv2D(base_kernels, kernel_size=5, strides=1, padding='same', kernel_regularizer=l2(weight_decay)))
    result_model.add(ReLU())
    result_model.add(BatchNormalization())
    
    # POOL + dropout
    result_model.add(MaxPooling2D(pool_size=3, strides=3))
    result_model.add(Dropout(rate=0.4))
    
    # CONV3
    result_model.add(Conv2D(base_kernels * 2, kernel_size=3, strides=3, padding='same', kernel_regularizer=l2(weight_decay)))
    result_model.add(ReLU())
    result_model.add(BatchNormalization())
    
    # POOL + dropout
    result_model.add(MaxPooling2D(pool_size=3, strides=3))
    result_model.add(Dropout(rate=0.4))
    
    # FC layers
    result_model.add(Flatten())
    result_model.add(Dense(number_of_classes, activation = 'softmax'))
    result_model.summary()
    return result_model

model = define_model(len(next(os.walk(music_processed_path))[1]))

model.compile(
    loss='sparse_categorical_crossentropy', 
    optimizer=Adam(learning_rate=0.01, decay=1e-6), 
    metrics=['accuracy'])

In [None]:
custom_checkpoint_folder = "/content/soundclassifier_custom_checkpoint"
!mkdir -p {custom_checkpoint_folder}
custom_checkpoint_path = os.path.join(custom_checkpoint_folder, checkpoint_filename)

def train_model(model):
    checkpointer = ModelCheckpoint(filepath=custom_checkpoint_path, verbose=1, 
                               save_best_only=True)

    hist = model.fit(
        custom_train_dataset, 
        validation_data=custom_validation_dataset, 
        batch_size=BATCH_SIZE, 
        epochs=30,
        callbacks=[checkpointer], 
        verbose=1)
    return hist

history = train_model(model)
print_training_history(history)

# 6. Using your own model

Now that we've trained a model for your own data, we can use the model to classify new samples. You can upload a music file using the upload form below, after which the file is classified using the trained model.

In [None]:
uploaded = files.upload()
predict_for_uploaded(uploaded, custom_labels)