In [None]:
pip install opensmile

In [None]:
import os
import pandas as pd
import opensmile
import re
from tqdm import tqdm
import numpy as np

In [None]:
# Preprocessed Data for training
data_1 = pd.read_csv('/content/drive/My Drive/BECU Capstone_Duress/Data_preprocessed/RAVDESS DATA.csv')
data_2 = pd.read_csv('/content/drive/My Drive/BECU Capstone_Duress/Data_preprocessed/CREMAD DATA.csv')

In [None]:
# Scikit-learn
from sklearn.feature_selection import RFE
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight

# XGBoost
import xgboost as xgb

# SHAP
import shap

# TensorFlow/Keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam

In [None]:
# ================== key features ==================
def feature_selection(X, y, method='shap', n_features=30):

    if method == 'shap':
        # SHAP
        model = xgb.XGBClassifier(eval_metric='logloss')
        model.fit(X, y)

        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X)

        shap_importance = np.abs(shap_values).mean(axis=0)
        top_indices = np.argsort(shap_importance)[-n_features:]

    elif method == 'rfe':
        # Recursive feature elimination
        selector = RFE(
            estimator=RandomForestClassifier(n_estimators=100),
            n_features_to_select=n_features,
            step=0.1
        )
        selector.fit(X, y)
        top_indices = selector.get_support(indices=True)

    return top_indices

# ================== Load and Preprocess Data ==================
data1 = pd.concat([data_1, data_2], axis=0).reset_index(drop=True)
X = data1.drop(columns=['duress_label'])
y = data1['duress_label']

# Split before scaling to avoid data leakage
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# ================== Feature Selection ==================
selected1 = feature_selection(X_train_scaled, y_train, method='shap', n_features=100)
selected2 = feature_selection(X_train_scaled, y_train, method='rfe', n_features=100)

feature_names = X.columns.tolist()
intersect_names = list(set(X.columns[selected1]) & set(X.columns[selected2]))
selected_idx = [feature_names.index(f) for f in intersect_names]

X_train_sel = X_train_scaled[:, selected_idx]
X_test_sel = X_test_scaled[:, selected_idx]

# ================== LSTM Helper ==================
def reshape_for_lstm(X, timesteps=1):
    return X.reshape((X.shape[0], timesteps, X.shape[1]))

# ================== Cross Validation & Training ==================
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
best_auc = 0
auc_scores = []

for fold, (train_idx, val_idx) in enumerate(skf.split(X_train_sel, y_train), 1):
    print(f"\n===== Fold {fold} =====")
    X_tr, X_val = X_train_sel[train_idx], X_train_sel[val_idx]
    y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]

    X_tr_lstm = reshape_for_lstm(X_tr)
    X_val_lstm = reshape_for_lstm(X_val)

    lstm_model = Sequential([
        Input(shape=(1, X_tr.shape[1])),
        LSTM(64, return_sequences=True),
        Dropout(0.5),
        LSTM(32),
        Dense(16, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    lstm_model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])

    class_weights = compute_class_weight('balanced', classes=np.unique(y_tr), y=y_tr)
    class_weight_dict = dict(enumerate(class_weights))

    lstm_model.fit(X_tr_lstm, y_tr, validation_data=(X_val_lstm, y_val),
                   epochs=10, batch_size=32, verbose=0,
                   callbacks=[EarlyStopping(patience=5, restore_best_weights=True)],
                   class_weight=class_weight_dict)

    lstm_val_proba = lstm_model.predict(X_val_lstm).flatten()

    rf_model = RandomForestClassifier(n_estimators=300, class_weight='balanced', random_state=42)
    rf_model.fit(X_tr, y_tr)
    rf_val_proba = rf_model.predict_proba(X_val)[:, 1]

    lstm_auc = roc_auc_score(y_val, lstm_val_proba)
    rf_auc = roc_auc_score(y_val, rf_val_proba)
    final_val_proba = (lstm_val_proba * lstm_auc + rf_val_proba * rf_auc) / (lstm_auc + rf_auc)
    final_auc = roc_auc_score(y_val, final_val_proba)
    print(f"Fold-{fold} AUC: {final_auc:.4f}")
    auc_scores.append(final_auc)

    if final_auc > best_auc:
        best_auc = final_auc
        best_lstm_model = lstm_model
        best_rf_model = rf_model

# ================== Final Evaluation ==================
print("\n=== Cross-Validation Summary ===")
print(f"Mean AUC: {np.mean(auc_scores):.4f}")
print(f"Std AUC: {np.std(auc_scores):.4f}")
print(f"Best validation AUC: {best_auc:.4f}")

X_test_lstm = reshape_for_lstm(X_test_sel)
lstm_test_proba = best_lstm_model.predict(X_test_lstm).flatten()
rf_test_proba = best_rf_model.predict_proba(X_test_sel)[:, 1]
final_proba = (lstm_test_proba + rf_test_proba) / 2
final_pred = (final_proba > 0.5).astype(int)

print("\n=== Final Test Evaluation ===")
print(classification_report(y_test, final_pred))
print(f"Test AUC: {roc_auc_score(y_test, final_proba):.4f}")

In [None]:
# ================== Load and Preprocess Data ==================
data1 = pd.concat([data_1, data_2], axis=0).reset_index(drop=True)
X = data1.drop(columns=['duress_label'])
y = data1['duress_label']

# Split before scaling to avoid data leakage
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
                                                    stratify=y, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# ================== Feature Selection ==================
selected1 = feature_selection(X_train_scaled, y_train, method='shap', n_features=100)
selected2 = feature_selection(X_train_scaled, y_train, method='rfe', n_features=100)

feature_names = X.columns.tolist()
intersect_names = list(set(X.columns[selected1]) & set(X.columns[selected2]))
selected_idx = [feature_names.index(f) for f in intersect_names]

X_train_sel = X_train_scaled[:, selected_idx]
X_test_sel = X_test_scaled[:, selected_idx]

# ================== LSTM Helper ==================
def reshape_for_lstm(X, timesteps=1):
    return X.reshape((X.shape[0], timesteps, X.shape[1]))

# ================== Cross Validation & Training ==================
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
best_auc = 0
auc_scores = []

for fold, (train_idx, val_idx) in enumerate(skf.split(X_train_sel, y_train), 1):
    print(f"\n===== Fold {fold} =====")
    X_tr, X_val = X_train_sel[train_idx], X_train_sel[val_idx]
    y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]

    X_tr_lstm = reshape_for_lstm(X_tr)
    X_val_lstm = reshape_for_lstm(X_val)

    lstm_model = Sequential([
        Input(shape=(1, X_tr.shape[1])),
        LSTM(64, return_sequences=True),
        Dropout(0.5),
        LSTM(32),
        Dense(16, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    lstm_model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])

    class_weights = compute_class_weight('balanced', classes=np.unique(y_tr), y=y_tr)
    class_weight_dict = dict(enumerate(class_weights))

    lstm_model.fit(X_tr_lstm, y_tr, validation_data=(X_val_lstm, y_val),
                   epochs=10, batch_size=32, verbose=0,
                   callbacks=[EarlyStopping(patience=5, restore_best_weights=True)],
                   class_weight=class_weight_dict)

    lstm_val_proba = lstm_model.predict(X_val_lstm).flatten()

    rf_model = RandomForestClassifier(n_estimators=300, class_weight='balanced', random_state=42)
    rf_model.fit(X_tr, y_tr)
    rf_val_proba = rf_model.predict_proba(X_val)[:, 1]

    lstm_auc = roc_auc_score(y_val, lstm_val_proba)
    rf_auc = roc_auc_score(y_val, rf_val_proba)
    final_val_proba = (lstm_val_proba * lstm_auc + rf_val_proba * rf_auc) / (lstm_auc + rf_auc)
    final_auc = roc_auc_score(y_val, final_val_proba)
    print(f"Fold-{fold} AUC: {final_auc:.4f}")
    auc_scores.append(final_auc)

    if final_auc > best_auc:
        best_auc = final_auc
        best_lstm_model = lstm_model
        best_rf_model = rf_model

# ================== Final Evaluation ==================
print("\n=== Cross-Validation Summary ===")
print(f"Mean AUC: {np.mean(auc_scores):.4f}")
print(f"Std AUC: {np.std(auc_scores):.4f}")
print(f"Best validation AUC: {best_auc:.4f}")

X_test_lstm = reshape_for_lstm(X_test_sel)
lstm_test_proba = best_lstm_model.predict(X_test_lstm).flatten()
rf_test_proba = best_rf_model.predict_proba(X_test_sel)[:, 1]
final_proba = (lstm_test_proba + rf_test_proba) / 2
final_pred = (final_proba > 0.5).astype(int)

print("\n=== Final Test Evaluation ===")
print(classification_report(y_test, final_pred))
print(f"Test AUC: {roc_auc_score(y_test, final_proba):.4f}")


===== Fold 1 =====
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 12ms/step
Fold-1 AUC: 0.8438

===== Fold 2 =====
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 9ms/step
Fold-2 AUC: 0.8241

===== Fold 3 =====
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 13ms/step
Fold-3 AUC: 0.8230

===== Fold 4 =====
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 9ms/step
Fold-4 AUC: 0.8095

===== Fold 5 =====
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 12ms/step
Fold-5 AUC: 0.8000

=== Cross-Validation Summary ===
Mean AUC: 0.8201
Std AUC: 0.0149
Best validation AUC: 0.8438
[1m56/56[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step

=== Final Test Evaluation ===
              precision    recall  f1-score   support

           0       0.73      0.79      0.76       899
           1       0.77      0.70      0.73       878

    accuracy                           0.75      1777
   macro avg       0

In [None]:
# Save the LSTM model 
best_lstm_model.save("/content/drive/My Drive/BECU Capstone_Duress/Model/lstm_model.keras")

# Save the RandomForest model
import joblib
joblib.dump(best_rf_model, "/content/drive/My Drive/BECU Capstone_Duress/Model/rf_model.pkl")
joblib.dump(scaler, "/content/drive/My Drive/BECU Capstone_Duress/Model/scaler.pkl")
selected_features = [feature_names[i] for i in selected_idx]
joblib.dump(selected_idx, "/content/drive/My Drive/BECU Capstone_Duress/Model/selected_idx.pkl")
