In [None]:
#install it only once.
pip install cmake
pip install dlib-19.24.1-cp311-cp311-win_amd64.whl

In [3]:
# Imports
import cv2
import base64
import numpy as np
import io
from PIL import Image, ExifTags
import keras
from keras import backend as k
from keras.models import Sequential, load_model
from keras.preprocessing.image import ImageDataGenerator
from flask import request, jsonify, Flask
import matplotlib.pyplot as plt
import dlib


# App
app = Flask(__name__)

# ------------------------------------------------------------

# function to load the models
def load_model_fn(path):
    model = load_model(path)
    print('Model loaded!')
    return model
# ------------------------------------------------------------

# function to resize our image
def preprocess_image(image, target_size):
    if image.mode != 'RGB':
        image = image.convert('RGB')
    image = image.resize(target_size)
    image = np.array(image)
    image = np.expand_dims(image, axis=0)
    return image
# ------------------------------------------------------------
def exif_transpose(img):
    try:
        for orientation in ExifTags.TAGS.keys():
            if ExifTags.TAGS[orientation]=='Orientation':
                break
        e = img._getexif()
        if e is not None:
            exif=dict(e.items())
            if orientation in exif:
                if exif[orientation] == 3:
                    img = img.transpose(Image.ROTATE_180)
                elif exif[orientation] == 6:
                    img = img.transpose(Image.ROTATE_270)
                elif exif[orientation] == 8:
                    img = img.transpose(Image.ROTATE_90)
    except Exception as e:
        print(f"Error in exif_transpose: {e}")
    return img
# ------------------------------------------------------------
predictor_path = "shape_predictor_68_face_landmarks.dat"
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(predictor_path)

def crop_mouth(img, Target_Size=(224, 224)):
    
#     img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) 

    # Ask the detector to find the bounding boxes of each face. The 1 in the
    # second argument indicates that we should upsample the image 1 time. This
    # will make everything bigger and allow us to detect more faces.
    dets = detector(img, 1)
    if len(dets) > 0:
        for k, d in enumerate(dets):
            # Get the landmarks/parts for the face in box d.
            shape = predictor(img, d)
            # The next lines of code just get the coordinates for the mouth
            # and crop the mouth from the image.This part can probably be optimised
            # by taking only the outer most points.
            xmouthpoints = [shape.part(x).x for x in range(48,67)]
            ymouthpoints = [shape.part(x).y for x in range(48,67)]
            maxx = max(xmouthpoints)
            minx = min(xmouthpoints)
            maxy = max(ymouthpoints)
            miny = min(ymouthpoints) 

            # to show the mouth properly pad both sides
            pad = 10

            crop_image = img[miny-pad:maxy+pad,minx-pad:maxx+pad]
            RGB_image = cv2.cvtColor(crop_image, cv2.COLOR_BGR2RGB)
            resized_image = cv2.resize(RGB_image,Target_Size)
            return resized_image
    else:
        return None
    
def crop_eye(img, Target_Size=(224, 224)):
    
#     img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) 

    # Ask the detector to find the bounding boxes of each face. The 1 in the
    # second argument indicates that we should upsample the image 1 time. This
    # will make everything bigger and allow us to detect more faces.
    dets = detector(img, 1)
    if len(dets) > 0:
        for k, d in enumerate(dets):
            # Get the landmarks/parts for the face in box d.
            shape = predictor(img, d)
            # The next lines of code just get the coordinates for the mouth
            # and crop the eye from the image.This part can probably be optimised
            # by taking only the outer most points.
            xmouthpoints = [shape.part(18).x, shape.part(19).x, shape.part(20).x, shape.part(21).x, shape.part(36).x, shape.part(37).x, shape.part(38).x, shape.part(39).x, shape.part(40).x,shape.part(41).x]
            ymouthpoints = [shape.part(18).y, shape.part(19).y, shape.part(20).y, shape.part(21).y, shape.part(36).y, shape.part(37).y, shape.part(38).y, shape.part(39).y, shape.part(40).y,shape.part(41).y]
            maxx = max(xmouthpoints)
            minx = min(xmouthpoints)
            maxy = max(ymouthpoints)
            miny = min(ymouthpoints) 

            # to show the mouth properly pad both sides
            pad = 10

            crop_image = img[miny-pad:maxy+pad,minx-pad:maxx+pad]
            RGB_image = cv2.cvtColor(crop_image, cv2.COLOR_BGR2RGB)
            resized_image = cv2.resize(RGB_image,Target_Size)
            return resized_image
    else:
        return None
# ------------------------------------------------------------


# Loading the models !!!
print('Drowsiness Model is being loaded!')
print('Distraction Model is being loaded!')
print('Road Model is being loaded!')

distraction_CNN_model = load_model_fn('CNNModelwithDataAugmentation.h5')
drowsiness_model = load_model_fn('drowsinessmodel.h5')
road_model = load_model_fn('vehicle.h5')

# ###################################################################################################
#         mapping_list = ['yawn', 'no_yawn', 'Closed', 'Open']

# 1- #Drowsiness API
@app.route('/drowsiness', methods=['POST'])
def predict_drowsiness():
    message = request.get_json(force=True)
    encoded = message['image']
    decoded = base64.b64decode(encoded)
    image_received = Image.open(io.BytesIO(decoded))

    # Correct the orientation
    image_received = exif_transpose(image_received)
    image_arr = np.array(image_received)
    mapping_list = ["yawn", "no_yawn", "Closed", "Open"]
    try:
        #crop eyes
        eye_right = crop_eye(image_arr)
        # Reshape and normalize the image
        img_batch_right_eye = np.expand_dims(eye_right, axis=0)
        normalized_eye = img_batch_right_eye / 255
        # Make prediction for right eye
        prediction_eye_right = drowsiness_model.predict(normalized_eye).tolist()
        index_of_eye_right = np.argmax(prediction_eye_right)
        largest_eye = max(prediction_eye_right[0])
        if largest_eye > 0.50:
            result_right = mapping_list[index_of_eye_right]
        else:
            result_right = 'Weak Prediction'
        
    except:
        result_right = "Couldn't Crop the Eye"
        
    try:
        #crop mouth and resize
        mouth_image = crop_mouth(image_arr)
        img_batch_mouth = np.expand_dims(mouth_image, axis=0)
        normalized_mouth = img_batch_mouth / 255
        #make mouth prediction
        prediction_mouth = drowsiness_model.predict(normalized_mouth).tolist()
        index_of_mouth = np.argmax(prediction_mouth[0])
        largest_mouth = max(prediction_mouth[0])
        if largest_mouth > 0.50:
            result_mouth = mapping_list[index_of_mouth]
        else:
            result_mouth = 'Weak Prediction'
        
    except:
        result_mouth = "Couldn't Crop the Mouth"
    
    # Initial value for final_result
    final_result = ""
    
    if result_mouth=="yawn" or result_right == "Closed":
        final_result = "Drowsy"
    else:
        final_result = 'Awake'
    
    return jsonify(final_result)

# ###################################################################################################

# Define a global variable to store road result
global road_result
road_result = None

# 2- Distraction API
@app.route('/distraction', methods=['POST'])
def predict_driver_distraction():
    try:
        message = request.get_json(force=True)
        encoded = message['image']
        decoded = base64.b64decode(encoded)
        image = Image.open(io.BytesIO(decoded))
    except:
        result = 'Unable to decode message'
        return jsonify(result)
    
    processed_image = preprocess_image(image, target_size=(224, 224))
    try:
        prediction = distraction_CNN_model.predict(processed_image).tolist()
        print(prediction)
    except:
        result = 'The model could not predict the Image'
        return jsonify(result)
    
    label = ['normal driving',
             'texting - right',
             'talking on the phone - right',
             'texting - left',
             'talking on the phone - left',
             'operating the radio',
             'drinking',
             'reaching behind',
             'hair and makeup',
             'talking to passenger']
    
    largest_value = max(prediction[0])
    if largest_value >= .50 :
        max_index = np.argmax(prediction)
        result = label[max_index]
    elif largest_value < .50:
        result = 'Unknown'
        

    global road_result
    if road_result == 'Vehicles':
        if result != 'normal driving':
            final_result = 'Warning!!! There is an object'
        else:
            final_result = 'Safe Driving'
        return jsonify(final_result)
    else:
        # If road prediction is 'Non-Vehicles', return 'result' of distraction model
        return jsonify(result)

        

# ###################################################################################################

# 3- Road API
@app.route('/road', methods=['POST'])
def predict_road():
    try:
        message = request.get_json(force=True)
        encoded = message['image']
        decoded = base64.b64decode(encoded)
        image = Image.open(io.BytesIO(decoded))
    except:
        result = 'Unable to decode message'
        return jsonify(result)

    # Make prediction for road
    processed_image_road = preprocess_image(image, target_size=(150, 150))
    prediction_road = road_model.predict(processed_image_road).tolist()
    response_road = {
        'prediction': {
            'Non-Vehicles': prediction_road[0][0],
            'Vehicles': prediction_road[0][1]
        }
    }
    global road_result
    road_result = max(response_road['prediction'], key=response_road['prediction'].get)
#     road_result = 'Vehicles'
    
    return jsonify(road_result)

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)

Drowsiness Model is being loaded!
Distraction Model is being loaded!
Road Model is being loaded!
Model loaded!
Model loaded!
Model loaded!
 * Serving Flask app '__main__' (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.1.11:5000
Press CTRL+C to quit




192.168.1.11 - - [15/Feb/2024 15:03:10] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:03:15] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:03:16] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:03:18] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:03:19] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:03:20] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:03:22] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:03:50] "POST /drowsiness HTTP/1.1" 200 -




192.168.1.11 - - [15/Feb/2024 15:04:01] "POST /drowsiness HTTP/1.1" 200 -
