In [3]:
# ===================== IMPORTS/LIBRARIES =====================

import tensorflow as tf
from mtcnn import MTCNN
from pathlib import Path
import pandas as pd
import glob
import cv2
import csv
import os
import ast
import pydot
import pydotplus
import graphviz
from datetime import datetime
import matplotlib.pyplot as plt

import time

from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import Callback

In [4]:
def historyToCsv():
    # convert the history.history dict to a pandas DataFrame:     
    hist_df = pd.DataFrame(history.history) 
    
    current_datetime = datetime.now()

    # Convert the datetime object to a string
    filename_friendly_datetime_string = current_datetime.strftime("%Y-%m-%d_%H-%M-%S")
    
    # save to csv: 
    hist_csv_file = 'history' + filename_friendly_datetime_string + '.csv'
    with open(hist_csv_file, mode='w') as f:
        hist_df.to_csv(f)

def csvToHistory(csv_filename):
    # Read the CSV file into a pandas DataFrame
    hist_df = pd.read_csv(csv_filename, index_col=0)

    # Convert the DataFrame to a dictionary
    history_dict = hist_df.to_dict(orient='list')

    return history_dict

In [7]:
# ===================== MULTITASK MODEL SETUP =====================
base_model = tf.keras.applications.VGG16(
    include_top=False,
    weights='imagenet',
    input_shape=(224, 224, 3)
)

flattened_features = tf.keras.layers.Flatten(name='flattened_features')(base_model.output)

embedding_layer = tf.keras.layers.Dense(512, activation='linear', name='embedding')(flattened_features)

additional_dense_layer1 = tf.keras.layers.Dense(64, name='additional_dense1')(embedding_layer)
additional_dense_layer2 = tf.keras.layers.Dense(64, name='additional_dense2')(additional_dense_layer1)
# additional_dense_layer3 = tf.keras.layers.Dense(64, activation='relu', name='additional_dense3')(flattened_features)

landmarks = tf.keras.layers.Dense(10, activation='linear', name='landmark_output')(additional_dense_layer2)
# illum = tf.keras.layers.Dense(1, activation='linear', name='previous_illuminance_output')(additional_dense_layer2)

# Reshape layer to the desired shape
# reshaped_features = tf.keras.layers.Reshape((8, 8, 8))(embedding_layer)

# Upsampling layers
# upsample1 = tf.keras.layers.UpSampling2D(size=(7, 7))(reshaped_features)
# upsample2 = tf.keras.layers.UpSampling2D(size=(2, 2))(upsample1)
# upsample3 = tf.keras.layers.UpSampling2D(size=(2, 2))(upsample2)

# retIllum = tf.keras.layers.Conv2D(3, kernel_size=(3, 3), activation='relu', padding='same', name='image_retinex_output')(upsample3)

task_outputs = None

task_outputs = [landmarks]

multi_task_model = tf.keras.Model(inputs=base_model.input, outputs=task_outputs)

# Compile the model with specific loss functions and metrics for each task
multi_task_model.compile(
    optimizer='adam',
    loss={
        'landmark_output': 'mean_squared_error'
#         'previous_illuminance_output': 'mean_squared_error',
#         'image_retinex_output': 'mean_squared_error'
    },
    metrics={
        'landmark_output': ['mse', "mae"]
#         'previous_illuminance_output': ['mse', "mae"],
#         'image_retinex_output': ['mse', "mae"]
    }
)

# Summary of the multi-task model
multi_task_model.summary()
# plot_model(multi_task_model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0   

In [None]:
# ===================== DROWSINESS MODEL =====================

# retain weights and remove top layer
output_layer = multi_task_model.get_layer('embedding').output

drowsiness_model = Model(inputs=multi_task_model.input, outputs=output_layer)

# drowsiness_model.summary()
# tf.keras.utils.plot_model(drowsiness_model)
from keras.utils.vis_utils import plot_model
# plot_model(drowsiness_model, to_file='drowsinessModel_plot.png', show_shapes=True, show_layer_names=True)

existing_output = drowsiness_model.output

reshaped_output = tf.keras.layers.Reshape((16, 16, 2))(existing_output)

spatial_attention = tf.keras.layers.Conv2D(2, (1, 1), activation='sigmoid', padding='same')(reshaped_output)
spatial_attention = tf.keras.layers.Softmax()(spatial_attention)
output_tensor = tf.keras.layers.Multiply()([reshaped_output, spatial_attention])

# Add Global Average Pooling layer
output_tensor = tf.keras.layers.GlobalAveragePooling2D()(output_tensor)

# Add output layer with two classes and softmax activation
predictions = tf.keras.layers.Dense(2, activation='softmax', name='drowsiness_output')(output_tensor)

# Create the new model with the modified top layers
drowsiness_model = Model(inputs=drowsiness_model.input, outputs=predictions)

drowsiness_model.compile(
    optimizer='adam',
    loss={
        'drowsiness_output': 'binary_crossentropy'
    },
    metrics={
        'drowsiness_output': ["accuracy"]
    }
)

drowsiness_model.summary()

In [8]:
# ===================== LANDMARKS ONLY DATA GEN =====================

class CustomDataGen(tf.keras.utils.Sequence):
    
    def __init__(self, df, X_col, y_col,
                 batch_size,
                 input_size=(224, 224, 3),
                 shuffle=True,
                 random_seed=None):  # Add a new parameter for random seed
        
        self.df = df.copy()
        self.X_col = X_col
        self.y_col = y_col
        self.batch_size = batch_size
        self.input_size = input_size
        self.shuffle = shuffle
        self.random_seed = random_seed  # Store the random seed
        
        self.n = len(self.df)
        self.n_coords = 2  # Assuming landmark coordinates are 2-dimensional
#         self.n_illuminance = 1  # Assuming a single illuminance value
    
    def on_epoch_end(self):
        if self.shuffle:
            self.df = self.df.sample(frac=1, random_state=self.random_seed).reset_index(drop=True)  # Use the random seed
    
    def __get_input(self, path, target_size):
    
        image = tf.keras.preprocessing.image.load_img(path)
        image_arr = tf.keras.preprocessing.image.img_to_array(image)

        image_arr = tf.image.resize(image_arr, (target_size[0], target_size[1])).numpy()

        return image_arr / 255.
    
    def __get_output(self, label, output_type):
        # Assuming output_type is 'coordinates', 'illuminance', or 'adjusted_image_path'
        if output_type == 'coordinates':
            # Assuming label is a string containing a dictionary-like structure
            # Safely evaluate the string as a literal dictionary using ast.literal_eval
            coordinates_dict = ast.literal_eval(label)
            
            # Extract x and y coordinates for each landmark
            landmarks = ['left_eye', 'right_eye', 'nose', 'mouth_left', 'mouth_right']
            coordinates_list = [coordinates_dict[landmark] for landmark in landmarks]
            
            # Flatten the list and convert to numpy array
            coordinates_array = np.array([coord for landmark_coords in coordinates_list for coord in landmark_coords])
            
#             print("Shape of landmarks_array:", coordinates_array.shape)
            
            # If there are exactly 10 values, return the array, otherwise raise an error
            if len(coordinates_array) == 10:
                return coordinates_array
            else:
                raise ValueError("Expected 10 coordinates, but found {}".format(len(coordinates_array)))
#         elif output_type == 'illuminance':
#             # Convert the illuminance value to a float
#             return float(label)
#         elif output_type == 'adjusted_image_path':
#             # Assuming label is the path to the adjusted image
#             return self.__get_input(label, self.input_size)
    
    def __get_data(self, batches):
        # Generates data containing batch_size samples

        path_batch = batches[self.X_col['path']]
        
        coords_batch = batches[self.y_col['coordinates']]
#         illuminance_batch = batches[self.y_col['illuminance']]
#         adjusted_image_path_batch = batches[self.y_col['adjusted_image_path']]

        X_batch = np.asarray([self.__get_input(x, self.input_size) for x in path_batch])

        y0_batch = np.asarray([self.__get_output(y, 'coordinates') for y in coords_batch])
#         y1_batch = np.asarray([self.__get_output(y, 'illuminance') for y in illuminance_batch])
#         y2_batch = np.asarray([self.__get_output(y, 'adjusted_image_path') for y in adjusted_image_path_batch])

        return X_batch, [y0_batch]
    
    def __getitem__(self, index):
        batches = self.df[index * self.batch_size:(index + 1) * self.batch_size]
        X, y = self.__get_data(batches)

        # Print a few examples of the data
#         print("Sample X:", tf.shape(X[0]))  # Print the first example in the batch
#         print("Sample y[0] (landmarks):", tf.shape(y[0][0]))  # Print the first example in the landmarks output
#         print("Sample y[1] (illum):", tf.shape(y[1][0]))  # Print the first example in the illum output
#         print("Sample y[2] (adjusted_image_path):", tf.shape(y[2][0]))  # Print the first example in the adjusted_image_path output

        return X, y
    
    def __len__(self):
        return self.n // self.batch_size

In [6]:
# ===================== DATA GEN SETUP (LANDMARKS ONLY TASK) =====================

train_df = pd.read_csv("") # path to train_data csv
train_df["Filename"] = "./data/Training/" + train_df["Filename"] + ".jpg"

# Define column indices or names for X and y
X_col = {'path': 'Filename'}
y_col = {'coordinates': 'LandmarksRaw'}

# Create an instance of CustomDataGen
train_gen = CustomDataGen(train_df, X_col, y_col, batch_size=32, input_size=(224, 224, 3))

eval_df = pd.read_csv("") # path to eval_data csv
eval_df["Filename"] = "./data/Evaluation/" + eval_df["Filename"] + ".jpg"

# Define column indices or names for X and y
eval_X_col = {'path': 'Filename'}
eval_y_col = {'coordinates': 'LandmarksRaw'}

val_gen = CustomDataGen(eval_df, eval_X_col, eval_y_col, batch_size=32, input_size=(224, 224, 3))

FileNotFoundError: [Errno 2] No such file or directory: ''

In [None]:
history = multi_task_model.fit(train_gen, epochs=50, validation_data=val_gen)