In [1]:
import importlib
import os
import time
import sys
sys.path.append(r"C:\Users\Dell\GazeTracking")  
from gaze_tracking import GazeTracking
from __future__ import division
import cv2
from gaze_tracking.pupil import Pupil
import math
import numpy as np
from gaze_tracking.eye import Eye
from gaze_tracking.calibration import Calibration
import serial
import threading 
import cv2
import time
import numpy as np
import dlib
import math
import matplotlib.pyplot as plt
from IPython.display import clear_output

In [2]:
class Calibration(object):
    def __init__(self):
        self.nb_frames = 20
        self.thresholds_left = []
        self.thresholds_right = []

    def is_complete(self):
        return len(self.thresholds_left) >= self.nb_frames and len(self.thresholds_right) >= self.nb_frames

    def threshold(self, side):
        if side == 0:
            return int(sum(self.thresholds_left) / len(self.thresholds_left))
        elif side == 1:
            return int(sum(self.thresholds_right) / len(self.thresholds_right))

    @staticmethod
    def iris_size(frame):
        frame = frame[5:-5, 5:-5]
        height, width = frame.shape[:2]
        nb_pixels = height * width
        nb_blacks = nb_pixels - cv2.countNonZero(frame)
        return nb_blacks / nb_pixels

    @staticmethod
    def find_best_threshold(eye_frame):
        average_iris_size = 0.48
        trials = {}

        for threshold in range(5, 100, 5):
            iris_frame = Pupil.image_processing(eye_frame, threshold)
            trials[threshold] = Calibration.iris_size(iris_frame)

        best_threshold, iris_size = min(trials.items(), key=(lambda p: abs(p[1] - average_iris_size)))
        return best_threshold

    def evaluate(self, eye_frame, side):
        threshold = self.find_best_threshold(eye_frame)
        if side == 0:
            self.thresholds_left.append(threshold)
        elif side == 1:
            self.thresholds_right.append(threshold)

In [3]:
class Eye(object):
    LEFT_EYE_POINTS = [36, 37, 38, 39, 40, 41]
    RIGHT_EYE_POINTS = [42, 43, 44, 45, 46, 47]

    def __init__(self, original_frame, landmarks, side, calibration):
        self.frame = None
        self.origin = None
        self.center = None
        self.pupil = None
        self.landmark_points = None
        self._analyze(original_frame, landmarks, side, calibration)

    @staticmethod
    def _middle_point(p1, p2):
        x = int((p1.x + p2.x) / 2)
        y = int((p1.y + p2.y) / 2)
        return (x, y)

    def _isolate(self, frame, landmarks, points):
        region = np.array([(landmarks.part(point).x, landmarks.part(point).y) for point in points])
        region = region.astype(np.int32)
        self.landmark_points = region

        # Applying a mask to get only the eye
        height, width = frame.shape[:2]
        black_frame = np.zeros((height, width), np.uint8)
        mask = np.full((height, width), 255, np.uint8)
        cv2.fillPoly(mask, [region], (0, 0, 0))
        eye = cv2.bitwise_not(black_frame, frame.copy(), mask=mask)

        # Cropping on the eye
        margin = 5
        min_x = np.min(region[:, 0]) - margin
        max_x = np.max(region[:, 0]) + margin
        min_y = np.min(region[:, 1]) - margin
        max_y = np.max(region[:, 1]) + margin

        self.frame = eye[min_y:max_y, min_x:max_x]
        self.origin = (min_x, min_y)

        height, width = self.frame.shape[:2]
        self.center = (width / 2, height / 2)

    def _blinking_ratio(self, landmarks, points):
        left = (landmarks.part(points[0]).x, landmarks.part(points[0]).y)
        right = (landmarks.part(points[3]).x, landmarks.part(points[3]).y)
        top = self._middle_point(landmarks.part(points[1]), landmarks.part(points[2]))
        bottom = self._middle_point(landmarks.part(points[5]), landmarks.part(points[4]))
        eye_width = math.hypot((left[0] - right[0]), (left[1] - right[1]))
        eye_height = math.hypot((top[0] - bottom[0]), (top[1] - bottom[1]))
        try:
            ratio = eye_width / eye_height
        except ZeroDivisionError:
            ratio = None

        return ratio

    def _analyze(self, original_frame, landmarks, side, calibration):
        if side == 0:
            points = self.LEFT_EYE_POINTS
        elif side == 1:
            points = self.RIGHT_EYE_POINTS
        else:
            return
        self.blinking = self._blinking_ratio(landmarks, points)
        self._isolate(original_frame, landmarks, points)

        if not calibration.is_complete():
            calibration.evaluate(self.frame, side)

        threshold = calibration.threshold(side)
        self.pupil = Pupil(self.frame, threshold)

In [4]:
class GazeTracking(object):
    def __init__(self):
        self.frame = None
        self.eye_left = None
        self.eye_right = None
        self.calibration = Calibration()
        self._face_detector = dlib.get_frontal_face_detector()
        cwd = os.getcwd()
        model_path = os.path.join(cwd, "shape_predictor_68_face_landmarks.dat")
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model file not found: {model_path}")
        self._predictor = dlib.shape_predictor(model_path)


    @property
    def pupils_located(self):
        try:
            int(self.eye_left.pupil.x)
            int(self.eye_left.pupil.y)
            int(self.eye_right.pupil.x)
            int(self.eye_right.pupil.y)
            return True
        except Exception:
            return False

    def _analyze(self):
        frame = cv2.cvtColor(self.frame, cv2.COLOR_BGR2GRAY)
        faces = self._face_detector(frame)

        try:
            landmarks = self._predictor(frame, faces[0])
            self.eye_left = Eye(frame, landmarks, 0, self.calibration)
            self.eye_right = Eye(frame, landmarks, 1, self.calibration)

        except IndexError:
            self.eye_left = None
            self.eye_right = None

    def refresh(self, frame):
        self.frame = frame
        self._analyze()

    def pupil_left_coords(self):
        if self.pupils_located:
            x = self.eye_left.origin[0] + self.eye_left.pupil.x
            y = self.eye_left.origin[1] + self.eye_left.pupil.y
            return (x, y)

    def pupil_right_coords(self):
        if self.pupils_located:
            x = self.eye_right.origin[0] + self.eye_right.pupil.x
            y = self.eye_right.origin[1] + self.eye_right.pupil.y
            return (x, y)

    def horizontal_ratio(self):
        if self.pupils_located:
            pupil_left = self.eye_left.pupil.x / (self.eye_left.center[0] * 2 - 10)
            pupil_right = self.eye_right.pupil.x / (self.eye_right.center[0] * 2 - 10)
            return (pupil_left + pupil_right) / 2

    def vertical_ratio(self):
        if self.pupils_located:
            pupil_left = self.eye_left.pupil.y / (self.eye_left.center[1] * 2 - 10)
            pupil_right = self.eye_right.pupil.y / (self.eye_right.center[1] * 2 - 10)
            return (pupil_left + pupil_right) / 2

    def is_right(self):
        if self.pupils_located:
            return self.horizontal_ratio() <= 0.35

    def is_left(self):
        if self.pupils_located:
            return self.horizontal_ratio() >= 0.65

    def is_center(self):
        if self.pupils_located:
            return self.is_right() is not True and self.is_left() is not True

    def is_blinking(self):
        if self.pupils_located:
            blinking_ratio = (self.eye_left.blinking + self.eye_right.blinking) / 2
            return blinking_ratio > 3.8

    def annotated_frame(self):
        frame = self.frame.copy()
        if self.pupils_located:
            color = (0, 255, 0)
            x_left, y_left = self.pupil_left_coords()
            x_right, y_right = self.pupil_right_coords()
            cv2.line(frame, (x_left - 5, y_left), (x_left + 5, y_left), color)
            cv2.line(frame, (x_left, y_left - 5), (x_left, y_left + 5), color)
            cv2.line(frame, (x_right - 5, y_right), (x_right + 5, y_right), color)
            cv2.line(frame, (x_right, y_right - 5), (x_right, y_right + 5), color)

        return frame

In [5]:
class Pupil(object):
    def __init__(self, eye_frame, threshold):
        self.iris_frame = None
        self.threshold = threshold
        self.x = None
        self.y = None

        self.detect_iris(eye_frame)

    @staticmethod
    def image_processing(eye_frame, threshold):
        kernel = np.ones((3, 3), np.uint8)
        new_frame = cv2.bilateralFilter(eye_frame, 10, 15, 15)
        new_frame = cv2.erode(new_frame, kernel, iterations=3)
        new_frame = cv2.threshold(new_frame, threshold, 255, cv2.THRESH_BINARY)[1]

        return new_frame

    def detect_iris(self, eye_frame):
        self.iris_frame = self.image_processing(eye_frame, self.threshold)

        contours, _ = cv2.findContours(self.iris_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)[-2:]
        contours = sorted(contours, key=cv2.contourArea)

        try:
            moments = cv2.moments(contours[-2])
            self.x = int(moments['m10'] / moments['m00'])
            self.y = int(moments['m01'] / moments['m00'])
        except (IndexError, ZeroDivisionError):
            pass
            
    def diameter(self):
        if self.iris_frame is None:
            return None
    
        contours, _ = cv2.findContours(self.iris_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)[-2:]
        contours = sorted(contours, key=cv2.contourArea)
    
        try:
            cnt = contours[-2]  
            (x, y), radius = cv2.minEnclosingCircle(cnt)
            return 2*radius  
        except (IndexError, ValueError):
            return None

In [6]:
class PupilDetector:
    def __init__(self):
        self.detector = dlib.get_frontal_face_detector()
        self.predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
    
    def detect_pupil(self, frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.detector(gray)
        
        for face in faces:
            landmarks = self.predictor(gray, face)
            left_eye = self.get_eye_region(gray, landmarks, range(36, 42))
            right_eye = self.get_eye_region(gray, landmarks, range(42, 48))
            
            left_diameter = self.find_pupil_diameter(left_eye)
            right_diameter = self.find_pupil_diameter(right_eye)
            
            return left_diameter, right_diameter
        
        return None, None

    def get_eye_region(self, gray, landmarks, eye_indices):
        eye_pts = np.array([(landmarks.part(i).x, landmarks.part(i).y) for i in eye_indices], dtype=np.int32)
        x, y, w, h = cv2.boundingRect(eye_pts)
        eye_roi = gray[y:y+h, x:x+w]
        return eye_roi
    
    '''def find_pupil_diameter(self, eye_roi):
        blurred = cv2.GaussianBlur(eye_roi, (7, 7), 0)
        _, thresh = cv2.threshold(blurred, 30, 255, cv2.THRESH_BINARY_INV)
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        if contours:
            largest_contour = max(contours, key=cv2.contourArea)
            (x, y), radius = cv2.minEnclosingCircle(largest_contour)
            return 2 * radius
        
        return None'''
    def find_pupil_diameter(self, eye_roi):
        if eye_roi.size == 0:
            return None
    
        blurred = cv2.GaussianBlur(eye_roi, (7, 7), 0)
        thresh = cv2.adaptiveThreshold(blurred, 255,
                                       cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                       cv2.THRESH_BINARY_INV, 11, 2)
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
        if contours:
            largest_contour = max(contours, key=cv2.contourArea)
            if cv2.contourArea(largest_contour) < 10:
                return None
            (_, _), radius = cv2.minEnclosingCircle(largest_contour)
            return 2 * radius
    
        return None

In [7]:
arduino_port = 'COM8'
baud_rate = 9600

In [8]:
start_event = threading.Event()
stop_event = threading.Event()
stop_event.clear()

In [9]:
gsr_values = []
flex_values = []

In [10]:
def read_sensors():
    start_event.wait()
    try:
        with serial.Serial(arduino_port, baud_rate, timeout=1) as ser:
            time.sleep(2)
            print("Reading sensors...")

            while not stop_event.is_set():
                line = ser.readline().decode('utf-8', errors='ignore').strip()
                if line:
                    parts = line.split(',')
                    for part in parts:
                        pair = part.strip().split(':')
                        if len(pair) == 2:
                            key = pair[0].strip().lower()
                            try:
                                value = int(pair[1].strip())
                                if key == "gsr":
                                    gsr_values.append(value)
                                elif key == "flex":
                                    flex_values.append(value)
                            except ValueError:
                                print(f"Non-integer value for {key}")
    except Exception as e:
        print(f"Error in sensor thread: {e}")
    finally:
        print("Sensor thread exiting...")

In [11]:
blink_count = 0
blink_rate = 0
left_diameters = []
right_diameters = []
pupil_movements = []

In [12]:
def camera_feed():
    start_event.wait()
    global blink_count, blink_rate, left_diameters, right_diameters, pupil_movements

    gaze = GazeTracking()
    webcam = cv2.VideoCapture(0)
    start_time = time.time()
    pupil_detector = PupilDetector()

    try:
        while not stop_event.is_set():
            ret, frame = webcam.read()
            if not ret:
                break

            gaze.refresh(frame)
            frame_display = gaze.annotated_frame()

            direction_text = ""
            if gaze.is_blinking():
                direction_text = "Blinking"
                blink_count += 1
            elif gaze.is_right():
                direction_text = "Looking right"
            elif gaze.is_left():
                direction_text = "Looking left"
            elif gaze.is_center():
                direction_text = "Looking center"

            cv2.putText(frame_display, direction_text, (90, 60), cv2.FONT_HERSHEY_DUPLEX, 1.6, (147, 58, 31), 2)

            left_pupil = gaze.pupil_left_coords()
            right_pupil = gaze.pupil_right_coords()

            if left_pupil and right_pupil:
                pupil_movements.append((left_pupil, right_pupil))

            cv2.putText(frame_display, f"Left pupil:  {left_pupil}", (90, 130), cv2.FONT_HERSHEY_DUPLEX, 0.9, (147, 58, 31), 1)
            cv2.putText(frame_display, f"Right pupil: {right_pupil}", (90, 165), cv2.FONT_HERSHEY_DUPLEX, 0.9, (147, 58, 31), 1)

            left_d, right_d = pupil_detector.detect_pupil(frame)
            if left_d and not gaze.is_blinking():
                left_diameters.append(left_d)
            if right_d and not gaze.is_blinking():
                right_diameters.append(right_d)

            cv2.putText(frame_display, f"Blinks: {blink_count}", (90, 200), cv2.FONT_HERSHEY_DUPLEX, 0.9, (147, 58, 31), 1)

            cv2.imshow("Gaze Tracking", frame_display)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                stop_event.set()

    except Exception as e:
        print("Error in camera_feed:", str(e))

    finally:
        webcam.release()
        cv2.destroyAllWindows()
        elapsed_time = time.time() - start_time
        blink_rate = blink_count / elapsed_time if elapsed_time > 0 else 0

In [13]:
def camera_feed_2():
    global blink_count, blink_rate, left_diameters, right_diameters, pupil_movements

    gaze = GazeTracking()
    webcam = cv2.VideoCapture(0)
    start_time = time.time()
    pupil_detector = PupilDetector()

    try:
        while True:
            ret, frame = webcam.read()
            if not ret:
                break

            gaze.refresh(frame)
            frame_display = gaze.annotated_frame()

            direction_text = ""
            if gaze.is_blinking():
                direction_text = "Blinking"
                blink_count += 1
            elif gaze.is_right():
                direction_text = "Looking right"
            elif gaze.is_left():
                direction_text = "Looking left"
            elif gaze.is_center():
                direction_text = "Looking center"

            cv2.putText(frame_display, direction_text, (90, 60), cv2.FONT_HERSHEY_DUPLEX, 1.6, (147, 58, 31), 2)

            left_pupil = gaze.pupil_left_coords()
            right_pupil = gaze.pupil_right_coords()

            if left_pupil and right_pupil:
                pupil_movements.append((left_pupil, right_pupil))

            cv2.putText(frame_display, f"Left pupil:  {left_pupil}", (90, 130), cv2.FONT_HERSHEY_DUPLEX, 0.9, (147, 58, 31), 1)
            cv2.putText(frame_display, f"Right pupil: {right_pupil}", (90, 165), cv2.FONT_HERSHEY_DUPLEX, 0.9, (147, 58, 31), 1)

            left_d, right_d = pupil_detector.detect_pupil(frame)
            if left_d and not gaze.is_blinking():
                left_diameters.append(left_d)
            if right_d and not gaze.is_blinking():
                right_diameters.append(right_d)

            cv2.putText(frame_display, f"Blinks: {blink_count}", (90, 200), cv2.FONT_HERSHEY_DUPLEX, 0.9, (147, 58, 31), 1)

            cv2.imshow("Gaze Tracking", frame_display)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    except Exception as e:
        print("Error in camera_feed:", str(e))

    finally:
        webcam.release()
        cv2.destroyAllWindows()
        elapsed_time = time.time() - start_time
        blink_rate = blink_count / elapsed_time if elapsed_time > 0 else 0
        print(f"Blink count: {blink_count}")
        print(f"Blink rate: {blink_rate:.2f} blinks/sec")

#camera_feed_2()

In [14]:
#camera_feed_2()
'''print(blink_rate)
print(left_diameters)
print(right_diameters)
print(pupil_movements)'''

'print(blink_rate)\nprint(left_diameters)\nprint(right_diameters)\nprint(pupil_movements)'

In [15]:
threads = [
    threading.Thread(target=read_sensors),
    threading.Thread(target=camera_feed),
]

for t in threads:
    t.start()
start_event.set()
time.sleep(30)
stop_event.set()
for t in threads:
    t.join()

print("All modules stopped together.")

Error in sensor thread: could not open port 'COM8': FileNotFoundError(2, 'The system cannot find the file specified.', None, 2)
Sensor thread exiting...
All modules stopped together.


In [16]:
print(gsr_values)
print(flex_values)

[]
[]


In [17]:
def tremor_score(values, window_size=10, delta_thresh=3):
    scores = []
    for i in range(len(values) - window_size):
        window = values[i:i+window_size]
        changes = 0
        for j in range(1, len(window)):
            if abs(window[j] - window[j-1]) >= delta_thresh:
                changes += 1
        scores.append(changes)
    
    average_score = sum(scores) / len(scores)
    return average_score, scores

average_score, score_list = tremor_score(flex_values)
print(f"Tremor Score: {average_score:.2f}")

ZeroDivisionError: division by zero

In [None]:
print(blink_rate)

In [None]:
print(len(left_diameters))

In [None]:
print(len(right_diameters))

In [None]:
print(len(pupil_movements))

In [None]:
def get_input_gsr(gsr_values):
    return max(gsr_values)
def get_input_flex(values, window_size=10, delta_thresh=3):
    scores = []
    for i in range(len(values) - window_size):
        window = values[i:i+window_size]
        changes = 0
        for j in range(1, len(window)):
            if abs(window[j] - window[j-1]) >= delta_thresh:
                changes += 1
        scores.append(changes)
    
    average_score = sum(scores) / len(scores)
    return average_score
def get_input_dilation(left_diameters, right_diameters):
    def compute_avg_change(diameters):
        deltas = [j - i for i, j in zip(diameters[:-1], diameters[1:])]
        return sum(abs(d) for d in deltas) / len(deltas) if deltas else 0

    left_avg = compute_avg_change(left_diameters)
    right_avg = compute_avg_change(right_diameters)
    return (left_avg + right_avg) / 2
def get_gaze_aversion(pupil_movements):
    left_displacements = []
    right_displacements = []
    for i in range(1, len(pupil_movements)):
        x1_prev, y1_prev = pupil_movements[i - 1][0]
        x1_curr, y1_curr = pupil_movements[i][0]
        left_dist = math.hypot(x1_curr - x1_prev, y1_curr - y1_prev)
        left_displacements.append(left_dist)
    
        x2_prev, y2_prev = pupil_movements[i - 1][1]
        x2_curr, y2_curr = pupil_movements[i][1]
        right_dist = math.hypot(x2_curr - x2_prev, y2_curr - y2_prev)
        right_displacements.append(right_dist)
    
    avg_left = sum(left_displacements) / len(left_displacements)
    avg_right = sum(right_displacements) / len(right_displacements)
    return (avg_left + avg_right)/2
def get_blink_rate(blink_rate):
    return blink_rate

In [None]:
listt = [[get_input_gsr(gsr_values), get_input_flex(flex_values), get_gaze_aversion(pupil_movements), get_input_dilation(left_diameters, right_diameters), get_blink_rate(blink_rate)]]
print(listt)

In [None]:
import tensorflow as tf

In [None]:
'''interpreter = tf.lite.Interpreter(model_path="model.tflite")
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()'''

In [None]:
#input_data = np.array([[400, 8, 2.32, 18.747, 0.0195]], dtype = np.float32)

In [None]:
input_data = np.array([[get_input_gsr(gsr_values), get_input_flex(flex_values), get_gaze_aversion(pupil_movements), get_input_dilation(left_diameters, right_diameters), get_blink_rate(blink_rate)]], dtype = np.float32)

interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()

output = interpreter.get_tensor(output_details[0]['index'])
print("Prediction:", output)

In [None]:
'''def lie_probability(gsr_value, flex_score, blink_rate, pupil_dilation, gaze_aversion, gsr_range=(100, 200),flex_center=2.5,flex_max_deviation=2.5, blink_range=(0.05, 0.5),pupil_range=(0.5, 3.0)):
    gsr_norm = (gsr_value - gsr_range[0]) / (gsr_range[1] - gsr_range[0])
    flex_dev = abs(flex_score - flex_center)
    flex_norm = 1 - min(flex_dev / flex_max_deviation, 1)
    blink_norm = (blink_rate - blink_range[0]) / (blink_range[1] - blink_range[0])
    pupil_norm = (pupil_dilation - pupil_range[0]) / (pupil_range[1] - pupil_range[0])
    gaze_norm = min(max(gaze_aversion, 0), 1)
    prob = (0.4*gsr_norm +0.2*flex_norm +0.15*blink_norm +0.15*pupil_norm +0.1*gaze_norm)

    print(gsr_norm, flex_norm, blink_norm, pupil_norm, gaze_norm)

    return round(prob, 3)'''

In [None]:
#lie_probability(get_input_gsr(gsr_values), get_input_flex(flex_values), get_blink_rate(blink_rate), get_input_dilation(left_diameters, right_diameters), get_gaze_aversion(pupil_movements))

In [None]:
'''import math

def sigmoid(x):
    return 1 / (1 + math.exp(-x))

def lie_probability_raw(gsr_value, flex_score, blink_rate, pupil_dilation, gaze_aversion):
    gsr_score = sigmoid(0.05*(gsr_value - 100)) 
    flex_score_val = sigmoid(abs(flex_score - 2.5))  
    blink_score = sigmoid(-20 * (blink_rate - 0.2))  
    pupil_score = sigmoid(1.5 * (pupil_dilation - 1.5))  
    gaze_score = gaze_aversion if isinstance(gaze_aversion, (int, float)) else 0
    prob = (0.4*gsr_score + 0.2*flex_score_val + 0.15*blink_score + 0.15*pupil_score + 0.1*gaze_score)

    return round(prob, 3)'''

In [None]:
#lie_probability_raw(get_input_gsr(gsr_values), get_input_flex(flex_values), get_blink_rate(blink_rate), get_input_dilation(left_diameters, right_diameters), get_gaze_aversion(pupil_movements))