# AI based driver activity monitoring

In [1]:
import numpy as np
import pandas as pd
from keras.models import load_model
import cv2
import time

from keras.preprocessing import image
import matplotlib.pyplot as plt

Using TensorFlow backend.


In [38]:
# load the model
#model_path = "models/vgg16_size128_best.h5"
model_path = "models/restnet50_size128_best.h5" 

camera_port = 0 # Camera 0 is the integrated web cam on my netbook
#camera_port = 1 # For usb camera

IMAGE_SIZE = (128, 128)

left_data_dir = 'D:/my_learning_dataset/state_farms_drivers/imgs/test/'
right_data_dir = 'D:/my_learning_dataset/state_farms_drivers/imgs_right/test/'

data_dir = right_data_dir

In [39]:
# this is original image dataset for left hand driving
class_list =  ['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'c6','c7', 'c8', 'c9']
left_class_desc = ['safe-driving', 'texting-right', 'talking-phone-right', 'texting-left', 'talking-phone-left', 
              'operating-radio', 'drinking', 'reaching-behind', 'hair-and-makeup', 'talking-to-passenger']
right_class_desc = ['safe-driving', 'texting-left', 'talking-phone-left', 'texting-right', 'talking-phone-right', 
              'operating-radio', 'drinking', 'reaching-behind', 'hair-and-makeup', 'talking-to-passenger']
df_desc = pd.DataFrame({'class': class_list, 'left_desc': left_class_desc,  'right_desc': right_class_desc})
df_desc

Unnamed: 0,class,left_desc,right_desc
0,c0,safe-driving,safe-driving
1,c1,texting-right,texting-left
2,c2,talking-phone-right,talking-phone-left
3,c3,texting-left,texting-right
4,c4,talking-phone-left,talking-phone-right
5,c5,operating-radio,operating-radio
6,c6,drinking,drinking
7,c7,reaching-behind,reaching-behind
8,c8,hair-and-makeup,hair-and-makeup
9,c9,talking-to-passenger,talking-to-passenger


In [40]:
class_desc = right_class_desc
#class_desc = left_class_desc

In [41]:
def getActivity(predictions, threshold=0.50):
    result = np.where(predictions[0] == np.max(predictions[0])) # this is a list
    pos = result[0][0] # this is max probability
    if(predictions[0][pos] >= threshold):
        return pos, class_desc[pos], predictions[0][pos]
    else:
        return 10, "no class", 0
    
def getActivityList(predictions, top=3):
    val = sorted(zip(predictions[0], class_desc), reverse=True)[:top]
    return val

def getTextForDisplay(actlist):
    text = "";
    for idx, pair in enumerate(actlist):    
        text = text + "{}: {:0.2f}".format(pair[1], pair[0]) + "\n"
    text = text[:-1]
    return text

def plotImageWithActivityPredictions(file, actlist):
    displayText = getTextForDisplay(actlist)
    img = cv2.imread(file)
    plt.text(x=0, y=0,s=displayText, 
         bbox=dict(facecolor='orange', alpha=0.5), 
         horizontalalignment='left', 
         verticalalignment='top',
         fontsize=10)
    plt.imshow(img)
    
def plotImageWithActivityPredictionsExt(img, actlist):
    displayText = getTextForDisplay(actlist)
    plt.text(x=0, y=0,s=displayText, 
         bbox=dict(facecolor='orange', alpha=0.5), 
         horizontalalignment='left', 
         verticalalignment='top',
         fontsize=10)
    plt.imshow(img)

In [42]:
def read_img_cv2(filepath, size):
    img = cv2.imread(filepath) #, cv2.IMREAD_GRAYSCALE
    img = cv2.resize(img, size, interpolation = cv2.INTER_AREA) # resize image  
    img = image.img_to_array(img)
    return img

# input is image file
def cv2_pre_processing(filepath, size):
    img = read_img_cv2(filepath, size)
    img_preprocessed = np.expand_dims(img.copy(), axis=0)
    img_preprocessed = img_preprocessed.astype('float32')/255
    return img_preprocessed

# input is image data
def cv2_pre_processing_data(iframe, size):
    img = cv2.resize(iframe, size, interpolation = cv2.INTER_AREA) # resize image  
    img = image.img_to_array(img)
    img_preprocessed = np.expand_dims(img.copy(), axis=0)
    img_preprocessed = img_preprocessed.astype('float32')/255
    return img_preprocessed

def cv2_model_execution(model, filepath, size, threshold=0.5, top=3):
    preproc = cv2_pre_processing(filepath, size)
    preds = model.predict(preproc)
    actlist = getActivityList(preds,top)
    plotImageWithActivityPredictions(file,actlist)

In [43]:
#using image library of keras
def read_img_grayscale(filepath, size):
    img = image.load_img(filepath, target_size=size, color_mode="grayscale")
    img_data = image.img_to_array(img)
    return img_data

# input is image file
def pre_processing(filepath, size):
    img = read_img_grayscale(filepath, size)
    img_preprocessed = np.expand_dims(img.copy(), axis=0)
    img_preprocessed = img_preprocessed.astype('float32')/255
    return img_preprocessed

# input is image data
def pre_processing_data(iframe, size):
    img = np.resize(iframe, new_shape=size)
    img = image.img_to_array(img)
    img_preprocessed = np.expand_dims(img.copy(), axis=0)
    img_preprocessed = img_preprocessed.astype('float32')/255
    return img_preprocessed
    
def model_execution(model, filepath, size, threshold=0.5, top=3):
    preproc = pre_processing(filepath, size)
    preds = model.predict(preproc)
    actlist = getActivityList(preds,top)
    plotImageWithActivityPredictions(file,actlist)

In [44]:
# VGG
from keras.applications.vgg16 import preprocess_input

def vgg_read_image_cv2(filepath, size):
    img = cv2.imread(filepath) #, cv2.IMREAD_GRAYSCALE
    img = cv2.resize(img, size, interpolation = cv2.INTER_AREA) # resize image  
    img = image.img_to_array(img) # convert the image pixels to a numpy array
    # Convert the image / images into batch format
    # expand_dims will add an extra dimension to the data at a particular axis
    # We want the input matrix to the network to be of the form (batchsize, height, width, channels)
    # Thus we add the extra dimension to the axis 0.
    img = np.expand_dims(img, axis=0) # reshape data for the model
    return img

def vgg_read_image(filepath, size):
    img = vgg_read_image_cv2(filepath, size)
    img_preprocessed = preprocess_input(img.copy()) # # prepare the image for the VGG model
    img_preprocessed = img_preprocessed.astype('float32')/255
    return img_preprocessed

def vgg_pre_processing_data(iframe, size):  
    img = cv2.resize(iframe, size, interpolation = cv2.INTER_AREA) # resize image  
    img = image.img_to_array(img) # convert the image pixels to a numpy array
    # Convert the image / images into batch format
    # expand_dims will add an extra dimension to the data at a particular axis
    # We want the input matrix to the network to be of the form (batchsize, height, width, channels)
    # Thus we add the extra dimension to the axis 0.
    img = np.expand_dims(img, axis=0) # reshape data for the model
    img_preprocessed = preprocess_input(img.copy()) # # prepare the image for the VGG model
    img_preprocessed = img_preprocessed.astype('float32')/255
    return img_preprocessed

In [45]:
# For Restnet model
def dam_read_image(filepath, size):
    #img = vgg_read_image(filepath, size) # for VGG
    img = cv2_pre_processing(filepath, size)  # for restnet50 and custom
    return img

def dam_pre_processing_data(iframe, size):
    #img = vgg_pre_processing_data(iframe, size) # for VGG
    img = cv2_pre_processing_data(iframe, size) # for restnet50 and custom
    return img

In [46]:
# load the model
trainedModel = load_model(model_path)

In [47]:
trainedModel.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
resnet50 (Model)             (None, 4, 4, 2048)        23587712  
_________________________________________________________________
flatten_1 (Flatten)          (None, 32768)             0         
_________________________________________________________________
dense_1 (Dense)              (None, 1024)              33555456  
_________________________________________________________________
dropout_1 (Dropout)          (None, 1024)              0         
_________________________________________________________________
dense_2 (Dense)              (None, 512)               524800    
_________________________________________________________________
dropout_2 (Dropout)          (None, 512)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 10)               

***image file prediction***

In [50]:
def activity_prediction_for_test_file(testfile):
    preprocessedInp = dam_read_image(testfile,IMAGE_SIZE)
    preds = trainedModel.predict(preprocessedInp)

    pos, classname, prob = getActivity(preds, threshold=0.5)
    #print("{}, {:0.2f}".format(classname, prob))

    actList = getActivityList(preds,top=3)
    print(actList)

    plotImageWithActivityPredictions(testfile,actList)
    return

**Live video, get frame in real time and predict activity**

In [51]:
def DAM_live_monitoring():

    #Number of frames to throw away while the camera adjusts to light levels
    ramp_frames = 30

    try:

        print("opening camera and cv2 resources")
        cap = cv2.VideoCapture(camera_port)
        
        print("camera warm ups")
        # camera warm up before we begin looping over the frames
        time.sleep(2)

        #Check whether user selected camera is opened successfully.
        if not (cap.isOpened()):
            print("Could not open video device")

        #To set the resolution
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)  #640
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 480

        #cv2.resizeWindow("Driver Activity Detection", 800, 600)

        # Grab the frame continuously from the camera and show it in the preview window using the while loop. 
        # Enter ‘q’ key, to break the loop and exit the application
        print("Enter ‘q’ key, to break the loop and exit the application")
        
        prev_classid = 0
        counter=0
        while(True):

            #time.sleep(0.1)

            #for i in range(ramp_frames):    
            #    ret, frame = cap.read()    # Capture frame-by-frame
            #    cv2.imshow('Driver Activity Detection',frame)


            # image to predict activity
            ret, frame = cap.read()  
            #frame = cv2.flip(frame1,1)

            processedImg = dam_pre_processing_data(frame, IMAGE_SIZE)
            preds = trainedModel.predict(processedImg)
            pos, classname, prob = getActivity(preds, 0.2)
            #print("{},{}".format(classname, prob))

            actlist = getActivityList(preds,top=3)
            displayText = getTextForDisplay(actlist)
            
            if(prev_classid == pos):
                counter += 1
            else:
                counter = 0
                
            prev_classid = pos
                
            if(counter > 5):
                # Display the resulting frame
                label = "{}:{:0.2f}".format(classname, prob)
            else:
                label = "no class"

            #dispframe = cv2.putText(frame, label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            dispframe = cv2.putText(frame, label, 
                                    (10, 25), # bottomLeftCornerOfText
                                    cv2.FONT_HERSHEY_SIMPLEX, # font
                                    0.7, # fontScale
                                    (0, 255, 0), #fontColor
                                    2 # lineType
                                    )
            cv2.imshow('Driver Activity Monitoring',dispframe) # show the output frame

            #Waits for a user input to quit the application
            key = cv2.waitKey(1) & 0xFF

            # if the `q` key was pressed, break from the loop
            if key == ord('q'):
                break
    finally:        
        # Release the camera, then close all of the imshow() windows
        # When everything done, release the capture
        print("Releasing camera and cv2 resources")
        cap.release()
        cv2.destroyAllWindows()
    return

In [21]:
DAM_live_monitoring()

opening camera and cv2 resources
camera warm ups
Enter ‘q’ key, to break the loop and exit the application
Releasing camera and cv2 resources


**Single click prediction using live camera**

In [None]:
def dam_one_shot_live_prediction():   

    #Number of frames to throw away while the camera adjusts to light levels
    ramp_frames = 30

    try:
        print("opening camera and cv2 resources")
        cap = cv2.VideoCapture(camera_port)

        #Check whether user selected camera is opened successfully.
        if not (cap.isOpened()):
            print("Could not open video device")

        # Capture frame-by-frame
        ret, frame = cap.read()
        #frame = cv2.flip(frame1,1)
        #print("Frame: ", frame.shape)

        processedImg = dam_pre_processing_data(frame,IMAGE_SIZE)
        preds = trainedModel.predict(processedImg)
        #pos, classname, prob = getActivity(preds, 0.001)
        #print("{},{}".format(classname, prob))

        actlist = getActivityList(preds,top=3)
        plotImageWithActivityPredictionsExt(frame,actlist)

        cv2.imwrite(outfile, frame)

    finally:        
        # Release the camera, then close all of the imshow() windows
        # When everything done, release the capture
        print("Releasing camera and cv2 resources")
        cap.release()
        cv2.destroyAllWindows()
    return

In [52]:
#outfile = "capture_images/live_image1.png"
#dam_one_shot_live_prediction()

**Utility to click picture and save into file**

In [53]:
def click_picture(file):
    
    try: 

        #Number of frames to throw away while the camera adjusts to light levels
        ramp_frames = 30

        print("opening camera and cv2 resources")
        cap = cv2.VideoCapture(camera_port)

        #Check whether user selected camera is opened successfully.
        if not (cap.isOpened()):
            print("Could not open video device")

        # Capture frame-by-frame
        ret, frame = cap.read()

        cv2.imwrite(file, frame)
    finally:        
        # Release the camera, then close all of the imshow() windows
        # When everything done, release the capture
        print("Releasing camera and cv2 resources")
        cap.release()
        cv2.destroyAllWindows()
    return

In [54]:
#outfilenane = "capture_images/test_image1.png"
#click_picture(outfilenane)

***Predict test data***