In [3]:
import cv2 as cv
import numpy as np
import mediapipe as mp
import pygame
from pygame import mixer
import datetime
import csv
import os

# Initialize CSV file
csv_filename = 'attentiveness_data.csv'

def initialize_csv_file():
    if not os.path.exists(csv_filename):
        with open(csv_filename, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["Time", "Blink Count", "Yawn Count", "Drowsiness Count", "Out of Frame", "Fatigue Status"])

def append_data_to_csv(time, blink_count, yawn_count, drowsiness_count, outof_frame, fatigue_status):
    with open(csv_filename, 'a', newline='') as file:  
        writer = csv.writer(file)
        writer.writerow([time, blink_count, yawn_count, drowsiness_count, outof_frame, fatigue_status])
        
initialize_csv_file()

# Initialize sound
mixer.init()
watch_out = mixer.Sound('watch_out.wav')
eyes_blink = mixer.Sound('wake_up_alarm.mp3')
yawn = mixer.Sound('coffe.wav')
welcome_sound = mixer.Sound('welcome.wav')

# Function to calculate eye openness
def open_len(arr):
    y_arr = [y for _, y in arr]
    min_y = min(y_arr)
    max_y = max(y_arr)
    return max_y - min_y

# Mediapipe initialization
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)

RIGHT_EYE = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398]
LEFT_EYE = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246]
UPPER_LIP = [61, 185, 40, 39, 37, 0, 267, 269, 270, 409, 291]
LOWER_LIP = [146, 91, 181, 84, 17, 314, 405, 321, 375, 291, 308, 324, 318, 402, 317, 14]

cap = cv.VideoCapture(0)
welcome_sound.play()

start_time = datetime.datetime.now()

with mp_face_mesh.FaceMesh(
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.3,
        min_tracking_confidence=0.3
) as face_mesh:

    drowsy_frames = 0
    max_left = 0
    max_right = 0
    blink_count = 0
    eye_closed = False
    eye_open_frames = []
    yawn_frames = 0
    yawn_count = 0
    drowsiness_count = 0
    fatigue_status = "Normal"
    left_frames = 0
    right_frames = 0
    down_frames = 0
    threshold_frames = 90
    outof_frame = 0
    screenshot_counter = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv.flip(frame, 1)
        rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
        img_h, img_w = frame.shape[:2]
        cv.putText(frame, "Hi I'm HIMOQ! your AI Attentive assistant.", (50, img_h - 50), cv.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
        results = face_mesh.process(rgb_frame)

        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                all_landmarks = np.array(
                    [np.multiply([p.x, p.y], [img_w, img_h]).astype(int) for p in face_landmarks.landmark])

                right_eye = all_landmarks[RIGHT_EYE]
                left_eye = all_landmarks[LEFT_EYE]
                upper_lip = all_landmarks[UPPER_LIP]
                lower_lip = all_landmarks[LOWER_LIP]

                cv.polylines(frame, [left_eye], True, (0, 255, 0), 1, cv.LINE_AA)
                cv.polylines(frame, [right_eye], True, (0, 255, 0), 1, cv.LINE_AA)
                cv.polylines(frame, [upper_lip], True, (255, 0, 0), 1, cv.LINE_AA)
                cv.polylines(frame, [lower_lip], True, (255, 0, 0), 1, cv.LINE_AA)

                len_left = open_len(right_eye)
                len_right = open_len(left_eye)
                mouth_openness = open_len(upper_lip) + open_len(lower_lip)

                current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                cv.putText(frame, current_time, (400, 30), cv.FONT_HERSHEY_SIMPLEX, .5, (255, 255, 255), 2)

                # Use a moving average for eye openness
                eye_open_frames.append((len_left + len_right) / 2)
                if len(eye_open_frames) > 10:  # consider more frames for a smoother average
                    eye_open_frames.pop(0)
                avg_eye_openness = sum(eye_open_frames) / len(eye_open_frames)

                if avg_eye_openness <= (max_left + max_right) / 7:
                    if not eye_closed:
                        blink_count += 1
                        eye_closed = True
                else:
                    eye_closed = False

                if len_left > max_left:
                    max_left = len_left
                if len_right > max_right:
                    max_right = len_right

                # Enhanced drowsiness detection logic
                if (len_left <= max_left / 2 and len_right <= max_right / 2):
                    drowsy_frames += 1
                else:
                    drowsy_frames = 0

                if (drowsy_frames > 60):  # Reduce the threshold for quicker detection
                    drowsiness_count += 1
                    drowsy_frames = 0
                    eyes_blink.play()
                    screenshot_filename = f"drowsiness_screenshot_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{screenshot_counter}.jpg"
                    cv.imwrite(screenshot_filename, frame)
                    screenshot_counter += 1

                if mouth_openness > 50:
                    yawn_frames += 1
                    if yawn_frames > 40:
                        yawn_count += 1
                        yawn_frames = 0
                        yawn.play()
                else:
                    yawn_frames = 0

                if drowsiness_count < 5 and yawn_count < 5:
                    fatigue_status = "Normal"
                elif 5 <= drowsiness_count <= 10 or 5 <= yawn_count <= 10:
                    fatigue_status = "Tired"
                else:
                    fatigue_status = "Very Tired"

                face_2d = []
                face_3d = []
                for idx, lm in enumerate(face_landmarks.landmark):
                    if idx in [33, 263, 1, 61, 291, 199]:
                        x, y = int(lm.x * img_w), int(lm.y * img_h)
                        face_2d.append([x, y])
                        if idx == 1:
                            nose_2d = (x, y)
                            nose_3d = (x, y, lm.z * 3000)
                        face_3d.append([x, y, lm.z])

                face_2d = np.array(face_2d, dtype=np.float64)
                face_3d = np.array(face_3d, dtype=np.float64)

                focal_length = 1 * img_w
                cam_matrix = np.array([[focal_length, 0, img_h / 2],
                                       [0, focal_length, img_w / 2],
                                       [0, 0, 1]])
                distortion_matrix = np.zeros((4, 1), dtype=np.float64)

                success, rotation_vec, translation_vec = cv.solvePnP(face_3d, face_2d, cam_matrix, distortion_matrix)
                rmat, jac = cv.Rodrigues(rotation_vec)
                angles, mtxR, mtxQ, Qx, Qy, Qz = cv.RQDecomp3x3(rmat)

                x = angles[0] * 360
                y = angles[1] * 360
                z = angles[2] * 360

                if y >= -10:
                    left_frames = 0
                if y <= 10:
                    right_frames = 0
                if x >= -10:
                    down_frames = 0

                if y < -10:
                    left_frames += 1
                    text = "Looking Left!!!"
                    if left_frames > threshold_frames:
                        watch_out.play()
                        left_frames = 0
                        outof_frame += 1
                elif y > 10:
                    right_frames += 1
                    text = "Looking Right!!!"
                    if right_frames > threshold_frames:
                        watch_out.play()
                        right_frames = 0
                        outof_frame += 1
                elif x < -10:
                    down_frames += 1
                    text = "Looking Down!!!"
                    if down_frames > threshold_frames:
                        watch_out.play()
                        down_frames = 0
                        outof_frame += 1
                else:
                    text = "Forward!!!"

                cv.putText(frame, text, (40, 300), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                cv.putText(frame, f'Blinks: {blink_count}', (50, 50), cv.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv.putText(frame, f'Yawns: {yawn_count}', (50, 100), cv.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv.putText(frame, f'Drowsiness: {drowsiness_count}', (50, 150), cv.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv.putText(frame, f'Fatigue: {fatigue_status}', (50, 200), cv.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv.putText(frame, f'Out of frame: {outof_frame}', (50, 250), cv.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        cv.imshow('img', frame)
        current_time = datetime.datetime.now()
        if (current_time - start_time).total_seconds() >= 20:
            append_data_to_csv(current_time.strftime("%Y-%m-%d %H:%M:%S"), blink_count, yawn_count, drowsiness_count, outof_frame, fatigue_status)
            start_time = current_time  # Reset start time
        if cv.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv.destroyAllWindows()

pygame 2.5.2 (SDL 2.28.3, Python 3.11.7)
Hello from the pygame community. https://www.pygame.org/contribute.html




In [2]:
import csv
from collections import defaultdict
from datetime import datetime
from sklearn.metrics import mean_squared_error
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
import matplotlib.dates as mdates


def read_and_aggregate_data(csv_filename):
    aggregated_data = defaultdict(lambda: {'blink_count': 0, 'yawn_count': 0, 'drowsiness_count': 0, 'out_of_frame': 0, 'fatigue_status_counts': defaultdict(int)})
    
    with open(csv_filename, 'r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            date = datetime.strptime(row['Time'].split(' ')[0], '%Y-%m-%d').date()
            aggregated_data[date]['blink_count'] += int(row['Blink Count'])
            aggregated_data[date]['yawn_count'] += int(row['Yawn Count'])
            aggregated_data[date]['drowsiness_count'] += int(row['Drowsiness Count'])
            aggregated_data[date]['out_of_frame'] += int(row['Out of Frame'])
            aggregated_data[date]['fatigue_status_counts'][row['Fatigue Status']] += 1
    
    for date, data in aggregated_data.items():
        data['fatigue_status'] = max(data['fatigue_status_counts'], key=data['fatigue_status_counts'].get)
        del data['fatigue_status_counts']  
    
    return aggregated_data

def generate_attentiveness_insights(date, data):
    print(f"Generating insights for {date}:")
    print(f"daily_summary: On {date}, you blinked {data['blink_count']} times, yawned {data['yawn_count']} times, had {data['drowsiness_count']} drowsiness incidents, and were out of frame {data['out_of_frame']} times.")
    if data['yawn_count'] > data['blink_count']:
        print("trend_insight: You tend to yawn more frequently than blink, which might indicate tiredness.")
    else:
        print("trend_insight: Your blinking rate is higher than yawn count, indicating general alertness.")
    if data['drowsiness_count'] > 5:
        print("recommendation: Consider taking more frequent breaks or adjusting your driving schedule to avoid drowsiness.")
    else:
        print("recommendation: Keep up the good work! Your attentiveness levels are commendable.")
    if data['fatigue_status'] == "Very Tired":
        print("fatigue_status_overview: You were very tired on this day. Ensure you get some rest before your next drive.")
    elif data['fatigue_status'] == "Tired":
        print("fatigue_status_overview: You showed signs of tiredness. Stay hydrated and take short breaks.")
    else:
        print("fatigue_status_overview: Your fatigue status is normal. Continue driving safely.")
    print("\n")  

csv_filename = 'attentiveness_data.csv'
aggregated_data = read_and_aggregate_data(csv_filename)

for date in sorted(aggregated_data.keys()):
    generate_attentiveness_insights(date, aggregated_data[date])

def convert_to_dataframe(aggregated_data):
    df = pd.DataFrame.from_dict(aggregated_data, orient='index')
    df.index = pd.to_datetime(df.index)
    df['day_of_week'] = df.index.day_name()
    df['month'] = df.index.month
    return df

def perform_correlation_analysis(df):
    correlation_matrix = df[['blink_count', 'yawn_count', 'drowsiness_count']].corr()
    sns.heatmap(correlation_matrix, annot=True)
    plt.show()

def train_predictive_model(df):
    X = df[['blink_count', 'yawn_count']]
    y = df['drowsiness_count']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    model = RandomForestRegressor(n_estimators=100)
    model.fit(X_train, y_train)
    return model

def detect_anomalies(df):
    clf = IsolationForest(random_state=42)
    df['anomaly'] = clf.fit_predict(df[['drowsiness_count']])
    anomalies = df[df['anomaly'] == -1]
    return anomalies

def visualize_data(df):
    plt.figure(figsize=(10, 7))
    plt.scatter(df.index, df['blink_count'], color='blue', label='Blink Count')
    plt.scatter(df.index, df['yawn_count'], color='red', label='Yawn Count')
    plt.scatter(df.index, df['drowsiness_count'], color='green', label='Drowsiness Count')
    plt.gca().xaxis.set_major_locator(mdates.DayLocator())
    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    plt.legend()
    plt.xlabel('Date')
    plt.ylabel('Count')
    plt.title('Driver Behavior Metrics Over Time')
    plt.gcf().autofmt_xdate()
    plt.tight_layout()
    plt.show()

df = convert_to_dataframe(aggregated_data)


model = train_predictive_model(df)
anomalies = detect_anomalies(df)

visualize_data(df)


Generating insights for 2024-07-19:
daily_summary: On 2024-07-19, you blinked 66 times, yawned 5 times, had 12 drowsiness incidents, and were out of frame 13 times.
trend_insight: Your blinking rate is higher than yawn count, indicating general alertness.
recommendation: Consider taking more frequent breaks or adjusting your driving schedule to avoid drowsiness.
fatigue_status_overview: Your fatigue status is normal. Continue driving safely.




ValueError: With n_samples=1, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.