# DMS - Driver Monitoring System

# Table of Contents
1. [Importing Needed Packages](#import)
2. [Declaring Constants](#constants)
3. [Loading and Processing Data](#load)
4. [Creating the Model](#create)
5. [Saving the Model](#save)
6. [Testing the Model Using Testing Data](#test)
7. [Testing the Model Using a Webcam](#webcam)

## Importing Needed Packages <a name="import"></a>

In [None]:
# Import Needed Packages
import os
import time
import sys
import time
import shutil
from base64 import b64decode
import cv2
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.preprocessing.image import ImageDataGenerator

IN_COLAB = 'google.colab' in sys.modules

if 'google.colab' in sys.modules:
    from google.colab.patches import cv2_imshow
    from IPython.display import display, Javascript
    from google.colab.output import eval_js
    from google.colab import drive

    drive.mount('/content/drive')

## Declaring Constants <a name="constants"></a>

In [None]:
# Constants

# Set the input image size to 224 pixels
IMG_SIZE = 224

MODEL_THRESHOLD = 0.8
CLOSURE_DURATION_THRESHOLD = 3

# Set the directories for the training and testing datasets
# # # #
if IN_COLAB:
    # Running on Google Colab
    print("Running on Google Colab")
    # Drive directories
    BASE_ROOT_DIRECTORY = '/content/drive/MyDrive/Colab Notebooks'
    TRAINING_DATA_DIRECTORY = f'{BASE_ROOT_DIRECTORY}/Input/training_data/'
    VALIDATION_DATA_DIRECTORY = f'{BASE_ROOT_DIRECTORY}/Input/validation_data/'
    TESTING_DATA_DIRECTORY = f'{BASE_ROOT_DIRECTORY}/Input/testing_data/'
    MODELS_DIRECTORY = f'{BASE_ROOT_DIRECTORY}/models/'
else:
    # Running on a local environment
    print("Running on a local environment")
    TRAINING_DATA_DIRECTORY = 'Input/training_data/'
    VALIDATION_DATA_DIRECTORY = 'Input/validation_data/'
    TESTING_DATA_DIRECTORY = 'Input/testing_data/'
    MODELS_DIRECTORY = 'models/'
# # # #
timestr = time.strftime("%Y%m%d-%H%M")
MODEL_FILE_NAME = f'dms_model-{timestr}.h5'
MODEL_FILE_NAME = 'dms_model_final.h5'

# Define the classes present in the dataset
CLASSES = ['Closed_Eyes', 'Opened_Eyes']

# Load cascade classifiers for detecting faces and eyes
FACE_CASCADE = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
EYE_CASCADE = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')
BATCH_SIZE = 32
EPOCHS = 20

## Loading and Processing Data <a name="load"></a>

In [None]:
train_datagen = ImageDataGenerator()
test_datagen = ImageDataGenerator()

print('Loading training data...')
train_generator = train_datagen.flow_from_directory(
    f'{TRAINING_DATA_DIRECTORY}',
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary')

print('Loading validation data...')
validation_generator = test_datagen.flow_from_directory(
    f'{VALIDATION_DATA_DIRECTORY}',
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary')

## Creating the Model <a name="create"></a>

In [None]:
# Load the pre-trained MobileNet model from Keras applications
model = tf.keras.applications.mobilenet.MobileNet()

# Print a summary of the model architecture
print(model.summary())

# Retrieve the input and output layers of the pre-trained model
base_input = model.layers[0].input
base_output = model.layers[-4].output

# Flatten the output of the pre-trained model
flat_layer = tf.keras.layers.Flatten()(base_output)

# Add a dense layer and an activation function to the model
final_output = tf.keras.layers.Dense(1)(flat_layer)
final_output = tf.keras.layers.Activation('sigmoid')(final_output)

# Create a new model with the pre-trained layers and the added layers
result_model = tf.keras.Model(inputs=base_input, outputs=final_output)

# Print a summary of the new model architecture
print(result_model.summary())

# Compile the new model with a binary crossentropy loss function, Adam optimizer, and accuracy metric
result_model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

# Fit the new model to the training data with a specified batch size and number of epochs
train_iterator = result_model.fit(
    train_generator,
    steps_per_epoch=train_generator.n // BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=validation_generator,
    validation_steps=validation_generator.n // BATCH_SIZE,
    batch_size=BATCH_SIZE
)

## Evaluating and Saving the Model in Order to Choose the Best One<a name="evaluate"></a>

In [None]:
print("Evaluate on test data")
results = result_model.evaluate(validation_generator, steps=validation_generator.samples // BATCH_SIZE)
print("test loss, test acc:", results)

In [None]:
# Save the trained model to a file
result_model.save(f'{MODELS_DIRECTORY}/{MODEL_FILE_NAME}')

# Testing the Model Using Testing Data <a name="test"></a>

In [None]:
# Load the trained DMS model
dms_model = tf.keras.models.load_model(f'{MODELS_DIRECTORY}/{MODEL_FILE_NAME}')

# Create an empty 2x2 matrix to hold the results of the test
result = np.zeros((2, 2), dtype=int)

counter_closed = 0
counter_open = 0
cnt = 0

# Loop through each class (Closed_Eyes and Opened_Eyes)
for category in CLASSES:
    # Construct the path to the directory containing images for this category
    path = os.path.join(TESTING_DATA_DIRECTORY, category)
    # Determine the index of this category (0 or 1)
    class_index = CLASSES.index(category)
    
    # Loop through each image in the directory
    for img in os.listdir(path):
        
        cnt += 1
        
        # Read the image and convert it to grayscale
        img_array = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
        # Convert the grayscale image to RGB and resize it to the expected size
        back_to_rgb = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
        new_array = cv2.resize(back_to_rgb, (IMG_SIZE, IMG_SIZE))

        # Convert the resized image to a numpy array and reshape it to have 4 dimensions and
        # normalize it by dividing pixels by 255.0
        np_array = np.array(new_array).reshape(1, IMG_SIZE, IMG_SIZE, 3)

        # Make a prediction on the image using the DMS model
        pred = dms_model.predict(np_array)
        # Convert the prediction to a binary classification (closed or open eyes) based on a threshold of 0.5
        prediction = pred >= MODEL_THRESHOLD
        
        if category == 'Closed_Eyes' and pred > 0.5:
            counter_closed += 1
        
        if category == 'Opened_Eyes' and pred < 0.5:
            counter_open += 1
        
        
        if cnt % 200 == 0:
            print('counter_open = ', counter_open)
            print('counter_closed = ', counter_closed)
            
        # Update the result matrix based on the true and predicted classes of the image
        result[class_index][int(prediction[0, 0])] += 1

print(f"#True Pos = {result[1, 1]}")
print(f"#True Neg = {result[0, 0]}")
print(f"#False Pos = {result[1, 0]}")
print(f"#False Neg = {result[0, 1]}")

# Testing the Model Using a Webcam <a name="webcam"></a>
* take_photo(...) is function that returns an image captured using JS. (The code snippet is provided by Google Colab)
* test_on_webcam_browser() and test_on_webcam_local() are two functions that to try the model, test_on_webcam_browser() is to be used on a browser (in a Google Colab environment), while test_on_webcam_local() is to be used on a local machine with python environment. 
* The use of one of the functions is to be decided with the use of IN_COLAB boolean.
* test_model_on_single_image(...) to test an image manually.

In [None]:
def take_photo(filename='photo.jpg', quality=1):
    js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
    display(js)
    data = eval_js('takePhoto({})'.format(quality))
    binary = b64decode(data.split(',')[1])
    # convert the binary data to a numpy array
    np_array = np.frombuffer(binary, np.uint8)
    # decode the numpy array to an OpenCV image
    img = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
    return img

In [None]:
def test_on_webcam_browser():
    # Load the Deep Learning model for drowsiness detection
    dms_model = tf.keras.models.load_model(f'{MODELS_DIRECTORY}/{MODEL_FILE_NAME}')

    # Initialize the start time
    start = time.time()

    # Continuously capture frames from the webcam
    while True:
        # Capture a frame from the webcam
        frame = take_photo()

        # Detect eyes in the frame
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        eyes = EYE_CASCADE.detectMultiScale(gray, 1.1, 4)

        # Initialize variables for detected eyes and eye regions of interest (ROIs)
        detected_eyes = None
        eyes_roi = None

        # Loop through the detected eyes
        for x, y, w, h in eyes:
            # Extract the region of interest (ROI) for the eyes
            roi_gray = gray[y:y + h, x:x + w]
            roi_color = frame[y:y + h, x:x + w]
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

            # Detect eyes in the ROI
            detected_eyes = EYE_CASCADE.detectMultiScale(roi_gray)

            # If eyes are not detected in the ROI, print a message
            if len(detected_eyes) == 0:
                print("eyes not detected")
            else:
                # Loop through the detected eyes in the ROI
                for ex, ey, ew, eh in detected_eyes:
                    # Extract the eye region of interest (ROI)
                    eyes_roi = roi_color[ey:ey + eh, ex:ex + ew]

        # Preprocess the eye region of interest (ROI) for the drowsiness detection model
        if detected_eyes is None or eyes_roi is None:
            status = "No eyes detected"
        else:
            final_image = cv2.resize(eyes_roi, (224, 224))
            final_image = np.expand_dims(final_image, axis=0)
#             final_image = final_image / 255.0

            # Predict whether the eyes are open or closed using the drowsiness detection model
            prediction = dms_model.predict(final_image)
            print("Prediction ", prediction)

            # Set the status to "Open Eyes" or "Closed Eyes" based on the prediction
            if prediction > MODEL_THRESHOLD:
                status = "Open Eyes"
                start = time.time()
            else:
                status = "Closed Eyes"
                current = time.time()
                if current - start > CLOSURE_DURATION_THRESHOLD:
                    print('DROWSINESS Detected')
                    start = time.time()

        # Detect faces in the frame
        faces = FACE_CASCADE.detectMultiScale(gray, 1.1, 4)

        # Draw rectangles around the detected faces
        for (x, y, w, h) in faces:
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

        # Display the status on the frame
        font = cv2.FONT_HERSHEY_SIMPLEX

        cv2.putText(frame,
                    status,
                    (10, 35),
                    font, 0.8,
                    (255, 255, 255),
                    thickness=3)

        cv2.putText(frame,
                    status,
                    (10, 35),
                    font, 0.8,
                    (0, 0, 0),
                    thickness=2)

        cv2_imshow(frame)
        time.sleep(2)

In [None]:
def test_on_webcam_local():
    # Load the Deep Learning model for drowsiness detection
    dms_model = tf.keras.models.load_model(f'{MODELS_DIRECTORY}/{MODEL_FILE_NAME}')

    # Open the local webcam
    cap = cv2.VideoCapture(-1)

    # Check if the webcam is opened correctly
    if not cap.isOpened():
        cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        raise IOError("Cannot open webcam")

    # Initialize the start time
    start = time.time()

    # Loop until 'q' is pressed or the webcam is stopped
    while True:
        # Capture the frame from the webcam
        ret, frame = cap.read()

        # Detect eyes in the frame
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        eyes = EYE_CASCADE.detectMultiScale(gray, 1.1, 4)

        detected_eyes = None
        eyes_roi = None

        # If eyes are detected, extract the region of interest (ROI) and resize it to (224, 224)
        for x, y, w, h in eyes:
            roi_gray = gray[y:y + h, x:x + w]
            roi_color = frame[y:y + h, x:x + w]
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
            detected_eyes = EYE_CASCADE.detectMultiScale(roi_gray)

            if len(detected_eyes) == 0:
                print("eyes not detected")
            else:
                for ex, ey, ew, eh in detected_eyes:
                    eyes_roi = roi_color[ey:ey + eh, ex:ex + ew]

        drowsiness_detected = False

        # Preprocess the ROI image and predict the output using the loaded Deep Learning model
        if detected_eyes is None or eyes_roi is None:
            status = "No eyes detected"
        else:
            final_image = cv2.resize(eyes_roi, (224, 224))

            final_image = np.expand_dims(final_image, axis=0)
#             final_image = (final_image-np.min(final_image))/(np.max(final_image)-np.min(final_image))

            prediction = dms_model.predict(final_image)
            print("Prediction ", prediction)

            # If the prediction score is greater than 0.5, set the status to "Open Eyes" and update the start time
            if prediction > MODEL_THRESHOLD:
                status = "Open Eyes"
                start = time.time()

            # If the prediction score is less than or equal to 0.5, set the status to "Closed Eyes" If the duration
            # of closed eyes is more than 3 seconds, print 'DROWSINESS Detected' and update the start time
            else:
                status = "Closed Eyes"
                current = time.time()
                if current - start > CLOSURE_DURATION_THRESHOLD:
                    print('DROWSINESS Detected')
                    drowsiness_detected = True
                    if current - start > CLOSURE_DURATION_THRESHOLD + 2:
                        start = time.time()

        # Detect faces in the frame and draw rectangles around them
        faces = FACE_CASCADE.detectMultiScale(gray, 1.1, 4)

        for (x, y, w, h) in faces:
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

        font = cv2.FONT_HERSHEY_DUPLEX

        cv2.putText(frame,
                    status,
                    (10, 35),
                    font, 0.8,
                    (255, 255, 255),
                    thickness=3)
        cv2.putText(frame,
                    status,
                    (10, 35),
                    font, 0.8,
                    (0, 0, 0),
                    thickness=2)

        if drowsiness_detected:
            import winsound
            frequency = 2500  # Set Frequency To 2500 Hertz
            duration = 1000  # Set Duration To 1000 ms == 1 second
            winsound.Beep(frequency, duration)
            cv2.putText(frame,
                        'DROWSINESS Detected',
                        (10, 70),
                        font, 0.8,
                        (0, 0, 255),
                        3, )

        cv2.imshow("DMS - Driver Monitoring System", frame)

        if cv2.waitKey(2) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()


In [None]:
if IN_COLAB:
  test_on_webcam_browser()
else:
  test_on_webcam_local()

In [None]:
def test_model_on_single_image(image_path):
    # Read in a grayscale image using OpenCV
    img_array = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

    # Convert grayscale image to BGR
    back_to_rgb = cv2.cvtColor(img_array, cv2.COLOR_GRAY2BGR)

    # Resize the image
    new_array = cv2.resize(back_to_rgb, (IMG_SIZE, IMG_SIZE))

    # Reshape the image and convert it to a numpy array
    x_input = np.array(new_array).reshape(1, IMG_SIZE, IMG_SIZE, 3)

    # Display the image
    plt.imshow(new_array)

    # Normalize the image
    x_input = x_input / 255.0

    # Load the trained model and make a prediction
    prediction = tf.keras.models.load_model(f'{MODELS_DIRECTORY}/{MODEL_FILE_NAME}').predict(x_input)

    # Return the prediction
    print(prediction)

In [None]:
test_model_on_single_image(f'{TRAINING_DATA_DIRECTORY}/Opened_Eyes/s0011_01518_0_0_1_0_0_01.png')