In [1]:
import requests

API_BASE_URL = "http://127.0.0.1:8000/mediapipe"
STREAM_ID = None

In [2]:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import os

# Check if model exists
model_path = "../models/face_landmarker.task"
if not os.path.exists(model_path):
    print(f"ERROR: Move {model_path} into this folder first!")
else:
    try:
        base_options = python.BaseOptions(model_asset_path=model_path)
        options = vision.FaceLandmarkerOptions(base_options=base_options)
        detector = vision.FaceLandmarker.create_from_options(options)
        print("SUCCESS: MediaPipe is installed and model loaded!")
    except Exception as e:
        print(f"FAILED: {e}")

SUCCESS: MediaPipe is installed and model loaded!


## MediaPipe setup

In [3]:
# -------- MediaPipe setup --------
BaseOptions = mp.tasks.BaseOptions
FaceLandmarker = mp.tasks.vision.FaceLandmarker
FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions
FaceLandmarkerResult = mp.tasks.vision.FaceLandmarkerResult
VisionRunningMode = mp.tasks.vision.RunningMode

model_path = "../models/face_landmarker.task"


## Variables

todo: Move all the variables to constant file

In [4]:
# -------- landmark indices --------
LEFT_EYE = [33, 160, 158, 133, 153, 144]
RIGHT_EYE = [362, 385, 387, 263, 373, 380]
NOSE_INDEX = 1

EAR_THRESHOLD = 0.25
HEAD_UP_THRESHOLD = -0.03
HEAD_LEFT_THRESHOLD = -0.03
HEAD_RIGHT_THRESHOLD = 0.03

In [5]:
# -------- State variables --------
right_blink_count = 0
left_blink_count = 0

right_eye_closed = False
left_eye_closed = False

head_up_detected = False
head_left_detected = False
head_right_detected = False

In [6]:

# Instruction control
instruction_queue= [
    "left_eye_blink",
    "right_eye_blink",
    "head_up"
]

current_instruction_index = 0

session_complete = False

### Creating a function for stream id

In [7]:
def create_stream():
    global STREAM_ID
    try:
        response = requests.post(f"{API_BASE_URL}/create-stream")
        data = response.json()
        STREAM_ID = data["stream_id"]
        print("Strean created:", STREAM_ID)
    except Exception as e:
        print("Failed to create stream: ", e)

In [8]:
def send_instruction_update(instruction_name):
    if STREAM_ID is None:
        return
    
    try:
        requests.post(f"{API_BASE_URL}/update",
                      params={
                            "stream_id": STREAM_ID, 
                            "instruction": instruction_name
                            }
                    )
        
    except Exception as e:
        print("API update failed", e)
        

In [9]:

def get_current_instruction():
    if current_instruction_index < len(instruction_queue):
        return instruction_queue[current_instruction_index]
    return None

# HELPER FUNCTIONS

In [10]:
import math

In [11]:
def get_relative_position(landmark, reference):
    return landmark.y - reference.y, landmark.x - reference.x

In [12]:
# -------- Helper functions --------
def distance(p1, p2):
    return math.dist([p1.x, p1.y], [p2.x, p2.y])

def eye_aspect_ratio(landmarks, eye_indices):
    p1, p2, p3, p4, p5, p6 = [landmarks[i] for i in eye_indices]
    vertical1 = distance(p2, p6)
    vertical2 = distance(p3, p5)
    horizontal = distance(p1, p4)
    return (vertical1 + vertical2) / (2.0 * horizontal)




In [13]:
def advance_instruction():
    global current_instruction_index, session_complete
    current_instruction_index += 1
    if current_instruction_index >= len(instruction_queue):
        session_complete = True
        print("✅ LIVENESS SUCCESS")

# CALLBACK FUNCTION

In [14]:
# -------- Callback --------
def print_result(result: FaceLandmarkerResult, output_image, timestamp_ms):
    global right_blink_count, left_blink_count
    global right_eye_closed, left_eye_closed, instruction
    global head_up_detected, head_left_detected, head_right_detected

    if session_complete:
        return

    if not result.face_landmarks:
        return

    landmarks = result.face_landmarks[0]

    #-------HEAD MOVEMENTS----------

    nose = landmarks[NOSE_INDEX]

    # using eyes midpoint as reference
    left_eye_center = landmarks[33]
    right_eye_center = landmarks[263]

    ref_x = (left_eye_center.y + right_eye_center.y) / 2
    ref_y = (left_eye_center.x + right_eye_center.x) / 2

    delta_y = nose.y - ref_y
    delta_x = nose.x - ref_x

    current_instruction = get_current_instruction()

    #HEAD UP
    if (current_instruction == 'head_up' and delta_y < HEAD_UP_THRESHOLD and not head_up_detected):

        head_up_detected = True
        print("Head up detected")
        send_instruction_update("head_up")
        advance_instruction()
    
    # HEAD LEFT
    elif (current_instruction == 'head_left' and delta_x < HEAD_LEFT_THRESHOLD and not head_left_detected):
        head_left_detected = True
        print("Head left detected")
        send_instruction_update("head_left")
        advance_instruction()

    # HEAD RIGHT
    elif (current_instruction == 'head_right' and delta_x > HEAD_RIGHT_THRESHOLD and not head_right_detected):
        head_right_detected = True
        print("Head Right detected")
        send_instruction_update("head_right")
        advance_instruction()

# ---------------BLINK LOGIC-----------------------------


    left_ear = eye_aspect_ratio(landmarks, LEFT_EYE)
    right_ear = eye_aspect_ratio(landmarks, RIGHT_EYE)

    left_closed = left_ear < EAR_THRESHOLD
    right_closed = right_ear < EAR_THRESHOLD

    # -------- Detect RIGHT eye blink only --------
    if (current_instruction == "right_eye_blink" and right_closed and not left_closed and not right_eye_closed):
            right_eye_closed = True

    elif (current_instruction == "right_eye_blink" and not right_closed and right_eye_closed and not left_closed):
        right_blink_count += 1
        right_eye_closed = False
        print("Right eye blink detected")
        send_instruction_update("right_eye_blink")
        advance_instruction()

    # -------- Detect LEFT eye blink only --------
    if (current_instruction == "left_eye_blink" and left_closed and not right_closed and not left_eye_closed):
        left_eye_closed = True

    elif (current_instruction == "left_eye_blink" and not left_closed and left_eye_closed and not right_closed):
        left_blink_count += 1
        left_eye_closed = False
        print("Left eye blink detected")
        send_instruction_update("left_eye_blink")
        advance_instruction()

    # -------- Reset if both eyes blink --------
    if left_closed and right_closed:
        left_eye_closed = False
        right_eye_closed = False

    # ---------Reset head logic----------------
    if abs(delta_x) < 0.01:
        head_left_detected = False
        head_right_detected = False
    
    if abs(delta_y) < 0.01:
        head_up_detected = False


# OPTIONS MEDIAPIPE

In [15]:
# -------- Options --------
options = FaceLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.LIVE_STREAM,
    result_callback=print_result
)



# MEDIAPIPE WEBCAM

In [16]:
import cv2
import mediapipe as mp
import time

# -------- Webcam loop --------
create_stream()

with FaceLandmarker.create_from_options(options) as landmarker:
    cap = cv2.VideoCapture(0)

    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            continue

        frame = cv2.flip(frame, 1)  # Mirror image
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        mp_image = mp.Image(
            image_format=mp.ImageFormat.SRGB,
            data=frame_rgb
        )

        timestamp_ms = int(time.time() * 1000)
        landmarker.detect_async(mp_image, timestamp_ms)


        if session_complete:
            cv2.putText(
                frame,
                "LIVENESS SUCCESS",
                (30,50),
                cv2.FONT_HERSHEY_SIMPLEX,
                1.0,
                (0, 255, 0),
                3
            )

            cv2.imshow("Eye Blink Liveness Check", frame)
            cv2.waitKey(2000)
            break

        # ----SHOW CURRENT INSTRUCTION-----
        current_instruction = get_current_instruction()
        if current_instruction:
            cv2.putText(
                frame,
                f"Instruction: {current_instruction.replace('_', ' ').upper()}",
                (30, 40),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.9,
                (0, 255, 255),
                2
            )

        # -------- COUNTERS --------
        cv2.putText(
            frame,
            f"Step {current_instruction_index + 1} / {len(instruction_queue)}",
            (30,80),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (255,255,255),
            2
        )
       
        cv2.imshow("Eye Blink Liveness Check", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()


Failed to create stream:  HTTPConnectionPool(host='127.0.0.1', port=8000): Max retries exceeded with url: /mediapipe/create-stream (Caused by NewConnectionError("HTTPConnection(host='127.0.0.1', port=8000): Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it"))
Left eye blink detected
Right eye blink detected
Head up detected
✅ LIVENESS SUCCESS
