# Import libraries

In [1]:
import os, glob
import librosa as lr
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import h5py
import keras
import pickle
import keras.initializers

from pathlib import Path
from glob import glob
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from keras.layers import Conv2D, MaxPool2D, Flatten, Dense, Dropout
from keras.models import Sequential
from keras.regularizers import l2
from keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.preprocessing import StandardScaler

In [2]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Num GPUs Available:  1


In [9]:
# Set directory for source files
ROOT = "all_samples_wav"
folders = os.listdir(ROOT) # names of subfolders
file_data = [] # filenames in each of the 58 folders
TARGET_SECONDS = 3

n_fft = 1024
hop_length = 512
n_mfcc = 13
sr = 22050

# Preprocessing labels

In [10]:
for label in tqdm(range(len(folders))):
    sub_dir = os.listdir(f'{ROOT}/{folders[label]}')
    file_data.append(sub_dir)
    
    # Because there are multiple subfolders inside 'percussion', count the data inside too
    # if folders[label]=='percussion':
    #     sub_dir = os.listdir(f'{ROOT}/percussion')
    #     for i in range(len(sub_dir)):
    #         contents = os.listdir(f'{ROOT}/percussion/{sub_dir[i]}')
    #         file_data.append(contents)
            
    # else:
    #     file_data.append(sub_dir)

100%|█████████████████████████████████████████████████████████████████████████████████| 19/19 [00:00<00:00, 635.04it/s]


In [11]:
# # Include percussion subfolders into labels
# 
# removed = folders.pop(13)
# # folders.insert(13, os.listdir(f'{ROOT}/percussion'))
# perc_sub_dir = os.listdir(f'{ROOT}/percussion')
# for i in range(len(perc_sub_dir)):
#     folders.insert(13+i, perc_sub_dir[i])
# 
print(f'Number of folders: {len(file_data)}')

# Folders 13 - 51 are subfolders in the 'Percussion' directory with distinct 
# They will be trained individually, but summed up as percussion in the end
# file_data[13:51]

amounts = []
for i in range(len(file_data)):
    amounts.append(len(file_data[i]))

col1 = np.array(folders)
col2 = np.array(amounts)
merge = {'folder': col1, 'amount': col2}

df = pd.DataFrame(merge, columns=['folder', 'amount'])
                   
print(f'Total amount of samples: {sum(amounts)}')

df

Number of folders: 19
Total amount of samples: 13533


Unnamed: 0,folder,amount
0,banjo,74
1,bassoon,720
2,bass_clarinet,944
3,cello,889
4,clarinet,846
5,contrabassoon,710
6,cor_anglais,691
7,double_bass,852
8,flute,878
9,french_horn,652


Load audio data; only up to 3 seconds. Shorter soundfiles are being zero-padded to reach 3 seconds

In [None]:
audio_data = []

for dirname, _, filenames in os.walk(ROOT):
    for filename in filenames:
        src = f'{dirname}/{filename}'
        audio_data.append(lr.load(src))

Save/Load audio_waves array

In [None]:
# Save
with open('audio_data.pickle', 'wb') as f:
    pickle.dump(audio_data, f)

In [None]:
# Load
# with open('audio_data.pickle', 'rb') as f:
#     audio_data = pickle.load(f)

Create dataframe overview

In [None]:
fname = []
classID = []
num_samples = []

df1 = pd.DataFrame(np.array(audio_data, dtype=object), columns=['signal', 'samplerate'])

for i in range(df1.shape[0]):
    num_samples.append(len(df1['signal'].iloc[i]))
num_samples = np.array(num_samples)

for dirname, _, filenames in os.walk(ROOT):
    for filename in filenames:
        fname.append(filename)
        classID.append(dirname[16:])
fname = np.array(fname)
classID = np.array(classID)

df1['num samples'] = num_samples
df1['seconds'] = df1['num samples']/df1['samplerate']
df1['fname'] = fname
df1['classID'] = classID

# round seconds
df1['seconds'] = df1['seconds'].apply(pd.to_numeric, errors='coerce').round(1)

df1

Bring all soundfiles to the desired length of 3 seconds

In [None]:
processed_audio = []
target_num_samples = sr*target_seconds

for i in range(len(audio_data)):
    signal = audio_data[i][0]
    
    # shorten if too long, right-pad if too short
    if len(signal) > target_num_samples:
        processed_audio.append(signal[:sr*3])
        
    if len(signal) < target_num_samples:
        num_missing_samples = target_num_samples - len(signal)
        last_dim_padding = (0, num_missing_samples)
        processed_audio.append(np.pad(signal, last_dim_padding, mode='constant'))
        
processed_audio = np.array(processed_audio)

# Feature extraction

In [None]:
def mfcc_scale(mfcc):
    scaler = StandardScaler()
    mfcc = scaler.fit_transform(np.array(mfcc))
    return mfcc

def calc_mfcc(signal):
    return lr.feature.mfcc(y=signal, n_mfcc=n_mfcc, sr=sr)

In [None]:
mfcc_features = list()

for i in tqdm(range(len(processed_audio))):
    mfcc_features.append(mfcc_scale(calc_mfcc(processed_audio[i])))
    
mfcc_features = np.array(mfcc_features)

Save/load mfcc_features

In [None]:
# Save
with open('mfcc_features.pickle', 'wb') as f:
    pickle.dump(mfcc_features, f)

In [None]:
# Load
# with open('norm_mfcc_features.pickle', 'rb') as f:
#     mfcc_features = pickle.load(f)

In [None]:
print(processed_audio.shape)
print(mfcc_features.shape)

# Extract and plot a single sound file

In [None]:
test_nr = 8100
plt.figure(figsize=(12,2))
plt.plot(processed_audio[test_nr])
plt.title(classID[test_nr])
plt.show()
plt.figure(figsize=(15, 2))
plt.imshow(mfcc_features[test_nr], vmin=0, vmax=1)
plt.title(classID[test_nr])
plt.show()
print(fname[8100])

Encoding the labels<br>
'cel' -> '0',<br>
'cla' -> '1' etc.

In [None]:
label_encoder = LabelEncoder()
label_encoded = label_encoder.fit_transform(classID)
label_encoded = label_encoded[:, np.newaxis]

In [None]:
one_hot_encoder = OneHotEncoder(sparse=False)
one_hot_encoded = one_hot_encoder.fit_transform(label_encoded)

# Create train and test sets

In [None]:
X = mfcc_features
y = one_hot_encoded
X = (X-X.min())/(X.max()-X.min())
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [None]:
input_shape = (X_train.shape[1], X_train.shape[2], 1)

In [None]:
input_shape

In [None]:
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
print(X_train.shape)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)
print(X_test.shape)

# Training

In [None]:
# Training parameters

num_epochs = 100
# num_steps = 1000
activation = 'relu'
last_act = 'Softmax'
kernel_init = 'he_normal'
dense_init = keras.initializers.HeNormal()
regularizer = l2(0.01)
padding = 'same'
loss = 'categorical_crossentropy'
optimizer = 'adam'
metrics = 'acc'
dropout_prob = 0.3
filter_dim = (3, 3)

# Early stopping parameters
callback = keras.callbacks.EarlyStopping(monitor='loss', patience=3)

CNN Architecture

In [None]:
model = Sequential()
model.add(Conv2D(16, filter_dim, activation=activation, strides=(1, 1), padding=padding, input_shape=input_shape, kernel_initializer=kernel_init))
# model.add(MaxPool2D((2, 2), padding=padding))

model.add(Conv2D(32, filter_dim, activation=activation, strides=(1, 1), padding=padding, kernel_initializer=kernel_init))
# model.add(MaxPool2D((2, 2), padding=padding))

model.add(Conv2D(64, filter_dim, activation=activation, strides=(1, 1), padding=padding, kernel_initializer=kernel_init))
# model.add(MaxPool2D((2, 2), padding=padding))

model.add(Conv2D(128, filter_dim, activation=activation, strides=(1, 1), padding=padding, kernel_initializer=kernel_init))
model.add(MaxPool2D((2, 2), padding=padding))

model.add(Flatten())
model.add(Dense(512, activation=activation, kernel_initializer=dense_init, kernel_regularizer=regularizer))
model.add(Dropout(0.3))
model.add(Dense(1024, activation=activation, kernel_initializer=dense_init, kernel_regularizer=regularizer))
model.add(Dropout(0.2))
model.add(Dense(512, activation=activation, kernel_initializer=dense_init, kernel_regularizer=regularizer))
model.add(Dense(58, activation=last_act))

model.compile(loss=loss, 
     optimizer=optimizer,
     metrics=[metrics])

In [None]:
model.summary()

Train

In [None]:
# EarlyStopping(monitor='val_loss', patience=7),
#             ModelCheckpoint(filepath='best_model.h5', monitor='val_loss', save_best_only=True)

history = model.fit(X_train, y_train, 
                    # steps_per_epoch=num_steps,
                    initial_epoch=0,
                    epochs=num_epochs, 
                    validation_data=(X_test, y_test), 
                    shuffle=True,
                    callbacks=[callback])

# Evaluation

In [None]:
plt.figure(figsize=(8,8))
plt.title('Loss Value')
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['loss', 'val_loss'])
print('loss', history.history['loss'][-1])
print('val_loss:', history.history['val_loss'][-1])
plt.show()
plt.figure(figsize=(8,8))
plt.title('Accuracy')
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.legend(['acc', 'val_acc'])
print('acc:', history.history['acc'][-1])
print('val_acc:', history.history['val_acc'][-1])
plt.show()

In [None]:
predictions = model.predict(X_test)

In [None]:
predictions = np.argmax(predictions, axis=1)
y_test = one_hot_encoder.inverse_transform(y_test)

In [None]:
cm = confusion_matrix(y_test, predictions)
plt.figure(figsize=(8,8))
sns.heatmap(cm, annot=True, xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_, fmt='d', cmap=plt.cm.Blues, cbar=False)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

In [None]:
from sklearn.metrics import recall_score, precision_score, accuracy_score
from sklearn.metrics import confusion_matrix, f1_score, classification_report

In [None]:
# Confusion matrix
print("\nConfusion matrix:\n", confusion_matrix(y_test, predictions))

# Classification report
print("\nClassification report:\n", classification_report(y_test, predictions))