In [1]:
!python3 -V

Python 3.9.0


In [2]:
import cv2
import torch
from torchvision import models, transforms
from PIL import Image
import numpy as np
import torch.nn as nn
import os
import time
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from typing import Tuple, Union, Optional
import math
from IPython.display import display
import ipywidgets as widgets


# Configuration
CONFIG = {
    'model_path': 'models/resnet18-e_20-d_10k.pth',
    'detector_path': 'models/detector.tflite',
    'image_size': (224, 224),
    'class_names': ['close', 'open'],
    'num_classes': 2,
}

# Constants
EYE_VERTICAL_OFFSET = 35
EYE_HORIZONTAL_OFFSET = 35

# Define the device to use
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def load_model(model_path: str, num_classes: int) -> nn.Module:
    """Load and prepare the model for inference."""
    model = models.resnet18(pretrained=False)
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, num_classes)
    model.load_state_dict(torch.load(model_path, map_location=device))
    return model.to(device).eval()

def load_face_detector(detector_path: str) -> vision.FaceDetector:
    """Load the face detector model."""
    base_options = python.BaseOptions(model_asset_path=detector_path)
    options = vision.FaceDetectorOptions(base_options=base_options)
    return vision.FaceDetector.create_from_options(options)

# Load models
model = load_model(CONFIG['model_path'], CONFIG['num_classes'])
detector = load_face_detector(CONFIG['detector_path'])

# Image preprocessing
'''
def apply_histogram_equalization(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    equalized = cv2.equalizeHist(gray)
    return cv2.cvtColor(equalized, cv2.COLOR_GRAY2BGR)

def apply_white_balance(image):
    result = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    avg_a = np.average(result[:, :, 1])
    avg_b = np.average(result[:, :, 2])
    result[:, :, 1] = result[:, :, 1] - ((avg_a - 128) * (result[:, :, 0] / 255.0) * 1.1)
    result[:, :, 2] = result[:, :, 2] - ((avg_b - 128) * (result[:, :, 0] / 255.0) * 1.1)
    return cv2.cvtColor(result, cv2.COLOR_LAB2BGR)
'''
def apply_clahe(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    equalized = clahe.apply(gray)
    return cv2.cvtColor(equalized, cv2.COLOR_GRAY2BGR)

def adjust_gamma(image, gamma=1.0):
    inv_gamma = 1.0 / gamma
    table = np.array([((i / 255.0) ** inv_gamma) * 255
                      for i in np.arange(0, 256)]).astype("uint8")
    return cv2.LUT(image, table)

def normalize_image(image):
    return cv2.normalize(image, None, 0, 255, cv2.NORM_MINMAX)

def preprocess_image(image):
    # Apply a combination of techniques
    image = normalize_image(image)
    image = apply_clahe(image)
    image = adjust_gamma(image, 1.2)  # Slightly increase brightness
    return image

# Define the transformation for input images
transform = transforms.Compose([
    transforms.Resize(CONFIG['image_size']),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def _normalized_to_pixel_coordinates(
    normalized_x: float, normalized_y: float, image_width: int,
    image_height: int) -> Union[None, Tuple[int, int]]:
    """Converts normalized value pair to pixel coordinates."""

    def is_valid_normalized_value(value: float) -> bool:
        return (value > 0 or math.isclose(0, value)) and (value < 1 or math.isclose(1, value))

    if not (is_valid_normalized_value(normalized_x) and is_valid_normalized_value(normalized_y)):
        return None
    x_px = min(math.floor(normalized_x * image_width), image_width - 1)
    y_px = min(math.floor(normalized_y * image_height), image_height - 1)
    return x_px, y_px

def eyes_detection(frame: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
    """Detect eyes in the given frame."""
    image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
    detection_result = detector.detect(image)
    
    image_np = np.copy(image.numpy_view())
    height, width, _ = image_np.shape
    
    lefteye_img = righteye_img = None
    
    for detection in detection_result.detections:
        keypoint_px_right = _normalized_to_pixel_coordinates(detection.keypoints[0].x, detection.keypoints[0].y, width, height)
        keypoint_px_left = _normalized_to_pixel_coordinates(detection.keypoints[1].x, detection.keypoints[1].y, width, height)
        
        if keypoint_px_right:
            righteye_img = image_np[keypoint_px_right[1] - EYE_VERTICAL_OFFSET:keypoint_px_right[1] + EYE_VERTICAL_OFFSET, keypoint_px_right[0] - EYE_HORIZONTAL_OFFSET:keypoint_px_right[0] + EYE_HORIZONTAL_OFFSET]
        
        if keypoint_px_left:
            lefteye_img = image_np[keypoint_px_left[1] - EYE_VERTICAL_OFFSET:keypoint_px_left[1] + EYE_VERTICAL_OFFSET, keypoint_px_left[0] - EYE_HORIZONTAL_OFFSET:keypoint_px_left[0] + EYE_HORIZONTAL_OFFSET]
        
        break  # Assume only one face for simplicity
    
    return lefteye_img, righteye_img

def predict_frame(frame: np.ndarray) -> Tuple[int, Optional[np.ndarray], Optional[np.ndarray]]:
    """Predict the state of eyes in the given frame."""
    
    '''
    left_frame, right_frame = eyes_detection(frame)

    if left_frame is None or right_frame is None:
        return "Eyes not detected", None, None

    left_img = transform(Image.fromarray(left_frame)).unsqueeze(0).to(device)
    right_img = transform(Image.fromarray(right_frame)).unsqueeze(0).to(device)
    '''

    preprocessed_frame = preprocess_image(frame)
    
    left_frame, right_frame = eyes_detection(preprocessed_frame)

    if left_frame is None or right_frame is None:
        return "Eyes not detected", None, None

    # Further preprocess the eye images
    left_frame = preprocess_image(left_frame)
    right_frame = preprocess_image(right_frame)

    left_img = transform(Image.fromarray(left_frame)).unsqueeze(0).to(device)
    right_img = transform(Image.fromarray(right_frame)).unsqueeze(0).to(device)


    with torch.no_grad():
        outputs_left = model(left_img)
        outputs_right = model(right_img)
        _, predicted_left = torch.max(outputs_left, 1)
        _, predicted_right = torch.max(outputs_right, 1)

    result = predicted_left.item() + predicted_right.item()

    return result, left_frame, right_frame

# Create widgets for displaying results
result_text = widgets.Label(style={'font-size': '30px', 'color': 'red'})
computeSpeed_text = widgets.Label(style={'font-size': '30px'})
# image_widget = widgets.Image(format='jpeg', width=640, height=480)
left_eye_image = widgets.Image(format='png')
right_eye_image = widgets.Image(format='png')

right_eye_widget = widgets.VBox([
    widgets.Label(value='Right Eye', layout=widgets.Layout(align_items='center')),
    right_eye_image
])

left_eye_widget = widgets.VBox([
    widgets.Label(value='Left Eye', layout=widgets.Layout(align_items='center')),
    left_eye_image
])

output_widget = widgets.HBox([
    # image_widget,
    widgets.VBox([widgets.VBox([result_text, computeSpeed_text]),
                widgets.HBox([left_eye_widget, right_eye_widget])
    ])
])

def main():
    cap = cv2.VideoCapture(0, cv2.CAP_V4L2)
    cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    if not cap.isOpened():
        return

    display(output_widget)
    
    start_time = time.time()

    eyes_states_his = []
    eyes_states = 1 # 1 for safe, 0 for danger
    history_len = 5
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                continue

            num_opened_eyes, left_img, right_img = predict_frame(frame)

            end_time = time.time()
            compute_time = end_time - start_time
            start_time = end_time

            if len(eyes_states_his) == history_len:
                eyes_states_his = eyes_states_his[1:]
                eyes_states_his.append(0 if num_opened_eyes == 0 else 1)
                eyes_states = 0 if (eyes_states_his[0] == 0 and eyes_states_his[1] == 0 and eyes_states_his[2] == 0) or \
                                    (eyes_states_his[1] == 0 and eyes_states_his[2] == 0 and eyes_states_his[3] == 0) or \
                                    (eyes_states_his[2] == 0 and eyes_states_his[3] == 0 and eyes_states_his[4] == 0) else 1
            else:
                eyes_states_his.append(0 if num_opened_eyes ==0 else 1)
            
            result_text.value = f'Predicted: {num_opened_eyes} eyes opened!'
            computeSpeed_text.value = f'Compute Time: {compute_time:.3f} s'
            
            
            if left_img is not None:
                if eyes_states:
                    cv2.rectangle(left_img, (0, 0), (EYE_VERTICAL_OFFSET * 2, EYE_HORIZONTAL_OFFSET * 2), (0, 255, 0), 5, cv2.LINE_AA)
                else:
                    cv2.rectangle(left_img, (0, 0), (EYE_VERTICAL_OFFSET * 2, EYE_HORIZONTAL_OFFSET * 2), (0, 0, 255), 5, cv2.LINE_AA)
                
                _, left_buffer = cv2.imencode('.png', left_img)
                left_eye_image.value = left_buffer.tobytes()

            if right_img is not None:
                if eyes_states:
                    cv2.rectangle(right_img, (0, 0), (EYE_VERTICAL_OFFSET * 2, EYE_HORIZONTAL_OFFSET * 2), (0, 255, 0), 5, cv2.LINE_AA)
                else:
                    cv2.rectangle(right_img, (0, 0), (EYE_VERTICAL_OFFSET * 2, EYE_HORIZONTAL_OFFSET * 2), (0, 0, 255), 5, cv2.LINE_AA)
                    
                _, right_buffer = cv2.imencode('.png', right_img)
                right_eye_image.value = right_buffer.tobytes()

            # time.sleep(0.1)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    except KeyboardInterrupt:
        pass
    
    finally:
        cap.release()
        # print(np.mean(np.array(speed_rec)))

if __name__ == "__main__":
    main()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1722396989.988731   10220 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


HBox(children=(VBox(children=(VBox(children=(Label(value=''), Label(value=''))), HBox(children=(VBox(children=…

