# Head pose est

In [2]:
import os
import cv2
import numpy as np
from scipy.io import loadmat

def load_images_from_folder(folder):
    images = []
    try:
        if not os.path.exists(folder):
            print("Error: Folder '{}' does not exist.".format(folder))
            return images
        for filename in os.listdir(folder):
            img_path = os.path.join(folder, filename)
            if os.path.isfile(img_path) and filename.endswith('.jpg'):
                images.append(img_path)
    except Exception as e:
        print("Error loading images from folder '{}': {}".format(folder, str(e)))
    return images

def preprocess_images_batch(image_paths, target_size):
    processed_images = []
    try:
        for img_path in image_paths:
            img = cv2.imread(img_path)
            if img is not None:
                # Resize image to target size
                resized_img = cv2.resize(img, target_size)
                # Normalize pixel values to range [0, 1]
                normalized_img = resized_img.astype(np.float32) / 255.0
                processed_images.append(normalized_img)
            else:
                print("Error loading image:", img_path)
    except MemoryError as mem_err:
        print("Memory error occurred during preprocessing:", str(mem_err))
    except Exception as e:
        print("Error preprocessing images: {}".format(str(e)))
    return np.array(processed_images)

# Define input directory paths
cew_folder = 'input/dataset_B_FacialImages'  # CEW dataset folder
aflw2000_lp_folder = 'input/AFLW2000'  # AFLW2000-LP dataset folder

# Define target size for resizing
target_size = (224, 224)  # Example size, adjust as needed

# Load images from CEW dataset
closed_eye_folder = os.path.join(cew_folder, 'ClosedFace')
open_eye_folder = os.path.join(cew_folder, 'OpenFace')
closed_eye_images = load_images_from_folder(closed_eye_folder)
open_eye_images = load_images_from_folder(open_eye_folder)

# Load images from AFLW2000-LP dataset (JPEG images)
aflw2000_lp_images = load_images_from_folder(aflw2000_lp_folder)

if len(closed_eye_images) == 0:
    print("Warning: No images found in closed eye folder.")
if len(open_eye_images) == 0:
    print("Warning: No images found in open eye folder.")
if len(aflw2000_lp_images) == 0:
    print("Warning: No images found in AFLW2000-LP dataset folder.")

# Preprocess images in batches
batch_size = 32  # Adjust batch size based on available memory
closed_eye_processed_images = []
open_eye_processed_images = []
aflw2000_lp_processed_images = []

# Preprocess closed eye images
for i in range(0, len(closed_eye_images), batch_size):
    batch_paths = closed_eye_images[i:i+batch_size]
    batch_processed_images = preprocess_images_batch(batch_paths, target_size)
    closed_eye_processed_images.extend(batch_processed_images)

# Preprocess open eye images
for i in range(0, len(open_eye_images), batch_size):
    batch_paths = open_eye_images[i:i+batch_size]
    batch_processed_images = preprocess_images_batch(batch_paths, target_size)
    open_eye_processed_images.extend(batch_processed_images)

# Preprocess AFLW2000-LP dataset images
for i in range(0, len(aflw2000_lp_images), batch_size):
    batch_paths = aflw2000_lp_images[i:i+batch_size]
    batch_processed_images = preprocess_images_batch(batch_paths, target_size)
    aflw2000_lp_processed_images.extend(batch_processed_images)

closed_eye_processed_images = np.array(closed_eye_processed_images)
open_eye_processed_images = np.array(open_eye_processed_images)
aflw2000_lp_processed_images = np.array(aflw2000_lp_processed_images)

print("Closed eye images preprocessed:", closed_eye_processed_images.shape)
print("Open eye images preprocessed:", open_eye_processed_images.shape)
print("AFLW2000-LP dataset images preprocessed:", aflw2000_lp_processed_images.shape)


Closed eye images preprocessed: (1192, 224, 224, 3)
Open eye images preprocessed: (1231, 224, 224, 3)
AFLW2000-LP dataset images preprocessed: (2000, 224, 224, 3)


# Split and train

In [3]:
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense

# Assume you have your data loaded: closed_eye_processed_images, open_eye_processed_images

# Combine only the relevant datasets
X = np.concatenate([closed_eye_processed_images, open_eye_processed_images])
y = np.concatenate([np.zeros(len(closed_eye_processed_images)), np.ones(len(open_eye_processed_images))])

# Split the data into training, validation, and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.15, random_state=42)

# Build the model
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(64, activation='relu'),
    Dense(1, activation='sigmoid')  # Output layer for binary classification (closed/open)
])

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
try:
    for epoch in range(10):
        print("Epoch", epoch + 1, "/ 10")
        history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=1, batch_size=32)
        val_accuracy = history.history['val_accuracy'][0]
        print("Validation Accuracy:", val_accuracy)
except MemoryError as mem_err:
    print("Memory error occurred during training:", str(mem_err))
except Exception as e:
    print("Error occurred during training:", str(e))

# Find the best epoch based on validation accuracy
best_epoch = np.argmax(history.history['val_accuracy']) + 1  # Adding 1 because epochs are 1-indexed

# Save the complete model from the best epoch
try:
    model.save("eye_state_detection_model_best.h5")
    print("Best model saved successfully.")
except Exception as e:
    print("Error occurred while saving best model:", str(e))

# Evaluate the model on test data
try:
    test_loss, test_accuracy = model.evaluate(X_test, y_test)
    print("Test Accuracy:", test_accuracy)
except MemoryError as mem_err:
    print("Memory error occurred during evaluation:", str(mem_err))
except Exception as e:
    print("Error occurred during evaluation:", str(e))


Epoch 1 / 10
Validation Accuracy: 0.6426116824150085
Epoch 2 / 10
Validation Accuracy: 0.7594501972198486
Epoch 3 / 10
Validation Accuracy: 0.8144329786300659
Epoch 4 / 10
Validation Accuracy: 0.7594501972198486
Epoch 5 / 10
Validation Accuracy: 0.8247422575950623
Epoch 6 / 10
Validation Accuracy: 0.8797250986099243
Epoch 7 / 10
Validation Accuracy: 0.8900343775749207
Epoch 8 / 10
Validation Accuracy: 0.8865979313850403
Epoch 9 / 10
Validation Accuracy: 0.8969072103500366
Epoch 10 / 10
Validation Accuracy: 0.9037800431251526
Best model saved successfully.
Test Accuracy: 0.8865979313850403


In [6]:
from tensorflow.keras.models import load_model
model = load_model("eye_state_detection_model_best.h5")
try:
    test_loss, test_accuracy = model.evaluate(X_test, y_test)
    print("Test Accuracy:", test_accuracy)
except MemoryError as mem_err:
    print("Memory error occurred during evaluation:", str(mem_err))
except Exception as e:
    print("Error occurred during evaluation:", str(e))


Test Accuracy: 0.8865979313850403


# Test video cap

In [7]:
import cv2
import numpy as np
from tensorflow.keras.models import load_model

# Load the trained model
model = load_model("eye_state_detection_model_best.h5")

# Load Haar cascades for eye detection
eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')

# Define label colors for overlaying
label_colors = [(0, 255, 0), (0, 0, 255)]  # Green for open, Red for closed

def overlay_labels(frame, eye_regions, predictions):
    # Count predictions for "Open" and "Closed"
    open_count = predictions.count(0)
    closed_count = predictions.count(1)

    # Determine the majority prediction
    majority_prediction = 0 if open_count > closed_count else 1

    # Overlay the majority label on all eye regions
    for region in eye_regions:
        x, y, w, h = region
        label_text = "Open" if majority_prediction == 0 else "Closed"
        label_color = label_colors[majority_prediction]
        cv2.putText(frame, label_text, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, label_color, 2)

    return frame


# Function to preprocess an eye region for model inference
def preprocess_eye_region(eye_region, target_size):
    resized_eye = cv2.resize(eye_region, target_size)
    processed_eye = resized_eye.astype(np.float32) / 255.0
    return processed_eye

# Function to process video stream and overlay labels on eyes
def process_video_stream():
    # Open the video capture device (change 0 to the appropriate camera index if using a different camera)
    cap = cv2.VideoCapture(0)
    
    if not cap.isOpened():
        print("Error: Unable to open video capture device.")
        return

    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        if not ret:
            print("Error: Unable to capture frame.")
            break

        # Convert frame to grayscale for eye detection
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Detect eyes in the frame
        eyes = eye_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

        # Preprocess each detected eye region and perform inference using the loaded model
        predictions = []
        for (x, y, w, h) in eyes:
            eye_region = frame[y:y+h, x:x+w]
            processed_eye = preprocess_eye_region(eye_region, (224, 224))
            prediction = np.argmax(model.predict(np.expand_dims(processed_eye, axis=0)))
            predictions.append(prediction)

        # Overlay labels on eye regions
        frame_with_labels = overlay_labels(frame.copy(), eyes, predictions)

        # Display the frame with labels
        cv2.imshow("Eye State Detection", frame_with_labels)

        # Check for 'q' key press to exit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release the video capture device and close all windows
    cap.release()
    cv2.destroyAllWindows()

# Run the video stream processing function
process_video_stream()




KeyboardInterrupt: 

# Test images

IndexError: list index out of range