In [1]:
from __future__ import print_function
import keras
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.datasets import fashion_mnist, cifar10
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
from functools import partial
from sklearn.model_selection import train_test_split

2022-11-11 13:36:48.599737: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Loading data

First, we load in the data and split it into a train and test set. 

In [2]:
# load the grayscale images and two-interger labels
images = np.load('images.npy')
labels = np.load('labels.npy')

# the data, split between train and test sets
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# input image dimensions
img_rows, img_cols = 150, 150

# reshape the 2D input data to 4D to fit the conv2D layer
if K.image_data_format() == 'channels_first':
    X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols)
    X_test = X_test.reshape(X_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1)
    X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255
print('X_train shape:', X_train.shape)
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')

X_train shape: (14400, 150, 150, 1)
14400 train samples
3600 test samples


## Definition of common-sense difference

The difference between the predicted time and actual time will be quantified using the 'common-sense' time. The function that calculates this time difference is shown below.

In [3]:
def common_sense(time1, time2):
    time1min = time1[0]*60 + time1[1]
    time2min = time2[0]*60 + time2[1]
    
    diff = abs(time1min - time2min)
    csdiff = min(diff, 12*60-diff)
    csdiffhr = np.array([int(csdiff/60), csdiff%60])
    
    return csdiff

print('Time 1: ', labels[500], '\nTime 2: ', labels[17500], '\nDifference in minutes: ', common_sense(labels[500], labels[17500]) )

Time 1:  [ 0 20] 
Time 2:  [11 40] 
Difference in minutes:  40


In [4]:
batch_size = 128
num_classes = 24 # maximum 720
epochs = 30

## Regression

In [5]:
# change the two-interger labels to a float
y_reg_train = y_train[:, 0] + y_train[:, 1] / 60
y_reg_test = y_test[:, 0] + y_test[:, 1] / 60

In [6]:
# the common sense accuracy for regression model
def common_sense_reg(y_true, y_pred):
    print(y_true, y_pred)
    differences = K.abs(y_pred - y_true)
    diff = tf.math.minimum(differences, tf.subtract(12.0, differences))
    return diff

In [11]:
DefaultConv2D = partial(keras.layers.Conv2D, kernel_size=3, padding="SAME", input_shape=input_shape) 

model = keras.models.Sequential()
model.add(Conv2D(32, (3, 3), padding='same', input_shape=input_shape)) # , activation='relu'
model.add(Activation('relu'))
model.add(Conv2D(32, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
# model.add(Dense(num_classes))
# model.add(Activation('softmax'))
model.add(Dense(1))

model.compile(loss="mean_absolute_error", optimizer='sgd', metrics=[common_sense_reg])

In [None]:
model.fit(X_train, y_reg_train,
        batch_size=batch_size,
        epochs=epochs,
        verbose=1,
        validation_data=(X_test, y_reg_test))
score = model.evaluate(X_test, y_reg_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

Epoch 1/12

## Classification

In [111]:
# change the two-interger labels to 24 classes
def classification_label(y):
    label = []
    for i in y:
        if i[1] <= 30:
            label.append(2 * i[0] +1)
        else:
            label.append(2 * i[0] +2)
    return label
y_class_train = classification_label(y_train)

In [None]:
# the common sense accuracy for classification model
def common_sense_reg(y_true, y_pred):
    differences = K.abs(y_pred - y_true)
    diff = tf.math.minimum(differences, tf.subtract(12.0, differences))
    return diff

In [None]:
# Convert class vectors to binary class matrices.
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

## Multi-head models

## Optimizing final model