# Instrument recogntion machine learning project

In [None]:
import os
import random
import re
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline 

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import classification_report
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam

## Parameters

In [None]:
trainPath = "../data/IRMAS-TrainingData"
testPath = "../data/IRMAS-TestingData-Part"


MAX_SIZE = 884000
VALIDATION_SIZE = 1000
EPOCHS = 200
BATCH_SIZE = 4
SHAPE=(BATCH_SIZE, 884000//2)
OUTPUT_SHAPE=(BATCH_SIZE, 11)

## Utility functions

In [None]:
def path_to_audio(path):
    """Reads and decodes an audio file."""
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1)
    audio = add_padding(audio)
    return fft(audio)


def get_audio_paths(directoryPath, files):
    return [
        os.path.join(directoryPath, file)
        for file in files
        if file.endswith(".wav") ]
    
    
def get_labels(directoryPath, files):
    mlb = MultiLabelBinarizer()
    return mlb.fit_transform([
        open(os.path.join(directoryPath, file)).read().split()
        for file in files
        if file.endswith(".txt") ])

        
def add_padding(audio):
    padding_size = MAX_SIZE - tf.size(audio)
    if padding_size <= 0: return audio[:MAX_SIZE]
    front_padding = random.randint(0, padding_size)
    new_audio = np.zeros((MAX_SIZE, 1))
    new_audio[front_padding:front_padding+tf.size(audio)] = audio
    
    return new_audio


def fft(audio):
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    #fft = tf.expand_dims(tf.squeeze(fft), axis=-1)
    return tf.math.abs(fft[:fft.shape[0]//2])


def shuffle(data, labels):
    shuffle_indexes = np.random.permutation(len(data))
    return np.array(data)[shuffle_indexes], np.array(labels)[shuffle_indexes]


def label_set():
    labels = []
    for folder in os.listdir(trainPath):
        if os.path.isdir(os.path.join(trainPath, folder)):
            labels.append(folder)
    return labels

## Getting paths

In [None]:
def training_paths_and_labels(subFolder, label):
    mlb = MultiLabelBinarizer()
    subFolderItems = os.listdir(subFolder)
    audioPaths = get_audio_paths(subFolder, subFolderItems)
    labels = mlb.fit_transform([label_set()] + [[label]] * len(subFolderItems))[1:]
    return audioPaths, labels


def testing_paths_and_labels(subFolder):
    subFolderItems = os.listdir(subFolder)
    audioPaths = get_audio_paths(subFolder, subFolderItems)
    labels = get_labels(subFolder, subFolderItems)
    return audioPaths, labels


def path_to_paths_and_labels(path, folderIsLabel=True):
    audioPaths, labels = [], []
    
    for dir_ in os.listdir(path):
        dirPath = os.path.join(path, dir_)
        if not os.path.isdir(dirPath): continue
        
        if folderIsLabel:
            newAudioPaths, newLabels = training_paths_and_labels(dirPath, dir_)
        else:
            newAudioPaths, newLabels = testing_paths_and_labels(dirPath)
        audioPaths += newAudioPaths
        [labels.append(oneHotLabel) for oneHotLabel in newLabels]

    return audioPaths, labels

## Generating datasets

In [None]:
def dataset_generator(paths, labels):
    audio = []
    for i in range(paths[::BATCH_SIZE].size):
        audio = [path_to_audio(path) for path in paths[BATCH_SIZE*i:BATCH_SIZE*(i+1)]]
        if len(audio) < BATCH_SIZE: break
        yield audio, labels[BATCH_SIZE*i:BATCH_SIZE*(i+1)]

def get_training_dataset():
    audioPaths, labels = path_to_paths_and_labels(trainPath)
    audioPaths, labels = shuffle(audioPaths, labels)
    
    train_audio, val_audio = audioPaths[:-VALIDATION_SIZE], audioPaths[-VALIDATION_SIZE:]
    train_labels, val_labels = labels[:-VALIDATION_SIZE], labels[-VALIDATION_SIZE:]
    
    train_audio = tf.data.Dataset.from_generator(dataset_generator, output_signature=
                                                 (tf.TensorSpec(shape=SHAPE, dtype=tf.float32),
                                                  tf.TensorSpec(shape=OUTPUT_SHAPE, dtype=tf.int32)),
                                                 args=(train_audio, train_labels))
    val_audio = tf.data.Dataset.from_generator(dataset_generator, output_signature=
                                               (tf.TensorSpec(shape=SHAPE, dtype=tf.float32),
                                                tf.TensorSpec(shape=OUTPUT_SHAPE, dtype=tf.int32)),
                                               args=(val_audio, val_labels))
    
    return train_audio, val_audio

def get_testing_dataset():
    audioPaths, labels = [], []
    for i in range(1, 4):
        newAudioPaths, newLabels = path_to_paths_and_labels("{}{}".format(testPath, i), folderIsLabel=False)
        audioPaths += newAudioPaths
        labels += newLabels
        
    audioPaths, labels = shuffle(audioPaths, labels)
    
    audio = tf.data.Dataset.from_generator(dataset_generator, output_signature=
                                               (tf.TensorSpec(shape=SHAPE, dtype=tf.float32),
                                                tf.TensorSpec(shape=OUTPUT_SHAPE, dtype=tf.int32)),
                                               args=(audioPaths, labels))
    
    return audio

In [None]:
train_data, val_data = get_training_dataset()
test_data = get_testing_dataset()

## Deep learning model

In [None]:
inputs = Input(batch_size=BATCH_SIZE, shape=(884000//2,), name="input")

x = Dense(64,)(inputs)
x = LeakyReLU()(x)
x = Dense(100000,)(x)
x = LeakyReLU()(x)
x = Dense(32,)(x)
x = LeakyReLU()(x)
x = Dense(50000,)(x)
x = LeakyReLU()(x)

outputs = Dense(len(label_set()), activation="sigmoid", name="output")(x)


model = Model(inputs=inputs, outputs=outputs)

model.summary()

opt = Adam(learning_rate=0.0001)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

## Fitting the model

In [None]:
model.fit(train_data, epochs=EPOCHS, batch_size=BATCH_SIZE, validation_data=val_data)

In [None]:
model.evaluate(test_data)

In [None]:
b_test = []

for x in test_data.as_numpy_iterator():
    for i in range(BATCH_SIZE):
        b_test.append(x[1][i])
        
b_test = np.array(b_test)
b_pred = model.predict(test_data, batch_size=BATCH_SIZE, verbose=1)
b_pred = (b_pred>0.5).astype(int)

In [None]:
print(classification_report(b_test, b_pred, target_names=label_set(), zero_division=0))

f = open("FFT classification report.txt", "w")
f.write(classification_report(b_test, b_pred, target_names=label_set(), zero_division=0))
f.close()

In [None]:
model.save("Models/FFT model")

## Testing data visualisations

In [None]:
temp = test_data.__iter__().get_next()[0].numpy()
print(temp.shape)

plt.figure(figsize=(20, 10))
plt.scatter(range(0, tf.size(temp)), temp, s=1, alpha=1, );
#plt.savefig("Figures/FFT transformed.png")

In [None]:
f = open("Accuracy.txt", "r")
accuracy = f.read()
f.close()

data = re.split('\n', accuracy)
data = re.split('-', '-'.join(data))

accuracy = [data[i].split() for i in range(len(data)) if "accuracy" in data[i]]
test_accuracy = np.array([i[1] for i in accuracy[::2]]).astype(float)
val_accuracy = np.array([i[1] for i in accuracy[1::2]]).astype(float)

loss = [data[i].split() for i in range(len(data)) if "loss" in data[i]]
test_loss = np.array([i[1] for i in loss[::2]]).astype(float)
val_loss = np.array([i[1] for i in loss[1::2]]).astype(float)

plt.figure(figsize=(20, 10))
plt.subplot(1, 2, 1)
plt.title("Training and validation accuracy")
plt.plot(test_accuracy, label="Training accuracy")
plt.plot(val_accuracy, label="Validation accuracy")
plt.xlabel("Epochs")
plt.ylabel("accuracy")
plt.legend()

plt.subplot(1, 2, 2)
plt.title("Training and validation loss")
plt.plot(test_loss, label="Training loss")
plt.plot(val_loss, label="Validation loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

plt.savefig("Figures/FFT overfitting.png")
plt.show()

## Artifacts