# Real-time Proctoring System

This notebook implements a real-time proctoring system using computer vision to monitor user attention and detect distractions.

In [4]:
# Install required packages
%pip install streamlit opencv-python-headless mediapipe numpy plotly ipywidgets

Note: you may need to restart the kernel to use updated packages.


In [1]:
import streamlit as st
import cv2
import mediapipe as mp
import numpy as np
import time
from datetime import datetime
import plotly.graph_objects as go
import logging
from typing import Optional, Tuple
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [2]:
class ProctorSystem:
    def __init__(self):
        self.setup_mediapipe()
        self.reset_metrics()
        
    def setup_mediapipe(self):
        """Initialize MediaPipe components with error handling"""
        try:
            self.mp_face_mesh = mp.solutions.face_mesh
            self.mp_hands = mp.solutions.hands
            self.mp_drawing = mp.solutions.drawing_utils
            
            self.face_mesh = self.mp_face_mesh.FaceMesh(
                max_num_faces=1,
                refine_landmarks=True,
                min_detection_confidence=0.5,
                min_tracking_confidence=0.5
            )
            
            self.hands = self.mp_hands.Hands(
                max_num_hands=2,
                min_detection_confidence=0.5,
                min_tracking_confidence=0.5
            )
        except Exception as e:
            logger.error(f"Failed to initialize MediaPipe: {str(e)}")
            raise

    def reset_metrics(self):
        """Reset all tracking metrics"""
        self.focus_time = 0.0
        self.distraction_time = 0.0
        self.last_update = time.time()
        self.face_detected = False
        self.looking_away = False
        self.phone_detected = False
        self.events = []
        self.last_face_time = time.time()

    def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, dict]:
        """Process a single frame and return the annotated frame and metrics"""
        if frame is None:
            raise ValueError("Invalid frame received")

        # Reset status flags
        self.face_detected = False
        self.looking_away = False
        self.phone_detected = False
        
        try:
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Process with MediaPipe
            face_results = self.face_mesh.process(frame_rgb)
            hands_results = self.hands.process(frame_rgb)
            
            # Process face detection
            if face_results.multi_face_landmarks:
                self.face_detected = True
                self.last_face_time = time.time()
                
                # Draw face landmarks
                for face_landmarks in face_results.multi_face_landmarks:
                    self.mp_drawing.draw_landmarks(
                        image=frame,
                        landmark_list=face_landmarks,
                        connections=self.mp_face_mesh.FACEMESH_TESSELATION,
                        landmark_drawing_spec=None,
                        connection_drawing_spec=self.mp_drawing.DrawingSpec(
                            color=(0, 255, 0), thickness=1, circle_radius=1
                        )
                    )
                    
                    # Check head pose
                    nose_tip = face_landmarks.landmark[4]
                    if abs(nose_tip.x - 0.5) > 0.2:
                        self.looking_away = True
            
            # Process hand detection
            if hands_results.multi_hand_landmarks:
                for hand_landmarks in hands_results.multi_hand_landmarks:
                    self.mp_drawing.draw_landmarks(
                        frame,
                        hand_landmarks,
                        self.mp_hands.HAND_CONNECTIONS
                    )
                    
                    # Check if hands are near face
                    if face_results.multi_face_landmarks:
                        wrist = hand_landmarks.landmark[0]
                        nose = face_results.multi_face_landmarks[0].landmark[4]
                        if abs(wrist.y - nose.y) < 0.2:
                            self.phone_detected = True
            
            # Update metrics
            current_time = time.time()
            time_diff = current_time - self.last_update
            
            if self.face_detected and not (self.looking_away or self.phone_detected):
                self.focus_time += time_diff
                event_type = "Focused"
            else:
                self.distraction_time += time_diff
                event_type = "Distracted"
            
            self.events.append((event_type, datetime.now()))
            self.last_update = current_time
            
            # Keep only last 100 events
            if len(self.events) > 100:
                self.events.pop(0)
            
            return frame, self.get_metrics()
            
        except Exception as e:
            logger.error(f"Error processing frame: {str(e)}")
            raise

    def get_metrics(self) -> dict:
        """Return current metrics"""
        total_time = max(self.focus_time + self.distraction_time, 0.001)
        return {
            "focus_time": self.focus_time,
            "distraction_time": self.distraction_time,
            "focus_percentage": (self.focus_time / total_time) * 100,
            "face_detected": self.face_detected,
            "looking_away": self.looking_away,
            "phone_detected": self.phone_detected,
            "events": self.events
        }

In [3]:
def create_focus_chart(events):
    """Create focus timeline chart"""
    if not events:
        return None
    
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=[t for _, t in events],
        y=[1 if e == "Focused" else 0 for e, _ in events],
        mode='lines',
        name='Focus Status'
    ))
    
    fig.update_layout(
        title="Focus Timeline",
        yaxis=dict(
            ticktext=["Distracted", "Focused"],
            tickvals=[0, 1],
            range=[-0.1, 1.1]
        ),
        height=200,
        margin=dict(l=0, r=0, t=30, b=0)
    )
    
    return fig

In [4]:
def run_proctoring_system():
    # Initialize widgets with better styling
    header = widgets.HTML(
        value='<h1 style="text-align:center;color:#2962FF">AI Proctoring System</h1>'
    )
    
    # Control panel
    control_panel = widgets.HBox([
        widgets.Button(description='Start', button_style='success', icon='play'),
        widgets.Button(description='Stop', button_style='danger', icon='stop'),
        widgets.Button(description='Reset Stats', button_style='warning', icon='refresh'),
        widgets.ToggleButton(description='Record Session', icon='video-camera')
    ])
    
    # Status indicators
    status_panel = widgets.GridBox([
        widgets.HTML(value='<h3>Face Detection</h3>'),
        widgets.HTML(value='<h3>Focus Status</h3>'),
        widgets.HTML(value='<h3>Phone Detection</h3>'),
        widgets.HTML(value='<h3>Eye Tracking</h3>')
    ], layout=widgets.Layout(grid_template_columns='repeat(4, 1fr)', grid_gap='20px'))
    
    # Metrics dashboard
    metrics_panel = widgets.VBox([
        widgets.HTML(value='<h2>Session Metrics</h2>'),
        widgets.GridBox([
            widgets.FloatProgress(description='Focus %', max=100),
            widgets.FloatProgress(description='Attention %', max=100),
            widgets.Label(value='Focus Time: 0s'),
            widgets.Label(value='Distraction Time: 0s')
        ], layout=widgets.Layout(grid_template_columns='repeat(2, 1fr)', grid_gap='10px'))
    ])
    
    # Video feed
    video_widget = widgets.Image(format='jpeg', width=640, height=480)
    
    # Initialize plots
    focus_fig = go.FigureWidget()
    focus_fig.add_scatter(name='Focus Level')
    focus_fig.update_layout(
        title='Real-time Focus Tracking',
        xaxis_title='Time',
        yaxis_title='Focus Level',
        yaxis_range=[0, 1]
    )
    
    # Layout everything
    display(widgets.VBox([
        header,
        control_panel,
        widgets.HBox([
            video_widget,
            widgets.VBox([status_panel, metrics_panel])
        ]),
        focus_fig
    ]))
    
    # Initialize camera and proctor
    cap = cv2.VideoCapture(0)
    proctor = ProctorSystem()
    running = False
    recording = False
    start_time = time.time()
    focus_data = {'times': [], 'values': []}
    
    def update_ui(metrics):
        # Update status indicators with animations
        status_panel.children[0].value = create_status_badge(
            "Face Detection", metrics["face_detected"])
        status_panel.children[1].value = create_status_badge(
            "Focus Status", not metrics["looking_away"])
        status_panel.children[2].value = create_status_badge(
            "Phone Detection", not metrics["phone_detected"])
        status_panel.children[3].value = create_status_badge(
            "Eye Tracking", metrics["face_detected"] and not metrics["looking_away"])
        
        # Update metrics
        metrics_panel.children[1].children[0].value = metrics["focus_percentage"]
        metrics_panel.children[1].children[1].value = calculate_attention_score(metrics)
        metrics_panel.children[1].children[2].value = f'Focus Time: {metrics["focus_time"]:.1f}s'
        metrics_panel.children[1].children[3].value = f'Distraction Time: {metrics["distraction_time"]:.1f}s'
        
        # Update focus plot
        current_time = time.time() - start_time
        focus_data['times'].append(current_time)
        focus_data['values'].append(1 if metrics["face_detected"] and not (metrics["looking_away"] or metrics["phone_detected"]) else 0)
        
        # Keep only last 60 seconds of data
        if len(focus_data['times']) > 600:  # 10 minutes at 0.1s intervals
            focus_data['times'].pop(0)
            focus_data['values'].pop(0)
        
        focus_fig.data[0].x = focus_data['times']
        focus_fig.data[0].y = focus_data['values']
    
    def calculate_attention_score(metrics):
        # Weighted scoring system
        weights = {
            'face_detected': 0.4,
            'looking_away': 0.3,
            'phone_detected': 0.3
        }
        score = (
            weights['face_detected'] * (1 if metrics['face_detected'] else 0) +
            weights['looking_away'] * (0 if metrics['looking_away'] else 1) +
            weights['phone_detected'] * (0 if metrics['phone_detected'] else 1)
        ) * 100
        return score
    
    def create_status_badge(label, status):
        color = '#4CAF50' if status else '#F44336'
        icon = '✓' if status else '✗'
        return f'''
        <div style="
            padding: 10px;
            border-radius: 5px;
            background: {color};
            color: white;
            text-align: center;
            margin: 5px;
        ">
            <h4>{label}</h4>
            <span style="font-size: 24px">{icon}</span>
        </div>
        '''
    
    def on_start_button_clicked(b):
        nonlocal running, start_time
        running = True
        start_time = time.time()
        focus_data['times'].clear()
        focus_data['values'].clear()
    
    def on_stop_button_clicked(b):
        nonlocal running
        running = False
    
    def on_reset_button_clicked(b):
        proctor.reset_metrics()
        focus_data['times'].clear()
        focus_data['values'].clear()
    
    def on_record_button_clicked(b):
        nonlocal recording
        recording = b.value
    
    # Connect button callbacks
    control_panel.children[0].on_click(on_start_button_clicked)
    control_panel.children[1].on_click(on_stop_button_clicked)
    control_panel.children[2].on_click(on_reset_button_clicked)
    control_panel.children[3].observe(on_record_button_clicked, 'value')
    
    video_writer = None
    
    try:
        while True:
            if not running:
                time.sleep(0.1)
                continue
                
            ret, frame = cap.read()
            if not ret:
                print("Error: Failed to read frame")
                break
            
            # Process frame
            frame, metrics = proctor.process_frame(frame)
            
            # Record video if enabled
            if recording:
                if video_writer is None:
                    fourcc = cv2.VideoWriter_fourcc(*'XVID')
                    video_writer = cv2.VideoWriter(
                        f'proctoring_session_{int(time.time())}.avi',
                        fourcc, 10.0, (frame.shape[1], frame.shape[0])
                    )
                video_writer.write(frame)
            
            # Update display
            _, jpeg = cv2.imencode('.jpg', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            video_widget.value = jpeg.tobytes()
            
            # Update UI elements
            update_ui(metrics)
            
            time.sleep(0.1)  # Prevent excessive CPU usage
            
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        logger.error(f"Runtime error: {str(e)}", exc_info=True)
        
    finally:
        cap.release()
        if video_writer is not None:
            video_writer.release()


## Run the Proctoring System

Click the cell below and run it to start the proctoring system. Click the 'Stop' button to end the session.

In [None]:
# Install anywidget if not already installed
# %pip install anywidget

# Run the system
run_proctoring_system()