# Dataset Processing

## Frame Generating From Videos

In [None]:
import cv2
import os

dataset_path = r"D:\fundamentals\AI NTI\Second Final Project\RAVDESS dataset"
output_path = r"D:\fundamentals\AI NTI\Second Final Project\frames_output"
os.makedirs(output_path, exist_ok=True)

def extract_frames(video_path, output_folder, frames_per_sec=3):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0:
        print(f"Can not read: {video_path}")
        return

    frame_interval = int(fps / frames_per_sec)  
    video_name = os.path.splitext(os.path.basename(video_path))[0]

    
    video_folder = os.path.join(output_folder, video_name)
    os.makedirs(video_folder, exist_ok=True)

    count = 0
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_interval == 0:
            frame_filename = f"{video_name}_frame{count}.jpg"
            frame_path = os.path.join(video_folder, frame_filename)
            cv2.imwrite(frame_path, frame)
            count += 1

        frame_idx += 1

    cap.release()
    print(f"{count} frames saved for {video_name}")


for root, dirs, files in os.walk(dataset_path):
    for file in files:
        if file.endswith((".mp4", ".avi", ".mov")):
            video_path = os.path.join(root, file)
            extract_frames(video_path, output_path, frames_per_sec=3)


 ## Audio Generating From Videos

In [None]:
from moviepy.video.io.VideoFileClip import VideoFileClip
import tqdm

OUT_AUDIO_DIR = "ravdess_wavs"
os.makedirs(OUT_AUDIO_DIR, exist_ok=True)

def parse_label_from_filename(filename):
    parts = os.path.basename(filename).split('.')[0].split('-')
    emotion_code = parts[2]
    emotion_map = {
        '01':'neutral','02':'calm','03':'happy','04':'sad',
        '05':'angry','06':'fearful','07':'disgust','08':'surprised'
    }
    return emotion_map.get(emotion_code, 'unknown')

for root, _, files in os.walk(r"RAVDESS dataset"):
    for fname in tqdm([f for f in files if f.endswith('.mp4')]):
        video_path = os.path.join(root, fname)
        base = os.path.splitext(fname)[0]

        # 1) Extract audio
        wav_out = os.path.join(OUT_AUDIO_DIR, base + ".wav")
        if not os.path.exists(wav_out):
            clip = VideoFileClip(video_path)
            clip.audio.write_audiofile(wav_out, verbose=False, logger=None)
            clip.reader.close()
            clip.audio.reader.close_proc()


# Feature Extraction

## Audio Features Extraction

In [None]:
import librosa
import numpy as np
import os
import pandas as pd


dataset_path = r"D:\fundamentals\AI NTI\Second Final Project\ravdess_wavs"


def extract_features(file_path):
    try:
        y, sr = librosa.load(file_path, sr=22050)

        # MFCCs
        mfccs = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40).T, axis=0)

        # Zero Crossing Rate
        zcr = np.mean(librosa.feature.zero_crossing_rate(y).T, axis=0)

        # Spectral Centroid
        centroid = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr).T, axis=0)

        # Spectral Bandwidth
        bandwidth = np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr).T, axis=0)

        # Spectral Rolloff
        rolloff = np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr).T, axis=0)

        # Root Mean Square Energy
        rms = np.mean(librosa.feature.rms(y=y).T, axis=0)

        # all features
        features = np.hstack([mfccs, zcr, centroid, bandwidth, rolloff, rms])
        return features

    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None


# ========== mapping ==========
emotion_map = {
    "01": "neutral",
    "02": "calm",
    "03": "happy",
    "04": "sad",
    "05": "angry",
    "06": "fearful",
    "07": "disgust",
    "08": "surprised"
}

# ========== Apply Function==========
all_features = []
file_names = []
labels = []
video_ids = []

for file in os.listdir(dataset_path):
    if file.endswith(".wav") or file.endswith(".mp3"):
        file_path = os.path.join(dataset_path, file)

        # features extraction
        features = extract_features(file_path)
        if features is not None:
            all_features.append(features)
            file_names.append(file)

            code = file.split("-")[2]
            label = emotion_map.get(code, "unknown")
            labels.append(label)


            video_id = os.path.splitext(file)[0]
            video_ids.append(video_id)

# ========== Save CSV ==========
df = pd.DataFrame(all_features)
df.insert(0, "video_id", video_ids)     
df.insert(1, "file_name", file_names)   
df.insert(2, "label", labels)           
df.to_csv("ravdess_features.csv", index=False)

print("Feature extraction done!")


## Frames Features Extraction

In [None]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from tensorflow.keras.utils import load_img, img_to_array
from keras_vggface.vggface import VGGFace
from keras_vggface.utils import preprocess_input
from tensorflow.keras.layers import Dense, Activation
import tensorflow as tf

# VGGFace (ResNet50 backbone)
model = VGGFace(model='resnet50', include_top=False, pooling='avg')


frames_path = r"D:\fundamentals\AI NTI\Second Final Project\frames_output"

def extract_image_features(img_path):
    img = load_img(img_path, target_size=(224, 224))
    img_array = img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array = preprocess_input(img_array, version=2)  
    features = model.predict(img_array, verbose=0)
    return features.flatten()

def attention_pooling(frame_feats):
    
    frame_feats = np.array(frame_feats)  # (T, D)
    T, D = frame_feats.shape

    
    dense = Dense(1)
    scores = dense(frame_feats)  # (T, 1)
    scores = tf.nn.softmax(scores, axis=0).numpy()  

    # weighted sum
    video_feat = np.sum(frame_feats * scores, axis=0)  # (D,)
    return video_feat


video_features = []
video_ids = []


for video_folder in tqdm(os.listdir(frames_path)):
    folder_path = os.path.join(frames_path, video_folder)
    if not os.path.isdir(folder_path):
        continue

    frame_feats = []
    for file in os.listdir(folder_path):
        if file.endswith((".jpg", ".png")):
            img_path = os.path.join(folder_path, file)
            feats = extract_image_features(img_path)
            frame_feats.append(feats)

    if len(frame_feats) > 0:
        # attention pooling 
        video_feat = attention_pooling(frame_feats)
        video_features.append(video_feat)

        
        video_ids.append(video_folder)


df = pd.DataFrame(video_features)
df.insert(0, "video_id", video_ids)


df.to_csv("video_features_vggface_attention.csv", index=False)
print("Feature extraction done!")


# Feature Fusion

In [None]:
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Concatenate, MultiHeadAttention
from tensorflow.keras.models import Model

# ========= 1) =========
audio_df = pd.read_csv("ravdess_features.csv")
video_df = pd.read_csv("video_features_vggface_attention.csv")

# ========= 2)  =========
df = pd.merge(audio_df, video_df, on="video_id")

# ========= 3) =========
audio_feats = df.iloc[:, 3:48].values  # audio features
video_feats = df.iloc[:, 48:].values   # video features
labels = df["label"]
video_ids = df["video_id"]

# ========= 4) Normalization =========
scaler_audio = StandardScaler()
scaler_video = StandardScaler()
audio_scaled = scaler_audio.fit_transform(audio_feats)
video_scaled = scaler_video.fit_transform(video_feats)

# ========= 5) PCA  =========
pca = PCA(n_components=300, random_state=42)
video_reduced = pca.fit_transform(video_scaled)

# ========= 6) Fusion Models =========

# --- Inputs ---
audio_in = Input(shape=(audio_scaled.shape[1],))
video_in = Input(shape=(video_reduced.shape[1],))

# --- (1) Concatenation Fusion ---
fusion_concat = Concatenate()([audio_in, video_in])

# --- (2) Cross Attention Fusion ---
audio_proj = Dense(128, activation="relu")(audio_in)
video_proj = Dense(128, activation="relu")(video_in)

audio_seq = tf.expand_dims(audio_proj, axis=1)
video_seq = tf.expand_dims(video_proj, axis=1)

attn_audio = MultiHeadAttention(num_heads=4, key_dim=32)(audio_seq, video_seq)
attn_video = MultiHeadAttention(num_heads=4, key_dim=32)(video_seq, audio_seq)

attn_audio = tf.squeeze(attn_audio, axis=1)
attn_video = tf.squeeze(attn_video, axis=1)

fusion_attention = Concatenate()([attn_audio, attn_video])

# --- (3) Gated Fusion ---
gate_audio = Dense(1, activation="sigmoid")(audio_in)
gate_video = Dense(1, activation="sigmoid")(video_in)

gates = Concatenate()([gate_audio, gate_video])
gates = tf.keras.layers.Softmax(axis=-1)(gates)

alpha = gates[:, 0:1]
beta = gates[:, 1:2]

fusion_gated = Concatenate()([alpha * audio_in, beta * video_in])

# --- (Final Multi-View Fusion) ---
fusion_final = Concatenate()([fusion_concat, fusion_attention, fusion_gated])

# ========= 7) features Extraction =========
fusion_model = Model(inputs=[audio_in, video_in], outputs=fusion_final)
fusion_features = fusion_model.predict([audio_scaled, video_reduced], verbose=0)

# ========= 8) DataFrame =========
fusion_df = pd.DataFrame(fusion_features)
fusion_df.insert(0, "video_id", video_ids)
fusion_df.insert(1, "label", labels)

fusion_df.to_csv("fusion_features.csv", index=False)
print("Multi-View Fusion Done!")




✅ Multi-View Fusion Done! الملف fusion_features_multiview.csv اتعمل.


# Modelling

## Model train

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, classification_report
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from xgboost import XGBClassifier

# ========= 1) =========
fusion_df = pd.read_csv("fusion_features.csv")

# ========= 2) =========
labels = fusion_df["label"]
video_ids = fusion_df["video_id"]  # optional
features = fusion_df.drop(columns=["video_id", "label"])

# ========= 3) Label Encoding =========
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

# ========= 4) Train-Test-Validation Split =========
X_temp, X_test, y_temp, y_test = train_test_split(
    features, labels_encoded, test_size=0.2, random_state=42, stratify=labels_encoded
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.1, random_state=42, stratify=y_temp
)

# ========= 5) Scaling =========
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

# ========= 6) Base Classifiers =========
svm_clf = SVC(kernel='rbf', C=10, gamma='scale', probability=True, random_state=42)
rf_clf = RandomForestClassifier(n_estimators=300, max_depth=None, random_state=42)
xgb_clf = XGBClassifier(n_estimators=300, max_depth=6, learning_rate=0.05,
                        subsample=0.8, colsample_bytree=0.8, random_state=42)

# ========= 7) Stacking Classifier =========    
stack_clf = StackingClassifier(
    estimators=[('svm', svm_clf), ('rf', rf_clf), ('xgb', xgb_clf)],
    final_estimator=SVC(kernel='linear', probability=True, random_state=42),
    stack_method="predict_proba",
    n_jobs=-1
)

# ========= 8) Training =========
stack_clf.fit(X_train_scaled, y_train)

# ========= 9) Evaluation =========
y_val_pred = stack_clf.predict(X_val_scaled)
val_acc = accuracy_score(y_val, y_val_pred)

y_test_pred = stack_clf.predict(X_test_scaled)
test_acc = accuracy_score(y_test, y_test_pred)

y_val_pred_labels = label_encoder.inverse_transform(y_val_pred)
y_test_pred_labels = label_encoder.inverse_transform(y_test_pred)
y_val_labels = label_encoder.inverse_transform(y_val)
y_test_labels = label_encoder.inverse_transform(y_test)

print(f"✅ Validation Accuracy: {val_acc*100:.2f}%")
print(f"✅ Test Accuracy: {test_acc*100:.2f}%\n")

print("Classification Report (Test):")
print(classification_report(y_test_labels, y_test_pred_labels))


✅ Validation Accuracy: 93.97%
✅ Test Accuracy: 93.06%

Classification Report (Test):
              precision    recall  f1-score   support

       angry       0.97      0.89      0.93        38
        calm       0.93      0.97      0.95        38
     disgust       0.95      0.95      0.95        38
     fearful       0.95      0.90      0.92        39
       happy       0.97      1.00      0.99        39
     neutral       0.94      0.89      0.92        19
         sad       0.87      0.89      0.88        38
   surprised       0.88      0.92      0.90        39

    accuracy                           0.93       288
   macro avg       0.93      0.93      0.93       288
weighted avg       0.93      0.93      0.93       288



## Model Evaluation

In [7]:
y_val_pred = stack_clf.predict(X_val_scaled)
val_acc = accuracy_score(y_val, y_val_pred)

y_test_pred = stack_clf.predict(X_test_scaled)
test_acc = accuracy_score(y_test, y_test_pred)

print(f"✅ Validation Accuracy: {val_acc*100:.2f}%")
print(f"✅ Test Accuracy: {test_acc*100:.2f}%\n")

print("Classification Report (Test):")
print(classification_report(y_test, y_test_pred))

✅ Validation Accuracy: 93.97%
✅ Test Accuracy: 93.06%

Classification Report (Test):
              precision    recall  f1-score   support

       angry       0.97      0.89      0.93        38
        calm       0.93      0.97      0.95        38
     disgust       0.95      0.95      0.95        38
     fearful       0.95      0.90      0.92        39
       happy       0.97      1.00      0.99        39
     neutral       0.94      0.89      0.92        19
         sad       0.87      0.89      0.88        38
   surprised       0.88      0.92      0.90        39

    accuracy                           0.93       288
   macro avg       0.93      0.93      0.93       288
weighted avg       0.93      0.93      0.93       288



## Model Saving

In [None]:
# ========== 7) Save Everything ==========
import joblib
import os
os.makedirs("models", exist_ok=True)

# Fusion step
joblib.dump(scaler_audio, "models/scaler_audio.joblib")
joblib.dump(scaler_video, "models/scaler_video.joblib")
joblib.dump(pca, "models/pca_video.joblib")
fusion_model.save("models/fusion_model.h5")

# Training step
joblib.dump(scaler, "models/fusion_scaler.joblib")
joblib.dump(svm_clf, "models/svm_model.joblib")
joblib.dump(rf_clf, "models/rf_model.joblib")
joblib.dump(xgb_clf, "models/xgb_model.joblib")
joblib.dump(stack_clf, "models/stacking_model.joblib")
joblib.dump(label_encoder, "models/label_encoder.joblib")

print("All Fusion + Training models & scalers & encoders saved!")

✅ All Fusion + Training models & scalers & encoders saved!
