# üéì Optimized Exam Proctoring System (OEPS) v2.0 ‚Äî Colab Edition

## Optimized Pipeline (No LSTM, No DeepSORT)

```
Stage 1: Video Input ‚Üí OpenCV Frame Extraction (every frame)
Stage 2: Pose Estimation ‚Üí YOLOv11s-Pose (Detect + Extract 13 Keypoints)
Stage 3: Lightweight IoU Tracking (Assign Track IDs)
Stage 4: Skeleton ROI ‚Üí Generate Bones (224√ó224√ó3) ‚Äî IN MEMORY, no disk I/O
Stage 5: Feature Extraction ‚Üí ResNet50V2 ‚Üí Binary Classification (BATCHED)
Stage 6: Temporal Voting ‚Üí Sliding Window Majority Vote
Stage 7: Output ‚Üí Color-Coded Boxes + Write to Video File at original FPS
```

### Speed Optimizations vs v1.0:
- ‚ùå Removed LSTM (useless at low frame rates, sequence_length=2 = just counting)
- ‚ùå Removed DeepSORT (overkill for few students, replaced with IoU tracker)
- ‚úÖ `model(x, training=False)` instead of `model.predict()` ‚Äî 10-50x faster per call
- ‚úÖ Batched ResNet inference (all students in one forward pass)
- ‚úÖ Fixed preprocessing mismatch (now uses correct `resnet_v2.preprocess_input`)
- ‚úÖ No matplotlib display during processing (write video ‚Üí watch after)
- ‚úÖ No disk I/O during inference (skeleton images stay in memory)

---

## üì¶ 1. Installation and Setup

In [None]:
# Install required packages
!pip install -q -U ultralytics
!pip install -q opencv-python-headless
!pip install -q tensorflow
!pip install -q numpy matplotlib seaborn

print('‚úÖ All packages installed!')

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.2/1.2 MB[0m [31m39.3 MB/s[0m eta [36m0:00:00[0m
[?25h‚úÖ All packages installed!


## üìÅ 2. Platform Setup (Colab)

Mount Google Drive to access your model and video files.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## üìö 3. Import Libraries

In [None]:
import cv2
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.applications.resnet_v2 import preprocess_input as resnet_preprocess
import ultralytics
from ultralytics import YOLO
import os
import time
import warnings
warnings.filterwarnings('ignore')

# Check GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"‚úÖ GPU detected: {gpus[0].name}")
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
else:
    print("‚ö†Ô∏è No GPU detected! Running on CPU (will be slow)")

print(f"TensorFlow: {tf.__version__}")
print(f"OpenCV: {cv2.__version__}")
print(f"TF GPU in use: {len(tf.config.list_physical_devices('GPU')) > 0}")

‚úÖ GPU detected: /physical_device:GPU:0
TensorFlow: 2.19.0
OpenCV: 4.13.0
TF GPU in use: True


## ‚öôÔ∏è 4. Configuration

In [None]:
##############################################################################
# CELL 8 ‚Äî REPLACE ENTIRE CELL (Config)
#
# Changes:
#   1. Added UPPER_BODY_KPTS, KPT_CONF_THRESHOLD, bone thickness constants
#   2. Added (0,5) and (0,6) neck connections to SKELETON_CONNECTIONS
#   3. ROI_SIZE added for skeleton normalization
##############################################################################

class Config:
    # === MODEL PATHS (UPDATE THESE) ===
    RESNET_MODEL_PATH = "/content/drive/MyDrive/Model/resnet50v2_final.keras"
    VIDEO_PATH = "/content/drive/MyDrive/Video_Collection/Testing video's /test4.mp4"
    OUTPUT_DIR = "/content"

    # Image processing
    IMG_SIZE = (224, 224)
    ROI_SIZE = 224

    # Temporal voting
    VOTE_WINDOW = 1
    SUSPICIOUS_THRESHOLD = 0.5
    VOTE_THRESHOLD = 1

    # YOLO detection
    POSE_CONF_THRESHOLD = 0.5
    YOLO_POSE_MODEL = 'yolo11s-pose.pt'

    # IoU tracker
    IOU_THRESHOLD = 0.3
    MAX_LOST_FRAMES = 30

    # Skeleton parameters ‚Äî MUST match training extraction code
    KPT_CONF_THRESHOLD = 0.4
    BODY_BONE_THICKNESS = 3
    HEAD_BONE_THICKNESS = 2

    UPPER_BODY_KPTS = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}

    KEYPOINT_NAMES = [
        'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
        'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
        'left_wrist', 'right_wrist', 'left_hip', 'right_hip'
    ]

    # FIXED: 14 connections (old had 12 ‚Äî was missing neck bones)
    SKELETON_CONNECTIONS = [
        (0, 1), (0, 2), (1, 3), (2, 4),       # head
        (0, 5), (0, 6),                         # neck  ‚Üê WERE MISSING
        (5, 6),                                  # shoulders
        (5, 7), (7, 9),                          # left arm
        (6, 8), (8, 10),                         # right arm
        (5, 11), (6, 12), (11, 12)               # torso
    ]

config = Config()
print("‚úÖ Configuration loaded!")
print(f"   ResNet model: {config.RESNET_MODEL_PATH}")
print(f"   Video: {config.VIDEO_PATH}")
print(f"   Skeleton connections: {len(config.SKELETON_CONNECTIONS)} (should be 14)")

‚úÖ Configuration loaded!
   ResNet model: /content/drive/MyDrive/Model/resnet50v2_final.keras
   Video: /content/drive/MyDrive/Video_Collection/Testing video's /test4.mp4
   Skeleton connections: 14 (should be 14)


## ü§ñ 5. Load Models

Only **2 models** now (was 3):
1. **YOLOv11s-Pose** ‚Äî Pose estimation + keypoint extraction
2. **ResNet50V2** ‚Äî Direct binary classification (normal vs suspicious)

~~LSTM~~ ‚Äî Removed. Temporal voting handles smoothing.

In [None]:
import torch
import os
import functools

# Download YOLO if needed
if not os.path.exists('yolo11s-pose.pt'):
    print('Downloading yolo11s-pose.pt...')
    !wget -q https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11s-pose.pt

# PyTorch 2.6 security bypass
torch.load = functools.partial(torch.load, weights_only=False)

# === LOAD YOLO ===
print("Loading YOLOv11s-Pose...")
device = "cuda" if torch.cuda.is_available() else "cpu"
yolo_model = YOLO('yolo11s-pose.pt').to(device)
print(f"YOLO on: {device}")
print("‚úÖ YOLO loaded!")

# === LOAD RESNET (as CLASSIFIER, not feature extractor) ===
print(f"Loading ResNet from {config.RESNET_MODEL_PATH}...")
if not os.path.exists(config.RESNET_MODEL_PATH):
    raise FileNotFoundError(f"Model not found: {config.RESNET_MODEL_PATH}")

resnet_model = load_model(config.RESNET_MODEL_PATH)
print(f"‚úÖ ResNet loaded! Output shape: {resnet_model.output_shape}")

# Warm up models
print("\nWarming up models...")
dummy_img = np.random.rand(1, 224, 224, 3).astype(np.float32)
_ = resnet_model(dummy_img, training=False)
print("‚úÖ ResNet warmed up!")

dummy_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
_ = yolo_model(dummy_frame, conf=0.5, verbose=False)
print("‚úÖ YOLO warmed up!")

print("\n" + "="*50)
print("‚úÖ ALL MODELS LOADED AND READY!")
print("="*50)

Loading YOLOv11s-Pose...
YOLO on: cuda
‚úÖ YOLO loaded!
Loading ResNet from /content/drive/MyDrive/Model/resnet50v2_final.keras...
‚úÖ ResNet loaded! Output shape: (None, 1)

Warming up models...
‚úÖ ResNet warmed up!
‚úÖ YOLO warmed up!

‚úÖ ALL MODELS LOADED AND READY!


## ü¶¥ 6. Skeleton Generator (In-Memory, No Disk I/O)

In [None]:
##############################################################################
# CELL 12 ‚Äî REPLACE ENTIRE CELL (SkeletonGenerator)
#
# This is THE critical fix. Old code used YOLO bbox crop + resize.
# New code matches training: center+scale normalization, grayscale,
# bones only, dilation, variable thickness, validation.
##############################################################################

class SkeletonGenerator:
    """Generates skeleton images matching training pipeline exactly."""

    def __init__(self, roi_size=224):
        self.roi_size = roi_size
        self.dilate_kernel = np.ones((3, 3), np.uint8)

    def is_valid_skeleton(self, conf):
        """Relaxed validation for inference ‚Äî more lenient than training."""

        th = config.KPT_CONF_THRESHOLD
        # Count how many upper body keypoints are confident
        visible = sum(1 for i in config.UPPER_BODY_KPTS if i < len(conf) and conf[i] > th)
        # Need at least 5 visible keypoints AND at least one shoulder
        return visible >= 5 and (conf[5] > th or conf[6] > th)

    def create_skeleton_image(self, keypoints, bbox):
        """
        Generate normalized skeleton image identical to training.
        Returns 224x224 grayscale image, or None if invalid.
        """
        if len(keypoints) < 13:
            return None

        xy = keypoints[:, :2]
        conf = keypoints[:, 2]

        if not self.is_valid_skeleton(conf):
            return None

        # Collect valid upper body points
        valid_pts = [
            xy[i] for i in config.UPPER_BODY_KPTS
            if i < len(conf) and conf[i] > config.KPT_CONF_THRESHOLD
        ]
        if len(valid_pts) <3:
            return None

        pts = np.array(valid_pts)
        min_x, min_y = pts.min(axis=0)
        max_x, max_y = pts.max(axis=0)
        skel_w = max_x - min_x
        skel_h = max_y - min_y
        if max(skel_w, skel_h) < 1e-6:
            return None

        # Center + scale to fill 90% of canvas (same as training)
        scale = 0.9 * self.roi_size / max(skel_w, skel_h)
        cx = (min_x + max_x) / 2
        cy = (min_y + max_y) / 2

        # Grayscale canvas (same as training)
        skeleton_img = np.zeros((self.roi_size, self.roi_size), dtype=np.uint8)

        def norm(pt):
            x = (pt[0] - cx) * scale + self.roi_size / 2
            y = (pt[1] - cy) * scale + self.roi_size / 2
            return int(x), int(y)

        # Bones only ‚Äî NO circles (same as training)
        for i, j in config.SKELETON_CONNECTIONS:
            if i >= len(conf) or j >= len(conf):
                continue
            if conf[i] < config.KPT_CONF_THRESHOLD or conf[j] < config.KPT_CONF_THRESHOLD:
                continue
            if i not in config.UPPER_BODY_KPTS or j not in config.UPPER_BODY_KPTS:
                continue
            p1 = norm(xy[i])
            p2 = norm(xy[j])
            if not (0 <= p1[0] < self.roi_size and 0 <= p1[1] < self.roi_size and
                    0 <= p2[0] < self.roi_size and 0 <= p2[1] < self.roi_size):
                continue
            thickness = config.HEAD_BONE_THICKNESS if (i == 0 or j == 0) else config.BODY_BONE_THICKNESS
            cv2.line(skeleton_img, p1, p2, 255, thickness)

        # Dilation (same as training)
        skeleton_img = cv2.dilate(skeleton_img, self.dilate_kernel, iterations=1)
        return skeleton_img

    def preprocess_batch(self, skeleton_images):
        """
        Match Keras ImageDataGenerator behavior:
        grayscale ‚Üí RGB (3-channel) ‚Üí resnet_v2.preprocess_input
        """
        batch = []
        for img in skeleton_images:
            img_3ch = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
            batch.append(img_3ch.astype(np.float32))
        return resnet_preprocess(np.array(batch))

skeleton_gen = SkeletonGenerator()
print("‚úÖ Skeleton generator ready (FIXED ‚Äî matches training pipeline)")


‚úÖ Skeleton generator ready (FIXED ‚Äî matches training pipeline)


## üéØ 7. Lightweight IoU Tracker (Replaces DeepSORT)

For 3 students in a fixed camera, simple IoU matching is sufficient and much faster.

In [None]:
from collections import deque

class SimpleIoUTracker:
    """
    Lightweight tracker using IoU matching.
    No deep features, no Kalman filter ‚Äî just bounding box overlap.
    """

    def __init__(self, iou_threshold=0.3, max_lost=30):
        self.iou_threshold = iou_threshold
        self.max_lost = max_lost
        self.tracks = {}
        self.next_id = 1
        self.vote_history = {}
        self.colors = {}

    def _iou(self, boxA, boxB):
        xA = max(boxA[0], boxB[0])
        yA = max(boxA[1], boxB[1])
        xB = min(boxA[2], boxB[2])
        yB = min(boxA[3], boxB[3])
        inter = max(0, xB - xA) * max(0, yB - yA)
        areaA = (boxA[2]-boxA[0]) * (boxA[3]-boxA[1])
        areaB = (boxB[2]-boxB[0]) * (boxB[3]-boxB[1])
        union = areaA + areaB - inter
        return inter / union if union > 0 else 0

    def update(self, detections):
        det_bboxes = [d[0] for d in detections]
        matched = []
        used_dets = set()
        used_tracks = set()

        if self.tracks and detections:
            iou_matrix = []
            track_ids = list(self.tracks.keys())
            for tid in track_ids:
                row = [self._iou(self.tracks[tid]['bbox'], db) for db in det_bboxes]
                iou_matrix.append(row)

            iou_matrix = np.array(iou_matrix) if iou_matrix else np.array([])
            if iou_matrix.size > 0:
                while True:
                    max_val = iou_matrix.max()
                    if max_val < self.iou_threshold:
                        break
                    ti, di = np.unravel_index(iou_matrix.argmax(), iou_matrix.shape)
                    tid = track_ids[ti]
                    self.tracks[tid]['bbox'] = det_bboxes[di]
                    self.tracks[tid]['lost'] = 0
                    matched.append((tid, det_bboxes[di], detections[di][2]))
                    used_dets.add(di)
                    used_tracks.add(tid)
                    iou_matrix[ti, :] = -1
                    iou_matrix[:, di] = -1

        for di, det in enumerate(detections):
            if di not in used_dets:
                tid = self.next_id
                self.next_id += 1
                self.tracks[tid] = {'bbox': det[0], 'lost': 0}
                self.vote_history[tid] = deque(maxlen=config.VOTE_WINDOW)
                matched.append((tid, det[0], det[2]))

        for tid in list(self.tracks.keys()):
            if tid not in used_tracks and tid in self.tracks:
                self.tracks[tid]['lost'] += 1
                if self.tracks[tid]['lost'] > self.max_lost:
                    del self.tracks[tid]
                    self.vote_history.pop(tid, None)
                    self.colors.pop(tid, None)

        return matched

    def add_vote(self, track_id, is_suspicious):
        if track_id not in self.vote_history:
            self.vote_history[track_id] = deque(maxlen=config.VOTE_WINDOW)
        self.vote_history[track_id].append(int(is_suspicious))

    def get_decision(self, track_id):
        if track_id not in self.vote_history or len(self.vote_history[track_id]) == 0:
            return 'Normal', 0.0
        history = self.vote_history[track_id]
        suspicious_count = sum(history)
        ratio = suspicious_count / len(history)
        if suspicious_count >= config.VOTE_THRESHOLD:
            return 'Suspicious', ratio
        return 'Normal', ratio

    def get_color(self, track_id, label):
        if label == 'Suspicious':
            self.colors[track_id] = (0, 0, 255)
        elif label == 'Normal':
            self.colors[track_id] = (0, 255, 0)
        return self.colors.get(track_id, (0, 255, 255))

print("‚úÖ IoU Tracker ready (replaces DeepSORT)")

‚úÖ IoU Tracker ready (replaces DeepSORT)


## üîÑ 8. Optimized Processing Pipeline

Key speed improvements:
- `model(x, training=False)` instead of `model.predict()` ‚Äî **10-50x faster** per call
- **Batched** ResNet inference ‚Äî all students in one forward pass
- **Correct** preprocessing matching training
- No LSTM ‚Äî direct ResNet classification + temporal voting

In [None]:

class OptimizedPipeline:
    def __init__(self, yolo_model, resnet_model, skeleton_gen):
        self.yolo = yolo_model
        self.resnet = resnet_model
        self.skeleton_gen = skeleton_gen
        self.tracker = SimpleIoUTracker(
            iou_threshold=config.IOU_THRESHOLD,
            max_lost=config.MAX_LOST_FRAMES
        )
        self.frame_count = 0
        self.total_suspicious = 0
        self.skipped_invalid = 0
        self.timing = {'yolo': [], 'skeleton': [], 'resnet': [], 'tracking': [], 'total': []}

    def process_frame(self, frame):
        t_total = time.perf_counter()
        self.frame_count += 1

        # === STAGE 1: YOLO Pose Detection ===
        t0 = time.perf_counter()
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.yolo(rgb_frame, conf=config.POSE_CONF_THRESHOLD, verbose=False, device=0)
        self.timing['yolo'].append(time.perf_counter() - t0)

        detections = []
        if results[0].keypoints is not None and results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            confs = results[0].boxes.conf.cpu().numpy()
            kpts = results[0].keypoints.data.cpu().numpy()
            for b, c, k in zip(boxes, confs, kpts):
                detections.append((b, c, k))

        # === STAGE 2: IoU Tracking ===
        t0 = time.perf_counter()
        matched_tracks = self.tracker.update(detections)
        self.timing['tracking'].append(time.perf_counter() - t0)

        if not matched_tracks:
            self.timing['skeleton'].append(0)
            self.timing['resnet'].append(0)
            self.timing['total'].append(time.perf_counter() - t_total)
            return frame, []

        # === STAGE 3: Generate skeleton images ===
        t0 = time.perf_counter()
        skeleton_batch = []
        track_info = []
        skipped_tracks = []

        for track_id, bbox, keypoints in matched_tracks:
            skeleton_img = self.skeleton_gen.create_skeleton_image(keypoints, bbox)
            if skeleton_img is None:
                self.skipped_invalid += 1
                skipped_tracks.append((track_id, bbox))
                continue
            skeleton_batch.append(skeleton_img)
            track_info.append((track_id, bbox))

        self.timing['skeleton'].append(time.perf_counter() - t0)

        detections_info = []

        # === STAGE 4: ResNet Classification ===
        if skeleton_batch:
            t0 = time.perf_counter()
            batch_preprocessed = self.skeleton_gen.preprocess_batch(skeleton_batch)
            with tf.device('/GPU:0'):
                batch_tensor = tf.constant(batch_preprocessed, dtype=tf.float32)
                predictions = self.resnet(batch_tensor, training=False).numpy().flatten()
            self.timing['resnet'].append(time.perf_counter() - t0)

            # === STAGE 5: Temporal Voting + Draw for classified students ===
            for i, (track_id, bbox) in enumerate(track_info):
                prob = float(predictions[i]) if i < len(predictions) else 0.0
                is_suspicious = 1 if prob >= config.SUSPICIOUS_THRESHOLD else 0
                self.tracker.add_vote(track_id, is_suspicious)

                label, vote_ratio = self.tracker.get_decision(track_id)
                color = self.tracker.get_color(track_id, label)

                if label == 'Suspicious':
                    self.total_suspicious += 1

                x1, y1, x2, y2 = map(int, bbox)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

                label_text = f"ID:{track_id} | {label} ({prob:.2f})"
                (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
                cv2.rectangle(frame, (x1, y1 - 30), (x1 + tw + 10, y1), color, -1)
                cv2.putText(frame, label_text, (x1 + 5, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

                detections_info.append({
                    'student_id': track_id,
                    'label': label,
                    'resnet_prob': prob,
                    'vote_ratio': vote_ratio
                })
        else:
            self.timing['resnet'].append(0)

        # === STAGE 6: Draw skipped students with last known label ===
        for track_id, bbox in skipped_tracks:
            label, vote_ratio = self.tracker.get_decision(track_id)
            color = self.tracker.get_color(track_id, label)
            x1, y1, x2, y2 = map(int, bbox)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            label_text = f"ID:{track_id} | {label} (skip)"
            (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            cv2.rectangle(frame, (x1, y1 - 30), (x1 + tw + 10, y1), color, -1)
            cv2.putText(frame, label_text, (x1 + 5, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            detections_info.append({
                'student_id': track_id,
                'label': label,
                'resnet_prob': -1,
                'vote_ratio': vote_ratio
            })

        total_students = len(track_info) + len(skipped_tracks)
        info = f"Frame: {self.frame_count} | Students: {total_students} | Suspicious: {self.total_suspicious}"
        cv2.putText(frame, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        self.timing['total'].append(time.perf_counter() - t_total)
        return frame, detections_info

    def print_timing_stats(self):
        print("\n" + "="*60)
        print("‚è±Ô∏è  PIPELINE TIMING BREAKDOWN (averages)")
        print("="*60)
        for stage, times in self.timing.items():
            if times:
                avg_ms = np.mean(times) * 1000
                print(f"   {stage:12s}: {avg_ms:7.1f} ms")
        if self.timing['total']:
            avg_total = np.mean(self.timing['total'])
            print(f"   {'FPS':12s}: {1.0/avg_total:7.1f}")
        print(f"   Skipped (invalid skeleton): {self.skipped_invalid}")
        print("="*60)

print("‚úÖ Optimized pipeline ready!")


‚úÖ Optimized pipeline ready!


## üß™ 9. Quick Test

In [None]:
# Test the pipeline with a dummy frame
pipeline = OptimizedPipeline(yolo_model, resnet_model, skeleton_gen)

test_frame = np.random.randint(0, 255, (720, 1280, 3), dtype=np.uint8)
processed, dets = pipeline.process_frame(test_frame)
print(f"‚úÖ Pipeline test passed! Detections: {len(dets)}")
print(f"   Output frame shape: {processed.shape}")

for _ in range(5):
    _, _ = pipeline.process_frame(test_frame)
pipeline.print_timing_stats()
print("\n(Timing on random frames ‚Äî real video will differ)")

‚úÖ Pipeline test passed! Detections: 0
   Output frame shape: (720, 1280, 3)

‚è±Ô∏è  PIPELINE TIMING BREAKDOWN (averages)
   yolo        :    19.2 ms
   skeleton    :     0.0 ms
   resnet      :     0.0 ms
   tracking    :     0.0 ms
   total       :    19.3 ms
   FPS         :    51.8
   Skipped (invalid skeleton): 0

(Timing on random frames ‚Äî real video will differ)


## üé¨ 10. Process Video ‚Üí Save Output Video

Processes every frame and writes to an output video at the original FPS.
No real-time display ‚Äî watch the output video after.

In [None]:
import cv2
import os
import time

def save_processed_video_exact_match(video_path, output_path, process_every_n=10):
    """
    Saves the video with the EXACT visual style and logic as your 18 FPS preview.
    Uses 'OptimizedPipeline' and 'resnet_prob' as per your original code.
    """
    if not os.path.exists(video_path):
        print(f"‚ùå Source video not found: {video_path}")
        return

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # 1. Initialize Video Writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # --- LINE 24 FIX: Using your exact Pipeline class ---
    pipe = OptimizedPipeline(yolo_model, resnet_model, skeleton_gen)
    cached_annotations = []

    print("=" * 60)
    print(f"üíæ EXACT-MATCH SAVING STARTED")
    print(f"üìÅ Target: {output_path}")
    print("=" * 60)

    frame_num = 0
    start_time = time.time()

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_num += 1

        # Logic exactly from your preview loop
        if frame_num % process_every_n == 1 or process_every_n == 1:
            processed_frame, dets = pipe.process_frame(frame.copy())

            cached_annotations = []
            for d in dets:
                track_id = d['student_id']
                label = d['label']
                prob = d['resnet_prob'] # Using your specific key name

                # Matching your bbox retrieval logic
                if track_id in pipe.tracker.tracks:
                    bbox = pipe.tracker.tracks[track_id]['bbox']
                else:
                    bbox = None

                color = pipe.tracker.get_color(track_id, label)
                if bbox is not None:
                    cached_annotations.append((bbox, label, prob, color, track_id))

        # --- Replicating 'draw_annotations' and 'info' overlay ---
        save_frame = frame.copy()

        # Draw Annotations
        for bbox, label, prob, color, track_id in cached_annotations:
            x1, y1, x2, y2 = map(int, bbox)
            cv2.rectangle(save_frame, (x1, y1), (x2, y2), color, 2)
            label_text = f"ID:{track_id} | {label} ({prob:.2f})"
            (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            cv2.rectangle(save_frame, (x1, y1 - 30), (x1 + tw + 10, y1), color, -1)
            cv2.putText(save_frame, label_text, (x1 + 5, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

        # Draw Info Bar (using pipe.total_suspicious)
        info = f"Frame: {frame_num} | Students: {len(cached_annotations)} | Suspicious: {pipe.total_suspicious}"
        cv2.putText(save_frame, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        # Write the full-quality frame
        out.write(save_frame)

        if frame_num % 100 == 0:
            print(f"Processed {frame_num}/{total_frames} frames...")

    cap.release()
    out.release()
    print(f"\n‚úÖ Video saved successfully: {output_path}")

# --- RUN IT IN A NEW CELL ---
final_output = os.path.join(config.OUTPUT_DIR, "proctored_exact_match.mp4")
save_processed_video_exact_match(config.VIDEO_PATH, final_output, process_every_n=10)

üíæ EXACT-MATCH SAVING STARTED
üìÅ Target: /content/proctored_exact_match.mp4
Processed 100/1807 frames...
Processed 200/1807 frames...
Processed 300/1807 frames...
Processed 400/1807 frames...
Processed 500/1807 frames...
Processed 600/1807 frames...
Processed 700/1807 frames...
Processed 800/1807 frames...
Processed 900/1807 frames...
Processed 1000/1807 frames...
Processed 1100/1807 frames...
Processed 1200/1807 frames...
Processed 1300/1807 frames...
Processed 1400/1807 frames...
Processed 1500/1807 frames...
Processed 1600/1807 frames...
Processed 1700/1807 frames...
Processed 1800/1807 frames...

‚úÖ Video saved successfully: /content/proctored_exact_match.mp4


## üñ•Ô∏è 10b. Process Video with Live Preview (Fast Display)

Displays every frame with cached annotations for smooth playback.
Only processes every Nth frame through YOLO+ResNet. No video file saved.

In [None]:
import cv2
import numpy as np
import base64
from IPython.display import display, HTML
import time
import os

def process_video_smooth_preview(video_path, process_every_n=10, preview_width=480):
    """
    Process every Nth frame through YOLO+ResNet, but DISPLAY every frame
    with the last processed frame's labels applied to fresh pixels.
    """
    if not os.path.exists(video_path):
        print(f"‚ùå Video not found: {video_path}")
        return None

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = total_frames / fps if fps > 0 else 0
    preview_height = int(preview_width * height / width)
    frame_delay = 1.0 / fps if fps > 0 else 1.0 / 30.0

    print("=" * 60)
    print(f"üìπ Input: {width}x{height} @ {fps:.1f} FPS | {total_frames} frames | {duration:.1f}s")
    print(f"üìù Processing every {process_every_n} frame(s), displaying ALL frames")
    print("=" * 60)

    pipe = OptimizedPipeline(yolo_model, resnet_model, skeleton_gen)
    cached_annotations = []

    def draw_annotations(frame, annotations):
        for bbox, label, prob, color, track_id in annotations:
            x1, y1, x2, y2 = map(int, bbox)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            label_text = f"ID:{track_id} | {label} ({prob:.2f})"
            (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            cv2.rectangle(frame, (x1, y1 - 30), (x1 + tw + 10, y1), color, -1)
            cv2.putText(frame, label_text, (x1 + 5, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        return frame

    frame_num = 0
    processed_count = 0
    start_time = time.time()

    display_handle = display(HTML(""), display_id=True)

    while True:
        loop_start = time.perf_counter()

        ret, frame = cap.read()
        if not ret:
            break
        frame_num += 1

        if frame_num % process_every_n == 1 or process_every_n == 1:
            processed_frame, dets = pipe.process_frame(frame.copy())
            processed_count += 1

            cached_annotations = []
            for d in dets:
                track_id = d['student_id']
                label = d['label']
                prob = d['resnet_prob']
                if track_id in pipe.tracker.tracks:
                    bbox = pipe.tracker.tracks[track_id]['bbox']
                else:
                    bbox = None
                color = pipe.tracker.get_color(track_id, label)
                if bbox is not None:
                    cached_annotations.append((bbox, label, prob, color, track_id))

        display_frame = frame.copy()
        draw_annotations(display_frame, cached_annotations)

        info = f"Frame: {frame_num} | Students: {len(cached_annotations)} | Suspicious: {pipe.total_suspicious}"
        cv2.putText(display_frame, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        preview = cv2.resize(display_frame, (preview_width, preview_height))
        _, jpeg = cv2.imencode('.jpg', preview, [cv2.IMWRITE_JPEG_QUALITY, 70])
        b64 = base64.b64encode(jpeg.tobytes()).decode('utf-8')

        elapsed = time.time() - start_time
        display_fps = frame_num / elapsed if elapsed > 0 else 0
        progress = frame_num / total_frames * 100
        mins = int(elapsed) // 60
        secs = int(elapsed) % 60

        html = f"""
        <div style="font-family: monospace; background: #1a1a1a; padding: 10px; border-radius: 8px; display: inline-block;">
            <img src="data:image/jpeg;base64,{b64}" style="border-radius: 4px;"/>
            <div style="color: #00ff88; margin-top: 6px; font-size: 14px;">
                Frame: {frame_num}/{total_frames} |
                Progress: {progress:.1f}% |
                Output FPS: {display_fps:.1f} |
                Time: {mins:02d}:{secs:02d} |
                Students: {len(cached_annotations)} |
                Suspicious: {pipe.total_suspicious}
            </div>
        </div>
        """
        display_handle.update(HTML(html))

        elapsed_frame = time.perf_counter() - loop_start
        wait = frame_delay - elapsed_frame
        if wait > 0:
            time.sleep(wait)

    cap.release()

    total_time = time.time() - start_time
    final_display_fps = frame_num / total_time if total_time > 0 else 0
    total_mins = int(total_time) // 60
    total_secs = int(total_time) % 60

    display_handle.update(HTML(
        f'<div style="font-family:monospace; color:#00ff88; font-size:16px; padding:10px;">'
        f'‚úÖ DONE! {frame_num} frames | '
        f'Output: {final_display_fps:.1f} FPS | '
        f'Total Time: {total_mins:02d}:{total_secs:02d} | '
        f'Suspicious: {pipe.total_suspicious}</div>'
    ))

    pipe.print_timing_stats()

    return {
        'total_frames': frame_num,
        'processed_frames': processed_count,
        'total_time': total_time,
        'display_fps': final_display_fps,
        'suspicious_count': pipe.total_suspicious,
        'pipeline': pipe
    }

print("‚úÖ Smooth preview function ready!")

‚úÖ Smooth preview function ready!


## ‚ñ∂Ô∏è 11. Run Live Preview

In [None]:
# === LIVE PREVIEW (no file saved) ===
VIDEO_PATH = config.VIDEO_PATH

results = process_video_smooth_preview(
    video_path=VIDEO_PATH,
    process_every_n=10,
    preview_width=480
)

üìπ Input: 1280x720 @ 30.0 FPS | 1793 frames | 59.8s
üìù Processing every 10 frame(s), displaying ALL frames



‚è±Ô∏è  PIPELINE TIMING BREAKDOWN (averages)
   yolo        :    20.3 ms
   skeleton    :     0.6 ms
   resnet      :   241.9 ms
   tracking    :     0.2 ms
   total       :   264.0 ms
   FPS         :     3.8
   Skipped (invalid skeleton): 0


## üìä 14. Results Analysis

In [None]:
def generate_report(results):
    if not results:
        print("No results to analyze.")
        return

    print("="*60)
    print("üìä EXAM PROCTORING ANALYSIS REPORT")
    print("="*60)
    print(f"   Frames processed: {results['processed_frames']}/{results['total_frames']}")
    print(f"   Processing speed: {results.get('avg_fps', results.get('display_fps', 0)):.1f} FPS")
    print(f"   Total time: {results['total_time']:.1f}s")
    print(f"   Suspicious events: {results['suspicious_count']}")

    fps = results.get('avg_fps', results.get('display_fps', 0))
    if fps >= 30:
        print("\n   üü¢ REAL-TIME CAPABLE (‚â•30 FPS)")
    elif fps >= 15:
        print("\n   üü° NEAR REAL-TIME (15-30 FPS)")
    else:
        print(f"\n   üî¥ BELOW REAL-TIME ({fps:.0f} FPS)")
        recommended_skip = max(1, int(30 / fps)) if fps > 0 else 10
        print(f"   üí° Recommendation: use process_every_n={recommended_skip}")

    print("="*60)

if results:
    generate_report(results)

## üìä 14b. Full Pipeline Evaluation (Ground Truth Comparison)

Upload a `ground_truth.csv` to your Google Drive with format:
```
frame_start,frame_end,student_id,label
1,240,1,normal
241,450,1,suspicious
```

In [None]:
import csv
from sklearn.metrics import classification_report, confusion_matrix

def evaluate_pipeline_on_video(video_path, ground_truth_csv, process_every_n=1):
    """Compare pipeline predictions against manually annotated ground truth."""
    if not os.path.exists(video_path):
        print(f"‚ùå Video not found: {video_path}")
        return
    if not os.path.exists(ground_truth_csv):
        print(f"‚ùå Ground truth CSV not found: {ground_truth_csv}")
        return

    gt = {}
    with open(ground_truth_csv, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            start = int(row['frame_start'])
            end = int(row['frame_end'])
            sid = int(row['student_id'])
            label = row['label'].strip().lower()
            for frame in range(start, end + 1):
                gt[(frame, sid)] = label

    cap = cv2.VideoCapture(video_path)
    pipe = OptimizedPipeline(yolo_model, resnet_model, skeleton_gen)

    frame_num = 0
    y_true = []
    y_pred = []

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Evaluating {total_frames} frames...")

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_num += 1

        if frame_num % process_every_n == 1 or process_every_n == 1:
            _, dets = pipe.process_frame(frame)

            for d in dets:
                key = (frame_num, d['student_id'])
                if key in gt:
                    true_label = 1 if gt[key] == 'suspicious' else 0
                    pred_label = 1 if d['label'] == 'Suspicious' else 0
                    y_true.append(true_label)
                    y_pred.append(pred_label)

        if frame_num % 200 == 0:
            print(f"  Frame {frame_num}/{total_frames}...")

    cap.release()

    if not y_true:
        print("‚ùå No matching ground truth found. Check your student IDs match the tracker IDs.")
        return

    print("\n" + "=" * 60)
    print("üìä FULL PIPELINE EVALUATION RESULTS")
    print("=" * 60)
    print(f"   Total matched predictions: {len(y_true)}")
    print()
    print(classification_report(y_true, y_pred, target_names=["Normal", "Suspicious"]))
    print("Confusion Matrix:")
    print(confusion_matrix(y_true, y_pred))
    print("=" * 60)

# === USAGE ===
# 1. First run live preview to see which track IDs match which students
# 2. Create ground_truth.csv and upload to Google Drive
# 3. Uncomment and run:
#
# evaluate_pipeline_on_video(
#     video_path=config.VIDEO_PATH,
#     ground_truth_csv=os.path.join(MODEL_DIR, 'ground_truth.csv'),
#     process_every_n=1
# )

## üìñ 16. System Usage Guide

### Architecture (v2.0 ‚Äî Optimized):

| Component | Purpose | Speed Impact |
|-----------|---------|-------------|
| **YOLOv11s-Pose** | Detects students, extracts 13 keypoints | ~15-25ms |
| **IoU Tracker** | Tracks students across frames | <1ms |
| **Skeleton Generator** | Creates 224√ó224 skeleton images in memory | ~2-5ms |
| **ResNet50V2** | Classifies normal vs suspicious (batched) | ~5-15ms |
| **Temporal Voting** | Majority vote over last N predictions | <0.1ms |

### Output:
- üü© **Green Box**: Normal behavior
- üü• **Red Box**: Suspicious
- üü® **Yellow Box**: New student (not enough votes yet)
- **Label**: `ID:X | Label (ResNet_probability)`

### Tips:
1. Run benchmark first to know your actual FPS
2. Use `process_every_n=1` if FPS ‚â• 30, otherwise increase N
3. Output video always plays at original FPS regardless of processing speed
4. For live preview use Cell under "11. Run Live Preview"
5. For saved video use Cell under "12. Save Processed Video"

---
## ‚úÖ System Ready! üéì