In [2]:
import cv2
import numpy as np
from tensorflow import keras
import dlib
import time
import socket

RASPBERRY_PI_IP = '192.168.0.137'
RASPBERRY_PI_PORT = 65432

MODEL: keras.Model = keras.models.load_model("models/eye_state_detection_model_0806_97k.h5")

DETECTOR = dlib.get_frontal_face_detector()
PREDICTOR = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')

eye_closed_time = None
eyes_previously_closed = False

def get_square_roi(start_x, start_y, end_x, end_y):
    """
    Adjusts the coordinates of the ROI to make the area square and centered around the eye
    """
    width = end_x - start_x
    height = end_y - start_y

    if width > height:
        diff = (width - height) // 2
        return start_x, start_y - diff, end_x, end_y + diff
    elif height > width:
        diff = (height - width) // 2
        return start_x - diff, start_y, end_x + diff, end_y
    else:
        return start_x, start_y, end_x, end_y


def detect_eyes(frame, gray):

    global eye_closed_time, eyes_previously_closed

    eyes_open = False
    mouth_center_x = None
    mouth_center_y = None

    faces = DETECTOR(gray)
    for face in faces:
        landmarks = PREDICTOR(gray, face)

        # Coordinates for the mouth
        mouth_left_x = landmarks.part(48).x
        mouth_top_y = min([landmarks.part(i).y for i in range(61, 66)])
        mouth_right_x = landmarks.part(54).x
        mouth_bottom_y = max([landmarks.part(i).y for i in range(65, 68)])

        cv2.rectangle(frame, (mouth_left_x, mouth_top_y), (mouth_right_x, mouth_bottom_y), (255, 0, 0), 2)

        mouth_center_x = (mouth_left_x + mouth_right_x) // 2
        mouth_center_y = (mouth_top_y + mouth_bottom_y) // 2
        cv2.circle(frame, (mouth_center_x, mouth_center_y), 2, (0, 255, 0), -1)

        # Coordinates for left eye
        left_x = landmarks.part(36).x
        left_y = landmarks.part(36).y
        right_x = landmarks.part(39).x
        right_y = landmarks.part(39).y

        # Adjust for square ROI
        left_x, left_y, right_x, right_y = get_square_roi(left_x, left_y, right_x, right_y)
        left_eye = frame[left_y:right_y, left_x:right_x]

        cv2.rectangle(frame, (left_x, left_y), (right_x, right_y), (255, 0, 0), 2)

        # Coordinates for right eye
        left_x_2 = landmarks.part(42).x
        left_y_2 = landmarks.part(42).y
        right_x_2 = landmarks.part(45).x
        right_y_2 = landmarks.part(45).y

        # Adjust for square ROI
        left_x_2, left_y_2, right_x_2, right_y_2 = get_square_roi(left_x_2, left_y_2, right_x_2, right_y_2)
        right_eye = frame[left_y_2:right_y_2, left_x_2:right_x_2]

        for eye, label, x, y in [(left_eye, "Left", left_x, left_y), (right_eye, "Right", left_x_2, left_y_2)]:
            if eye.size == 0:
                continue

            eye = cv2.resize(eye, (128, 128)) # The model was trained on 128x128 images
            eye = eye.astype("float") / 255.0 
            eye = np.expand_dims(eye, axis=0)
            prediction = MODEL.predict(eye)[0][0]

            if prediction > 0.5:
                text = f"{label} Open"
                eyes_open = True
            else:
                text = f"{label} Closed"

            cv2.putText(frame, text, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        if eyes_open:
            eye_closed_time = None
            eyes_previously_closed = False

        else:
            if eyes_previously_closed:
                if time.time() - eye_closed_time >= 3: # If eyes have been closed for 3 seconds
                    eye_closed_time = time.time() # Reset timer
            else:
                eye_closed_time = time.time()
                eyes_previously_closed = True

    return frame, mouth_center_x, mouth_center_y


def map_value(value, leftMin, leftMax, rightMin, rightMax):
    """  
    Maps a value from one range to another (window size to servo angle)
    the servo angles are between 30 and 90 degrees becuse of how the servos are mounted
    """
    leftSpan = leftMax - leftMin
    rightSpan = rightMax - rightMin
    valueScaled = float(value - leftMin) / float(leftSpan)
    return rightMin + (valueScaled * rightSpan)


def main():
    """
    stream video from webcam and detect eyes, then send servo angles to raspberry pi
    """
    cap = cv2.VideoCapture(0)
    cv2.namedWindow('Eye Detection', cv2.WINDOW_NORMAL)
    cv2.resizeWindow('Eye Detection', 640, 480)

    while True:
        try:
            ret, frame = cap.read()
            if not ret:
                continue

            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            canvas, mouth_center_x, mouth_center_y = detect_eyes(frame, gray)
            cv2.imshow('Eye Detection', canvas)

            if mouth_center_x is not None and mouth_center_y is not None:
                servo_x_angle = map_value(mouth_center_x, 0, 640, 90, 30)
                servo_y_angle = map_value(mouth_center_y, 0, 480, 30, 90)

                servo_x_angle = int(servo_x_angle)
                servo_y_angle = int(servo_y_angle)

                print(f"x = {servo_x_angle}, y = {servo_y_angle}")

                send_data = f"{servo_x_angle},{servo_y_angle}\n"
                socket_connection.close().sendall(send_data.encode('utf-8'))
                time.sleep(0.2)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                cap.release()
                cv2.destroyAllWindows()
                socket_connection.close()
                break

        except KeyboardInterrupt:
            cap.release()
            cv2.destroyAllWindows()
            socket_connection.close()
            break

        except Exception as e:
            print(e)
            cap.release()
            cv2.destroyAllWindows()
            socket_connection.close()
            break

if __name__ == '__main__':
    socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socket_connection.connect((RASPBERRY_PI_IP, RASPBERRY_PI_PORT))
    main()
