In [5]:
!pip -q install --upgrade mediapipe opencv-python numpy



[notice] A new release of pip is available: 25.3 -> 26.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
import urllib.request
from pathlib import Path

url = "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task"
out = Path("face_landmarker.task")

urllib.request.urlretrieve(url, out)
print("Downloaded to:", out.resolve(), "size:", out.stat().st_size, "bytes")


Downloaded to: D:\Scripts\Python Scripts\AI Interview Cheating Detector\face_landmarker.task size: 3758596 bytes


In [7]:
import time
import cv2
import numpy as np
import mediapipe as mp
from pathlib import Path

from mediapipe.tasks import python
from mediapipe.tasks.python import vision


In [8]:
# ---------- Landmark drawing (fast & simple) ----------
def draw_landmarks(frame_bgr, face_landmarks, step=6):
    """
    Draw a subset of landmarks for speed.
    step=6 means draw every 6th landmark.
    """
    h, w = frame_bgr.shape[:2]
    for i in range(0, len(face_landmarks), step):
        lm = face_landmarks[i]
        x, y = int(lm.x * w), int(lm.y * h)
        cv2.circle(frame_bgr, (x, y), 1, (0, 255, 0), -1)

# ---------- Eye Aspect Ratio (EAR) helpers ----------
def _pt(lm, w, h):
    return np.array([lm.x * w, lm.y * h], dtype=np.float32)

def eye_ear(lms, idx, w, h):
    """
    EAR = (||p2-p6|| + ||p3-p5||) / (2*||p1-p4||)
    idx: 6 landmark indices around the eye [p1,p2,p3,p4,p5,p6]
    """
    p1, p2, p3, p4, p5, p6 = [_pt(lms[i], w, h) for i in idx]
    v1 = np.linalg.norm(p2 - p6)
    v2 = np.linalg.norm(p3 - p5)
    hdist = np.linalg.norm(p1 - p4)
    if hdist < 1e-6:
        return 0.0
    return float((v1 + v2) / (2.0 * hdist))


In [9]:
MODEL_PATH = "face_landmarker.task"
if not Path(MODEL_PATH).exists():
    raise FileNotFoundError(f"Model file not found: {MODEL_PATH}. Run the download cell first.")

latest_result = {"res": None}

def _callback(result: vision.FaceLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
    latest_result["res"] = (result, timestamp_ms)

base_options = python.BaseOptions(model_asset_path=MODEL_PATH)

options = vision.FaceLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.LIVE_STREAM,
    num_faces=1,
    output_face_blendshapes=False,
    output_facial_transformation_matrixes=False,
    result_callback=_callback
)

landmarker = vision.FaceLandmarker.create_from_options(options)
print("FaceLandmarker created")


FaceLandmarker created


In [10]:
# ------------------- CONFIG -------------------
MAX_RUNTIME_SEC = 25.0

# Face disappearance timer
MISSING_THRESHOLD_SEC = 2.0
missing_start = None
max_missing = 0.0

# Eye closure detection (EAR)
LEFT_EYE_EAR_IDX  = [33, 160, 158, 133, 153, 144]
RIGHT_EYE_EAR_IDX = [263, 387, 385, 362, 380, 373]

EAR_CLOSED_THR = 0.18        # tune per camera/face
EYE_CLOSED_MIN_SEC = 0.25    # persistence to avoid blink false positives
eye_closed_start = {"L": None, "R": None}

# "Looking away" proxy thresholds
DX_THR = 0.03   # left/right
DY_THR = 0.05   # down

# ------------------- RUN -------------------
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    landmarker.close()
    raise RuntimeError("Could not open webcam.")

t0 = cv2.getTickCount()
freq = cv2.getTickFrequency()
start_time = time.time()

def persistent_closed(is_closed, key, now):
    if is_closed:
        if eye_closed_start[key] is None:
            eye_closed_start[key] = now
        return (now - eye_closed_start[key]) >= EYE_CLOSED_MIN_SEC
    else:
        eye_closed_start[key] = None
        return False

try:
    while True:
        # Auto-stop after MAX_RUNTIME_SEC
        if time.time() - start_time >= MAX_RUNTIME_SEC:
            print(f"Auto-stopping after {MAX_RUNTIME_SEC} seconds ✅")
            break

        ok, frame = cap.read()
        if not ok:
            break

        h, w = frame.shape[:2]

        # timestamp in ms (monotonic-ish)
        t = cv2.getTickCount()
        timestamp_ms = int(((t - t0) / freq) * 1000)

        # Convert OpenCV BGR -> RGB and wrap as mp.Image
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)

        # Async detect (results come via callback)
        landmarker.detect_async(mp_image, timestamp_ms)

        # Defaults
        face_detected = False
        status = "NO FACE"
        color = (0, 0, 255)

        # Read latest result (may lag by a frame)
        res_pack = latest_result["res"]
        if res_pack is not None:
            result, _ts = res_pack
            if result.face_landmarks and len(result.face_landmarks) > 0:
                face_detected = True
                status = "FACE"
                color = (0, 255, 0)

                # Landmarks for first face
                lms = result.face_landmarks[0]
                draw_landmarks(frame, lms, step=6)

                # --- Looking-away proxy (dx/dy) ---
                try:
                    nose = lms[1]
                    left_eye = lms[133]
                    right_eye = lms[362]

                    eye_mid_x = (left_eye.x + right_eye.x) / 2.0
                    eye_mid_y = (left_eye.y + right_eye.y) / 2.0

                    dx = nose.x - eye_mid_x
                    dy = nose.y - eye_mid_y

                    looking_away = (abs(dx) > DX_THR) or (dy > DY_THR)
                    if looking_away:
                        status = "LOOKING AWAY (proxy)"
                        color = (0, 0, 255)
                    else:
                        status = "LOOKING AT SCREEN (proxy)"
                        color = (0, 255, 0)

                    cv2.putText(frame, f"dx:{dx:.3f} dy:{dy:.3f}", (10, 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
                except Exception:
                    pass

                # --- Eye closure / wink detection (EAR) ---
                try:
                    now = time.time()
                    left_ear = eye_ear(lms, LEFT_EYE_EAR_IDX, w, h)
                    right_ear = eye_ear(lms, RIGHT_EYE_EAR_IDX, w, h)

                    left_closed = persistent_closed(left_ear < EAR_CLOSED_THR, "L", now)
                    right_closed = persistent_closed(right_ear < EAR_CLOSED_THR, "R", now)

                    eye_state = "EYES OPEN"
                    eye_color = (0, 255, 0)

                    if left_closed and right_closed:
                        eye_state = "BOTH EYES CLOSED"
                        eye_color = (0, 0, 255)
                    elif left_closed and not right_closed:
                        eye_state = "LEFT EYE CLOSED (WINK)"
                        eye_color = (0, 165, 255)
                    elif right_closed and not left_closed:
                        eye_state = "RIGHT EYE CLOSED (WINK)"
                        eye_color = (0, 165, 255)

                    cv2.putText(frame, f"L_EAR:{left_ear:.3f} R_EAR:{right_ear:.3f}",
                                (10, 95), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
                    cv2.putText(frame, eye_state,
                                (10, 125), cv2.FONT_HERSHEY_SIMPLEX, 0.8, eye_color, 2)
                except Exception:
                    pass

        # --- Face disappearance timer ---
        now = time.time()

        if face_detected:
            missing_start = None
        else:
            if missing_start is None:
                missing_start = now

        missing_duration = 0.0 if missing_start is None else (now - missing_start)
        max_missing = max(max_missing, missing_duration)

        if missing_duration >= MISSING_THRESHOLD_SEC:
            status = f"FACE MISSING > {MISSING_THRESHOLD_SEC:.1f}s"
            color = (0, 0, 255)

        # --- Overlay text ---
        cv2.putText(frame, status, (10, 65),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

        cv2.putText(frame, f"Missing: {missing_duration:.2f}s (max {max_missing:.2f}s)",
                    (10, 155), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

        remaining = max(0.0, MAX_RUNTIME_SEC - (time.time() - start_time))
        cv2.putText(frame, f"Auto-stop in: {remaining:.1f}s",
                    (10, 185), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

        cv2.imshow("MediaPipe Tasks - Starter (Face + Missing + Eyes)", frame)

        # Press q to stop early
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

finally:
    cap.release()
    cv2.destroyAllWindows()
    landmarker.close()
    print("Clean shutdown")


Auto-stopping after 25.0 seconds ✅
Clean shutdown
