In [1]:
import os
import time
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Conv2D, Flatten, GRU, Reshape
from tensorflow.keras.optimizers import Adam
import pygame
import threading

# Initialize pygame for alarm sound
pygame.mixer.init()
alarm_sound = 'alarm.wav'  # Replace with your alarm sound file
alarm_playing = False

# Paths
data_dir = 'dataset'  # Update this to your dataset root
model_path = 'drowsiness_model.h5'
img_size = (224, 224)
batch_size = 32

# Data Augmentation
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

valid_datagen = ImageDataGenerator()
test_datagen = ImageDataGenerator()

# Load dataset
def load_data(folder):
    csv_path = os.path.join(folder, 'labels.csv')
    df = pd.read_csv(csv_path)
    df['label'] = df['drowsy']  # 1 if drowsy, 0 if awake
    img_paths = [os.path.join(folder, fname) for fname in df['filename']]
    return img_paths, df['label'].values

# Prepare dataset
train_imgs, train_labels = load_data(os.path.join(data_dir, 'train'))
valid_imgs, valid_labels = load_data(os.path.join(data_dir, 'valid'))
test_imgs, test_labels = load_data(os.path.join(data_dir, 'test'))

def process_images(img_paths, labels):
    images = [img_to_array(load_img(img, target_size=img_size)) / 255.0 for img in img_paths]
    return np.array(images), np.array(labels)

X_train, y_train = process_images(train_imgs, train_labels)
X_valid, y_valid = process_images(valid_imgs, valid_labels)
X_test, y_test = process_images(test_imgs, test_labels)

# Train model if not exists
if not os.path.exists(model_path):
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = False
    model = Sequential([
        base_model,
        Conv2D(64, (3, 3), activation='relu'),
        Flatten(),
        Reshape((1, -1)),  # Preparing for GRU input
        GRU(64, return_sequences=False),
        Dense(128, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])
    model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=10, batch_size=batch_size)
    model.save(model_path)
else:
    model = load_model(model_path)

# Real-time detection
cap = cv2.VideoCapture(0)
drowsy_start_time = None
drowsy_duration = 0

# Temporal smoothing buffer (store last 5 frame predictions)
prediction_history = []

def play_alarm():
    global alarm_playing
    if not alarm_playing:
        pygame.mixer.music.load(alarm_sound)
        pygame.mixer.music.play(-1)
        alarm_playing = True

def stop_alarm():
    global alarm_playing
    if alarm_playing:
        pygame.mixer.music.stop()
        alarm_playing = False

# Function to apply temporal smoothing
def apply_temporal_smoothing(prediction, history, window_size=5):
    # Add the new prediction to the history
    history.append(prediction)
    # Limit the history size to the window size
    if len(history) > window_size:
        history.pop(0)
    # Calculate the average of the predictions in the history
    smoothed_prediction = np.mean(history)
    return smoothed_prediction

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    # Resize the frame and normalize
    img = cv2.resize(frame, img_size) / 255.0
    img = np.expand_dims(img, axis=0)
    
    # Make prediction using the model
    prediction = model.predict(img)[0][0]

    # Apply temporal smoothing
    smoothed_prediction = apply_temporal_smoothing(prediction, prediction_history)

    # Determine state based on smoothed prediction
    state = 'Drowsy' if smoothed_prediction > 0.5 else 'Awake'
    confidence = int(smoothed_prediction * 100 if smoothed_prediction > 0.5 else (1 - smoothed_prediction) * 100)
    label = f"{state} ({confidence}%)"

    if state == 'Drowsy':
        if drowsy_start_time is None:
            drowsy_start_time = time.time()
        elif time.time() - drowsy_start_time >= drowsy_duration:
            threading.Thread(target=play_alarm).start()
    else:
        drowsy_start_time = None
        stop_alarm()

    # Display label and color the text based on the state
    cv2.putText(frame, label, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, 
                (0, 0, 255) if state == 'Drowsy' else (0, 255, 0), 2)
    
    # Show the frame
    cv2.imshow('Drowsiness Detection', frame)

    # Exit condition
    if cv2.waitKey(1) & 0xFF == ord('q'):
        cap.release()
        cv2.destroyAllWindows()
        stop_alarm()
        break


pygame 2.6.1 (SDL 2.28.4, Python 3.11.11)
Hello from the pygame community. https://www.pygame.org/contribute.html




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 96ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 81ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 84ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 84ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 81ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 84ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 78ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 84ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 113ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 83ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 89ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 95ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 93m