In [None]:
# ==================================================
# üîÅ Full Reproducibility + Warning Suppression Setup
# ==================================================
import os
import random
import logging
import warnings

# === ENVIRONMENT VARIABLES (SET BEFORE TF IMPORT) ===
SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)           # Hash seed
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'           # Suppress TensorFlow logs
os.environ['TF_DETERMINISTIC_OPS'] = '0'           # Allow non-deterministic ops to avoid UnimplementedError
os.environ['CUDA_VISIBLE_DEVICES'] = '0'           # Set GPU ID (or "" to force CPU)

# === PYTHON SEED SETTINGS ===
random.seed(SEED)

# === SUPPRESS WARNINGS & LOGGING ===
logging.getLogger('absl').setLevel(logging.ERROR)
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)

# === IMPORT LIBRARIES AFTER SEED SETTINGS ===
import numpy as np
import tensorflow as tf
import pandas as pd
import librosa
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score
from tqdm import tqdm

# === NUMPY & TENSORFLOW SEEDS ===
np.random.seed(SEED)
tf.random.set_seed(SEED)

# === SINGLE-THREADING FOR FULL REPRODUCIBILITY ===
tf.config.threading.set_intra_op_parallelism_threads(1)
tf.config.threading.set_inter_op_parallelism_threads(1)

# ‚úÖ CHECK GPU AVAILABILITY
print("‚úÖ GPU Available:", tf.config.list_physical_devices('GPU'))

# ‚úÖ DATASET PATH
DATASET_PATH = r'D:\498R\BanglaSER'

# ‚úÖ EMOTION LABELS FROM FILENAME
# Format: Mode-StatementType-Emotion-Intensity-Statement-Repetition-Actor.wav
EMOTION_MAPPING = {
    '01': 'happy',
    '02': 'sad',
    '03': 'angry',
    '04': 'surprise',
    '05': 'neutral'
}

In [None]:
# Create dataset information from file structure
def create_dataset_info(root_path):
    data = []
    for root, dirs, files in os.walk(root_path):
        for file in files:
            if file.endswith('.wav'):
                # Parse the filename to get emotion
                # Format: Mode-StatementType-Emotion-Intensity-Statement-Repetition-Actor.wav
                parts = file.split('-')
                if len(parts) == 7:
                    emotion_code = parts[2]
                    emotion = EMOTION_MAPPING.get(emotion_code, 'unknown')
                    file_path = os.path.join(root, file)
                    data.append({
                        'file_path': file_path,
                        'emotion': emotion,
                        'emotion_code': emotion_code
                    })
    
    return pd.DataFrame(data)

# Create and display dataset info
df = create_dataset_info(DATASET_PATH)
print(f"Total samples: {len(df)}")
print(df['emotion'].value_counts())

In [None]:
import matplotlib.pyplot as plt

# Get class distribution
emotion_counts = df['emotion'].value_counts()

# Pie chart
fig, ax = plt.subplots(figsize=(8,8), facecolor='none')  # transparent figure background
ax.set_facecolor('none')  # transparent axes background

wedges, texts, autotexts = ax.pie(
    emotion_counts, 
    labels=emotion_counts.index, 
    autopct='%1.1f%%',
    startangle=90, 
    colors=plt.cm.Set3.colors,
    textprops={'fontsize': 19, 'fontweight': 'bold'}
)

# Bolden the percentages
for autotext in autotexts:
    autotext.set_fontsize(19)
    autotext.set_fontweight('bold')

plt.show()


In [None]:
import numpy as np
import librosa
from tqdm import tqdm
from collections import defaultdict
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf

# ---------------------------
# Audio loading & utilities
# ---------------------------
TARGET_SR = 16000
DURATION_SEC = 4.0
TARGET_LEN = int(TARGET_SR * DURATION_SEC)

def load_audio_fixed(file_path, sr=TARGET_SR, duration=DURATION_SEC):
    audio, sr = librosa.load(file_path, sr=sr, duration=duration)
    if len(audio) < TARGET_LEN:
        audio = np.pad(audio, (0, TARGET_LEN - len(audio)), mode='constant')
    else:
        audio = audio[:TARGET_LEN]
    return audio, sr

def ensure_length(audio, target_len=TARGET_LEN):
    if len(audio) < target_len:
        audio = np.pad(audio, (0, target_len - len(audio)), mode='constant')
    elif len(audio) > target_len:
        audio = audio[:target_len]
    return audio

# ---------------------------
# Audio-domain augmentations
# ---------------------------
def aug_noise(audio, noise_std=0.005):
    return audio + np.random.normal(0.0, noise_std, size=audio.shape)

def aug_shift(audio, max_shift_ratio=0.1):
    """Zero-pad shift (no wrap-around)."""
    max_shift = int(len(audio) * max_shift_ratio)
    shift = np.random.randint(-max_shift, max_shift + 1)
    if shift == 0:
        return audio
    if shift > 0:
        # delay: pad at start, drop end
        return np.concatenate([np.zeros(shift, dtype=audio.dtype), audio[:-shift]])
    else:
        # advance: drop start, pad at end
        s = -shift
        return np.concatenate([audio[s:], np.zeros(s, dtype=audio.dtype)])

def aug_pitch(audio, sr=TARGET_SR, semitone_range=(-2, 2)):
    steps = np.random.uniform(semitone_range[0], semitone_range[1])
    y = librosa.effects.pitch_shift(y=audio, sr=sr, n_steps=steps)
    return ensure_length(y, TARGET_LEN)

def aug_stretch(audio, rate_range=(0.8, 1.25)):
    rate = np.random.uniform(rate_range[0], rate_range[1])
    # In some librosa versions, 'rate' is keyword-only
    y = librosa.effects.time_stretch(y=audio, rate=rate)
    return ensure_length(y, TARGET_LEN)

AUG_FUNCS = ['noise', 'shift', 'pitch', 'stretch']

def apply_aug(audio, method):
    if method == 'noise':
        return aug_noise(audio)
    elif method == 'shift':
        return aug_shift(audio)
    elif method == 'pitch':
        return aug_pitch(audio)
    elif method == 'stretch':
        return aug_stretch(audio)
    return audio

# ---------------------------
# Feature extraction (unchanged logic, but from audio)
# ---------------------------
def extract_features_from_audio(audio, sr=TARGET_SR):
    mfccs   = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
    mel_s   = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=128)
    log_mel = librosa.power_to_db(mel_s)
    zcr     = librosa.feature.zero_crossing_rate(audio)
    chroma  = librosa.feature.chroma_stft(y=audio, sr=sr, n_chroma=12)
    rms     = librosa.feature.rms(y=audio)

    features = np.vstack([mfccs, log_mel, zcr, chroma, rms])  # (155, T)
    return features.T  # (T, 155)

# If you need the same signature as before:
def extract_features(file_path):
    audio, sr = load_audio_fixed(file_path, sr=TARGET_SR, duration=DURATION_SEC)
    return extract_features_from_audio(audio, sr)

# ---------------------------
# Label mapping (same as yours)
# ---------------------------
emotions = df['emotion'].unique()
label_mapping = {label: i for i, label in enumerate(emotions)}
reverse_mapping = {i: label for label, i in label_mapping.items()}
num_labels = len(emotions)

print("\nEmotion Label Mapping:")
for emotion, idx in label_mapping.items():
    print(f"{emotion}: {idx}")

# ---------------------------------------------
# PREPROCESS + AUDIO AUGMENTATION (before split)
# ---------------------------------------------
print("üîÅ Extracting ORIGINAL features for ALL files ...")
X_all, y_all = [], []
audios_by_class = defaultdict(list)

for _, row in tqdm(df.iterrows(), total=len(df), desc="All data"):
    audio, sr = load_audio_fixed(row['file_path'], sr=TARGET_SR, duration=DURATION_SEC)
    # keep raw audio per class for later augmentation
    label = label_mapping[row['emotion']]
    audios_by_class[label].append(audio)

    # original features
    feats = extract_features_from_audio(audio, sr)
    X_all.append(feats)
    y_all.append(label)

print(f"üìä Original samples: {len(X_all)}")

# Class-balanced augmentation over WHOLE set (audio-domain)
target_per_class = 1000  # your target
aug_X_all, aug_y_all = [], []

print("üéõÔ∏è Augmenting (audio-domain) to balance classes ...")
for label, audios in audios_by_class.items():
    count = len(audios)
    needed = max(0, target_per_class - count)
    i = 0
    while needed > 0:
        base = audios[i % count]
        method = AUG_FUNCS[np.random.randint(0, len(AUG_FUNCS))]
        aug_audio = apply_aug(base, method)
        aug_audio = ensure_length(aug_audio, TARGET_LEN)  # safety
        feats = extract_features_from_audio(aug_audio, TARGET_SR)
        aug_X_all.append(feats)
        aug_y_all.append(label)
        i += 1
        needed -= 1

# Combine originals + augmented (BEFORE split)
X_all = np.concatenate([np.array(X_all), np.array(aug_X_all)], axis=0)
y_all = np.concatenate([np.array(y_all), np.array(aug_y_all)], axis=0)

print(f"‚úÖ Final samples after class-balanced augmentation: {X_all.shape[0]} (shape per sample: {X_all.shape[1:]} )")

# -----------------
# Split (as you want)
# -----------------
X_train, X_temp, y_train, y_temp = train_test_split(
    X_all, y_all, test_size=0.2, random_state=42, stratify=y_all
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

print(f"Train set: {len(X_train)} | Val set: {len(X_val)} | Test set: {len(X_test)}")

# ---------------------------------------
# Feature scaling (fit on TRAIN only)
# ---------------------------------------
# Flatten across time to fit a scaler on feature dims
T, F = X_train.shape[1], X_train.shape[2]
scaler = StandardScaler()
scaler.fit(X_train.reshape(-1, F))  # train-only fit

def transform_with_scaler(X, scaler):
    N, T, F = X.shape
    X2 = X.reshape(-1, F)
    X2 = scaler.transform(X2)
    return X2.reshape(N, T, F)

X_train = transform_with_scaler(X_train, scaler)
X_val   = transform_with_scaler(X_val, scaler)
X_test  = transform_with_scaler(X_test, scaler)

# -------------------------
# Build TensorFlow datasets
# -------------------------
batch_size = 32
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train)).batch(batch_size).prefetch(tf.data.AUTOTUNE)
val_ds   = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(batch_size).prefetch(tf.data.AUTOTUNE)
test_ds  = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(batch_size).prefetch(tf.data.AUTOTUNE)

print("\n‚úÖ Dataset preparation complete (AUGMENT ‚Üí SPLIT).")

In [None]:
import matplotlib.pyplot as plt
import os

# Pick one sample (e.g., first training sample)
idx = 0  
audio = X_train[idx].reshape(-1)  

# Extract individual features
mfccs   = librosa.feature.mfcc(y=audio, sr=TARGET_SR, n_mfcc=13)
mel_s   = librosa.feature.melspectrogram(y=audio, sr=TARGET_SR, n_mels=128)
log_mel = librosa.power_to_db(mel_s)
zcr     = librosa.feature.zero_crossing_rate(audio)
chroma  = librosa.feature.chroma_stft(y=audio, sr=TARGET_SR, n_chroma=12)
rms     = librosa.feature.rms(y=audio)

features_dict = {
    "MFCCs": (mfccs, "magma"),
    "Log-Mel": (log_mel, "inferno"),
    "ZCR": (zcr, "plasma"),
    "Chroma": (chroma, "cividis"),
    "RMS": (rms, "coolwarm")
}

# Folder to save images
os.makedirs("feature_heatmaps_v2", exist_ok=True)

# Save each feature as a separate heatmap with new color + name
for name, (feat, cmap) in features_dict.items():
    plt.figure(figsize=(6,4))
    plt.imshow(feat, aspect='auto', origin='lower', cmap=cmap)
    plt.title(f"{name}")
    plt.xlabel("Time")
    plt.ylabel("Features")
    plt.colorbar(format="%+2.1f dB" if name=="Log-Mel" else None)
    plt.tight_layout()
    plt.savefig(
        f"feature_heatmaps_v2/{name}_v2.png",  # different filenames
        transparent=True,
        dpi=300
    )
    plt.close()

print("‚úÖ Saved 5 new heatmaps with different colors in 'feature_heatmaps_v2/' folder")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Example: your real counts
before_counts = [len(audios) for label, audios in audios_by_class.items()]
after_counts  = [1000 for _ in audios_by_class]   # since you balanced to 1000
class_labels  = [reverse_mapping[label] for label in audios_by_class.keys()]

x = np.arange(len(class_labels))
width = 0.5  # üîπ Increased thickness

# üîπ Transparent figure + axes
fig, ax = plt.subplots(figsize=(10,6), facecolor='none')
ax.set_facecolor('none')

# Plot bars
rects1 = ax.bar(x - width/2, before_counts, width, 
                label="Before Augmentation", color='skyblue')
rects2 = ax.bar(x + width/2, after_counts, width, 
                label="After Augmentation", color='lightcoral')

# Titles and labels
ax.set_ylabel("Number of Samples", fontsize=16, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(class_labels, rotation=0, fontsize=14, fontweight='bold')  # üîπ No tilt
ax.tick_params(axis='y', labelsize=14, width=2)

# Add value labels above bars
def autolabel(rects, color):
    for rect in rects:
        height = rect.get_height()
        ax.annotate(f'{height}',
                    xy=(rect.get_x() + rect.get_width()/2, height),
                    xytext=(0, 5),
                    textcoords="offset points",
                    ha='center', va='bottom',
                    fontsize=14, fontweight='bold', color=color)

autolabel(rects1, 'black')
autolabel(rects2, 'black')

# Legend
ax.legend(
    loc='upper left',
    bbox_to_anchor=(1.02, 1),
    borderaxespad=0,
    frameon=False,
    fontsize=14
)

# üîπ Remove outer box (all spines)
for spine in ax.spines.values():
    spine.set_visible(False)

plt.tight_layout()

# Show transparent plot
plt.show()

# üîπ If saving, keep transparency
# plt.savefig("augmentation_counts.png", dpi=300, transparent=True)


In [None]:
input_shape =X_train[0].shape

In [None]:
# ============================
# Duplicate detection/cleaning + GRAPHS
# ============================
import hashlib
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_context("talk")

# --- snapshot "before" for graphs ---
X_train_bef, y_train_bef = X_train.copy(), y_train.copy()
X_val_bef,   y_val_bef   = X_val.copy(),   y_val.copy()
X_test_bef,  y_test_bef  = X_test.copy(),  y_test.copy()

def hash_sample(arr, decimals=6):
    """Stable hash for a (T,F) float array after rounding (avoids tiny float jitter)."""
    a = np.ascontiguousarray(np.round(arr, decimals))
    return hashlib.sha1(a.view(np.uint8)).hexdigest()

def hashes_for_split(X):
    return np.array([hash_sample(x) for x in X])

def find_within_split_dups(hashes):
    buckets = defaultdict(list)
    for i, h in enumerate(hashes):
        buckets[h].append(i)
    # groups with more than 1 index
    return {h: idxs for h, idxs in buckets.items() if len(idxs) > 1}

def count_within_extra(dups_dict):
    # number of EXTRA items (i.e., total dup count minus 1 per group)
    return sum(len(idxs) - 1 for idxs in dups_dict.values())

def keep_first_indices(hashes):
    """Indices that keep the first appearance of each hash (deduplicate in-place)."""
    seen, keep = set(), []
    for i, h in enumerate(hashes):
        if h not in seen:
            seen.add(h)
            keep.append(i)
    return np.array(keep, dtype=int)

def apply_mask(X, y, keep_idx):
    mask = np.zeros(len(X), dtype=bool)
    mask[keep_idx] = True
    return X[mask], y[mask], mask

# --- compute hashes BEFORE cleaning ---
train_h_bef = hashes_for_split(X_train_bef)
val_h_bef   = hashes_for_split(X_val_bef)
test_h_bef  = hashes_for_split(X_test_bef)

dups_train_bef = find_within_split_dups(train_h_bef)
dups_val_bef   = find_within_split_dups(val_h_bef)
dups_test_bef  = find_within_split_dups(test_h_bef)

within_train_bef = count_within_extra(dups_train_bef)
within_val_bef   = count_within_extra(dups_val_bef)
within_test_bef  = count_within_extra(dups_test_bef)

# cross-split overlaps BEFORE cleaning
train_set_bef = set(train_h_bef)
val_set_bef   = set(val_h_bef)
test_set_bef  = set(test_h_bef)

val_in_train_bef  = np.where(np.isin(val_h_bef,  list(train_set_bef)))[0]
test_in_train_bef = np.where(np.isin(test_h_bef, list(train_set_bef)))[0]
train_in_val_bef  = np.where(np.isin(train_h_bef, list(val_set_bef)))[0]
train_in_test_bef = np.where(np.isin(train_h_bef, list(test_set_bef)))[0]
val_in_test_bef   = np.where(np.isin(val_h_bef,  list(test_set_bef)))[0]
test_in_val_bef   = np.where(np.isin(test_h_bef, list(val_set_bef)))[0]

print("\n=== DUPLICATE SUMMARY (before cleaning) ===")
print(f"[TRAIN] within groups: {len(dups_train_bef)} | extras: {within_train_bef}")
print(f"[VAL]   within groups: {len(dups_val_bef)}   | extras: {within_val_bef}")
print(f"[TEST]  within groups: {len(dups_test_bef)}  | extras: {within_test_bef}")
print(f"[VAL]   duplicates in TRAIN: {len(val_in_train_bef)}")
print(f"[TEST]  duplicates in TRAIN: {len(test_in_train_bef)}")
print(f"[TRAIN] duplicates in VAL:   {len(train_in_val_bef)}")
print(f"[TRAIN] duplicates in TEST:  {len(train_in_test_bef)}")
print(f"[VAL]   duplicates in TEST:  {len(val_in_test_bef)}")
print(f"[TEST]  duplicates in VAL:   {len(test_in_val_bef)}")

# -----------------------------
# Remove duplicates (toggle)
# -----------------------------
REMOVE_WITHIN_SPLIT = True          # drop duplicates within each split (keep first)
REMOVE_VALTEST_IF_IN_TRAIN = True   # drop val/test items that also appear in train

changed = False

# 1) within-split dedupe
if REMOVE_WITHIN_SPLIT:
    keep_tr = keep_first_indices(train_h_bef)
    keep_va = keep_first_indices(val_h_bef)
    keep_te = keep_first_indices(test_h_bef)

    if len(keep_tr) < len(X_train) or len(keep_va) < len(X_val) or len(keep_te) < len(X_test):
        X_train, y_train, _ = apply_mask(X_train, y_train, keep_tr)
        X_val,   y_val,   _ = apply_mask(X_val,   y_val,   keep_va)
        X_test,  y_test,  _ = apply_mask(X_test,  y_test,  keep_te)
        changed = True

# re-hash after within-split
train_hashes = hashes_for_split(X_train)
val_hashes   = hashes_for_split(X_val)
test_hashes  = hashes_for_split(X_test)
train_set    = set(train_hashes)

# 2) remove any VAL/TEST samples that appear in TRAIN
if REMOVE_VALTEST_IF_IN_TRAIN:
    val_keep_idx  = np.where(~np.isin(val_hashes,  list(train_set)))[0]
    test_keep_idx = np.where(~np.isin(test_hashes, list(train_set)))[0]
    if len(val_keep_idx) < len(X_val) or len(test_keep_idx) < len(X_test):
        X_val,  y_val,  _ = apply_mask(X_val,  y_val,  val_keep_idx)
        X_test, y_test, _ = apply_mask(X_test, y_test, test_keep_idx)
        changed = True

print("\n" + ("Duplicates removed." if changed else "No duplicates removed."))
print(f"Sizes ‚Äî Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")

# --- POST-clean for graphs ---
train_h_aft = hashes_for_split(X_train)
val_h_aft   = hashes_for_split(X_val)
test_h_aft  = hashes_for_split(X_test)

dups_train_aft = find_within_split_dups(train_h_aft)
dups_val_aft   = find_within_split_dups(val_h_aft)
dups_test_aft  = find_within_split_dups(test_h_aft)

within_train_aft = count_within_extra(dups_train_aft)
within_val_aft   = count_within_extra(dups_val_aft)
within_test_aft  = count_within_extra(dups_test_aft)

# ==================
# GRAPHS
# ==================

# 1) Counts before vs after
sizes_before = [len(X_train_bef), len(X_val_bef), len(X_test_bef)]
sizes_after  = [len(X_train),     len(X_val),     len(X_test)]
splits = ['Train', 'Val', 'Test']

plt.figure(figsize=(8,5))
x = np.arange(len(splits))
w = 0.38
plt.bar(x - w/2, sizes_before, width=w, label='Before')
plt.bar(x + w/2, sizes_after,  width=w, label='After')
plt.xticks(x, splits)
plt.ylabel('Num samples')
plt.title('Split sizes: before vs after duplicate cleaning')
plt.legend()
plt.tight_layout()
plt.savefig('dup_sizes_before_after.png')
plt.close()

# 2) Within-split duplicates (extras) before vs after
within_bef = [within_train_bef, within_val_bef, within_test_bef]
within_aft = [within_train_aft, within_val_aft, within_test_aft]

plt.figure(figsize=(8,5))
plt.bar(x - w/2, within_bef, width=w, label='Before')
plt.bar(x + w/2, within_aft, width=w, label='After')
plt.xticks(x, splits)
plt.ylabel('Num duplicate extras')
plt.title('Within-split duplicate extras: before vs after')
plt.legend()
plt.tight_layout()
plt.savefig('dup_within_extras_before_after.png')
plt.close()

# 3) Cross-split overlap heatmap (BEFORE cleaning)
overlap_mat = np.array([
    [within_train_bef, len(train_in_val_bef), len(train_in_test_bef)],
    [len(val_in_train_bef), within_val_bef,   len(val_in_test_bef)],
    [len(test_in_train_bef), len(test_in_val_bef), within_test_bef]
], dtype=int)

plt.figure(figsize=(7,6))
sns.heatmap(overlap_mat, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Train','Val','Test'],
            yticklabels=['Train','Val','Test'])
plt.title('Duplicate overlap (BEFORE cleaning)\n(diagonal = within extras)')
plt.tight_layout()
plt.savefig('dup_overlap_heatmap_before.png')
plt.close()

# 4) Class distribution per split ‚Äî before and after (stacked bars)
def plot_class_dist(y_train, y_val, y_test, title, fname):
    classes = [reverse_mapping[i] for i in range(len(reverse_mapping))]
    # counts per class per split
    counts_tr = np.array([np.sum(y_train == i) for i in range(len(reverse_mapping))])
    counts_va = np.array([np.sum(y_val   == i) for i in range(len(reverse_mapping))])
    counts_te = np.array([np.sum(y_test  == i) for i in range(len(reverse_mapping))])

    # stacked bars across splits
    plt.figure(figsize=(10,6))
    bottoms = np.zeros(3)
    for i, cls in enumerate(classes):
        vals = np.array([counts_tr[i], counts_va[i], counts_te[i]])
        plt.bar(splits, vals, bottom=bottoms, label=cls)
        bottoms += vals
    plt.ylabel('Num samples')
    plt.title(title)
    plt.legend(bbox_to_anchor=(1.02, 1), loc='upper left')
    plt.tight_layout()
    plt.savefig(fname)
    plt.close()

plot_class_dist(y_train_bef, y_val_bef, y_test_bef,
                'Class distribution per split (BEFORE cleaning)',
                'class_dist_before.png')

plot_class_dist(y_train, y_val, y_test,
                'Class distribution per split (AFTER cleaning)',
                'class_dist_after.png')

print("üìà Saved figures:",
      "dup_sizes_before_after.png,",
      "dup_within_extras_before_after.png,",
      "dup_overlap_heatmap_before.png,",
      "class_dist_before.png,",
      "class_dist_after.png")

In [None]:
import os
import numpy as np
import tensorflow as tf

# ===============================
# üîß Configuration
# ===============================
SAVE_PATH = r"D:\498R"   # Adjust path
batch_size = 32

# ===============================
# üíæ Save arrays to disk (once)
# ===============================
# Example: suppose X_train, y_train, X_val, y_val, X_test, y_test exist in RAM

# np.savez_compressed saves multiple arrays under chosen keys
np.savez_compressed(os.path.join(SAVE_PATH, "train(1000)(Cleaned).npz"), X=X_train, y=y_train)
np.savez_compressed(os.path.join(SAVE_PATH, "val(1000)(Cleaned).npz"), X=X_val, y=y_val)
np.savez_compressed(os.path.join(SAVE_PATH, "test(1000)(Cleaned).npz"), X=X_test, y=y_test)

print("‚úÖ Saved NPZ files.")

# # ===============================
# # üìÇ Load datasets from disk
# # ===============================
# def load_dataset(filename):
#     path = os.path.join(SAVE_PATH, filename)
#     if not os.path.exists(path):
#         raise FileNotFoundError(f"File not found: {path}")
#     data = np.load(path)
#     X, y = data['X'], data['y']
#     # Ensure correct dtype for TensorFlow
#     X = X.astype('float32')
#     y = y.astype('int64')
#     return X, y

# X_train, y_train = load_dataset("train_hybrid_2000.npz")
# X_val, y_val     = load_dataset("val_hybrid.npz")
# X_test, y_test   = load_dataset("test_hybrid.npz")

# # ===============================
# # üîÅ Create tf.data pipelines
# # ===============================
# train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)) \
#     .shuffle(buffer_size=len(X_train), seed=42) \
#     .batch(batch_size) \
#     .prefetch(tf.data.AUTOTUNE)

# val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val)) \
#     .batch(batch_size) \
#     .cache() \
#     .prefetch(tf.data.AUTOTUNE)

# test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)) \
#     .batch(batch_size) \
#     .cache() \
#     .prefetch(tf.data.AUTOTUNE)

# # ===============================
# # üõ† Sanity checks
# # ===============================
# print(f"Train: {X_train.shape} {y_train.shape}")
# print(f"Val:   {X_val.shape} {y_val.shape}")
# print(f"Test:  {X_test.shape} {y_test.shape}")

# print("Train classes:", np.unique(y_train))
# print("Val classes:  ", np.unique(y_val))
# print("Test classes: ", np.unique(y_test))


In [None]:
def create_cnn_model(input_shape, num_labels):
    inputs = tf.keras.layers.Input(shape=input_shape)

    # Increased number of filters in Conv1D layers and more layers
    x = tf.keras.layers.Conv1D(128, 5, padding='same', activation='relu')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.Conv1D(256, 5, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.Conv1D(512, 3, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    # Adding an extra Conv1D layer for more complexity
    x = tf.keras.layers.Conv1D(1024, 3, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    x = tf.keras.layers.Dense(512, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    outputs = tf.keras.layers.Dense(num_labels)(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model


def create_cnn_lstm_model(input_shape, num_labels):
    inputs = tf.keras.layers.Input(shape=input_shape)

    # Increased number of filters in Conv1D layers and more layers
    x = tf.keras.layers.Conv1D(128, 5, padding='same', activation='relu')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.Conv1D(256, 5, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.Conv1D(512, 3, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    # Bidirectional LSTM with more units
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(256, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    x = tf.keras.layers.Dense(512, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    outputs = tf.keras.layers.Dense(num_labels)(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model


def create_cnn_gru_model(input_shape, num_labels):
    inputs = tf.keras.layers.Input(shape=input_shape)

    # Increased number of filters in Conv1D layers and more layers
    x = tf.keras.layers.Conv1D(128, 5, padding='same', activation='relu')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.Conv1D(256, 5, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.Conv1D(512, 3, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2)(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    # Bidirectional GRU with more units
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(256, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    x = tf.keras.layers.Dense(512, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    outputs = tf.keras.layers.Dense(num_labels)(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model

In [None]:
# ======================
# Pure LSTM Model
# ======================
def create_lstm_model(input_shape, num_labels):
    inputs = tf.keras.layers.Input(shape=input_shape)

    # First LSTM layer
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(256, return_sequences=True))(inputs)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    # Second LSTM layer
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    x = tf.keras.layers.Dense(512, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    outputs = tf.keras.layers.Dense(num_labels)(x)

    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model


# ======================
# Pure GRU Model
# ======================
def create_gru_model(input_shape, num_labels):
    inputs = tf.keras.layers.Input(shape=input_shape)

    # First GRU layer
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(256, return_sequences=True))(inputs)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    # Second GRU layer
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(128, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    x = tf.keras.layers.Dense(512, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    outputs = tf.keras.layers.Dense(num_labels)(x)

    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model


# ======================
# GRU + LSTM Hybrid Model
# ======================
def create_gru_lstm_model(input_shape, num_labels):
    inputs = tf.keras.layers.Input(shape=input_shape)

    # First GRU block
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(256, return_sequences=True))(inputs)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    # Then LSTM block
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(256, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.3, seed=SEED)(x)

    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    x = tf.keras.layers.Dense(512, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4, seed=SEED)(x)

    outputs = tf.keras.layers.Dense(num_labels)(x)

    model = tf.keras.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model


In [None]:
# Example input shape (adjust to your dataset)
# For speech features: (timesteps, feature_dim), e.g. (126, 155)
input_shape = (126, 155)
num_labels = 5  # number of emotion classes

# ==== CNN ====
print("\n===== CNN Model =====")
cnn_model = create_cnn_model(input_shape, num_labels)
cnn_model.summary()

# ==== CNN + LSTM ====
print("\n===== CNN + LSTM Model =====")
cnn_lstm_model = create_cnn_lstm_model(input_shape, num_labels)
cnn_lstm_model.summary()

# ==== CNN + GRU ====
print("\n===== CNN + GRU Model =====")
cnn_gru_model = create_cnn_gru_model(input_shape, num_labels)
cnn_gru_model.summary()

# ==== Pure LSTM ====
print("\n===== LSTM Model =====")
lstm_model = create_lstm_model(input_shape, num_labels)
lstm_model.summary()

# ==== Pure GRU ====
print("\n===== GRU Model =====")
gru_model = create_gru_model(input_shape, num_labels)
gru_model.summary()

# ==== GRU + LSTM Hybrid ====
print("\n===== GRU + LSTM Model =====")
gru_lstm_model = create_gru_lstm_model(input_shape, num_labels)
gru_lstm_model.summary()

In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import (
    f1_score,
    classification_report,
    confusion_matrix
)

# --- callbacks: monitor val_accuracy instead of val_f1 ---
def create_callbacks(model_name, val_ds=None):
    # val_ds kept for signature compatibility; not used here
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        mode='max',
        patience=10,
        restore_best_weights=True,
        verbose=1
    )

    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',   # keep LR logic on val_loss
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    )

    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
        filepath=f'best_{model_name}_model.h5',
        monitor='val_accuracy',
        mode='max',
        save_best_only=True,
        verbose=1
    )

    return [early_stopping, reduce_lr, model_checkpoint]


# --- plot curves after training completes ---
def plot_training_curves(history, model_name):
    hist = history.history

    # Accuracy
    if 'accuracy' in hist and 'val_accuracy' in hist:
        plt.figure(figsize=(7,5))
        plt.plot(hist['accuracy'], label='train_accuracy')
        plt.plot(hist['val_accuracy'], label='val_accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title(f'{model_name} - Accuracy')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(f'{model_name}_accuracy_curve.png')
        plt.close()

    # Loss
    if 'loss' in hist and 'val_loss' in hist:
        plt.figure(figsize=(7,5))
        plt.plot(hist['loss'], label='train_loss')
        plt.plot(hist['val_loss'], label='val_loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title(f'{model_name} - Loss')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(f'{model_name}_loss_curve.png')
        plt.close()


# --- training: same as yours, with callbacks + plotting at the end ---
def train_model(model, model_name, train_ds, val_ds, epochs=100):
    """
    IMPORTANT: compile your model with metrics=['accuracy'] 
    so val_accuracy exists for the callbacks and plots.
    """
    callbacks = create_callbacks(model_name, val_ds)
    print(f"\nTraining {model_name} model...")
    history = model.fit(
        train_ds,
        epochs=epochs,
        validation_data=val_ds,
        callbacks=callbacks,
        verbose=1
    )
    # Load the best-by-val_accuracy weights
    model.load_weights(f'best_{model_name}_model.h5')

    # Plot curves after training completes
    plot_training_curves(history, model_name)

    return model, history


# --- evaluation: unchanged logic, with a tiny robustness tweak ---
def evaluate_model(model, model_name, test_ds, reverse_mapping):
    print(f"\nEvaluating {model_name} model...")

    # Be robust to metric list shape
    eval_out = model.evaluate(test_ds, return_dict=True, verbose=1)
    test_loss = float(eval_out.get('loss', np.nan))
    test_acc  = float(eval_out.get('accuracy', np.nan))
    print(f"Test loss: {test_loss:.4f}")
    print(f"Test accuracy: {test_acc:.4f}")

    # Predictions for confusion matrix and reports
    y_pred, y_true = [], []
    for batch_x, batch_y in test_ds:
        batch_prob = model.predict(batch_x, verbose=0)
        batch_pred = np.argmax(batch_prob, axis=1)
        y_pred.extend(batch_pred)
        y_true.extend(batch_y.numpy())

    y_pred = np.array(y_pred)
    y_true = np.array(y_true)

    # Class-wise accuracies
    class_accuracies = {}
    for i in range(len(reverse_mapping)):
        class_indices = np.where(y_true == i)[0]
        if len(class_indices) > 0:
            class_correct = np.sum(y_pred[class_indices] == i)
            class_accuracies[reverse_mapping[i]] = class_correct / len(class_indices)

    # (Macro) average over classes (your original logic)
    weighted_avg_accuracy = sum(class_accuracies.values()) / len(class_accuracies)

    print(f"\nWeighted Average Accuracy (WAA): {weighted_avg_accuracy:.4f}")
    print("\nClass-wise accuracies:")
    for emotion, acc in class_accuracies.items():
        print(f"{emotion}: {acc:.4f}")

    # Classification report
    print("\nClassification Report:")
    target_names = [reverse_mapping[i] for i in range(len(reverse_mapping))]
    print(classification_report(y_true, y_pred, target_names=target_names,digits = 4))

    macro_f1 = f1_score(y_true, y_pred, average='macro')
    print(f"Macro F1-score: {macro_f1:.4f}")

    # Confusion matrix
    plt.figure(figsize=(10, 8))
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=target_names, yticklabels=target_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'Confusion Matrix - {model_name}')
    plt.tight_layout()
    plt.savefig(f'{model_name}_confusion_matrix.png')
    plt.close()

    return test_acc, weighted_avg_accuracy, macro_f1, class_accuracies, y_pred, y_true, cm

In [None]:
# ==============================
# Train all models
# ==============================

# CNN
cnn_model = create_cnn_model(input_shape, num_labels)
cnn_model, cnn_history = train_model(cnn_model, 'cnn', train_ds, val_ds)

# CNN-LSTM
cnn_lstm_model = create_cnn_lstm_model(input_shape, num_labels)
cnn_lstm_model, cnn_lstm_history = train_model(cnn_lstm_model, 'cnn_lstm', train_ds, val_ds)

# CNN-GRU
cnn_gru_model = create_cnn_gru_model(input_shape, num_labels)
cnn_gru_model, cnn_gru_history = train_model(cnn_gru_model, 'cnn_gru', train_ds, val_ds)

# Pure LSTM
lstm_model = create_lstm_model(input_shape, num_labels)
lstm_model, lstm_history = train_model(lstm_model, 'lstm', train_ds, val_ds)

# Pure GRU
gru_model = create_gru_model(input_shape, num_labels)
gru_model, gru_history = train_model(gru_model, 'gru', train_ds, val_ds)

# GRU + LSTM Hybrid
gru_lstm_model = create_gru_lstm_model(input_shape, num_labels)
gru_lstm_model, gru_lstm_history = train_model(gru_lstm_model, 'gru_lstm', train_ds, val_ds)


# ==============================
# Evaluate all models
# ==============================

cnn_results       = evaluate_model(cnn_model, 'cnn', test_ds, reverse_mapping)
cnn_lstm_results  = evaluate_model(cnn_lstm_model, 'cnn_lstm', test_ds, reverse_mapping)
cnn_gru_results   = evaluate_model(cnn_gru_model, 'cnn_gru', test_ds, reverse_mapping)
lstm_results      = evaluate_model(lstm_model, 'lstm', test_ds, reverse_mapping)
gru_results       = evaluate_model(gru_model, 'gru', test_ds, reverse_mapping)
gru_lstm_results  = evaluate_model(gru_lstm_model, 'gru_lstm', test_ds, reverse_mapping)

In [None]:
def compare_training_curves(histories, model_names):
    """Plot train vs val accuracy/loss for multiple models on the same figure."""
    plt.figure(figsize=(14,6))
    
    # --- Accuracy ---
    plt.subplot(1,2,1)
    for hist, name in zip(histories, model_names):
        # handle both History objects and dicts
        h = hist.history if hasattr(hist, "history") else hist
        plt.plot(h['accuracy'], label=f'{name} Train')
        plt.plot(h['val_accuracy'], linestyle='--', label=f'{name} Val')
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title("Train vs Validation Accuracy")
    plt.legend()
    plt.grid(alpha=0.3)

    # --- Loss ---
    plt.subplot(1,2,2)
    for hist, name in zip(histories, model_names):
        h = hist.history if hasattr(hist, "history") else hist
        plt.plot(h['loss'], label=f'{name} Train')
        plt.plot(h['val_loss'], linestyle='--', label=f'{name} Val')
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Train vs Validation Loss")
    plt.legend()
    plt.grid(alpha=0.3)

    plt.tight_layout()
    plt.savefig("all_models_training_curves.png", dpi=300)
    plt.show()

In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

# Function to create ensemble predictions for any combination of models
def create_combination_predictions(models, dataset):
    """Create ensemble predictions by averaging the softmax outputs of any combination of models."""
    predictions = [tf.nn.softmax(model.predict(dataset)).numpy() for model in models]
    ensemble_preds = np.mean(predictions, axis=0)
    return ensemble_preds


# ======================================================
# Define all models you want to compare
# Get all combinations of models for ensemble (single, pairwise, and full triple)
model_combinations = [
    [cnn_model],                   # Single model (CNN)
    [lstm_model],                  # Single model (LSTM)
    [gru_model],                   # Single model (GRU)
    [cnn_model, lstm_model],       # Pairwise (CNN + LSTM)
    [cnn_model, gru_model],        # Pairwise (CNN + GRU)
    [lstm_model, gru_model],       # Pairwise (LSTM + GRU)
    [cnn_model, lstm_model, gru_model]  # Triple ensemble (CNN + LSTM + GRU)
]

# Model names for plotting
model_names = [
    'CNN',
    'LSTM',
    'GRU',
    'CNN + LSTM',
    'CNN + GRU',
    'LSTM + GRU',
    'CNN + LSTM + GRU'   # ‚úÖ Fixed naming for triple ensemble
]


# ======================================================
# Calculate accuracy and WAA for each combination
# ======================================================
accuracies = []
waas = []

true_labels = np.concatenate([y for x, y in test_ds], axis=0)  # ground truth once

for models in model_combinations:
    ensemble_preds = create_combination_predictions(models, test_ds)
    ensemble_pred_classes = np.argmax(ensemble_preds, axis=1)

    # Accuracy
    ensemble_acc = np.mean(ensemble_pred_classes == true_labels)
    accuracies.append(ensemble_acc)

    # Weighted Average Accuracy (macro over classes)
    ensemble_class_accuracies = {}
    for cls in range(num_labels):
        mask = true_labels == cls
        if np.sum(mask) > 0:
            class_acc = np.mean(ensemble_pred_classes[mask] == true_labels[mask])
            ensemble_class_accuracies[reverse_mapping[cls]] = class_acc
    ensemble_waa = np.mean(list(ensemble_class_accuracies.values()))
    waas.append(ensemble_waa)


# ======================================================
# Visualization (Horizontal Bar Plots)
# ======================================================
plt.figure(figsize=(14, 8))

# --- Accuracy ---
plt.subplot(1, 2, 1)
bars1 = plt.barh(model_names, accuracies, color=plt.cm.Set2.colors, alpha=0.9)
plt.xlabel('Accuracy', fontsize=16, fontweight='bold')
plt.ylabel('Model', fontsize=16, fontweight='bold')
plt.title('Test Accuracy Comparison', fontsize=18, fontweight='bold')
for i, bar in enumerate(bars1):
    plt.text(bar.get_width() + 0.001, bar.get_y() + bar.get_height()/2,
             f'{accuracies[i]:.4f}', va='center', fontsize=12, fontweight='bold')

# --- WAA ---
plt.subplot(1, 2, 2)
bars2 = plt.barh(model_names, waas, color=plt.cm.Set2.colors, alpha=0.9)
plt.xlabel('Weighted Average Accuracy', fontsize=16, fontweight='bold')
plt.ylabel('Model', fontsize=16, fontweight='bold')
plt.title('Weighted Average Accuracy Comparison', fontsize=18, fontweight='bold')
for i, bar in enumerate(bars2):
    plt.text(bar.get_width() + 0.001, bar.get_y() + bar.get_height()/2,
             f'{waas[i]:.4f}', va='center', fontsize=12, fontweight='bold')

plt.tight_layout()
plt.savefig("model_comparison.png", dpi=300, bbox_inches="tight")
plt.show()


# ======================================================
# Save results to file
# ======================================================
with open('model_results.txt', 'w') as f:
    f.write("Speech Emotion Recognition Model Results\n")
    f.write("=======================================\n\n")
    f.write("Summary of Results:\n")
    f.write("-----------------\n")
    f.write(f"Best model by accuracy: {model_names[np.argmax(accuracies)]} ({max(accuracies):.4f})\n")
    f.write(f"Best model by WAA: {model_names[np.argmax(waas)]} ({max(waas):.4f})\n\n")
    f.write("Detailed Model Performance:\n")
    f.write("-------------------------\n")
    for i, model_name in enumerate(model_names):
        f.write(f"{model_name} Model:\n")
        f.write(f"  Test Accuracy: {accuracies[i]:.4f}\n")
        f.write(f"  Weighted Avg Accuracy (WAA): {waas[i]:.4f}\n")
        f.write("\n")

In [None]:
import matplotlib.pyplot as plt

# =====================
# Example Data (replace with your real results)
# =====================
model_names = ['CNN', 'CNN+LSTM', 'CNN+GRU', 'LSTM', 'GRU', 'LSTM+GRU']
accuracies = [0.9680, 0.9620, 0.9680, 0.9380, 0.9500, 0.9600]
# waas = [0.9680, 0.9620, 0.9680, 0.9380, 0.9500, 0.9600]

# =====================
# Plot 1: Test Accuracy
# =====================
plt.figure(figsize=(8, 6))
bars1 = plt.barh(model_names, accuracies, color=plt.cm.Set2.colors, alpha=0.9)

# Labels and Title
plt.xlabel('Accuracy', fontsize=16, fontweight='bold')
plt.ylabel('Model', fontsize=16, fontweight='bold')
plt.title('Test Accuracy Comparison', fontsize=18, fontweight='bold')

# Show values on bars
for i, bar in enumerate(bars1):
    plt.text(bar.get_width() + 0.001, bar.get_y() + bar.get_height()/2,
             f'{accuracies[i]:.4f}', va='center', fontsize=13, fontweight='bold')

plt.box(False)  # Remove plot box
plt.tight_layout()
plt.savefig("test_accuracy_comparison.png", dpi=300, bbox_inches="tight")
plt.show()


# # =====================
# # Plot 2: Weighted Average Accuracy
# # =====================
# plt.figure(figsize=(8, 6))
# bars2 = plt.barh(model_names, waas, color=plt.cm.Set2.colors, alpha=0.9)

# # Labels and Title
# plt.xlabel('Weighted Average Accuracy', fontsize=16, fontweight='bold')
# plt.ylabel('Model', fontsize=16, fontweight='bold')
# plt.title('Weighted Average Accuracy Comparison', fontsize=18, fontweight='bold')

# # Show values on bars
# for i, bar in enumerate(bars2):
#     plt.text(bar.get_width() + 0.001, bar.get_y() + bar.get_height()/2,
#              f'{waas[i]:.4f}', va='center', fontsize=13, fontweight='bold')

# plt.box(False)  # Remove plot box
# plt.tight_layout()
# plt.savefig("waa_comparison.png", dpi=300, bbox_inches="tight")
# plt.show()


In [None]:
# === Define input shape and number of labels ===
input_shape = (126, 155)
num_labels = 5

# === Load CNN ===
cnn_model = create_cnn_model(input_shape, num_labels)
cnn_model.load_weights('best_cnn_model.h5')

# === Load LSTM ===
lstm_model = create_lstm_model(input_shape, num_labels)
lstm_model.load_weights('best_lstm_model.h5')

# === Load GRU ===
gru_model = create_gru_model(input_shape, num_labels)
gru_model.load_weights('best_gru_model.h5')

# === Load CNN + LSTM ===
cnn_lstm_model = create_cnn_lstm_model(input_shape, num_labels)
cnn_lstm_model.load_weights('best_cnn_lstm_model.h5')

# === Load CNN + GRU ===
cnn_gru_model = create_cnn_gru_model(input_shape, num_labels)
cnn_gru_model.load_weights('best_cnn_gru_model.h5')

# === Load GRU + LSTM ===
gru_lstm_model = create_gru_lstm_model(input_shape, num_labels)
gru_lstm_model.load_weights('best_gru_lstm_model.h5')

In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_curve,
    auc
)
from sklearn.preprocessing import label_binarize

# ---------- helpers ----------
def collect_preds_and_labels(model, test_ds, num_labels):
    """Return y_true(int), y_prob(probabilities [N, C])."""
    y_true_list, y_prob_list = [], []
    for x, y in test_ds:
        # Convert one-hot labels to int if necessary
        if len(y.shape) > 1 and y.shape[-1] == num_labels:
            y_true_batch = np.argmax(y.numpy(), axis=1)
        else:
            y_true_batch = y.numpy().astype(int).reshape(-1)

        prob = model.predict(x, verbose=0)
        y_true_list.append(y_true_batch)
        y_prob_list.append(prob)

    y_true = np.concatenate(y_true_list, axis=0)
    y_prob = np.concatenate(y_prob_list, axis=0)

    # Ensure probabilities
    if not np.allclose(np.sum(y_prob, axis=1), 1.0, atol=1e-3):
        y_prob = tf.nn.softmax(y_prob, axis=1).numpy()

    return y_true, y_prob


def plot_confusion_matrix(cm, class_names, model_name, normalize=False):
    """Green variant confusion matrix with bold axis labels."""
    if normalize:
        cm_disp = cm.astype('float') / cm.sum(axis=1, keepdims=True).clip(min=1e-12)
        fmt = ".2f"
        title = f"Confusion Matrix (Normalized) - {model_name}"
    else:
        cm_disp = cm
        fmt = "d"
        title = f"Confusion Matrix - {model_name}"

    plt.figure(figsize=(8, 6))
    im = plt.imshow(cm_disp, interpolation='nearest', cmap='Greens')
    plt.title(title, fontweight='bold')
    plt.colorbar(im, fraction=0.046, pad=0.04)
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45, ha='right', fontweight='bold')
    plt.yticks(tick_marks, class_names, fontweight='bold')

    thresh = cm_disp.max() / 2.0
    for i in range(cm_disp.shape[0]):
        for j in range(cm_disp.shape[1]):
            plt.text(j, i, format(cm_disp[i, j], fmt),
                     ha="center", va="center",
                     color="white" if cm_disp[i, j] > thresh else "black",
                     fontweight='bold')

    plt.ylabel('True label', fontweight='bold')
    plt.xlabel('Predicted label', fontweight='bold')
    plt.tight_layout()
    plt.show()


def plot_roc_curves(y_true, y_prob, class_names, model_name):
    """One-vs-rest ROC for each class + micro and macro averages."""
    num_labels = len(class_names)

    y_true_bin = label_binarize(y_true, classes=list(range(num_labels)))
    if y_true_bin.shape[1] < num_labels:
        pad = np.zeros((y_true_bin.shape[0], num_labels - y_true_bin.shape[1]))
        y_true_bin = np.concatenate([y_true_bin, pad], axis=1)

    fpr, tpr, roc_auc = {}, {}, {}

    for i in range(num_labels):
        fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Micro-average
    fpr["micro"], tpr["micro"], _ = roc_curve(y_true_bin.ravel(), y_prob.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    # Macro-average
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(num_labels)]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(num_labels):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= num_labels
    fpr["macro"], tpr["macro"], roc_auc["macro"] = all_fpr, mean_tpr, auc(all_fpr, mean_tpr)

    plt.figure(figsize=(8, 6))
    plt.plot([0, 1], [0, 1], linestyle='--', linewidth=1, color='gray')

    for i in range(num_labels):
        plt.plot(fpr[i], tpr[i], label=f'{class_names[i]} (AUC = {roc_auc[i]:.3f})')

    plt.plot(fpr["micro"], tpr["micro"], linewidth=2, label=f'micro-average (AUC = {roc_auc["micro"]:.3f})')
    plt.plot(fpr["macro"], tpr["macro"], linewidth=2, label=f'macro-average (AUC = {roc_auc["macro"]:.3f})')

    plt.xlabel('False Positive Rate', fontweight='bold')
    plt.ylabel('True Positive Rate', fontweight='bold')
    plt.title(f'ROC Curves - {model_name}', fontweight='bold')
    plt.legend(loc='lower right', fontsize='small')
    plt.tight_layout()
    plt.show()

    print("\nAUC per class:")
    for i in range(num_labels):
        print(f"  {class_names[i]}: {roc_auc[i]:.4f}")
    print(f"Micro-average AUC: {roc_auc['micro']:.4f}")
    print(f"Macro-average AUC: {roc_auc['macro']:.4f}")


def evaluate_full(model, model_name, test_ds, class_names):
    num_labels = len(class_names)
    y_true, y_prob = collect_preds_and_labels(model, test_ds, num_labels)
    y_pred = np.argmax(y_prob, axis=1)

    print(f"\n=== {model_name} : Classification Report ===")
    print(classification_report(y_true, y_pred, target_names=class_names, digits=4))

    cm = confusion_matrix(y_true, y_pred, labels=list(range(num_labels)))
    plot_confusion_matrix(cm, class_names, model_name, normalize=False)
    plot_confusion_matrix(cm, class_names, model_name, normalize=True)

    plot_roc_curves(y_true, y_prob, class_names, model_name)


# ---------- usage ----------
class_names = ["happy", "sad", "angry", "surprise", "neutral"]

# Evaluate all six models
evaluate_full(cnn_model,       "CNN",        test_ds, class_names)
evaluate_full(lstm_model,      "LSTM",       test_ds, class_names)
evaluate_full(gru_model,       "GRU",        test_ds, class_names)
evaluate_full(cnn_lstm_model,  "CNN+LSTM",   test_ds, class_names)
evaluate_full(cnn_gru_model,   "CNN+GRU",    test_ds, class_names)
evaluate_full(gru_lstm_model,  "GRU+LSTM",   test_ds, class_names)


In [None]:
import itertools
from collections import Counter

# ---------- Ensemble helpers ----------
def get_model_preds(models, test_ds, num_labels):
    """Return list of (y_true, y_prob) for each model on test_ds."""
    all_preds = []
    for m in models:
        y_true, y_prob = collect_preds_and_labels(m, test_ds, num_labels)
        all_preds.append((y_true, y_prob))
    return all_preds


def ensemble_predictions(all_preds, method="mean"):
    """
    Combine predictions from multiple models.
    all_preds: list of (y_true, y_prob)
    method: 'mean' | 'maxvote' | 'confidence'
    """
    # All y_true should be the same
    y_true = all_preds[0][0]
    probs = [p for (_, p) in all_preds]
    num_models = len(probs)

    if method == "mean":
        y_prob = np.mean(probs, axis=0)

    elif method == "maxvote":
        preds = [np.argmax(p, axis=1) for p in probs]
        y_pred = []
        for i in range(len(y_true)):
            votes = [pred[i] for pred in preds]
            most_common = Counter(votes).most_common(1)[0][0]
            y_pred.append(most_common)
        y_pred = np.array(y_pred)
        # make one-hot like probabilities
        y_prob = np.zeros_like(probs[0])
        y_prob[np.arange(len(y_pred)), y_pred] = 1.0

    elif method == "confidence":
        y_prob = np.zeros_like(probs[0])
        y_pred = []
        for i in range(len(y_true)):
            # Pick model with highest confidence
            best_idx = np.argmax([np.max(p[i]) for p in probs])
            y_prob[i] = probs[best_idx][i]
        y_pred = np.argmax(y_prob, axis=1)

    else:
        raise ValueError("Unknown method")

    return y_true, y_prob


def evaluate_ensemble(all_preds, method, class_names, combo_name):
    y_true, y_prob = ensemble_predictions(all_preds, method=method)
    y_pred = np.argmax(y_prob, axis=1)

    print(f"\n=== Ensemble ({method}) on {combo_name} ===")
    print(classification_report(y_true, y_pred, target_names=class_names, digits=4))

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    plot_confusion_matrix(cm, class_names, f"Ensemble-{method}-{combo_name}", normalize=False)
    plot_confusion_matrix(cm, class_names, f"Ensemble-{method}-{combo_name}", normalize=True)

    plot_roc_curves(y_true, y_prob, class_names, f"Ensemble-{method}-{combo_name}")


# ---------- Usage ----------
class_names = ["happy", "sad", "angry", "surprise", "neutral"]

# list of models (already loaded with weights)
models = [
    cnn_model,
    lstm_model,
    gru_model,
    cnn_lstm_model,
    cnn_gru_model,
    gru_lstm_model
]

# all model predictions
num_labels = len(class_names)
all_preds = get_model_preds(models, test_ds, num_labels)

# Try ensembles on ALL models together
for method in ["mean", "maxvote", "confidence"]:
    evaluate_ensemble(all_preds, method, class_names, combo_name="All_6")

# If you want all possible COMBINATIONS (pairs, triplets, etc.)
for r in range(2, len(models) + 1):
    for subset_idx in itertools.combinations(range(len(models)), r):
        subset_models = [all_preds[i] for i in subset_idx]
        combo_name = "+".join([f"M{i+1}" for i in subset_idx])
        for method in ["mean", "maxvote", "confidence"]:
            evaluate_ensemble(subset_models, method, class_names, combo_name)


In [None]:
import numpy as np
import itertools
from collections import Counter
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

# ---------- Ensemble helpers ----------
def get_model_preds(models, test_ds, num_labels):
    """Return list of (y_true, y_prob) for each model on test_ds."""
    all_preds = []
    for m in models:
        y_true, y_prob = collect_preds_and_labels(m, test_ds, num_labels)
        all_preds.append((y_true, y_prob))
    return all_preds


def ensemble_predictions(all_preds, method="mean"):
    """Combine predictions from multiple models."""
    y_true = all_preds[0][0]
    probs = [p for (_, p) in all_preds]

    if method == "mean":
        y_prob = np.mean(probs, axis=0)

    elif method == "maxvote":
        preds = [np.argmax(p, axis=1) for p in probs]
        y_pred = []
        for i in range(len(y_true)):
            votes = [pred[i] for pred in preds]
            most_common = Counter(votes).most_common(1)[0][0]
            y_pred.append(most_common)
        y_pred = np.array(y_pred)
        y_prob = np.zeros_like(probs[0])
        y_prob[np.arange(len(y_pred)), y_pred] = 1.0

    elif method == "confidence":
        y_prob = np.zeros_like(probs[0])
        for i in range(len(y_true)):
            best_idx = np.argmax([np.max(p[i]) for p in probs])
            y_prob[i] = probs[best_idx][i]

    else:
        raise ValueError("Unknown method")

    return y_true, y_prob


def evaluate_ensemble(all_preds, method, class_names, combo_name):
    """Evaluate ensemble and print metrics + plots."""
    y_true, y_prob = ensemble_predictions(all_preds, method=method)
    y_pred = np.argmax(y_prob, axis=1)

    acc  = accuracy_score(y_true, y_pred)
    rec  = recall_score(y_true, y_pred, average="macro")
    prec = precision_score(y_true, y_pred, average="macro")
    f1   = f1_score(y_true, y_pred, average="macro")

    print(f"\n=== Ensemble ({method}) on {combo_name} ===")
    print(f"Accuracy:  {acc:.4f}")
    print(f"Recall:    {rec:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"F1 score:  {f1:.4f}")
    print(classification_report(y_true, y_pred, target_names=class_names, digits=4))

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
    plot_confusion_matrix(cm, class_names, f"Ensemble-{method}-{combo_name}", normalize=False)
    plot_confusion_matrix(cm, class_names, f"Ensemble-{method}-{combo_name}", normalize=True)

    plot_roc_curves(y_true, y_prob, class_names, f"Ensemble-{method}-{combo_name}")


# ---------- Usage ----------
class_names = ["happy", "sad", "angry", "surprise", "neutral"]

# Only using CNN, CNN-GRU, CNN-LSTM
models = [cnn_model, cnn_gru_model, cnn_lstm_model]
num_labels = len(class_names)

# Collect predictions once
all_preds = get_model_preds(models, test_ds, num_labels)

# Model labels for pretty printing
model_names = ["CNN", "CNN_GRU", "CNN_LSTM"]

# Iterate over all combinations (pairs and all 3)
for r in range(2, len(models) + 1):
    for subset_idx in itertools.combinations(range(len(models)), r):
        subset_preds = [all_preds[i] for i in subset_idx]
        combo_name = "+".join([model_names[i] for i in subset_idx])
        for method in ["mean", "maxvote", "confidence"]:
            evaluate_ensemble(subset_preds, method, class_names, combo_name)

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import os

# ---------- Helper to collect preds ----------
def collect_model_preds(model, test_ds, num_labels):
    y_true, y_prob = collect_preds_and_labels(model, test_ds, num_labels)
    y_pred = np.argmax(y_prob, axis=1)
    return y_true, y_pred

def save_conf_matrix_norm(y_true, y_pred, class_names, title, save_dir="conf_matrices_norm"):
    os.makedirs(save_dir, exist_ok=True)
    
    cm = confusion_matrix(y_true, y_pred)
    cm_norm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]

    plt.figure(figsize=(6, 5))
    sns.heatmap(cm_norm, annot=True, fmt=".2f", cmap="Blues",
                xticklabels=class_names, yticklabels=class_names)
    plt.title(f"Normalized Confusion Matrix - {title}")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    
    # ‚úÖ Save without cutting off labels
    plt.savefig(os.path.join(save_dir, f"{title}_cm_norm.png"), bbox_inches="tight")
    plt.close()

# ---------- Class names ----------
class_names = ["happy", "sad", "angry", "surprise", "neutral"]
num_labels = len(class_names)

# ---------- Your 6 models (already loaded with .h5 weights) ----------
models = {
    "CNN": cnn_model,
    "CNN_LSTM": cnn_lstm_model,
    "CNN_GRU": cnn_gru_model,
    "GRU": gru_model,
    "LSTM": lstm_model,
    "GRU_LSTM": gru_lstm_model
}

# ---------- Evaluate & Save ----------
for name, model in models.items():
    print(f"\n=== Evaluating {name} ===")
    y_true, y_pred = collect_model_preds(model, test_ds, num_labels)

    # Print report to console
    print(classification_report(y_true, y_pred, target_names=class_names, digits=4))

    # Save only normalized confusion matrix
    save_conf_matrix_norm(y_true, y_pred, class_names, name)
