In [1]:
%load_ext autoreload
%autoreload 2

# Classifying Music Note sounds using Few Shot Deep Learning

In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

In [3]:
import os
import math

# Load various imports 
import librosa
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split 
from sklearn import preprocessing

from matplotlib import pyplot as plt
from tqdm.notebook import tqdm

from tensorflow.keras import backend as K
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Input, Lambda
from tensorflow.keras.losses import binary_crossentropy

#### utils

In [199]:
def fft(f):
    Ni = len(f)
    Mi = int(Ni / 2)
    if Mi <= 2:
        return [f[0] + f[1] + f[2] + f[3], 
               f[0] - 1j*f[1] - f[2] + 1j*f[3],
               f[0] - f[1] + f[2] - f[3],
               f[0] + 1j*f[1] - f[2] - 1j*f[3]]
    
    wn = math.cos(2*math.pi/Ni) - 1j*math.sin(2*math.pi/Ni)
    fe = [f[i] for i in range(Ni) if i % 2 == 0]
    fo = [f[i] for i in range(Ni) if i % 2 == 1]
    Fe = fft(fe)
    Fo = fft(fo)
    return [np.around(Fe[i] + (wn**i)*Fo[i], decimals=10) for i in range(Mi)] + [np.around(Fe[i] - (wn**i)*Fo[i], decimals=10) for i in range(Mi)]

def get_audio_data(filename):
    fs = 2**12 # sample rate
    tp = 2 # sampling duration
    N = n = fs*tp # number of samples
    
    # Extract data and sampling rate from file
    recording, fs = librosa.load(filename, sr=fs, duration=tp, mono=True)

    n = len(recording)        
    tp = int(n / fs)

    if tp < 2:
        pad_width = N - recording.shape[0]
        recording = np.pad(recording, pad_width=((0, pad_width),), mode='constant')

        n = len(recording)
        tp = int(n / fs)

    N = fs*tp # number of samples
    x = [np.round(float(recording[i]), 10) for i in range(n)] # input sequence
    return x, tp, n

def get_frequency_amplitude(x, tp, N):
    _X = fft(x) # discrete Fourier transform
    X = [np.round(Xi/N, 10) for Xi in _X] # frequency spectrum
    X_amp = [np.absolute(Xi) for Xi in X] # amplitude spectrum

    M = int(N/2)
    ti = [i*tp/N for i in range(N)]
    fi = [i/tp for i in range(M)]
    X_amp = np.array(X_amp[:M])*2
    
    return ti, fi, X_amp

def extract_features(filepath):
    # try:
    audio_features = get_audio_data(filepath)
    if not audio_features:
        return

    x, tp, N = audio_features
    ti, fi, X_amp = get_frequency_amplitude(x, tp, N)
    return X_amp
#     return fi, X_amp
    
    # except Exception as e:
    #     print("Error encountered while parsing file: ", file_name, e)
    #     return None 
    
def extract_features(file_name):
    audio, sample_rate = librosa.load(file_name, res_type='kaiser_fast', duration=3) 
    mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=40)
    
#     pad_width = 256 - mfccs.shape[1]
#     mfccs = np.pad(mfccs, pad_width=((0, 0), (0, pad_width)), mode='constant')     
    
    mfccs = mfccs.mean(1)
    return mfccs

def sampling(args):
    z_mean, z_log_sigma = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim),
                              mean=0., stddev=0.1)
    return z_mean + K.exp(z_log_sigma) * epsilon

#### Load Preprocessed data 

In [200]:
# Set the path to the full UrbanSound dataset 
DATA_DIR = os.path.join("data", "guitar_sample")
# DATA_DIR = os.path.join("/content/drive/My Drive/Colab Notebooks/data", "guitar_sample")

# feature list
features = []
labels = os.listdir(DATA_DIR)

# Iterate through each sound file and extract the features 
for folder in tqdm(os.listdir(DATA_DIR)):
    for file in os.listdir(os.path.join(DATA_DIR, folder)):
        class_label = folder
        file_name = os.path.join(os.path.join(DATA_DIR, folder, file))
        
        data = extract_features(file_name)
        if data is None:
            continue
        
        data = np.array(data).ravel()
        features.append([data, class_label])

# Convert into a Panda dataframe 
featuresdf = pd.DataFrame(features, columns=['feature','class_label'])

print('Finished feature extraction from ', len(featuresdf), ' files') 

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=12.0), HTML(value='')))


Finished feature extraction from  251  files


In [201]:
# featuresdf.feature = featuresdf.feature.apply(lambda xx: xx.reshape((4096, 2)))
featuresdf.feature.iloc[0].shape

(40,)

In [202]:
featuresdf.head()

Unnamed: 0,feature,class_label
0,"[-631.5046, 177.96318, 42.935505, 5.7489667, 9...",0A
1,"[-611.00885, 173.89738, 43.356262, 10.194802, ...",0A
2,"[-600.1429, 145.31587, 36.65796, 10.907211, 15...",0A
3,"[-560.3416, 150.13954, 44.98096, 10.299185, 16...",0A
4,"[-590.608, 129.81804, 34.128117, 1.7992443, 8....",0A


In [203]:
labels

['0A', '0B', '0D', '0EH', '0EL', '0G', '1A', '1B', '1D', '1EH', '1EL', '1G']

In [204]:
# Convert features and corresponding classification labels into numpy arrays
input_data = np.array(featuresdf.feature.tolist())
output_label = np.array(featuresdf.class_label.tolist())

# split train and test data
x_train, x_test, y_train, y_test = train_test_split(input_data, output_label, test_size=0.2, random_state = 42)

le = preprocessing.LabelEncoder()

y_train_label = y_train.copy()
y_test_label = y_test.copy()

y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)

In [205]:
x_train.shape, y_train.shape

((200, 40), (200,))

In [206]:
x_test.shape, y_test.shape

((51, 40), (51,))

In [207]:
((x_train.max(1) - x_train.T) / (x_train.max(1) - x_train.min(1))).T[0]

array([1.        , 0.        , 0.093688  , 0.06795602, 0.11969754,
       0.07446596, 0.10145702, 0.08712867, 0.13352035, 0.07851177,
       0.09668222, 0.12599875, 0.11257131, 0.13458844, 0.13495412,
       0.11362012, 0.11853404, 0.12017162, 0.09987698, 0.10854585,
       0.09915395, 0.10913622, 0.1282056 , 0.13809265, 0.13347577,
       0.10030044, 0.06778458, 0.03148885, 0.02292104, 0.06596652,
       0.11285329, 0.1440286 , 0.13892572, 0.09551609, 0.06534543,
       0.08508629, 0.11815202, 0.13592488, 0.12234236, 0.08062435],
      dtype=float32)

In [208]:
(x_train[0].max() - x_train[0]) / (x_train[0].max() - x_train[0].min())

array([1.        , 0.        , 0.093688  , 0.06795602, 0.11969754,
       0.07446596, 0.10145702, 0.08712867, 0.13352035, 0.07851177,
       0.09668222, 0.12599875, 0.11257131, 0.13458844, 0.13495412,
       0.11362012, 0.11853404, 0.12017162, 0.09987698, 0.10854585,
       0.09915395, 0.10913622, 0.1282056 , 0.13809265, 0.13347577,
       0.10030044, 0.06778458, 0.03148885, 0.02292104, 0.06596652,
       0.11285329, 0.1440286 , 0.13892572, 0.09551609, 0.06534543,
       0.08508629, 0.11815202, 0.13592488, 0.12234236, 0.08062435],
      dtype=float32)

In [209]:
for i in range(3):
    print(f"class = {y_train[i]:>3d}, label = {y_train_label[i]:3s}")

class =   7, label = 1B 
class =  10, label = 1EL
class =   4, label = 0EL


### VAE model architecture 

In [None]:
x_train_encoded[:, :, 0].shape

In [None]:
# from tensorflow.keras.datasets import mnist
# (x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

x_train = x_train.astype('float32') / x_train.max()
x_test = x_test.astype('float32') / x_train.max()

# x_train = ((x_train.max(1) - x_train.T) / (x_train.max(1) - x_train.min(1))).T
# x_test = ((x_test.max(1) - x_test.T) / (x_test.max(1) - x_test.min(1))).T

In [None]:
original_dim = x_train[0].size
intermediate_dim = 64
latent_dim = 2

inputs = Input(shape=(original_dim,))
h = Dense(intermediate_dim, activation='relu')(inputs)
z_mean = Dense(latent_dim)(h)
z_log_sigma = Dense(latent_dim)(h)

z = Lambda(sampling)([z_mean, z_log_sigma])

### Compiling the model 

For compiling our model, we will use the same three parameters as the previous model: 

In [None]:
# Create encoder
encoder = Model(inputs, [z_mean, z_log_sigma, z], name='encoder')

# Create decoder
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
x = Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = Dense(original_dim, activation='sigmoid')(x)
decoder = Model(latent_inputs, outputs, name='decoder')

# instantiate VAE model
outputs = decoder(encoder(inputs)[2])
vae = Model(inputs, outputs, name='vae_mlp')

In [None]:
reconstruction_loss = binary_crossentropy(inputs, outputs)
reconstruction_loss *= original_dim
kl_loss = 1 + z_log_sigma - K.square(z_mean) - K.exp(z_log_sigma)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -0.5
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')

In [None]:
# Display model architecture summary 
vae.summary()

In [None]:
encoder.summary()

In [None]:
decoder.summary()

### Training 

Here we will train the model. As training a CNN can take a sigificant amount of time, we will start with a low number of epochs and a low batch size. If we can see from the output that the model is converging, we will increase both numbers.  

In [None]:
vae.fit(
    x_train, 
    x_train,
    epochs=100,
    batch_size=32,
    validation_data=(x_test, x_test)
)

#### Evaluation

In [None]:
x_train_encoded = np.array(encoder.predict(x_train, batch_size=1000))
x_test_encoded = np.array(encoder.predict(x_test, batch_size=1000))

In [None]:
plt.figure(figsize=(6, 6))
plt.scatter(x_train_encoded[0, :, 0], x_train_encoded[0, :, 1], c=y_train)
plt.colorbar()
plt.show()

### Cluster close points

In [None]:
import numpy as np
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from sklearn.svm import SVC
clf = make_pipeline(SVC(kernel="poly", gamma='auto'))

xx_train = x_train_encoded[0, :, :]
xx_test = x_test_encoded[0, :, :]

clf.fit(xx_train, y_train)
clf.score(xx_train, y_train)

In [None]:
from sklearn.metrics import confusion_matrix

y_pred = clf.predict(xx_test)
confusion_matrix(y_test, y_pred)

In [None]:
clf.score(xx_test, y_test)

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_test, y_pred))