In [None]:
from datetime import datetime
import os
import json
import tensorflow as tf
import numpy as np
import pandas as pd
from keras.utils import to_categorical, Sequence
from keras.models import Model
from keras.layers import (
    Input,
    Conv2D,
    MaxPooling2D,
    Flatten,
    Dense,
    Dropout,
    concatenate,
    BatchNormalization,
)
from keras.callbacks import EarlyStopping
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import classification_report, confusion_matrix
from modules.PostgresDBHandler import PostgresDBHandler

In [None]:
gpus = tf.config.experimental.list_physical_devices("GPU")
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
            print("Number of avaliable GPUs: ", len(gpus))
    except RuntimeError as e:
        print(e)

In [None]:
DB_PARAMS = {
    "dbname": "mydatabase",
    "user": "postgres",
    "password": "password",
    "host": "localhost",
    "port": "5432",
}
EPOCHS = 100
BATCH_SIZE = 128
KFOLD_SPLITS = 5

In [None]:
dbConnect = PostgresDBHandler(**DB_PARAMS)
dbConnect.connect()

In [None]:
instruments_mappings = dbConnect.get_mappings_instruments()

In [None]:
processedIDs = dbConnect.get_all_processed_ids()
# processedIDs = processedIDs[50:120]

processed_data = dbConnect.get_processed_fit_data(processedIDs)

In [None]:
mfcc_paths = [item["mfccPath"] for item in processed_data]
spectrogram_paths = [item["spectrogramPath"] for item in processed_data]
instrument_ids = [item["instrumentID"] for item in processed_data]
mappings = dbConnect.get_mappings_instruments().to_dict()['name']

In [None]:
processed_df = pd.DataFrame(
    {
        "mfccPath": mfcc_paths,
        "spectrogramPath": spectrogram_paths,
        "instrumentID": instrument_ids,
    }
)

In [None]:
dbConnect.close()

In [None]:
processed_df

In [None]:
class DataGenerator(Sequence):
    def __init__(
        self,
        df,
        fixed_length=128,
        batch_size=32,
        shuffle=True,
    ):
        self.df = df
        self.fixed_length = fixed_length
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.label_encoder = LabelEncoder()
        self.df.loc[:, "instrumentID"] = self.label_encoder.fit_transform(
            self.df["instrumentID"]
        )
        self.num_classes = len(self.label_encoder.classes_)
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.df) / self.batch_size))

    def __getitem__(self, index):
        indices = self.indices[index * self.batch_size : (index + 1) * self.batch_size]
        batch_df = self.df.iloc[indices]
        X, y = self.__data_generation(batch_df)
        return X, y

    def on_epoch_end(self):
        self.indices = np.arange(len(self.df))
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __data_generation(self, batch_df):
        X_mels = []
        X_mfccs = []
        y = []

        for idx, row in batch_df.iterrows():
            mel_spectrogram = np.load(row["spectrogramPath"])
            mfccs = np.load(row["mfccPath"])

            X_mels.append(mel_spectrogram)
            X_mfccs.append(mfccs)
            y.append(row["instrumentID"])

        X_mels = np.expand_dims(np.array(X_mels), -1)
        X_mfccs = np.expand_dims(np.array(X_mfccs), -1)
        y = np.array(y)

        return (
            {"mel_input": X_mels, "mfccs_input": X_mfccs},
            to_categorical(y, num_classes=self.num_classes),
        )

In [None]:
def create_model(input_shape_mel, input_shape_mfccs, num_classes):    
    # Mel spectrogram branch
    input_mel = Input(shape=input_shape_mel, name="mel_input")
    x_mel = Conv2D(64, (3, 3), activation="relu")(input_mel)
    x_mel = MaxPooling2D((2, 2))(x_mel)
    x_mel = Conv2D(128, (3, 3), activation="relu")(x_mel)
    x_mel = MaxPooling2D((2, 2))(x_mel)
    x_mel = Conv2D(256, (3, 3), activation="relu")(x_mel)
    x_mel = MaxPooling2D((2, 2))(x_mel)
    x_mel = Flatten()(x_mel)

    # MFCC branch
    input_mfccs = Input(shape=input_shape_mfccs, name="mfccs_input")
    x_mfccs = Conv2D(64, (3, 3), activation="relu")(input_mfccs)
    x_mfccs = MaxPooling2D((2, 2))(x_mfccs)
    x_mfccs = Conv2D(128, (3, 3), activation="relu")(x_mfccs)
    x_mfccs = MaxPooling2D((2, 2))(x_mfccs)
    x_mfccs = Flatten()(x_mfccs)

    # Concatenate the outputs of both branches
    concatenated = concatenate([x_mel, x_mfccs])
    # Fully connected layers
    x = Dense(64, activation="relu")(concatenated)
    x = Dropout(0.25)(x)
    x = BatchNormalization()(x)
    output = Dense(num_classes, activation="softmax")(x)

    model = Model(inputs=[input_mel, input_mfccs], outputs=output)
    return model

In [None]:
input_shape_mel = (128, 128, 1)  
input_shape_mfccs = (13, 128, 1)  
num_classes = len(np.unique(processed_df["instrumentID"]))

In [None]:
early_stopping = EarlyStopping(
    monitor="val_loss", patience=10, restore_best_weights=True
)

# Cross validation

In [None]:
accuracy_list = []
loss_list = []
classification_reports = []
confusion_matrices = []
history_list = []

In [None]:
kf = KFold(n_splits=KFOLD_SPLITS, shuffle=True)

In [None]:
for train_index, cross_index in kf.split(processed_df):

    # Split the data
    X_train, X_val = train_test_split(
        processed_df.iloc[train_index], test_size=0.2, shuffle=True
    )

    # Create data generators
    train_generator = DataGenerator(X_train, batch_size=350)
    val_generator = DataGenerator(X_val, batch_size=350)
    cross_generator = DataGenerator(processed_df.iloc[cross_index], batch_size=350, shuffle=False)

    # Create and compile the model
    model = create_model(input_shape_mel, input_shape_mfccs, num_classes)
    
    optimizer = tf.keras.optimizers.Adam(lr = 0.0005)
    
    model.compile(
        optimizer= optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )

    # Train the model
    history = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=EPOCHS,
        callbacks=[early_stopping],
    )
    
    history_list.append(history.history)

    # Evaluate the model
    loss, accuracy = model.evaluate(cross_generator)
    accuracy_list.append(accuracy)
    loss_list.append(loss)

    # Predict using the model
    y_pred = model.predict(cross_generator)
    y_pred_classes = np.argmax(y_pred, axis=1)

    # Extract true labels from the generator
    y_true = []
    for _, labels in cross_generator:
        y_true.extend(np.argmax(labels, axis=1))
    y_true = np.array(y_true)

    # Generate classification report and confusion matrix
    report = classification_report(y_true, y_pred_classes, output_dict=True)
    classification_reports.append(report)

    conf_matrix = confusion_matrix(y_true, y_pred_classes).tolist()
    confusion_matrices.append(conf_matrix)

# Save raports and model

In [None]:
try:
    os.mkdir("models")
except FileExistsError:
        print("Folder already exists")
except Exception:
    print("Unknown error")
    
model.save("models/instrument_classifier_model.h5")


## Create training version folder

In [None]:
date_part = datetime.now().date().__str__().replace('-', '_')
last_version = os.listdir(path="models")
last_version = [name.rpartition("_v")[-1] for name in last_version if date_part in name]
if len(last_version):
    last_version = int(sorted(last_version)[-1])
else:
    last_version = 0
folder_name = f"{date_part}_v{last_version+1}"

In [None]:
os.mkdir(os.path.join("models", folder_name))

## Store data

In [None]:
raport = {
    "accuracy_list": accuracy_list,
    "loss_list": loss_list,
    "classification_reports": classification_reports,
    "confusion_matrices": confusion_matrices,
    "histories": history_list,
    "mappings": mappings
}

In [None]:
with open(os.path.join("models", folder_name, "raport.json"), "w")as raportFile:
    json.dump(raport, raportFile)