In [3]:
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, StandardScaler
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array

In [4]:
def SplitDataset(classCSVPath):
    mfcc_df = pd.read_csv(classCSVPath)    
    label_encoder = LabelEncoder()
    mfcc_df["Label"] = label_encoder.fit_transform(mfcc_df["Label"])
    
    mfcc_features = mfcc_df.iloc[:, 3:].values  # Extract MFCC features
    spectrogram_paths = mfcc_df["Spectrogram_Path"].values
    labels = mfcc_df["Label"].values
    
    scaler = StandardScaler()
    mfcc_features = scaler.fit_transform(mfcc_features)
    mfcc_features = mfcc_features.reshape(mfcc_features.shape[0], mfcc_features.shape[1], 1)
    
    labels_one_hot = to_categorical(labels)
    X_train_mfcc, X_test_mfcc, y_train_mfcc, y_test_mfcc, train_indices, test_indices = train_test_split(
        mfcc_features, labels_one_hot, range(len(labels)), test_size=0.3, random_state=37
    )
    
    X_train_spectrogram_paths = spectrogram_paths[train_indices]
    X_test_spectrogram_paths = spectrogram_paths[test_indices]
    
    def load_spectrogram_images(paths):
        images = []
        labels = []
        BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), "../"))  # Get base directory (parent of "Code")

        for path in paths:
            corrected_path = os.path.abspath(os.path.join(BASE_DIR, path))  # Convert relative path to absolute
            if os.path.exists(corrected_path):
                img = load_img(corrected_path, target_size=(224, 224), color_mode='rgb')
                img = img_to_array(img) / 255.0
                images.append(img)
                labels.append(os.path.basename(os.path.dirname(corrected_path)))
            else:
                print(f"Warning: Spectrogram not found at {corrected_path}")  # Debugging print
        
        return np.array(images), labels

    
    X_train_spectrogram, spectrogram_train_labels = load_spectrogram_images(X_train_spectrogram_paths)
    X_test_spectrogram, spectrogram_test_labels = load_spectrogram_images(X_test_spectrogram_paths)
    
    spectrogram_train_labels = label_encoder.transform(spectrogram_train_labels)
    spectrogram_test_labels = label_encoder.transform(spectrogram_test_labels)
    
    y_train_spectrogram = to_categorical(spectrogram_train_labels)
    y_test_spectrogram = to_categorical(spectrogram_test_labels)
    
    return X_train_mfcc, y_train_mfcc, X_test_mfcc, y_test_mfcc, X_train_spectrogram, y_train_spectrogram, X_test_spectrogram, y_test_spectrogram, label_encoder


In [14]:
base_audio_path = "C:/Users/HP/Downloads/Project/Dataset/base_audio_mfcc_features_with_labels.csv"
X_train_mfcc, y_train_mfcc, X_test_mfcc, y_test_mfcc, X_train_spec, y_train_spec, X_test_spec, y_test_spec, label_encoder = SplitDataset(base_audio_path)

In [15]:
# Print sample output
print(f"Sample MFCC Features Shape: {X_test_mfcc.shape}")  # (samples, timesteps, features)
print(f"Sample Spectrogram Image Shape: {X_test_spec.shape}")  # (samples, 224, 224, 3)
print(f"Sample Label One-Hot: {y_test_mfcc[12]}")  # Example label
print(f"Sample Label One-Hot: {y_test_spec[12]}")  # Example label

Sample MFCC Features Shape: (144, 13, 1)
Sample Spectrogram Image Shape: (144, 224, 224, 3)
Sample Label One-Hot: [0. 1. 0. 0.]
Sample Label One-Hot: [0. 1. 0. 0.]


In [56]:
inc_audio_path = "C:/Users/HP/Downloads/Project/Dataset/inc_audio_mfcc_features_with_labels.csv"
new_X_train_mfcc, new_y_train_mfcc, new_X_test_mfcc, new_y_test_mfcc, new_X_train_spec, new_y_train_spec, new_X_test_spec, new_y_test_spec, new_label_encoder = SplitDataset(inc_audio_path)

In [7]:
# Print sample output
print(f"Sample MFCC Features Shape: {new_X_test_mfcc.shape}")  # (samples, timesteps, features)
print(f"Sample Spectrogram Image Shape: {new_X_test_spec.shape}")  # (samples, 224, 224, 3)
print(f"Sample Label One-Hot: {new_y_test_mfcc[12]}")  # Example label
print(f"Sample Label One-Hot: {new_y_test_spec[12]}")  # Example label


Sample MFCC Features Shape: (96, 13, 1)
Sample Spectrogram Image Shape: (96, 224, 224, 3)
Sample Label One-Hot: [0. 0. 1. 0.]
Sample Label One-Hot: [0. 0. 1. 0.]


In [22]:
# Load the trained MFCC models
mfcc_model_1 = tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\MFCC_H5\mfcc_BiLSTM_Model.h5")
mfcc_model_2 = tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\MFCC_H5\mfcc_cnn_model.h5")
mfcc_model_3 = tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\MFCC_H5\mfcc_LSTM_Model.h5")

mfcc_weights = [0.3, 0.2, 0.5] 

mfcc_models = [mfcc_model_1, mfcc_model_2, mfcc_model_3]

spec_model_1 = tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\Spec_H5\spec_mobilenet_model.h5")
spec_model_2 = tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\Spec_H5\spectrogram_cnn_model.h5")
spec_model_3 = tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\Spec_H5\spectrogram_resnet_model.h5")

spec_weights = [0.2, 0.2, 0.6]  

spec_models = [spec_model_1, spec_model_2, spec_model_3]



In [48]:
import tensorflow as tf
import numpy as np
from tqdm import tqdm

class EWC:
    def __init__(self, prior_model, data_samples, num_sample=30):
        self.prior_model = prior_model
        self.prior_weights = prior_model.get_weights()  # Store initial weights
        self.num_sample = num_sample
        self.data_samples = data_samples
        self.fisher_matrix = self.compute_fisher()

    def compute_fisher(self):
        fisher_accum = [np.zeros_like(w) for w in self.prior_model.trainable_weights]  # Initialize Fisher matrix

        for _ in tqdm(range(self.num_sample)):
            idx = np.random.randint(self.data_samples.shape[0])
            with tf.GradientTape() as tape:
                logits = tf.nn.log_softmax(self.prior_model(np.array([self.data_samples[idx]])))  # Forward pass
            grads = tape.gradient(logits, self.prior_model.trainable_weights)  # Compute gradients

            for i in range(len(grads)):
                if grads[i] is not None:  # Skip layers with no gradients
                    fisher_accum[i] += np.square(grads[i].numpy())  # Compute Fisher per layer

        fisher_accum = [f / self.num_sample for f in fisher_accum]  # Normalize Fisher matrix
        return fisher_accum
    
    def get_fisher(self):
        return self.fisher_matrix


In [49]:
class TrainEWC:
    def __init__(self, optimizer, loss_fn, prior_weights=None, lambda_=0.1):
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.prior_weights = prior_weights
        self.lambda_ = lambda_

    def train(self, model, train_data, train_labels, fisher_matrix, epochs=10):
        for epoch in tqdm(range(epochs)):
            for i in range(len(train_data)):  # Iterate through indices
                X = train_data[i: i+1]  # Get batch (single sample)
                y = train_labels[i: i+1]  # Get corresponding label
                
                with tf.GradientTape() as tape:
                    pred = model(X)  # Forward pass
                    loss = self.loss_fn(y, pred)  # Compute loss

                    # Add EWC penalty
                    ewc_loss = self.lambda_ * sum(
                        tf.reduce_sum(f * tf.square(w - w_old))
                        for f, w, w_old in zip(fisher_matrix, model.trainable_weights, self.prior_weights)
                    )
                    total_loss = loss + ewc_loss

                grads = tape.gradient(total_loss, model.trainable_weights)
                self.optimizer.apply_gradients(zip(grads, model.trainable_weights))


    def compute_penalty_loss(self, model, fisher_matrix):
        penalty = 0
        for f, w, p in zip(fisher_matrix, model.get_weights(), self.prior_weights):
            if f.shape == w.shape:  # Ensure Fisher matrix matches weight shape
                penalty += tf.reduce_sum(f * tf.square(w - p))
        return 0.5 * self.lambda_ * penalty


In [None]:
import os
import numpy as np
import tensorflow as tf

# Load all MFCC and Spectrogram models
mfcc_models = {
    "bilstm": tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\MFCC_H5\mfcc_BiLSTM_Model.h5"),
    "lstm": tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\MFCC_H5\mfcc_LSTM_Model.h5"),
}

spec_models = {
    "mbnet": tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\Spec_H5\spec_mobilenet_model.h5"),
    "cnn": tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\Spec_H5\spectrogram_cnn_model.h5"),
    "resnet": tf.keras.models.load_model(r"C:\Users\HP\Downloads\Project\Code\Spec_H5\spectrogram_resnet_model.h5"),
}

# Dummy EWC function (Replace with actual EWC implementation)
class EWC:
    def __init__(self, model, data):
        self.model = model
        self.data = data

    def get_fisher(self):
        return [tf.ones_like(w) for w in self.model.trainable_weights]  # Simulated Fisher matrix

# Compute Fisher matrices
data_samples_mfcc = np.random.rand(100, *mfcc_models["bilstm"].input_shape[1:])
data_samples_spec = np.random.rand(100, *spec_models["cnn"].input_shape[1:])
# new_X_train_spec = np.expand_dims(new_X_train_spec, axis=0)
# print("Fixed training data shape:", new_X_train_spec.shape)

# print("Expected input shape:", spec_models["cnn"].input_shape)
# print("New training data shape:", new_X_train_spec.shape)

fisher_mfcc = {name: EWC(model, data_samples_mfcc).get_fisher() for name, model in mfcc_models.items()}
fisher_spec = {name: EWC(model, data_samples_spec).get_fisher() for name, model in spec_models.items()}

# Ensure save directories exist
mfcc_save_dir = r"C:\Users\HP\Downloads\Project\Code\MFCC_H5\Updated"
spec_save_dir = r"C:\Users\HP\Downloads\Project\Code\Spec_H5\Updated"
os.makedirs(mfcc_save_dir, exist_ok=True)
os.makedirs(spec_save_dir, exist_ok=True)

# ✅ Adjust Fisher matrix shapes
def adjust_fisher_shapes(fisher_matrix, model):
    return [tf.broadcast_to(f, w.shape) if f.shape != w.shape else f for f, w in zip(fisher_matrix, model.trainable_weights)]

for name in mfcc_models.keys():
    fisher_mfcc[name] = adjust_fisher_shapes(fisher_mfcc[name], mfcc_models[name])

for name in spec_models.keys():
    fisher_spec[name] = adjust_fisher_shapes(fisher_spec[name], spec_models[name])

# ✅ Fix: Use fresh optimizer per model
class TrainEWC:
    def __init__(self, loss_fn, lambda_=0.1):
        self.loss_fn = loss_fn
        self.lambda_ = lambda_  # Regularization strength

    def train(self, model, X_train, y_train, fisher_matrix, epochs=10):
        if len(X_train.shape) == 3:  # If still (224, 224, 3), add batch dim
            X_train = np.expand_dims(X_train, axis=0)  # (1, 224, 224, 3)

        print("X_train shape:", X_train.shape)
        print("y_train shape:", y_train.shape)

        optimizer = tf.keras.optimizers.Adam()  # ✅ Fresh optimizer for each model
        fisher_matrix = adjust_fisher_shapes(fisher_matrix, model)

        model.compile(optimizer=optimizer, loss=self.loss_fn)  # ✅ Ensure model is compiled

        dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(32)
        prior_weights = model.get_weights()  # Store initial weights for EWC

        for epoch in range(epochs):
            for X, y in dataset:
                with tf.GradientTape() as tape:
                    pred = model(X, training=True)
                    loss = self.loss_fn(y, pred)

                    # ✅ Fix: Ensure EWC Loss uses correct shapes
                    ewc_loss = self.lambda_ * sum(
                        tf.reduce_sum(f * tf.square(w - w_old))
                        for f, w, w_old in zip(fisher_matrix, model.trainable_weights, prior_weights)
                    )

                    total_loss = loss + ewc_loss

                grads = tape.gradient(total_loss, model.trainable_weights)
                optimizer.apply_gradients(zip(grads, model.trainable_weights))  # ✅ Fresh optimizer applied here

            print(f"Epoch {epoch+1}/{epochs} - Loss: {loss.numpy():.4f}")

# Train & Save Updated MFCC Models
for name, model in mfcc_models.items():
    trainer = TrainEWC(loss_fn=tf.keras.losses.CategoricalCrossentropy(), lambda_=0.1)
    trainer.train(model, new_X_train_mfcc, new_y_train_mfcc, fisher_mfcc[name], epochs=2)

    model.save(os.path.join(mfcc_save_dir, f"updated_mfcc_{name}.h5"))
    print(f"✅ Updated MFCC model '{name}' saved.")

# Train & Save Updated Spectrogram Models
for name, model in spec_models.items():
    trainer = TrainEWC(loss_fn=tf.keras.losses.CategoricalCrossentropy(), lambda_=0.1)
    trainer.train(model, new_X_train_spec, new_y_train_spec, fisher_spec[name], epochs=2)

    model.save(os.path.join(spec_save_dir, f"updated_spec_{name}.h5"))
    print(f"✅ Updated Spectrogram model '{name}' saved.")




Fixed training data shape: (1, 1, 224, 224, 224, 3)
Expected input shape: (None, 224, 224, 3)
New training data shape: (1, 1, 224, 224, 224, 3)
X_train shape: (224, 13, 1)
y_train shape: (224, 4)
Epoch 1/2 - Loss: 0.4603




Epoch 2/2 - Loss: 0.2255
✅ Updated MFCC model 'bilstm' saved.
X_train shape: (224, 13, 1)
y_train shape: (224, 4)
Epoch 1/2 - Loss: 0.5110




Epoch 2/2 - Loss: 0.2868
✅ Updated MFCC model 'lstm' saved.
X_train shape: (1, 1, 224, 224, 224, 3)
y_train shape: (224, 4)


ValueError: Dimensions 1 and 224 are not compatible