In [None]:
from __future__ import print_function
import tensorflow as tf
import tensorflow.keras
from tensorflow.keras.datasets import mnist, fashion_mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras import backend as K
import numpy as np


In [None]:
# # Define common sense loss described in the assignment description
# def common_sense_loss_hours(y_true, y_pred):
#     return K.mean(K.abs(K.minimum(K.abs(y_true - y_pred), K.ones_like(y_true) * 12 - K.abs(y_true - y_pred))))

# def common_sense_loss_minutes(y_true, y_pred):
#     return K.mean(K.abs(K.minimum(K.abs(y_true - y_pred), K.ones_like(y_true) * 60 - K.abs(y_true - y_pred))))

# Define ResNet model
def resnet():
    """
    - ResNet model pretrained on 1000 class imagenet data set
    - ResNet 34 is loaded (34 deep convolutional layers)
    - The top of the ResNet model is excluded
    - Three layers are added to end of ResNet, a GAPooling and two dense layers
    - Last layer has linear activation to predict continuous regression value
    """
    ResNet, preprocess_input = Classifiers.get('resnet34')

    # Build ResNet model with correct input shape, weights loaded from 1000 class imagenet, top is exluced
    base_model = ResNet(input_shape=(150, 150, 3), weights='imagenet', include_top=False)
    x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
    flatten = tf.keras.layers.Dense(256, activation='relu')(x)
    output = tf.keras.layers.Dense(1, activation='linear')(flatten)
    
    # Add extra layers and create Keras model
    model = tf.keras.models.Model(inputs=[base_model.input], outputs=[output])
    
    # Compile model with adam optimizer and custom loss/MSE
    model.compile(optimizer="adam", loss=tf.keras.losses.MeanSquaredError(), metrics=[tf.keras.losses.MeanAbsoluteError()])
    
    return model

In [None]:
def train_test_split(X, y, test_size=0.2, shuffle=False):
    if shuffle:
        data = [(x, y) for x, y in zip(X, y)]
        np.random.shuffle(data)
        X, y = [d[0] for d in data], [d[1] for d in data]
        num_samples_train = int((1 - test_size) * len(data))
        return np.array(X[:num_samples_train]), np.array(X[num_samples_train:]), np.array(y[:num_samples_train]), np.array(y[num_samples_train:])
    
    num_samples_train = int((1 - test_size) * len(X))
    return X[:num_samples_train], X[num_samples_train:], y[:num_samples_train], y[num_samples_train:]
    
        
# Data processing hours
data = np.load("../input/time-data/images.npy")
labels = np.load("../input/time-data/labels.npy")

X = np.stack([data]*3, axis=-1)


x_train, x_test, y_train, y_test = train_test_split(X, labels, test_size=0.2, shuffle=True)

del X, data

y_train_hours = np.expand_dims(np.array([x[0] for x in y_train], dtype="float32"), axis=-1)
y_train_minutes = np.expand_dims(np.array([x[1] for x in y_train], dtype="float32"), axis=-1)

y_test_hours = np.expand_dims(np.array([x[0] for x in y_test], dtype="float32"), axis=-1)
y_test_minutes = np.expand_dims(np.array([x[1] for x in y_test], dtype="float32"), axis=-1)

def cv_generator(X, y, n_folds=5):
    folds = np.array_split([(x, y) for x, y in zip(X, y)], n_folds)
    fold = 0
    while True:
        train_samples = [item for sublist in [f for i, f in enumerate(folds) if i != fold] for item in sublist]
        x_train = np.array([x[0] for x in train_samples])
        y_train = np.array([x[1] for x in train_samples])
        x_test = np.array([x[0] for x in folds[fold]])
        y_test = np.array([x[1] for x in folds[fold]])
        yield x_train, x_test, y_train, y_test
        fold += 1
        
def evaluate_common_sense(model_hours, model_minutes, x_test, y_test):
    y_pred_hours = model_hours.predict(x_test)
    y_pred_minutes = model_minutes.predict(x_test)

    y_pred = [round(y_pred_hours[i][0]) + round(y_pred_minutes[i][0]) / 60 for i in range(len(x_test))]
    y_true = [y_test_hours[i][0] + y_test_minutes[i][0] / 60 for i in range(len(y_test_hours))]

    print(y_pred_hours[:20])
    print(y_pred_minutes[:20])
    print(y_test[:20])

    common_sense_error = sum([min(abs(y_true[i] - y_pred[i]), 12.0 - abs(y_true[i] - y_pred[i])) for i in range(len(y_true))]) / len(y_true) * 60
    print(f"Mean Common Sense Error for test samples is {common_sense_error}")
    return common_sense_error
    
    


In [None]:
cv_results = []
cv_data = cv_generator(np.concatenate((x_train, x_test), axis=0), np.concatenate((y_train, y_test), axis=0))

for _ in range(5):
    x_train, x_test, y_train, y_test = next(cv_data)
    
    model_hours = resnet()
    model_minutes = resnet()
    
    y_train_hours = np.expand_dims(np.array([x[0] for x in y_train], dtype="float32"), axis=-1)
    y_train_minutes = np.expand_dims(np.array([x[1] for x in y_train], dtype="float32"), axis=-1)

    y_test_hours = np.expand_dims(np.array([x[0] for x in y_test], dtype="float32"), axis=-1)
    y_test_minutes = np.expand_dims(np.array([x[1] for x in y_test], dtype="float32"), axis=-1)

    # Fit and evaluate model
    model_hours.fit(x_train, y_train_hours,
              batch_size=256,
              epochs=19,
              verbose=1,
              validation_data=(x_test, y_test_hours))

    score = model_hours.evaluate(x_test, y_test_hours, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

    model_minutes.fit(x_train, y_train_minutes,
              batch_size=256,
              epochs=19,
              validation_data=(x_test, y_test_minutes))

    score = model_minutes.evaluate(x_test, y_test_minutes, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    del x_train
    cv_results.append(evaluate_common_sense(model_hours, model_minutes, x_test, y_test))
    del model_hours, model_minutes

print(f"Mean of all Mean Common Sense Errors is {sum(cv_results) / len(cv_results)}")
