# Getting Dataset Information


Dataset Link : https://www.kaggle.com/datasets/ismailnasri20/driver-drowsiness-dataset-ddd/data


In [11]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import sys
import time
from tqdm import tqdm
import shutil

import sklearn
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import img_to_array, load_img

In [12]:
# Helper function to consolidate image paths
def consolidate_image_paths(input_path : str, subfolder_name: str = "") -> list[str]:
    return [os.path.join(input_path, subfolder_name, p) for p in os.listdir(os.path.join(input_path, subfolder_name))]

# Helper function to map image paths to labels
def map_image_paths_to_labels(image_paths: list[str], label: int) -> dict:
    return {p: label for p in image_paths}

In [None]:
# Start of Dataset 2 ============================================================================================================
base_path_2 = "./Datasets/Dataset_2/"

# Adds the relative paths of all the images in the dataset
drowsy_paths = consolidate_image_paths(base_path_2, "Drowsy")
non_drowsy_paths = consolidate_image_paths(base_path_2, "Non Drowsy")

# Combining all the paths
all_paths = drowsy_paths + non_drowsy_paths

# # Mapping the image paths to their respective labels
drowsy_labels = map_image_paths_to_labels(drowsy_paths, 1)
non_drowsy_labels = map_image_paths_to_labels(non_drowsy_paths, 0)

# # Combining all the labels
all_labels = {**drowsy_labels, **non_drowsy_labels}

print(f"Total Number of Images: {len(all_paths)}")
print(f"Difference between Drowsy and Non-Drowsy: {len(drowsy_paths) - len(non_drowsy_paths)}")

# TODO : Add some visuals in seaborn to show the distribution of the labels
# End of Dataset 2 ============================================================================================================


### Insights from Dataset 2

- There are `2903` drowsy images than non-drowsy images.
- The dataset is imbalanced.
- To balance the dataset, we can consider several techniques:
    - Oversampling
    - Undersampling
    - Data Augmentation

# Data Preprocessing

### `Steps`:
1. Image Resizing
2. Data Splitting
3. Reshuffling
4. Undersampling (Majority Class)
5. Data Augmentation (for training data)
6. Data Normalization

### Preprocessing Steps Methodology
1. **Image Resizing**:
    - The images should be resized first to ensure all images are of the same dimensions.
    - The images are resized to `224x224` pixels.

2. **Data Splitting**:
    - Dataset should be split before any form of augmentation or sampling to ensure that the model is evaluated on unseen data.
    - Augmented data can be spilt into the testing and validation sets otherwise.
    - The dataset is split into `70%` training, `15%` validation and `15%` testing sets.

3. **Reshuffling**:
    - The dataset is reshuffled to ensure that the data is not ordered in any way.
    - This helps to prevent the model from learning any patterns in the data that may not be present in real-world scenarios.

4. **Undersampling**:
    - The majority class is undersampled to the number of images in the minority class.

5. **Data Augmentation**:
    - Data Augmentation is applied to the training set only to increase the variability of the training data. 
    - This helps to prevent overfitting and help to contextualise to real-world scenarios. 
    - Possible augmentations are:
        - Rotation
        - Horizontal Flip
        - Vertical Flip
        - Increasing the brightness

6. **Data Normalization**:
    - The pixel values are normalized to the range `[0, 1]` by dividing by `255`.
    

In [14]:
# Image Resizing. Default resized dim : 224x224
def resize_image(image_path : str, size : tuple[int, int] = (224, 224)) -> np.ndarray:
    """
    Resizes an image to the specified size and returns the resized image as an ndarray.
    Handles potential errors if the image cannot be loaded.
    """
    image = cv2.imread(image_path)

    # Checking if image exists
    if image is None:
        raise ValueError(f"Error reading image from path: {image_path}")
    
    # Convert from BGR to RGB to ensure proper color representation
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Resize the image
    resized_image = cv2.resize(image_rgb, size)
    
    return resized_image

# Function to save the resized image
def save_resized_image(image: np.ndarray, output_path: str) -> None:
    """
    Converts the image back to BGR (if needed) and saves it to the specified output path.
    """
    # Convert back to BGR before saving with OpenCV (if required for consistency)
    image_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    
    # Ensure the output directory exists
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    # Save the image
    cv2.imwrite(output_path, image_bgr)

# Data Splitting
def split_data(X, y): # will the X and y be a dictionary or a list?
    
    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
    return X_train, X_val, X_test, y_train, y_val, y_test

# Shuffle Image Paths
def shuffle_paths(X_train: list[str], y_train: list[int]) -> tuple[list[tuple[str, int]], list[tuple[str, int]]]:
    """
    Separates and shuffles the training paths by label (drowsy and non-drowsy).
    
    Parameters:
        X_train (list[str]): List of training image paths.
        y_train (list[int]): List of labels corresponding to each image path.
        
    Returns:
        tuple: Two lists containing shuffled (path, label) tuples for each class.
    """
    # Separate paths by label in training data
    drowsy_train = [(path, label) for path, label in zip(X_train, y_train) if label == 1]
    non_drowsy_train = [(path, label) for path, label in zip(X_train, y_train) if label == 0]

    # Shuffle each class independently
    np.random.shuffle(drowsy_train)
    np.random.shuffle(non_drowsy_train)

    return drowsy_train, non_drowsy_train

# Undersampling the majority class
def undersample_majority_class(drowsy_data: list[tuple], non_drowsy_data: list[tuple]) -> list[tuple]:
    """
    Undersamples the majority class to match the size of the minority class.
    """
    # Determine the size for undersampling
    undersample_size = min(len(drowsy_data), len(non_drowsy_data))

    # Undersample the majority class
    if len(non_drowsy_data) > len(drowsy_data):
        non_drowsy_data = non_drowsy_data[:undersample_size]
    else:
        drowsy_data = drowsy_data[:undersample_size]

    # Combine both classes and shuffle the final training data
    balanced_train = drowsy_data + non_drowsy_data
    np.random.shuffle(balanced_train)
    
    return balanced_train

# Data Augmentation : Rotation, 
def augment_and_save_images(balanced_train_data, save_dir, target_size=(224, 224), augment_count=5):
    """
    Applies data augmentation to each image in the dataset and saves the augmented images.
    
    Parameters:
        balanced_train_data (list): List of (image_path, label) tuples for balanced training data.
        save_dir (str): Directory where augmented images will be saved.
        target_size (tuple): Target size for resizing images.
        augment_count (int): Number of augmented images to generate per original image.
        
    Returns:
        None
    """
    # Initialize ImageDataGenerator with augmentation parameters
    datagen = ImageDataGenerator(
        rotation_range=15,
        brightness_range=[0.8, 1.2],
        horizontal_flip=True,
        zoom_range=0.1,
        fill_mode='nearest'
    )
    
    # Clear the save directory if it already exists
    if os.path.exists(save_dir):
        shutil.rmtree(save_dir)
    os.makedirs(os.path.join(save_dir, "Drowsy"), exist_ok=True)
    os.makedirs(os.path.join(save_dir, "Non Drowsy"), exist_ok=True)
    
    # Process each image in the balanced training data
    for image_path, label in balanced_train_data:
        # Load and preprocess the image
        img = load_img(image_path, target_size=target_size)
        img_array = img_to_array(img)  # Convert to array
        img_array = img_array.reshape((1,) + img_array.shape)  # Reshape for generator

        # Determine save path based on label
        class_dir = "Drowsy" if label == 1 else "Non Drowsy"
        save_path = os.path.join(save_dir, class_dir)

        # Generate and save augmented images
        i = 0
        for batch in datagen.flow(img_array, batch_size=1, save_to_dir=save_path, 
                                  save_prefix=os.path.basename(image_path).split('.')[0], 
                                  save_format='jpg'):
            i += 1
            if i >= augment_count:
                break  # Stop after saving the specified number of augmented images

In [15]:
# Image Resizing
output_folder_drowsy = os.path.join(base_path_2, "Resized_Images", "Drowsy")
output_folder_non_drowsy = os.path.join(base_path_2, "Resized_Images", "Non Drowsy")

# Loop through all the images and resize them with tqdm progress bar
# for image_path, label in tqdm(all_labels.items(), desc="Resizing Images", ncols=100):
#     try:
#         resized_image = resize_image(image_path, (224, 224))

#         # Save in respective folders based on the label
#         if label == 1:  # Drowsy
#             output_file_name = os.path.basename(image_path)
#             output_path = os.path.join(output_folder_drowsy, output_file_name)
#         else:  # Non-Drowsy
#             output_file_name = os.path.basename(image_path)
#             output_path = os.path.join(output_folder_non_drowsy, output_file_name)

#         save_resized_image(resized_image, output_path)

#     except ValueError as e:
#         print(f"Skipping image {image_path}: {e}")

# # Verify the number of resized images
# print(f"Image Number Difference: {len(os.listdir(output_folder_drowsy)) + len(os.listdir(output_folder_non_drowsy)) - len(all_labels)}")

In [None]:
# Splitting the data

# Consolidate the paths of all resized images
drowsy_paths = consolidate_image_paths(output_folder_drowsy)
non_drowsy_paths = consolidate_image_paths(output_folder_non_drowsy)

# Create the labels for each class
drowsy_labels = [1] * len(drowsy_paths)
non_drowsy_labels = [0] * len(non_drowsy_paths)

# Combine the data and labels
all_images = drowsy_paths + non_drowsy_paths
all_labels = drowsy_labels + non_drowsy_labels

# Split the data into train, val, and test sets
X_train, X_val, X_test, y_train, y_val, y_test = split_data(all_images, all_labels)

# Convert the splits back to dictionaries using the helper function
train_dict = map_image_paths_to_labels(X_train, 1) if X_train else {}
val_dict = map_image_paths_to_labels(X_val, 1) if X_val else {}
test_dict = map_image_paths_to_labels(X_test, 1) if X_test else {}

# Example: Print the first few training image paths and their labels
print("Training Data (First 5):", list(train_dict.items())[:5])
print("Validation Data (First 5):", list(val_dict.items())[:5])
print("Test Data (First 5):", list(test_dict.items())[:5])

In [17]:
# Reshuffling
drowsy_train, non_drowsy_train = shuffle_paths(X_train, y_train)

# Undersampling
balanced_train_data = undersample_majority_class(drowsy_train, non_drowsy_train)

# Augmented Data Generator
# Assuming `balanced_train_data` is a list of (image_path, label) tuples
augment_and_save_images(balanced_train_data, save_dir=os.path.join(base_path_2, './Augmented_Train_Data)'), augment_count=5)

KeyboardInterrupt: 