In [1]:
!nvidia-smi

zsh:1: command not found: nvidia-smi


In [6]:
import datetime
import pickle
import numpy as np
import matplotlib.pyplot as plt
import sys

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

sys.path.append("model/data")

In [5]:
from process_data import Image, DriverDrowsinessDataset

In [None]:
def plot_images(images, classes, class_true, class_pred=None,smooth=True):
    assert len(images) == len(class_true)
    fig, axes = plt.subplots(3, 3)
    if class_pred is None:
        hspace = 0.3
    else:
        hspace = 0.6
    fig.subplots_adjust(hspace=hspace, wspace=0.3)
    if smooth:
        interpolation = 'spline16'
    else:
        interpolation = 'nearest'
    for i, ax in enumerate(axes.flat):
        if i < len(images):
            ax.imshow(images[i], interpolation=interpolation)
            cls_true_name = classes[class_true[i]]
            if class_pred is None:
                xlabel = "True: {0}".format(class_true[i])
            else:
                class_pred_name = classes[class_pred[i]]
                xlabel = "True: {0}\nPred: {1}".format(cls_true_name, class_pred_name)
            ax.set_xlabel(xlabel)
        ax.set_xticks([])
        ax.set_yticks([])
    plt.show()

def print_confusion_matrix(classes,class_test,class_pred):
    """
    prints the confusion matrix. class_pred is the array of all predicted classes of each image.
    """
    cm = confusion_matrix(y_true=class_test, y_pred=class_pred)
    print("Confusion matrix:")
    print(cm)
    for i, class_name in enumerate(classes):
        print("({0}) {1}".format(i, class_name))

def plot_training_history(model):
    # Get the classification accuracy and loss-value
    # for the training-set.
    acc = model.history['accuracy']
    loss = model.history['loss']

    # Get it for the validation-set (we only use the test-set).
    val_acc = model.history['val_accuracy']
    val_loss = model.history['val_loss']

    # Plot the accuracy and loss-values for the training-set.
    plt.plot(acc, linestyle='-', color='b', label='Training Acc.')
    plt.plot(loss, 'o', color='b', label='Training Loss')
    
    # Plot it for the test-set.
    plt.plot(val_acc, linestyle='--', color='r', label='Test Acc.')
    plt.plot(val_loss, 'o', color='r', label='Test Loss')

    # Plot title and legend.
    plt.title('Training and Test Accuracy')
    plt.legend()

    # Ensure the plot shows correctly.
    plt.show()


In [10]:
DIMS = (224,224,3)
BATCH_SIZE = 32

with open("../data/data.pkl","rb") as f:
    dataset = pickle.load(f)

frames,labels = zip(*dataset.values())

# preprocess all frames
frames = np.array([Image.load_and_prep_image(frame,dimensions=(DIMS[0],DIMS[1])) for frame in frames])

X_train, X_val, y_train, y_val = train_test_split(frames,labels,test_size=0.2, random_state=42)

X_train, X_val = np.array(X_train), np.array(X_val)
y_train, y_val = np.array(y_train), np.array(y_val)

def data_generator(X,y,batch_size):
    num_samples = len(X)
    while True:
        indices = np.arange(num_samples)
        np.random.shuffle(indices)

        for start_idx in range(0, num_samples, batch_size):
            batch_indices = indices[start_idx:start_idx + batch_size]
            X_batch, y_batch = X[batch_indices], y[batch_indices]
            yield X_batch, y_batch

def create_tensorboard_callback(dir_name, experiment_name):
    log_dir = dir_name + "/" + experiment_name + "/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir)
    print(f"Saving TensorBoard log files to: {log_dir}")
    return tensorboard_callback

(0, 0, 0, 0, 0)

In [None]:
train_generator = data_generator(X_train, y_train, BATCH_SIZE)
val_generator = data_generator(X_val, y_val, BATCH_SIZE)

data_augmentation = keras.Sequential([
    layers.experimental.preprocessing.RandomFlip("horizontal", input_shape=DIMS),
    layers.experimental.preprocessing.RandomRotation(0.2),
    layers.experimental.preprocessing.RandomZoom(0.2),
    layers.experimental.preprocessing.RandomHeight(0.2),
    layers.experimental.preprocessing.RandomWidth(0.2),
], name="data_augmentation")

In [None]:
BASE_MODEL = tf.keras.applications.EfficientNetV2B0(
    input_shape=DIMS,
    include_top=False,
    weights="imagenet",
)

def EfficientNet(input_shape=DIMS,base_model=BASE_MODEL, num_classes=2):
    # freeze the base model layers
    for layer in base_model.layers:
        layer.trainable = False
    
    # create input layer
    inputs = keras.Input(shape=input_shape, name="input_layer")
    x = data_augmentation(inputs)
    x = base_model(x, training=False)
    x = layers.GlobalAveragePooling2D(name="global_average_pooling_layer")(x)
    x = layers.Dense(1024, activation="relu", name="dense_layer")(x)
    x = layers.Dropout(0.7, name="dropout_layer")(x)
    outputs = layers.Dense(num_classes, activation="sigmoid", name="output_layer")(x)


    # create a new model with the EfficientNetV2B0 base model and a GlobalAveragePooling2D layer. assess model performance with metrics such as accuracy, loss, and f1 score
    model = keras.Model(inputs, outputs, name="EfficientNet")
    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss="binary_crossentropy",
        metrics=["accuracy",keras.metrics.Precision(),keras.metrics.Recall(),keras.metrics.AUC()]
    )
    return model



In [None]:
model = EfficientNet()
model.summary()

In [None]:
print(model.layers[3].dtype)
print(model.layers[3].dtype_policy)

In [None]:
model = model.fit(train_generator,
                  epochs=10,
                  steps_per_epoch=len(X_train)//BATCH_SIZE,validation_data=val_generator,
                  validation_steps=len(X_val)//BATCH_SIZE,
                  callbacks=[create_tensorboard_callback("logs","EfficientNetV2B0")])

In [None]:
model.save("en_model_v1.h5")