In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
# 
df = pd.read_parquet('./audio_dataset_bytes_normalized.parquet')
metadata = np.load('./metadata_normalized.npy', allow_pickle=True).item()


In [None]:

max_frames = metadata['max_frames']
n_features = metadata['n_features']

# transform byte-like back to 2D array
X_list = []
y_list = []
for _, row in df.iterrows():
    shape = row['feature_shape']  
    feature = np.frombuffer(row['feature_bytes'], dtype=np.float32)
    feature = feature.reshape(shape)  # reshape to 2D array
    X_list.append(feature)
    y_list.append(row['label'])

X = np.array(X_list)  
y = np.array(y_list)

# add channel (CNN need 4D tensor)
X = np.expand_dims(X, -1)  

# transform to one-hot encoder
le = LabelEncoder()
y_encoded = le.fit_transform(y)
num_classes = len(np.unique(y_encoded))
y_onehot = tf.keras.utils.to_categorical(y_encoded, num_classes)

# split train:test in ratio of 80:20
X_train, X_test, y_train, y_test = train_test_split(X, y_onehot, test_size=0.2, random_state=42)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras import models, layers, callbacks, regularizers
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# residual block (dilated convolution) function
def residual_block(x, filters, kernel_size, dilation_rate=1, dropout_rate=0.3, regularizer=regularizers.l2(0.001)):
    shortcut = x
    # Convolution 1 with dilated convolution
    x = layers.Conv2D(filters, kernel_size, padding='same', activation='relu',
                      dilation_rate=dilation_rate, kernel_regularizer=regularizer)(x)
    x = layers.BatchNormalization()(x)
    # Convolution 2 without activation before BatchNormalization
    x = layers.Conv2D(filters, kernel_size, padding='same', activation=None,
                      dilation_rate=dilation_rate, kernel_regularizer=regularizer)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([shortcut, x])
    x = layers.Activation('relu')(x)
    x = layers.Dropout(dropout_rate)(x)
    return x

# Attention Block Function
def attention_block(inputs):
    # inputs shape: (batch_size, time_steps, features)
    score = layers.Dense(inputs.shape[-1], activation='tanh')(inputs)  # (batch_size, time_steps, features)
    score = layers.Dense(1)(score)  # (batch_size, time_steps, 1)
    attention_weights = layers.Softmax(axis=1)(score)  # (batch_size, time_steps, 1)
    context_vector = layers.Multiply()([inputs, attention_weights])
    context_vector = layers.Lambda(lambda x: tf.reduce_sum(x, axis=1))(context_vector)
    return context_vector

def build_hybrid_model(metadata):
    input_shape = (metadata['max_frames'], metadata['n_features'], 1)
    inputs = layers.Input(shape=input_shape)
    
    x = layers.Conv2D(64, (3,3), activation='relu', padding='same',
                      kernel_regularizer=regularizers.l2(0.001))(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Dropout(0.4)(x)
    
    x = residual_block(x, filters=64, kernel_size=(3,3), dilation_rate=2, dropout_rate=0.4)
    
    x = layers.Conv2D(128, (3,3), activation='relu', padding='same',
                      kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Dropout(0.5)(x)
    
    x = layers.TimeDistributed(layers.Flatten())(x)
    
    # Use Bidirectional LSTM (biLSTM)
    x = layers.Bidirectional(
            layers.LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))
        )(x)
    x = layers.Bidirectional(
            layers.LSTM(128, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))
        )(x)
    x = layers.Bidirectional(
            layers.LSTM(64, return_sequences=True, kernel_regularizer=regularizers.l2(0.001))
        )(x)
    
    context_vector = attention_block(x)
    
    # Classifier
    x = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001))(context_vector)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(len(metadata['classes']), activation='softmax')(x)
    
    model = models.Model(inputs=inputs, outputs=outputs)
    return model

def get_callbacks():
    return [
        callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6,
            verbose=1
        )
    ]

def train_model(X_train, y_train, X_test, y_test, metadata):
    model = build_hybrid_model(metadata)
    
    optimizer = Adam(learning_rate=0.0005)
    
    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    history = model.fit(
        X_train, y_train,
        epochs=100,
        batch_size=32,
        validation_data=(X_test, y_test),
        callbacks=get_callbacks()
    )
    
    return model, history

def evaluate_model(model, X_test, y_test, metadata):
  
    y_pred = model.predict(X_test).argmax(axis=1)

    y_true = y_test.argmax(axis=1)
    
    # Show Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10,8))
    sns.heatmap(cm, annot=True, fmt='d',
                xticklabels=metadata['classes'],
                yticklabels=metadata['classes'])
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
    
    # Show Classification Report
    print(classification_report(y_true, y_pred, target_names=metadata['classes']))

def plot_learning_rate(history):
    lr_history = history.history.get('lr')
    if lr_history:
        plt.plot(lr_history)
        plt.title('Learning Rate Schedule')
        plt.xlabel('Epoch')
        plt.ylabel('Learning Rate')
        plt.show()

def plot_history(history):
    plt.figure(figsize=(15, 6))
        
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss', color='green', linestyle='-')
    plt.plot(history.history['val_loss'], label='Val Loss', color='orange', linestyle='--')
    plt.title('Loss Evolution')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Train Accuracy', color='blue', linestyle='-')
    plt.plot(history.history['val_accuracy'], label='Val Accuracy', color='red', linestyle='--')
    plt.title('Accuracy Evolution')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.ylim([0, 1])
    plt.legend()
    
    plt.tight_layout()
    plt.show()


model, history = train_model(X_train, y_train, X_test, y_test, metadata)
evaluate_model(model, X_test, y_test, metadata)
plot_learning_rate(history)
plot_history(history)