In [1]:
import cv2

import numpy as np

import mediapipe as mp

import math

from collections import deque
 
class AlcoholDetectionSystem:

    def __init__(self):

        # Initialize MediaPipe Face Mesh

        self.mp_face_mesh = mp.solutions.face_mesh

        self.face_mesh = self.mp_face_mesh.FaceMesh(

            max_num_faces=1,

            min_detection_confidence=0.5,

            min_tracking_confidence=0.5

        )

        # Initialize movement tracking

        self.head_positions = deque(maxlen=30)  # Store last 30 frames of head position

        self.head_tilt_history = deque(maxlen=30)  # Store last 30 frames of head tilt

        self.eye_blink_history = deque(maxlen=60)  # Store last 60 frames of eye state

        self.lip_positions = deque(maxlen=30)  # Store last 30 frames of lip positions

        # Thresholds

        self.HEAD_MOVEMENT_THRESHOLD = 0.05

        self.BLINK_RATIO_THRESHOLD = 0.5    #0.25 originally

        self.LIP_MOVEMENT_THRESHOLD = 0.03

        self.NORMAL_TILT_THRESHOLD = 6.0  # ±6 degrees for normal tilt

        self.IMPAIRED_TILT_THRESHOLD = 15.0  # ±15 degrees indicates impairment

    def calculate_head_tilt(self, face_landmarks):

        """Calculate head tilt angle in degrees"""

        left_eye = np.array([face_landmarks.landmark[33].x, face_landmarks.landmark[33].y])

        right_eye = np.array([face_landmarks.landmark[263].x, face_landmarks.landmark[263].y])

        dx = right_eye[0] - left_eye[0]

        dy = right_eye[1] - left_eye[1]

        angle = math.degrees(math.atan2(dy, dx))

        self.head_tilt_history.append(angle)

        return angle

    def analyze_head_tilt(self):

        if len(self.head_tilt_history) < 2:

            return False, 0

        tilt_angles = list(self.head_tilt_history)

        mean_tilt = np.mean(tilt_angles)

        tilt_variance = np.var(tilt_angles)

        tilt_changes = [abs(tilt_angles[i] - tilt_angles[i-1]) for i in range(1, len(tilt_angles))]

        max_tilt_change = max(tilt_changes) if tilt_changes else 0

        is_impaired = (

            abs(mean_tilt) > self.NORMAL_TILT_THRESHOLD or

            max_tilt_change > self.IMPAIRED_TILT_THRESHOLD or

            tilt_variance > 25

        )

        tilt_score = min(1.0, (abs(mean_tilt) / self.IMPAIRED_TILT_THRESHOLD + 

                              max_tilt_change / self.IMPAIRED_TILT_THRESHOLD + 

                              tilt_variance / 100) / 3)

        return is_impaired, tilt_score
 
    def calculate_eye_aspect_ratio(self, eye_points):

        A = np.linalg.norm(eye_points[1] - eye_points[5])

        B = np.linalg.norm(eye_points[2] - eye_points[4])

        C = np.linalg.norm(eye_points[0] - eye_points[3])

        ear = (A + B) / (2.0 * C)

        return ear

    def detect_head_movement(self, face_landmarks):

        nose_tip = np.array([face_landmarks.landmark[4].x,

                             face_landmarks.landmark[4].y,

                             face_landmarks.landmark[4].z])

        self.head_positions.append(nose_tip)

        if len(self.head_positions) < 2:

            return False

        positions = list(self.head_positions)

        movement = np.var([np.linalg.norm(pos - positions[-1]) for pos in positions[:-1]])

        return movement > self.HEAD_MOVEMENT_THRESHOLD

    def analyze_gaze(self, face_landmarks):

        left_eye = np.array([[face_landmarks.landmark[p].x, face_landmarks.landmark[p].y] 

                              for p in [33, 160, 158, 133, 153, 144]])

        right_eye = np.array([[face_landmarks.landmark[p].x, face_landmarks.landmark[p].y] 

                               for p in [362, 385, 387, 263, 373, 380]])

        left_ear = self.calculate_eye_aspect_ratio(left_eye)

        right_ear = self.calculate_eye_aspect_ratio(right_eye)

        avg_ear = (left_ear + right_ear) / 2

        self.eye_blink_history.append(avg_ear)

        if len(self.eye_blink_history) >= 60:

            blink_rate = np.mean(list(self.eye_blink_history)) < self.BLINK_RATIO_THRESHOLD

            blink_variance = np.var(list(self.eye_blink_history))

            return blink_rate or blink_variance > 0.01

        return False

    def analyze_lip_movement(self, face_landmarks):

        lip_points = np.array([[face_landmarks.landmark[p].x, face_landmarks.landmark[p].y]

                               for p in [61, 291, 0, 17, 269, 39]])

        self.lip_positions.append(lip_points)

        if len(self.lip_positions) < 2:

            return False

        positions = list(self.lip_positions)

        movement = np.var([np.linalg.norm(pos - positions[-1]) for pos in positions[:-1]])

        return movement > self.LIP_MOVEMENT_THRESHOLD

    def process_frame(self, frame):

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        results = self.face_mesh.process(frame_rgb)

        if results.multi_face_landmarks:

            face_landmarks = results.multi_face_landmarks[0]

            current_tilt = self.calculate_head_tilt(face_landmarks)

            tilt_impaired, tilt_score = self.analyze_head_tilt()

            unsteady_head = self.detect_head_movement(face_landmarks)

            unfocused_gaze = self.analyze_gaze(face_landmarks)

            uncoordinated_lips = self.analyze_lip_movement(face_landmarks)

            detection_score = sum([

                unsteady_head * 0.3,

                unfocused_gaze * 0.3,

                uncoordinated_lips * 0.15,

                tilt_score * 0.25

            ])

            return {

                'detection_score': detection_score,

                'features': {

                    'unsteady_head': unsteady_head,

                    'unfocused_gaze': unfocused_gaze,

                    'uncoordinated_lips': uncoordinated_lips,

                    'head_tilt': {

                        'angle': current_tilt,

                        'impaired': tilt_impaired

                    }

                }

            }

        return None
 
    def run_detection(self, source=0):

        cap = cv2.VideoCapture(source)

        while cap.isOpened():

            ret, frame = cap.read()

            if not ret:

                break

            results = self.process_frame(frame)

            if results:

                score = results['detection_score']

                features = results['features']

                all_impaired = (

                    features['unsteady_head'] and

                    features['unfocused_gaze'] and

                    features['uncoordinated_lips'] and

                    features['head_tilt']['impaired']

                )

                if all_impaired:

                    cv2.putText(frame, "ALCOHOL CONSUMPTION DETECTED",

                                (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 

                                1.0, (0, 0, 255), 3)

                else:

                    cv2.putText(frame, "No Alcohol Detected",

                                (50, 50), cv2.FONT_HERSHEY_SIMPLEX,

                                1.0, (0, 255, 0), 3)

                y_pos = 100

                for feature, value in features.items():

                    if feature == 'head_tilt':

                        color = (0, 0, 255) if value['impaired'] else (0, 255, 0)

                        cv2.putText(frame, f"Head Tilt: {value['angle']:.1f}° ({'true' if value['impaired'] else 'false'})", 

                                   (10, y_pos), cv2.FONT_HERSHEY_SIMPLEX,

                                   0.6, color, 2)

                    else:

                        color = (0, 0, 255) if value else (0, 255, 0)

                        cv2.putText(frame, f"{feature}: {'true' if value else 'false'}", 

                                   (10, y_pos), cv2.FONT_HERSHEY_SIMPLEX,

                                   0.6, color, 2)

                    y_pos += 30

            cv2.imshow('Alcohol Detection', frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):

                break

        cap.release()

        cv2.destroyAllWindows()
 
if __name__ == "__main__":

    detector = AlcoholDetectionSystem()

    detector.run_detection()

 