In [2]:
import sys, os
from pathlib import Path
sys.path.append("..")

import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.nn.functional as F
from PIL import Image
import numpy as np
import csv

from src.models.face_cnn import FaceCNN
from src.video_to_frames import extract_frames

device = torch.device("mps") if torch.backends.mps.is_available() else "cpu"
print("Using device:", device)


Using device: mps


In [11]:
transform = transforms.Compose([
    transforms.Grayscale(),
    transforms.Resize((48, 48)),
    transforms.ToTensor()
])

train_data = datasets.ImageFolder("../data/raw/fer2013/train", transform=transform)
test_data  = datasets.ImageFolder("../data/raw/fer2013/test", transform=transform)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_loader  = DataLoader(test_data,  batch_size=32, shuffle=False)

emotion_labels = train_data.classes  

print("Classes:", train_data.classes)


Classes: ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']


In [4]:
model = FaceCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    loss_sum = 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        out = model(images)
        loss = criterion(out, labels)
        loss.backward()
        optimizer.step()
        loss_sum += loss.item()
    print(f"Epoch {epoch+1} | Loss: {loss_sum/len(train_loader):.4f}")

# Save model ONCE
os.makedirs("../models", exist_ok=True)
torch.save(model.state_dict(), "../models/emotion_cnn.pth")
print("✅ Model saved!")


Epoch 1 | Loss: 1.6563
Epoch 2 | Loss: 1.4267
Epoch 3 | Loss: 1.2978
Epoch 4 | Loss: 1.1960
Epoch 5 | Loss: 1.1062
Epoch 6 | Loss: 1.0275
Epoch 7 | Loss: 0.9518
Epoch 8 | Loss: 0.8694
Epoch 9 | Loss: 0.7849
Epoch 10 | Loss: 0.7100
✅ Model saved!


In [5]:
model = FaceCNN().to(device)
model.load_state_dict(torch.load("../models/emotion_cnn.pth", map_location=device))
model.eval()
print("✅ Model loaded for inference")


✅ Model loaded for inference


In [6]:
def predict_emotion(frame_path):
    img = Image.open(frame_path)
    img = transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        out = model(img)
        probs = F.softmax(out, dim=1).cpu().numpy()[0]
    return probs


In [22]:
from src.body_language_yolo import extract_yolo_pose     # <-- NEW
from pathlib import Path
import numpy as np
import csv
import os

def process_video_to_features(video_path, class_label, out_csv, fps=2):
    """
    Extracts:
        - Emotion timeline
        - Body-language timeline (YOLO Pose)
    Then creates:
        - 23 emotion features
        - 7 body-language features
    Then writes one row to CSV.
    """

    video_name = Path(video_path).stem
    frame_dir = Path(f"../data/frames/{video_name}")
    frame_dir.mkdir(parents=True, exist_ok=True)

    # Extract frames from video
    extract_frames(str(video_path), str(frame_dir), fps=fps)

    # List frames
    frames = sorted([f for f in os.listdir(frame_dir) if f.lower().endswith((".jpg", ".png"))])

    if len(frames) == 0:
        print(f"⚠️ No frames extracted for {video_name}")
        return False

    emotion_list = []
    pose_list = []

    # ------------ PER FRAME FEATURE EXTRACTION ------------
    for f in frames:
        frame_path = str(frame_dir / f)

        # Emotion (your existing model)
        emot = predict_emotion(frame_path)
        emotion_list.append(emot)

        # Pose keypoints (YOLOv8)
        pose = extract_yolo_pose(frame_path)
        pose_list.append(pose)

    # Convert to numpy arrays
    emotion_timeline = np.array(emotion_list)   # (N_frames, 7)
    pose_timeline = np.array(pose_list)         # (N_frames, 51)

    # ------------ EMOTION FEATURES (YOUR EXISTING LOGIC) ------------
    dominant = np.argmax(emotion_timeline, axis=1)
    unique, counts = np.unique(dominant, return_counts=True)
    ratios = {e: 0 for e in emotion_labels}
    for idx, c in zip(unique, counts):
        ratios[emotion_labels[idx]] = c / len(dominant)

    transitions = int(np.sum(dominant[:-1] != dominant[1:]))
    volatility = float(np.mean(np.abs(np.diff(emotion_timeline, axis=0))))

    peak_vals = emotion_timeline.max(axis=0)
    var_vals = emotion_timeline.var(axis=0)

    emotion_features = []
    for e in emotion_labels:
        emotion_features.append(float(ratios[e]))

    emotion_features += [float(transitions), float(volatility)]
    emotion_features += [float(v) for v in peak_vals]
    emotion_features += [float(v) for v in var_vals]

    # ------------ BODY-LANGUAGE FEATURES (NEW) ------------
    # movement intensity: frame-to-frame change of all keypoints
    movement = np.linalg.norm(np.diff(pose_timeline, axis=0), axis=1)
    movement_mean = float(movement.mean())
    movement_var = float(movement.var())
    movement_max = float(movement.max())

    # shoulder width stability (keypoints 5 & 6)
    left_shoulder  = pose_timeline[:, 5*3:5*3+2]
    right_shoulder = pose_timeline[:, 6*3:6*3+2]
    shoulder_dist = np.linalg.norm(left_shoulder - right_shoulder, axis=1)
    shoulder_var = float(shoulder_dist.var())

    # head movement (nose index = 0)
    nose = pose_timeline[:, 0:3]
    head_speed = np.linalg.norm(np.diff(nose, axis=0), axis=1)
    head_var = float(head_speed.var())

    # hand-to-face (keypoints 9=left wrist, 10=right wrist)
    left_wrist = pose_timeline[:, 9*3:9*3+3]
    right_wrist = pose_timeline[:, 10*3:10*3+3]
    nose_3d = pose_timeline[:, 0:3]

    lw_dist = np.linalg.norm(left_wrist - nose_3d, axis=1)
    rw_dist = np.linalg.norm(right_wrist - nose_3d, axis=1)

    hand_face_min = float(min(lw_dist.min(), rw_dist.min()))
    hand_face_mean = float((lw_dist.mean() + rw_dist.mean()) / 2)

    body_features = [
        movement_mean, movement_var, movement_max,
        shoulder_var, head_var,
        hand_face_min, hand_face_mean
    ]

    # ------------ COMBINE ALL FEATURES ------------
    features = emotion_features + body_features

    # Write CSV header if necessary
    header = [f"f{i}" for i in range(len(features))] + ["label"]
    file_exists = os.path.exists(out_csv)

    if not file_exists:
        with open(out_csv, "w", newline="") as f:
            csv.writer(f).writerow(header)

    # Write new row
    row = features + [int(class_label)]
    with open(out_csv, "a", newline="") as f:
        csv.writer(f).writerow(row)

    print(f"✅ Processed {video_name} | Features: {len(features)} | Label: {class_label}")
    return features 


In [17]:
video_root = "../videos"
out_csv = "../data/deception_dataset.csv"

for class_name in ["lie", "truth"]:
    label = 0 if class_name == "lie" else 1
    folder = os.path.join(video_root, class_name)

    if not os.path.exists(folder):
        print("Folder missing:", folder)
        continue

    for video_file in os.listdir(folder):

        # ignore system files
        if video_file.startswith("."):
            continue

        video_path = os.path.join(folder, video_file)

        try:
            process_video_to_features(
                video_path=video_path,
                class_label=label,
                out_csv=out_csv,
                fps=2
            )
        except Exception as e:
            print(f"❌ Error processing {video_file}: {e}")


✅ Extracted 52 frames from ../videos/lie/trial_lie_041.mp4 → ../data/frames/trial_lie_041
✅ Processed trial_lie_041 | Features: 30 | Label: 0
✅ Extracted 64 frames from ../videos/lie/trial_lie_055.mp4 → ../data/frames/trial_lie_055
✅ Processed trial_lie_055 | Features: 30 | Label: 0
✅ Extracted 40 frames from ../videos/lie/trial_lie_054.mp4 → ../data/frames/trial_lie_054
✅ Processed trial_lie_054 | Features: 30 | Label: 0
✅ Extracted 50 frames from ../videos/lie/trial_lie_040.mp4 → ../data/frames/trial_lie_040
✅ Processed trial_lie_040 | Features: 30 | Label: 0
✅ Extracted 63 frames from ../videos/lie/trial_lie_056.mp4 → ../data/frames/trial_lie_056
✅ Processed trial_lie_056 | Features: 30 | Label: 0
✅ Extracted 48 frames from ../videos/lie/trial_lie_042.mp4 → ../data/frames/trial_lie_042
✅ Processed trial_lie_042 | Features: 30 | Label: 0
✅ Extracted 29 frames from ../videos/lie/trial_lie_043.mp4 → ../data/frames/trial_lie_043
✅ Processed trial_lie_043 | Features: 30 | Label: 0
✅ Extr

In [19]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

# Load dataset
df = pd.read_csv("../data/deception_dataset.csv")

# Separate features & labels
X = df.drop("label", axis=1)
y = df["label"]

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train classifier
clf = RandomForestClassifier(n_estimators=200, random_state=42)
clf.fit(X_train, y_train)

# Test performance
preds = clf.predict(X_test)
print("✅ Model Trained!")
print("\nAccuracy:", accuracy_score(y_test, preds))
print("\nReport:\n", classification_report(y_test, preds))


✅ Model Trained!

Accuracy: 0.88

Report:
               precision    recall  f1-score   support

           0       0.87      0.93      0.90        14
           1       0.90      0.82      0.86        11

    accuracy                           0.88        25
   macro avg       0.88      0.87      0.88        25
weighted avg       0.88      0.88      0.88        25



In [20]:
import joblib
joblib.dump(clf, "../models/deception_classifier.pkl")
print("✅ Lie/Truth classifier saved!")


✅ Lie/Truth classifier saved!


In [29]:
X_sample = process_video_to_features(
    video_path="../videos/lie/trial_lie_001.mp4",
    class_label=1,     # TRUTH = 1
    out_csv="__temp.csv",
    fps=2
)



✅ Extracted 37 frames from ../videos/lie/trial_lie_001.mp4 → ../data/frames/trial_lie_001
✅ Processed trial_lie_001 | Features: 30 | Label: 1


In [30]:
clf = joblib.load("../models/deception_classifier.pkl")

# Convert X_sample to DataFrame with same column names as training data
X_input = pd.DataFrame([X_sample], columns=clf.feature_names_in_)

prediction = clf.predict(X_input)
print("Prediction:", "LIE" if prediction[0] == 0 else "TRUTH")



Prediction: LIE
