In [None]:
import os
import numpy as np
import librosa
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Dense, Dropout, Input, GlobalAveragePooling2D, Concatenate
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, LearningRateScheduler
from tensorflow.keras.applications import EfficientNetV2B1
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
from tensorflow.keras.optimizers import Adam
from sklearn.utils import class_weight
from tensorflow.keras import layers, models, regularizers


# Class names for the dataset
class_names = [
    'bus', 'cafe/restaurant', 'car', 'city center', 'forest path', 
    'grocery store', 'home', 'lakeside beach', 'library', 'metro station', 
    'office', 'residential area', 'train', 'tram', 'urban park'
]


In [None]:
 def extract_mfcc(audio_path, n_mfcc=20, n_fft=2048, hop_length=512):
     y, sr = librosa.load(audio_path, sr=None)
     mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)
     mfcc = librosa.power_to_db(mfcc, ref=np.max)
     return mfcc

In [None]:
def extract_ivector(csv_path, audio_file_name):
    df = pd.read_csv(csv_path)
    
    
    # Assuming the CSV has a column 'file_name' that corresponds to the audio file
    # and other columns represent i-vector values (e.g., ivector_1, ivector_2, ...)
    ivector_row = df[df['Filename'] == audio_file_name].drop('Filename', axis=1)
    
    if not ivector_row.empty:
        ivector = ivector_row.values.flatten()  # Flatten to a 1D array
        return ivector
    else:
        raise ValueError(f"i-vector for {audio_file_name} not found in CSV.")


In [None]:
def load_data(audio_dir, csv_path, n_fft=2048, hop_length=512, n_mfcc=20):
    mfccs = []
    ivectors = []
    labels = []
    file_names = []  
    
    for audio_file in os.listdir(audio_dir):
        if audio_file.endswith('.wav'):
            audio_path = os.path.join(audio_dir, audio_file)
            audio_file_name = audio_file  # To match with the CSV file's name
            
            # Extract MFCC
            mfcc = extract_mfcc(audio_path, n_mfcc, n_fft, hop_length)
            mfcc = StandardScaler().fit_transform(mfcc) # Normalize MFCC
            mfccs.append(mfcc)
            
            # Extract i-vector from CSV
            ivector = extract_ivector(csv_path, audio_file_name)
            ivector = np.array(ivector)
            ivector = StandardScaler().fit_transform(ivector.reshape(1, -1))
            ivectors.append(ivector)
            
            # Concatenate MFCC and i-vector (Fusion)
            #fused_features = np.concatenate((mfcc.flatten(), ivector.flatten()))
            #mfccs.append(fused_features)
            
            label = int(audio_file.split('_')[-1].replace('class', '').replace('.wav', ''))
            labels.append(label)
            file_names.append(audio_file)
    
    mfcc_array = np.array(mfccs)
    print(mfcc_array.shape)
    ivector_array = np.array(ivectors)
    print(ivector_array.shape)
    labels_array = np.array(labels)
    
    return mfcc_array, ivector_array, labels_array, file_names

In [None]:
def prepare_data(audio_dir, csv_path, test_size=0.2, random_state=42, mode='train'):
    X_mfcc, X_ivector, y, file_names = load_data(audio_dir, csv_path)
    
    # Label encoding
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y)
    
    if mode == 'train': # Train-test split
        X_train_mfcc, X_val_mfcc, X_train_ivector, X_val_ivector, y_train, y_val, file_names_train, file_names_val = train_test_split(
        X_mfcc, X_ivector, y, file_names, test_size=test_size, random_state=random_state
    )
        return X_train_mfcc, X_val_mfcc, X_train_ivector, X_val_ivector, y_train, y_val, label_encoder.classes_, file_names_train, file_names_val
    
    
    else:
        return X_mfcc, X_ivector, y, label_encoder.classes_, file_names

In [None]:
# def create_multi_branch_model(mfcc_input_shape, ivector_input_shape, num_classes):
#     # Input layers
#     mfcc_input = Input(shape=mfcc_input_shape, name='mfcc_input')
#     ivector_input = Input(shape=ivector_input_shape, name='ivector_input')
    
#     # MFCC branch (you can add more layers if needed)
#     x_mfcc = layers.Conv2D(32, (3, 3), activation='relu')(mfcc_input)
#     x_mfcc = layers.MaxPooling2D((2, 2))(x_mfcc)
#     x_mfcc = GlobalAveragePooling2D()(x_mfcc)
    
#     # i-vector branch (fully connected layers)
#     x_ivector = Dense(128, activation='relu')(ivector_input)
#     x_ivector = Dropout(0.3)(x_ivector)
    
#     # Concatenate branches
#     concatenated = Concatenate()([x_mfcc, x_ivector])
    
#     # Fully connected layers after concatenation
#     x = Dense(128, activation='relu')(concatenated)
#     x = Dropout(0.3)(x)
#     output = Dense(num_classes, activation='softmax')(x)
    
#     # Define the model
#     model = Model(inputs=[mfcc_input, ivector_input], outputs=output)
    
#     # Compile the model
#     model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    
#     return model


In [None]:
# from tensorflow.keras.layers import Input, Conv2D, GlobalAveragePooling2D, Dense, Concatenate, Dropout, Flatten
# from tensorflow.keras.models import Model
# from tensorflow.keras.optimizers import Adam

# def create_multi_branch_model(mfcc_input_shape, ivector_input_shape, num_classes):
#     # MFCC branch
#     mfcc_input = Input(shape=mfcc_input_shape, name='mfcc_input')
#     x_mfcc = Conv2D(32, (3, 3), activation='relu')(mfcc_input)
#     x_mfcc = GlobalAveragePooling2D()(x_mfcc)  # Global average pooling to reduce dimensions
    
#     # i-vector branch
#     ivector_input = Input(shape=ivector_input_shape, name='ivector_input')
#     x_ivector = Flatten()(ivector_input)  # Flatten to make it 2D
#     x_ivector = Dense(128, activation='relu')(x_ivector)
#     x_ivector = Dropout(0.3)(x_ivector)
    
#     # Concatenate the outputs of both branches
#     concatenated = Concatenate()([x_mfcc, x_ivector])
    
#     # Dense layers after concatenation
#     x = Dense(128, activation='relu')(concatenated)
#     x = Dropout(0.3)(x)
#     output = Dense(num_classes, activation='softmax')(x)

#     # Define the model
#     model = Model(inputs=[mfcc_input, ivector_input], outputs=output)
    
#     # Compile the model
#     model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    
#     return model


In [None]:
from tensorflow.keras.layers import Input, Conv2D, GlobalAveragePooling2D, Dense, Concatenate, Dropout, Flatten, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2

def create_multi_branch_model(mfcc_input_shape, ivector_input_shape, num_classes):
    # MFCC branch
    mfcc_input = Input(shape=mfcc_input_shape, name='mfcc_input')
    x_mfcc = Conv2D(64, (3, 3), activation='relu', padding='same')(mfcc_input)
    x_mfcc = BatchNormalization()(x_mfcc)
    x_mfcc = Conv2D(64, (3, 3), activation='relu', padding='same')(x_mfcc)
    x_mfcc = BatchNormalization()(x_mfcc)
    x_mfcc = GlobalAveragePooling2D()(x_mfcc)  # Global average pooling to reduce dimensions

    # i-vector branch
    ivector_input = Input(shape=ivector_input_shape, name='ivector_input')
    x_ivector = Flatten()(ivector_input)  # Flatten to make it 2D
    x_ivector = Dense(256, activation='relu', kernel_regularizer=l2(0.01))(x_ivector)  # Increased units and added L2 regularization
    x_ivector = Dropout(0.3)(x_ivector)
    
    # Concatenate the outputs of both branches
    concatenated = Concatenate()([x_mfcc, x_ivector])
    
    # Dense layers after concatenation with L2 regularization
    x = Dense(256, activation='relu', kernel_regularizer=l2(0.01))(concatenated)  # Increased units and added L2 regularization
    x = Dropout(0.4)(x)  # Increased dropout rate
    x = Dense(128, activation='relu', kernel_regularizer=l2(0.01))(x)  # Added another dense layer with L2
    x = Dropout(0.4)(x)  # Increased dropout rate
    output = Dense(num_classes, activation='softmax')(x)

    # Define the model
    model = Model(inputs=[mfcc_input, ivector_input], outputs=output)
    
    # Compile the model with a lower learning rate
    model.compile(optimizer=Adam(learning_rate=0.00001), loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model


In [None]:
# def evaluate_model(model, X_test_mfcc, X_test_ivector, y_test, file_names, class_names):
#     test_loss, test_accuracy = model.evaluate([X_test_mfcc, X_test_ivector], y_test)
#     print(f'Test Loss: {test_loss:.4f}')
#     print(f'Test Accuracy: {test_accuracy:.4f}')
    
#     # Generate predictions
#     y_pred = model.predict([X_test_mfcc, X_test_ivector])
#     y_pred_labels = np.argmax(y_pred, axis=1)
#     y_true_labels = np.argmax(y_test, axis=1)

#     # Print classification report
#     print("\nClassification Report:")
#     print_class_report = classification_report(y_true_labels, y_pred_labels, target_names=class_names)
#     print(print_class_report)
#     class_report_dict = classification_report(y_true_labels, y_pred_labels, target_names=class_names, output_dict=True)
#     save_classification_report(class_report_dict)
    
#     # Confusion matrix visualization
#     cm = confusion_matrix(y_true_labels, y_pred_labels)
#     plt.figure(figsize=(10, 8))
#     sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
#     plt.xlabel('Predicted')
#     plt.ylabel('True')
#     plt.title('Confusion Matrix')
#     plt.savefig("/kaggle/working/Confusion_Matrix.png")
#     plt.show()

#     # Identify and print mispredictions
#     mispredictions = np.where(y_pred_labels != y_true_labels)[0]
#     print(f'\nNumber of mispredictions: {len(mispredictions)}')
    
#     for idx in mispredictions:
#         true_label = class_names[y_true_labels[idx]]
#         predicted_label = class_names[y_pred_labels[idx]]
#         confidence_score = y_pred[idx][y_pred_labels[idx]]
        
#         print(f'Index: {idx}, File Name: {file_names[idx]}, True Label: {true_label}, Predicted Label: {predicted_label}, Confidence Score: {confidence_score:.4f}')
#         print('Confidence Scores for all classes:')
#         for class_idx, class_name in enumerate(class_names):
#             print(f'  {class_name}: {y_pred[idx][class_idx]:.4f}')

In [None]:
from tensorflow.keras.utils import to_categorical

def evaluate_model(model, X_test_mfcc, X_test_ivector, y_test, file_names, class_names):
    # Convert y_test to one-hot encoding if it's not already
    if len(y_test.shape) == 1:  # Check if y_test is 1D
        y_test = to_categorical(y_test, num_classes=len(class_names))

    # Evaluate the model on test data
    test_loss, test_accuracy = model.evaluate([X_test_mfcc, X_test_ivector], y_test)
    print(f'Test Loss: {test_loss:.4f}')
    print(f'Test Accuracy: {test_accuracy:.4f}')
    
    # Generate predictions
    y_pred = model.predict([X_test_mfcc, X_test_ivector])
    y_pred_labels = np.argmax(y_pred, axis=1)
    y_true_labels = np.argmax(y_test, axis=1)

    # Print classification report
    print("\nClassification Report:")
    print_class_report = classification_report(y_true_labels, y_pred_labels, target_names=class_names)
    print(print_class_report)
    class_report_dict = classification_report(y_true_labels, y_pred_labels, target_names=class_names, output_dict=True)
    save_classification_report(class_report_dict)
    
    # Confusion matrix visualization
    cm = confusion_matrix(y_true_labels, y_pred_labels)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.savefig("/kaggle/working/Confusion_Matrix.png")
    plt.show()

    # Identify and print mispredictions
    mispredictions = np.where(y_pred_labels != y_true_labels)[0]
    print(f'\nNumber of mispredictions: {len(mispredictions)}')
    
    for idx in mispredictions:
        true_label = class_names[y_true_labels[idx]]
        predicted_label = class_names[y_pred_labels[idx]]
        confidence_score = y_pred[idx][y_pred_labels[idx]]
        
        print(f'Index: {idx}, File Name: {file_names[idx]}, True Label: {true_label}, Predicted Label: {predicted_label}, Confidence Score: {confidence_score:.4f}')
        print('Confidence Scores for all classes:')
        for class_idx, class_name in enumerate(class_names):
            print(f'  {class_name}: {y_pred[idx][class_idx]:.4f}')


In [None]:
def save_classification_report(class_report_dict):
    """
    saves classification report, parameter is a dictionary
    """
    report_df = pd.DataFrame(class_report_dict).transpose()
    output_path = "/kaggle/working/classification_report.csv"
    report_df.to_csv(output_path, index=True)

In [None]:
def main(dev_audio_dir, eval_audio_dir, csv_path, eval_csv_path, output_dir, model_path, use_early_stopping=True):
    
    if os.path.exists(model_path):
        print(f"Loading saved model from {model_path}...")
        model = load_model(model_path)
    
    else:
        # Prepare data for training and validation
        X_train_mfcc, X_val_mfcc, X_train_ivector, X_val_ivector, y_train, y_val, label_classes, file_names_train, file_names_val = prepare_data(dev_audio_dir, csv_path)

        # Ensure correct shape for MFCC input
        X_train_mfcc = np.expand_dims(X_train_mfcc, axis=-1)
        X_val_mfcc = np.expand_dims(X_val_mfcc, axis=-1)
        print(f'Input shape for MFCC training data: {X_train_mfcc.shape}')
        print(f'Input shape for MFCC validation data: {X_val_mfcc.shape}')
    
        # One-hot encode labels
        y_train_cat = to_categorical(y_train, num_classes=len(label_classes))
        y_val_cat = to_categorical(y_val, num_classes=len(label_classes))

        # Define input shapes for both branches
        mfcc_input_shape = X_train_mfcc.shape[1:]  # Shape of MFCC input (e.g., (time, frequency, 1))
        ivector_input_shape = X_train_ivector.shape[1:]  # Shape of i-vector input (e.g., (ivector_dim,))
    
        print(f'MFCC input shape: {mfcc_input_shape}')
        print(f'i-vector input shape: {ivector_input_shape}')

        # Create multi-branch model
        model = create_multi_branch_model(mfcc_input_shape, ivector_input_shape, len(label_classes))
        
        # Define callbacks
        callbacks = [
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6),#factor = 0.1, patience = 5
            ModelCheckpoint(os.path.join(output_dir, 'best_model.keras'), save_best_only=True, monitor='val_accuracy')
        ]

        if use_early_stopping:
            early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) #patience = 10
            callbacks.append(early_stopping)
        
        # Compute class weights to handle class imbalance
        class_weights = class_weight.compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
        class_weights_dict = dict(enumerate(class_weights))

        # Train the model
        history = model.fit(
            [X_train_mfcc, X_train_ivector], y_train_cat,  # Pass both MFCC and i-vector data
            validation_data=([X_val_mfcc, X_val_ivector], y_val_cat), 
            epochs=100, batch_size=32, 
            callbacks=callbacks, 
            class_weight=class_weights_dict
        )

    # Prepare evaluation data
    X_eval_mfcc, X_eval_ivector, y_eval, eval_label_classes, file_names_eval = prepare_data(eval_audio_dir, eval_csv_path, mode='eval')
    
    # Evaluate the model
    evaluate_model(model, X_eval_mfcc, X_eval_ivector, y_eval, file_names_eval, eval_label_classes)

if __name__ == '__main__':
    dev_audio_dir = '/kaggle/input/tut-2016/TUT_2016/TUT_Acoustic_scenes_development_all_in_one'
    eval_audio_dir = '/kaggle/input/tut-2016/TUT_2016/TUT_Acoustic_scenes_evaluation_all_in_one'
    csv_path = '/kaggle/input/i-vectors/i_vectors-dev-tut.csv'
    eval_csv_path = '/kaggle/input/i-vectors/i_vectors-eval-tut.csv'
    output_dir = '/kaggle/working/'
    model_path = '/kaggle/input/models/best_model.keras'
    main(dev_audio_dir, eval_audio_dir, csv_path, eval_csv_path, output_dir, model_path, use_early_stopping=True)
