In [1]:
!wget -O face_landmarker_v2_with_blendshapes.task -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task
!pip install mediapipe opencv-python matplotlib --quiet
!pip install requests



##Importations

In [7]:
import cv2
import json
import torch
import numpy as np
import mediapipe as mp
import matplotlib.pyplot as plt

from mediapipe.tasks.python import BaseOptions
from scipy.spatial.transform import Rotation as R
from mediapipe.tasks.python.vision import (
    FaceLandmarker,
    FaceLandmarkerOptions,
    RunningMode
)



## Vision Layer

In [8]:
class VisionLayer:
    def __init__(self, model_path='face_landmarker_v2_with_blendshapes.task', fps=5):
        base_options = BaseOptions(model_asset_path=model_path)
        options = FaceLandmarkerOptions(
            base_options=base_options,
            running_mode=RunningMode.IMAGE,
            num_faces=1
        )
        self.landmarker = FaceLandmarker.create_from_options(options)
        self.fps = fps

        # 3D head model points for solvePnP
        self.model_points = np.array([
            [0.0, 0.0, 0.0],          # Nose tip
            [0.0, -63.6, -12.5],      # Chin
            [-43.3, 32.7, -26.0],     # Left eye corner
            [43.3, 32.7, -26.0],      # Right eye corner
            [-28.9, -28.9, -24.1],    # Left mouth corner
            [28.9, -28.9, -24.1]      # Right mouth corner
        ], dtype=np.float64)
            # ---------------------------
    # Frame Sampler
    # ---------------------------
    def sample_frames(self, video_path):
        cap = cv2.VideoCapture(video_path)
        video_fps = cap.get(cv2.CAP_PROP_FPS)
        interval = max(1, int(video_fps / self.fps))
        frames = []
        timestamps = []
        count = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if count % interval == 0:
                frames.append(frame)
                timestamps.append(count / video_fps)
            count += 1
        cap.release()
        return frames, timestamps

    # ---------------------------
    # Head Pose (yaw, pitch, roll)
    # ---------------------------
    def get_head_pose(self, image, landmarks):
        image_points = np.array([
            [landmarks[1].x * image.shape[1], landmarks[1].y * image.shape[0]],   # Nose tip
            [landmarks[152].x * image.shape[1], landmarks[152].y * image.shape[0]], # Chin
            [landmarks[33].x * image.shape[1], landmarks[33].y * image.shape[0]],   # Left eye
            [landmarks[263].x * image.shape[1], landmarks[263].y * image.shape[0]], # Right eye
            [landmarks[61].x * image.shape[1], landmarks[61].y * image.shape[0]],   # Left mouth
            [landmarks[291].x * image.shape[1], landmarks[291].y * image.shape[0]]  # Right mouth
        ], dtype=np.float64)

        focal_length = image.shape[1]
        center = (image.shape[1]/2, image.shape[0]/2)
        camera_matrix = np.array([[focal_length,0,center[0]],
                                  [0,focal_length,center[1]],
                                  [0,0,1]], dtype=np.float64)
        dist_coeffs = np.zeros((4,1))
        success, rotation_vector, translation_vector = cv2.solvePnP(
            self.model_points, image_points, camera_matrix, dist_coeffs, flags=cv2.SOLVEPNP_ITERATIVE
        )
        rmat, _ = cv2.Rodrigues(rotation_vector)
        sy = np.sqrt(rmat[0,0]**2 + rmat[1,0]**2)
        yaw = np.arctan2(rmat[1,0], rmat[0,0])
        pitch = np.arctan2(-rmat[2,0], sy)
        roll = np.arctan2(rmat[2,1], rmat[2,2])
        return np.degrees(yaw), np.degrees(pitch), np.degrees(roll)

    # ---------------------------
    # Gaze Direction (3D Approx)
    # ---------------------------
    # def get_gaze_vector(self, landmarks, head_pose):
    #     # iris x avg
    #     left_iris_x = np.mean([lm.x for lm in landmarks[468:473]])
    #     right_iris_x = np.mean([lm.x for lm in landmarks[473:478]])
    #     avg_x = (left_iris_x + right_iris_x)/2
    #     # gaze 3D approximation
    #     yaw, pitch, _ = head_pose
    #     gaze_vector = [avg_x, pitch/90, yaw/90]  # normalized
    #     return gaze_vector

    def get_gaze_vector(self, landmarks, head_pose):
        """
        Compute a more accurate gaze vector using iris landmarks and head pose.

        landmarks: list of mediapipe landmarks
        head_pose: tuple (yaw, pitch, roll) in degrees
        """
        # 1′′ Eye centers approximation
        left_eye_center = np.mean([[lm.x, lm.y, lm.z] for lm in landmarks[33:42]], axis=0)
        right_eye_center = np.mean([[lm.x, lm.y, lm.z] for lm in landmarks[133:142]], axis=0)
        eye_center = (left_eye_center + right_eye_center) / 2

        # 2′′ Iris center
        left_iris_center = np.mean([[lm.x, lm.y, lm.z] for lm in landmarks[468:473]], axis=0)
        right_iris_center = np.mean([[lm.x, lm.y, lm.z] for lm in landmarks[473:478]], axis=0)
        iris_center = (left_iris_center + right_iris_center) / 2

        # 3′′ Raw gaze vector (from eye center to iris center)
        raw_gaze = iris_center - eye_center

        # 4′′ Rotate gaze according to head pose (yaw, pitch, roll)
        yaw, pitch, roll = head_pose  # degrees
        r = R.from_euler('xyz', [pitch, yaw, roll], degrees=True)
        gaze_vector = r.apply(raw_gaze)

        # 5′′ Normalize
        gaze_vector /= np.linalg.norm(gaze_vector) + 1e-8

        return gaze_vector


    # ---------------------------
    # Extract Signals per Frame
    # ---------------------------
    def extract_signals(self, frames):
        head_pose_series = []
        gaze_vectors = []
        timestamps = []
        for idx, frame in enumerate(frames):
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            result = self.landmarker.detect(mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb))
            if result.face_landmarks:
                face = result.face_landmarks[0]
                head_pose = {}
                yaw, pitch, roll = self.get_head_pose(frame, face)
                head_pose = {"yaw": yaw, "pitch": pitch, "roll": roll}
                gaze = self.get_gaze_vector(face, (yaw,pitch,roll))

                head_pose_series.append(head_pose)
                gaze_vectors.append(gaze)
            else:
                head_pose_series.append({"yaw":0,"pitch":0,"roll":0})
                gaze_vectors.append([0,0,0])
        return head_pose_series, gaze_vectors

In [11]:
class TemporalAggregator:
    def __init__(self, window_size=5, fps=5):
        self.window_size = window_size  # for smoothing
        self.fps = fps

    # ---------------------------
    # Simple moving average smoothing
    # ---------------------------
    def smooth_series(self, series, key=None):
        smoothed = []
        half_w = self.window_size // 2
        for i in range(len(series)):
            start = max(0, i - half_w)
            end = min(len(series), i + half_w + 1)
            if key:  # dict series (head_pose)
                avg = {k: np.mean([series[j][k] for j in range(start, end)]) for k in series[0].keys()}
            else:   # list/array series (gaze vector)
                avg = np.mean(series[start:end], axis=0).tolist()
            smoothed.append(avg)
        return smoothed

    # ---------------------------
    # Duration analysis
    # ---------------------------
    def duration_analysis(self, head_pose_series, gaze_series, timestamps):
        gaze_off_flags = []
        head_turn_flags = []

        for h, g in zip(head_pose_series, gaze_series):
            yaw = h["yaw"]
            pitch = h["pitch"]
            gx = g[0]

            # gaze off camera if |x| > 0.4
            gaze_off_flags.append(abs(gx) > 0.4)
            # head turned if |yaw| > 15°
            head_turn_flags.append(abs(yaw) > 15)

        def streaks(flags):
            streaks_list = []
            start_idx = None
            for idx, val in enumerate(flags + [False]):  # add False at end to flush last streak
                if val and start_idx is None:
                    start_idx = idx
                elif not val and start_idx is not None:
                    duration = timestamps[idx-1] - timestamps[start_idx] + 1/self.fps
                    streaks_list.append(duration)
                    start_idx = None
            return streaks_list

        gaze_off_durations = streaks(gaze_off_flags)
        head_turn_durations = streaks(head_turn_flags)

        return {"gaze_off": gaze_off_durations, "head_turn": head_turn_durations}

    # Full Aggregation
    # ---------------------------
    def aggregate(self, head_pose_series, gaze_series, timestamps):
        head_smoothed = self.smooth_series(head_pose_series, key="yaw")
        gaze_smoothed = self.smooth_series(gaze_series)

        durations = self.duration_analysis(head_smoothed, gaze_smoothed, timestamps)
        # Vision Evidence
        evidence = {
            "head_pose_series": head_smoothed,
            "gaze_series": gaze_smoothed,
            "durations": durations,
        }
        return evidence

Testing Video path

In [12]:
video_path = "/content/WIN_20251230_23_07_56_Pro.mp4"

# 1️⃣ Vision Layer
vision = VisionLayer(fps=5)
frames, timestamps = vision.sample_frames(video_path)
head_pose_series, gaze_series = vision.extract_signals(frames)

# 2️⃣ Temporal Aggregator
aggregator = TemporalAggregator(window_size=5, fps=5)
vision_evidence = aggregator.aggregate(head_pose_series, gaze_series, timestamps)

# 3️⃣ Output
print("=== Vision Evidence ===")
print("Head Pose (first 5 frames):", head_pose_series[:5])
print("Gaze Vectors (first 5 frames):", gaze_series[:5])
print("Durations:", vision_evidence["durations"])

=== Vision Evidence ===
Head Pose (first 5 frames): [{'yaw': np.float64(-1.3155129896432056), 'pitch': np.float64(18.178739447173847), 'roll': np.float64(-173.39582807747377)}, {'yaw': np.float64(-2.197778235398298), 'pitch': np.float64(16.33270941386751), 'roll': np.float64(-173.189405732221)}, {'yaw': np.float64(-1.3865676248183163), 'pitch': np.float64(22.63606521260831), 'roll': np.float64(-173.97837422808524)}, {'yaw': np.float64(-1.428939702351513), 'pitch': np.float64(14.244271738967104), 'roll': np.float64(-178.74785877919754)}, {'yaw': np.float64(-1.5184295958794545), 'pitch': np.float64(-26.02123126524632), 'roll': np.float64(-175.97438399057637)}]
Gaze Vectors (first 5 frames): [array([-0.45080724,  0.81019013, -0.37465265]), array([-0.45212392,  0.8262496 , -0.33599908]), array([-0.42864197,  0.78795826, -0.44202677]), array([-0.36904176,  0.87735788, -0.30667761]), array([-0.38993666,  0.84786006,  0.35928061])]
Durations: {'gaze_off': [0.798519024390244, 2.394569756097561

#EvaluationAgent

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
class LLMVisionAgent:
    def __init__(self, model_id="microsoft/phi-3-mini-4k-instruct"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.float16,
            device_map="auto"
        )

    def analyze(self, vision_evidence):
        prompt = f"""
You are a Vision Analysis Agent for an AI Interview system.

You receive structured visual evidence extracted from a candidate video.
Your job is NOT to accuse cheating.
Your job is to assess focus, attentiveness, and confidence.

Vision Evidence:
{json.dumps(vision_evidence, indent=2)}

Return a JSON object with:
- focus_assessment (short text)
- confidence_level (low / medium / high)
- vision_score (0.0 to 1.0)
- explanation (1-2 sentences)

Be conservative. Avoid strong claims.
"""

        inputs = self.tokenizer(
            prompt,
            return_tensors="pt"
        ).to(self.model.device)

        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=300,
                temperature=0.3
            )

        response_text = self.tokenizer.decode(
            outputs[0],
            skip_special_tokens=True
        )

        # مهم: اساءراء الـ JSON من الراء
        # The model often echoes the prompt. We need to extract only the generated JSON.
        decoded_prompt = self.tokenizer.decode(inputs['input_ids'][0], skip_special_tokens=True)

        generated_content = ""
        # Check if the response_text starts with the decoded prompt
        if response_text.startswith(decoded_prompt):
            generated_content = response_text[len(decoded_prompt):].strip()
        else:
            # Fallback: if not starting with prompt, assume the relevant JSON is the last one
            # or try to find it within the whole response_text
            generated_content = response_text.strip()

        json_start = generated_content.find("{")
        json_end = generated_content.rfind("}") + 1

        if json_start != -1 and json_end != -1 and json_end > json_start:
            try:
                json_string = generated_content[json_start:json_end]
                return json.loads(json_string)
            except json.JSONDecodeError as e:
                print(f"JSON Decode Error: {e}")
                print(f"Attempted to decode: {json_string}")
                raise
        else:
            print("No valid JSON found in LLM's generated response.")
            print(f"Generated Content: {generated_content}")
            # As a last resort, if no valid JSON is found, return an error or a default structure
            raise ValueError("LLM did not return a parseable JSON analysis.")

In [None]:
vision_agent = LLMVisionAgent(
    model_id="microsoft/phi-3-mini-4k-instruct"  # Corrected argument name
)

result = vision_agent.analyze(vision_evidence)
print(json.dumps(result, indent=2))