In [1]:
!pip install gradio sqlite3 opencv-python numpy pandas json pillow

[31mERROR: Could not find a version that satisfies the requirement sqlite3 (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for sqlite3[0m[31m
[0m

In [None]:
# -*- coding: utf-8 -*-

"""AIOnlineExamInvigilator.ipynb

Automatically generated by Colab.

Original file is located at
https://colab.research.google.com/drive/1HZNI1hZXpvm83fMiCBEP0zBBEddjxrhh

"""

import gradio as gr
import cv2
import numpy as np
import pandas as pd
import json
import sqlite3
import hashlib
import time
from datetime import datetime, timedelta
import threading
import queue
import base64
from PIL import Image
import io
import random
import string

class ExamDatabase:
    def __init__(self):
        self.conn = sqlite3.connect('exam_platform.db', check_same_thread=False)
        self.create_tables()

    def create_tables(self):
        cursor = self.conn.cursor()

        # Users table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT UNIQUE NOT NULL,
                password_hash TEXT NOT NULL,
                role TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

        # Exams table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS exams (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                description TEXT,
                duration_minutes INTEGER NOT NULL,
                questions TEXT NOT NULL,
                created_by INTEGER,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (created_by) REFERENCES users (id)
            )
        """)

        # Exam sessions table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS exam_sessions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                exam_id INTEGER,
                user_id INTEGER,
                start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                end_time TIMESTAMP,
                answers TEXT,
                cheating_flags TEXT,
                behavior_score REAL DEFAULT 100.0,
                is_completed BOOLEAN DEFAULT FALSE,
                FOREIGN KEY (exam_id) REFERENCES exams (id),
                FOREIGN KEY (user_id) REFERENCES users (id)
            )
        """)

        # Behavior logs table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS behavior_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id INTEGER,
                timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                event_type TEXT,
                severity REAL,
                details TEXT,
                FOREIGN KEY (session_id) REFERENCES exam_sessions (id)
            )
        """)

        self.conn.commit()

    def create_user(self, username, password, role='student'):
        cursor = self.conn.cursor()
        password_hash = hashlib.sha256(password.encode()).hexdigest()
        try:
            cursor.execute(
                "INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
                (username, password_hash, role)
            )
            self.conn.commit()
            return cursor.lastrowid
        except sqlite3.IntegrityError:
            return None

    def verify_user(self, username, password):
        cursor = self.conn.cursor()
        password_hash = hashlib.sha256(password.encode()).hexdigest()
        cursor.execute(
            "SELECT id, role FROM users WHERE username = ? AND password_hash = ?",
            (username, password_hash)
        )
        result = cursor.fetchone()
        return result if result else None


class BehaviorAnalyzer:
    def __init__(self):
        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')

        # Behavior tracking variables
        self.face_absent_count = 0
        self.looking_away_count = 0
        self.multiple_faces_count = 0
        self.suspicious_movements = 0

        # Thresholds
        self.face_absent_threshold = 30
        self.looking_away_threshold = 45
        self.multiple_faces_threshold = 10
        self.movement_threshold = 50

    def analyze_frame(self, frame):
        """Analyze a single frame for suspicious behavior"""
        if frame is None:
            return {"cheating_detected": True, "reason": "No camera feed", "severity": 10}

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)

        behavior_flags = []
        severity_score = 0

        # Check for face presence
        if len(faces) == 0:
            self.face_absent_count += 1
            if self.face_absent_count > self.face_absent_threshold:
                behavior_flags.append("Face not detected for extended period")
                severity_score += 8
        else:
            self.face_absent_count = 0

        # Check for multiple faces
        if len(faces) > 1:
            self.multiple_faces_count += 1
            if self.multiple_faces_count > self.multiple_faces_threshold:
                behavior_flags.append("Multiple faces detected")
                severity_score += 9
        else:
            self.multiple_faces_count = max(0, self.multiple_faces_count - 1)

        # Analyze eye movement and face orientation
        for (x, y, w, h) in faces:
            roi_gray = gray[y:y+h, x:x+w]
            eyes = self.eye_cascade.detectMultiScale(roi_gray)

            # Check if looking away (simplified heuristic)
            if len(eyes) < 2:
                self.looking_away_count += 1
                if self.looking_away_count > self.looking_away_threshold:
                    behavior_flags.append("Frequently looking away from screen")
                    severity_score += 6
            else:
                self.looking_away_count = max(0, self.looking_away_count - 1)

        return {
            "cheating_detected": severity_score > 5,
            "behavior_flags": behavior_flags,
            "severity": severity_score,
            "faces_detected": len(faces)
        }


class SecureExamPlatform:
    def __init__(self):
        self.db = ExamDatabase()
        self.behavior_analyzer = BehaviorAnalyzer()
        self.current_user = None
        self.current_session = None
        self.exam_active = False
        self.behavior_queue = queue.Queue()
        self.monitoring_thread = None

        # Create default admin user
        self.db.create_user("admin", "admin123", "admin")

    def login(self, username, password):
        """User authentication"""
        user = self.db.verify_user(username, password)
        if user:
            self.current_user = {"id": user[0], "role": user[1], "username": username}
            return f"Welcome, {username}! Role: {user[1]}", True
        return "Invalid credentials", False

    def register_user(self, username, password, role="student"):
        """Register new user"""
        user_id = self.db.create_user(username, password, role)
        if user_id:
            return f"User {username} registered successfully!"
        return "Registration failed. Username might already exist."

    def create_exam(self, title, description, duration, questions_json):
        """Create a new exam (admin/teacher only)"""
        if not self.current_user or self.current_user["role"] not in ["admin", "teacher"]:
            return "Access denied. Only admins and teachers can create exams."

        try:
            questions = json.loads(questions_json)
            cursor = self.db.conn.cursor()
            cursor.execute(
                "INSERT INTO exams (title, description, duration_minutes, questions, created_by) VALUES (?, ?, ?, ?, ?)",
                (title, description, int(duration), questions_json, self.current_user["id"])
            )
            self.db.conn.commit()
            return f"Exam '{title}' created successfully!"
        except Exception as e:
            return f"Error creating exam: {str(e)}"

    def get_available_exams(self):
        """Get list of available exams"""
        cursor = self.db.conn.cursor()
        cursor.execute("SELECT id, title, description, duration_minutes FROM exams")
        exams = cursor.fetchall()

        if not exams:
            return "No exams available."

        exam_list = "Available Exams:\n\n"
        for exam in exams:
            exam_list += f"ID: {exam[0]}\nTitle: {exam[1]}\nDescription: {exam[2]}\nDuration: {exam[3]} minutes\n\n"

        return exam_list

    def start_exam(self, exam_id):
        """Start an exam session"""
        if not self.current_user:
            return "Please login first.", None

        if self.exam_active:
            return "An exam is already in progress.", None

        try:
            cursor = self.db.conn.cursor()
            cursor.execute("SELECT title, questions, duration_minutes FROM exams WHERE id = ?", (int(exam_id),))
            exam = cursor.fetchone()

            if not exam:
                return "Exam not found.", None

            # Create exam session
            cursor.execute(
                "INSERT INTO exam_sessions (exam_id, user_id) VALUES (?, ?)",
                (int(exam_id), self.current_user["id"])
            )
            self.db.conn.commit()

            self.current_session = cursor.lastrowid
            self.exam_active = True

            # Parse questions
            questions = json.loads(exam[1])

            # Start behavior monitoring
            self.start_behavior_monitoring()

            return f"Exam '{exam[0]}' started! Duration: {exam[2]} minutes", questions

        except Exception as e:
            return f"Error starting exam: {str(e)}", None

    def start_behavior_monitoring(self):
        """Start the behavior monitoring thread"""
        if self.monitoring_thread is None or not self.monitoring_thread.is_alive():
            self.monitoring_thread = threading.Thread(target=self.monitor_behavior, daemon=True)
            self.monitoring_thread.start()

    def monitor_behavior(self):
        """Monitor student behavior during exam"""
        cap = cv2.VideoCapture(0)

        while self.exam_active:
            ret, frame = cap.read()
            if ret:
                analysis = self.behavior_analyzer.analyze_frame(frame)

                if analysis["cheating_detected"]:
                    # Log suspicious behavior
                    cursor = self.db.conn.cursor()
                    cursor.execute(
                        "INSERT INTO behavior_logs (session_id, event_type, severity, details) VALUES (?, ?, ?, ?)",
                        (self.current_session, "suspicious_behavior", analysis["severity"], json.dumps(analysis))
                    )
                    self.db.conn.commit()

                # Put analysis in queue for UI updates
                self.behavior_queue.put(analysis)

            time.sleep(1)  # Check every second

        cap.release()

    def submit_exam(self, answers_json):
        """Submit exam answers"""
        if not self.exam_active or not self.current_session:
            return "No active exam session."

        try:
            cursor = self.db.conn.cursor()
            cursor.execute(
                "UPDATE exam_sessions SET end_time = CURRENT_TIMESTAMP, answers = ?, is_completed = TRUE WHERE id = ?",
                (answers_json, self.current_session)
            )
            self.db.conn.commit()

            self.exam_active = False
            behavior_score = self.calculate_behavior_score()

            cursor.execute(
                "UPDATE exam_sessions SET behavior_score = ? WHERE id = ?",
                (behavior_score, self.current_session)
            )
            self.db.conn.commit()

            return f"Exam submitted successfully! Behavior Score: {behavior_score:.1f}/100"

        except Exception as e:
            return f"Error submitting exam: {str(e)}"

    def calculate_behavior_score(self):
        """Calculate behavior score based on logged events"""
        cursor = self.db.conn.cursor()
        cursor.execute(
            "SELECT AVG(severity) FROM behavior_logs WHERE session_id = ?",
            (self.current_session,)
        )
        result = cursor.fetchone()

        if result[0]:
            avg_severity = result[0]
            behavior_score = max(0, 100 - (avg_severity * 5))
        else:
            behavior_score = 100

        return behavior_score

    def get_behavior_status(self):
        """Get current behavior monitoring status"""
        if not self.exam_active:
            return "No active exam session"

        try:
            analysis = self.behavior_queue.get_nowait()
            status = f"Behavior Monitoring Status:\n"
            status += f"Faces Detected: {analysis['faces_detected']}\n"
            status += f"Cheating Detected: {'Yes' if analysis['cheating_detected'] else 'No'}\n"
            status += f"Severity Score: {analysis['severity']}\n"

            if analysis.get('behavior_flags'):
                status += f"Flags: {', '.join(analysis['behavior_flags'])}\n"

            return status
        except queue.Empty:
            return "Behavior monitoring active - No recent alerts"

    def get_exam_reports(self):
        """Get exam reports (admin only)"""
        if not self.current_user or self.current_user["role"] != "admin":
            return "Access denied. Admin only."

        cursor = self.db.conn.cursor()
        cursor.execute("""
            SELECT
                u.username,
                e.title,
                es.start_time,
                es.end_time,
                es.behavior_score,
                es.is_completed
            FROM exam_sessions es
            JOIN users u ON es.user_id = u.id
            JOIN exams e ON es.exam_id = e.id
            ORDER BY es.start_time DESC
        """)

        results = cursor.fetchall()

        if not results:
            return "No exam sessions found."

        report = "Exam Sessions Report:\n\n"
        for row in results:
            report += f"Student: {row[0]}\n"
            report += f"Exam: {row[1]}\n"
            report += f"Start Time: {row[2]}\n"
            report += f"End Time: {row[3] if row[3] else 'In Progress'}\n"
            report += f"Behavior Score: {row[4]:.1f}/100\n"
            report += f"Completed: {'Yes' if row[5] else 'No'}\n"
            report += "-" * 40 + "\n"

        return report


# Initialize the platform
platform = SecureExamPlatform()

# Sample exam data
sample_exam = {
    "questions": [
        {
            "id": 1,
            "question": "What is the capital of France?",
            "options": ["London", "Berlin", "Paris", "Madrid"],
            "correct": 2
        },
        {
            "id": 2,
            "question": "Which programming language is known for AI development?",
            "options": ["HTML", "Python", "CSS", "SQL"],
            "correct": 1
        },
        {
            "id": 3,
            "question": "What does CPU stand for?",
            "options": ["Central Processing Unit", "Computer Personal Unit", "Central Program Unit", "Computer Processing Unit"],
            "correct": 0
        }
    ]
}

# Gradio Interface Functions
def login_interface(username, password):
    message, success = platform.login(username, password)
    return message

def register_interface(username, password, role):
    return platform.register_user(username, password, role)

def create_exam_interface(title, description, duration, questions):
    return platform.create_exam(title, description, duration, questions)

def view_exams_interface():
    return platform.get_available_exams()

def start_exam_interface(exam_id):
    message, questions = platform.start_exam(exam_id)
    if questions:
        # Format questions for display
        formatted_questions = "EXAM QUESTIONS:\n\n"
        for i, q in enumerate(questions, 1):
            formatted_questions += f"Question {i}: {q['question']}\n"
            for j, option in enumerate(q['options']):
                formatted_questions += f"  {chr(65+j)}) {option}\n"
            formatted_questions += "\n"
        return message + "\n\n" + formatted_questions
    return message

def submit_exam_interface(answers):
    return platform.submit_exam(answers)

def behavior_status_interface():
    return platform.get_behavior_status()

def exam_reports_interface():
    return platform.get_exam_reports()

# Create Gradio Interface
with gr.Blocks(title="AI-Powered Secure Online Exam Platform") as app:
    gr.Markdown("# 🎓 AI-Powered Secure Online Exam Platform")
    gr.Markdown("### Advanced behavior analysis and cheating detection system")

    with gr.Tabs():
        # Authentication Tab
        with gr.Tab("🔐 Authentication"):
            gr.Markdown("## Login / Register")

            with gr.Row():
                with gr.Column():
                    gr.Markdown("### Login")
                    login_username = gr.Textbox(label="Username", placeholder="Enter username")
                    login_password = gr.Textbox(label="Password", type="password", placeholder="Enter password")
                    login_btn = gr.Button("Login", variant="primary")
                    login_output = gr.Textbox(label="Login Status", interactive=False)

                with gr.Column():
                    gr.Markdown("### Register New User")
                    reg_username = gr.Textbox(label="Username", placeholder="Choose username")
                    reg_password = gr.Textbox(label="Password", type="password", placeholder="Choose password")
                    reg_role = gr.Dropdown(choices=["student", "teacher", "admin"], value="student", label="Role")
                    register_btn = gr.Button("Register", variant="secondary")
                    register_output = gr.Textbox(label="Registration Status", interactive=False)

        # Exam Management Tab
        with gr.Tab("📝 Exam Management"):
            with gr.Row():
                with gr.Column():
                    gr.Markdown("### Create New Exam (Admin/Teacher)")
                    exam_title = gr.Textbox(label="Exam Title", placeholder="Enter exam title")
                    exam_desc = gr.Textbox(label="Description", placeholder="Enter exam description")
                    exam_duration = gr.Number(label="Duration (minutes)", value=60)
                    exam_questions = gr.Textbox(
                        label="Questions (JSON format)",
                        placeholder=json.dumps(sample_exam, indent=2),
                        lines=10
                    )
                    create_exam_btn = gr.Button("Create Exam", variant="primary")
                    create_exam_output = gr.Textbox(label="Creation Status", interactive=False)

                with gr.Column():
                    gr.Markdown("### Available Exams")
                    view_exams_btn = gr.Button("Refresh Exam List", variant="secondary")
                    exams_display = gr.Textbox(label="Available Exams", lines=15, interactive=False)

        # Take Exam Tab
        with gr.Tab("✍️ Take Exam"):
            gr.Markdown("### Start Exam Session")
            gr.Markdown("⚠️ **Warning**: This exam session will be monitored for suspicious behavior using AI")

            exam_id_input = gr.Number(label="Exam ID", value=1)
            start_exam_btn = gr.Button("Start Exam", variant="primary")
            exam_display = gr.Textbox(label="Exam Content", lines=20, interactive=False)

            gr.Markdown("### Submit Answers")
            gr.Markdown("Format your answers as JSON: `{\"1\": 2, \"2\": 1, \"3\": 0}` (question_id: answer_index)")

            answers_input = gr.Textbox(
                label="Answers (JSON format)",
                placeholder='{"1": 2, "2": 1, "3": 0}',
                lines=3
            )
            submit_exam_btn = gr.Button("Submit Exam", variant="primary")
            submit_output = gr.Textbox(label="Submission Status", interactive=False)

        # Behavior Monitoring Tab
        with gr.Tab("👁️ Behavior Monitoring"):
            gr.Markdown("### Real-time Behavior Analysis")
            gr.Markdown("This system monitors for:")
            gr.Markdown("- Face detection and presence")
            gr.Markdown("- Multiple faces (indicating help from others)")
            gr.Markdown("- Looking away from screen frequently")
            gr.Markdown("- Suspicious movements")

            behavior_btn = gr.Button("Check Behavior Status", variant="secondary")
            behavior_output = gr.Textbox(label="Behavior Status", lines=10, interactive=False)

            # Auto-refresh behavior status every 5 seconds when exam is active
            behavior_timer = gr.Timer(5)
            behavior_timer.tick(behavior_status_interface, outputs=[behavior_output])

        # Reports Tab
        with gr.Tab("📊 Reports & Analytics"):
            gr.Markdown("### Exam Session Reports (Admin Only)")
            reports_btn = gr.Button("Generate Reports", variant="primary")
            reports_output = gr.Textbox(label="Exam Reports", lines=20, interactive=False)

        # Help Tab
        with gr.Tab("❓ Help & Instructions"):
            gr.Markdown("""
## How to Use the Platform

### For Students:
1. **Login** with your credentials or register a new account
2. **View available exams** in the Exam Management tab
3. **Start an exam** by entering the exam ID
4. **Answer questions** and submit in JSON format
5. **Monitor your behavior** - the AI system tracks suspicious activities

### For Teachers/Admins:
1. **Create exams** using JSON format for questions
2. **Monitor student sessions** in real-time
3. **View detailed reports** of all exam sessions
4. **Check behavior scores** and cheating flags

### Sample Question Format:
```json
{
    "questions": [
        {
            "id": 1,
            "question": "Your question here?",
            "options": ["Option A", "Option B", "Option C", "Option D"],
            "correct": 2
        }
    ]
}
```

### Answer Format:
```json
{"1": 2, "2": 1, "3": 0}
```
(question_id: selected_option_index)

### Security Features:
- 🔒 User authentication and role-based access
- 📹 Real-time webcam monitoring
- 🤖 AI-powered behavior analysis
- 📊 Comprehensive cheating detection
- 📈 Behavior scoring system
- 📋 Detailed session reports
""")

    # Event handlers
    login_btn.click(login_interface, inputs=[login_username, login_password], outputs=[login_output])
    register_btn.click(register_interface, inputs=[reg_username, reg_password, reg_role], outputs=[register_output])
    create_exam_btn.click(create_exam_interface, inputs=[exam_title, exam_desc, exam_duration, exam_questions], outputs=[create_exam_output])
    view_exams_btn.click(view_exams_interface, outputs=[exams_display])
    start_exam_btn.click(start_exam_interface, inputs=[exam_id_input], outputs=[exam_display])
    submit_exam_btn.click(submit_exam_interface, inputs=[answers_input], outputs=[submit_output])
    behavior_btn.click(behavior_status_interface, outputs=[behavior_output])
    reports_btn.click(exam_reports_interface, outputs=[reports_output])

# Launch the application
if __name__ == "__main__":
    # Create sample exam
    platform.create_exam(
        "Sample Programming Quiz",
        "A basic quiz covering programming fundamentals",
        30,
        json.dumps(sample_exam)
    )

    print("🎓 AI-Powered Secure Online Exam Platform")
    print("=" * 50)
    print("Default Admin Login:")
    print("Username: admin")
    print("Password: admin123")
    print("=" * 50)

    app.launch(
        server_name="0.0.0.0",
        server_port=7860,
        share=True,
        debug=True
    )


🎓 AI-Powered Secure Online Exam Platform
Default Admin Login:
Username: admin
Password: admin123
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://1b16d0931256ec3fe4.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


In [4]:
{"1": 2, "2": 1, "3": 0}

{'1': 2, '2': 1, '3': 0}