In [None]:
import numpy as np
import cv2
import os
import glob
from os.path import join
import json
#from data_utility import image_normalization
import random


# normalize all data
def normalize(data):

    print("Data normalization...")
    shape = data.shape
    data = np.reshape(data, (shape[0], -1))
    # scaling
    data = data.astype('float32') / 255.
    # normalizing
    data = data - np.mean(data, axis=0)
    print("Done.")
    return np.reshape(data, shape)


# normalize a single image
def image_normalization(img):

    img = img.astype('float32') / 255.
    img = img - np.mean(img)

    return img


# prepare all data (npz version)
def prepare_data(data):
    print("Data preparing...")
    eye_left, eye_right, face, face_mask, y = data
    eye_left = normalize(eye_left)
    eye_right = normalize(eye_right)
    face = normalize(face)
    face_mask = np.reshape(face_mask, (face_mask.shape[0], -1)).astype('float32')
    y = y.astype('float32')
    print("Done.")
    return [eye_left, eye_right, face, face_mask, y]


# shuffle data
def shuffle_data(data):

    idx = np.arange(data[0].shape[0])
    np.random.shuffle(idx)
    for i in list(range(len(data))):
        data[i] = data[i][idx]
    return data

# Loading Data below

In [None]:
def load_data_from_npz(file):

    print("Loading dataset from npz file...", end='')
    npzfile = np.load(file)
    print(npzfile.files)

    train_eye_left = npzfile["train_eye_left"]
    train_eye_right = npzfile["train_eye_right"]
    train_face = npzfile["train_face"]
    train_face_mask = npzfile["train_face_mask"]
    train_y = npzfile["train_y"]
    val_eye_left = npzfile["val_eye_left"]
    val_eye_right = npzfile["val_eye_right"]
    val_face = npzfile["val_face"]
    val_face_mask = npzfile["val_face_mask"]
    val_y = npzfile["val_y"]
    print("Done.")

    return [train_eye_left, train_eye_right, train_face, train_face_mask, train_y], [val_eye_left, val_eye_right, val_face, val_face_mask, val_y]




def load_batch(data, img_ch, img_cols, img_rows):

    # useful for debug
    save_images = False

    # if save images, create the related directory
    img_dir = "images"
    if save_images:
        if not os.path.exists(img_dir):
            os.makedir(img_dir)

    # create batch structures
    left_eye_batch = np.zeros(shape=(data[0].shape[0], img_ch, img_cols, img_rows), dtype=np.float32)
    right_eye_batch = np.zeros(shape=(data[0].shape[0], img_ch, img_cols, img_rows), dtype=np.float32)
    face_batch = np.zeros(shape=(data[0].shape[0], img_ch, img_cols, img_rows), dtype=np.float32)
    face_grid_batch = np.zeros(shape=(data[0].shape[0], 1, 25, 25), dtype=np.float32)
    y_batch = np.zeros((data[0].shape[0], 2), dtype=np.float32)

    # load left eye
    for i, img in enumerate(data[0]):
        img = cv2.resize(img, (img_cols, img_rows))
        if save_images:
            cv2.imwrite(join(img_dir, "left" + str(i) + ".png"), img)
        img = image_normalization(img)
        left_eye_batch[i] = img.transpose(2, 0, 1)

    # load right eye
    for i, img in enumerate(data[1]):
        img = cv2.resize(img, (img_cols, img_rows))
        if save_images:
            cv2.imwrite("images/right" + str(i) + ".png", img)
        img = image_normalization(img)
        right_eye_batch[i] = img.transpose(2, 0, 1)

    # load faces
    for i, img in enumerate(data[2]):
        img = cv2.resize(img, (img_cols, img_rows))
        if save_images:
            cv2.imwrite("images/face" + str(i) + ".png", img)
        img = image_normalization(img)
        face_batch[i] = img.transpose(2, 0, 1)

    # load grid faces
    for i, img in enumerate(data[3]):
        if save_images:
            cv2.imwrite("images/grid" + str(i) + ".png", img)
        face_grid_batch[i] = img.reshape((1, img.shape[0], img.shape[1]))

    # load labels
    for i, labels in enumerate(data[4]):
        y_batch[i] = labels

    return [right_eye_batch, left_eye_batch, face_batch, face_grid_batch], y_batch

# Model Definition

In [None]:
from keras.layers import Layer
from keras.layers import Input, Conv2D, Dense, Flatten, MaxPool2D, concatenate
from keras.models import Model


class ScaledSigmoid(Layer):
    def __init__(self, alpha, beta, **kwargs):
        self.alpha = alpha
        self.beta = beta
        super(ScaledSigmoid, self).__init__(**kwargs)

    def build(self, input_shape):
        super(ScaledSigmoid, self).build(input_shape)

    def call(self, x, mask=None):
        return self.alpha / (1 + np.exp(-x / self.beta))

    def get_output_shape_for(self, input_shape):
        return input_shape


# activation functions
activation = 'relu'
last_activation = 'linear'


# eye model
def get_eye_model(img_ch, img_cols, img_rows):

    eye_img_input = Input(shape=(img_ch, img_cols, img_rows))

    h = Conv2D(96, (11, 11), activation=activation,data_format='channels_first')(eye_img_input)
    h = MaxPool2D(pool_size=(2, 2))(h)
    h = Conv2D(256, (5, 5), activation=activation,data_format='channels_first')(h)
    h = MaxPool2D(pool_size=(2, 2))(h)
    h = Conv2D(384, (3, 3), activation=activation,data_format='channels_first')(h)
    h = MaxPool2D(pool_size=(2, 2))(h)
    out = Conv2D(64, (1, 1), activation=activation,data_format='channels_first')(h)

    model = Model(inputs=eye_img_input, outputs=out)

    return model


# face model
def get_face_model(img_ch, img_cols, img_rows):

    face_img_input = Input(shape=(img_ch, img_cols, img_rows))

    h = Conv2D(96, (11, 11), activation=activation,data_format='channels_first')(face_img_input)
    h = MaxPool2D(pool_size=(2, 2))(h)
    h = Conv2D(256, (5, 5), activation=activation,data_format='channels_first')(h)
    h = MaxPool2D(pool_size=(2, 2))(h)
    h = Conv2D(384, (3, 3), activation=activation,data_format='channels_first')(h)
    h = MaxPool2D(pool_size=(2, 2))(h)
    out = Conv2D(64, (1, 1), activation=activation,data_format='channels_first')(h)

    model = Model(inputs=face_img_input, outputs=out)

    return model


# final model
def get_eye_tracker_model(img_ch, img_cols, img_rows):

    # get partial models
    eye_net = get_eye_model(img_ch, img_cols, img_rows)
    face_net_part = get_face_model(img_ch, img_cols, img_rows)

    # right eye model
    right_eye_input = Input(shape=(img_ch, img_cols, img_rows))
    right_eye_net = eye_net(right_eye_input)

    # left eye model
    left_eye_input = Input(shape=(img_ch, img_cols, img_rows))
    left_eye_net = eye_net(left_eye_input)

    # face model
    face_input = Input(shape=(img_ch, img_cols, img_rows))
    face_net = face_net_part(face_input)

    # face grid
    face_grid = Input(shape=(1, 25, 25))

    # dense layers for eyes
    e = concatenate([left_eye_net, right_eye_net])
    e = Flatten()(e)
    fc_e1 = Dense(128, activation=activation)(e)

    # dense layers for face
    f = Flatten()(face_net)
    fc_f1 = Dense(128, activation=activation)(f)
    fc_f2 = Dense(64, activation=activation)(fc_f1)

    # dense layers for face grid
    fg = Flatten()(face_grid)
    fc_fg1 = Dense(256, activation=activation)(fg)
    fc_fg2 = Dense(128, activation=activation)(fc_fg1)

    # final dense layers
    h = concatenate([fc_e1, fc_f2, fc_fg2])
    fc1 = Dense(128, activation=activation)(h)
    fc2 = Dense(2, activation=last_activation)(fc1)

    # final model
    final_model = Model(
        inputs=[right_eye_input, left_eye_input, face_input, face_grid],
        outputs=[fc2])

    return final_model


# Training and Testing

In [None]:
import os
from keras.optimizers import SGD, Adam
from keras.callbacks import  EarlyStopping, ModelCheckpoint



# generator for data loaded from the npz file
def generator_npz(data, batch_size, img_ch, img_cols, img_rows):

    while True:
        for it in list(range(0, data[0].shape[0], batch_size)):
            x, y = load_batch([l[it:it + batch_size] for l in data], img_ch, img_cols, img_rows)
            yield x, y





def train():

    os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
    os.environ["CUDA_VISIBLE_DEVICES"] = '-1'

    #todo: manage parameters in main


    # train parameters
    n_epoch = 100
    batch_size = 128
    patience = 15

    # image parameter
    img_cols = 64
    img_rows = 64
    img_ch = 3

    # model
    model = get_eye_tracker_model(img_ch, img_cols, img_rows)

    # model summary
    model.summary()

    # weights
    # print("Loading weights...",  end='')
    # weights_path = "weights/weights.003-4.05525.hdf5"
    # model.load_weights(weights_path)
    # print("Done.")

    # optimizer
    sgd = SGD(lr=1e-1, decay=5e-4, momentum=9e-1, nesterov=True)
    adam = Adam(lr=1e-3)

    # compile model
    model.compile(optimizer=adam, loss='mse')

    # data
    # todo: parameters not hardocoded

    train_data, val_data = load_data_from_npz('eye_tracker_train_and_val.npz')

    # debug
    # x, y = load_batch([l[0:batch_size] for l in train_data], img_ch, img_cols, img_rows)
    # x, y = load_batch_from_names(train_names[0:batch_size], dataset_path, img_ch, img_cols, img_rows)

    # last dataset checks

    print("train data sources of size: {} {} {} {} {}".format(
        train_data[0].shape[0], train_data[1].shape[0], train_data[2].shape[0],
        train_data[3].shape[0], train_data[4].shape[0]))
    print("validation data sources of size: {} {} {} {} {}".format(
        val_data[0].shape[0], val_data[1].shape[0], val_data[2].shape[0],
        val_data[3].shape[0], val_data[4].shape[0]))




    model.fit_generator(
        generator=generator_npz(train_data, batch_size, img_ch, img_cols, img_rows),
        steps_per_epoch=(train_data[0].shape[0])/batch_size,
        epochs=n_epoch,
        verbose=1,
        validation_data=generator_npz(val_data, batch_size, img_ch, img_cols, img_rows),
        validation_steps=(val_data[0].shape[0])/batch_size,
        callbacks=[EarlyStopping(patience=patience),
                   ModelCheckpoint("weights/weights.{epoch:03d}-{val_loss:.5f}.hdf5", save_best_only=True)
                   ]
    )

In [None]:
import numpy as np

def test_small():
    # os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
    # os.environ["CUDA_VISIBLE_DEVICES"] = -1

    dataset_path = ""
    print("Dataset: {}".format(dataset_path))

    weights_path = ""
    print("Weights: {}".format(weights_path))

    # image parameter
    img_cols = 64
    img_rows = 64
    img_ch = 3

    # test parameter
    batch_size = 128

    # model
    model = get_eye_tracker_model(img_ch, img_cols, img_rows)

    # model summary
    model.summary()

    # weights
    print("Loading weights...")
    model.load_weights(weights_path)

    # data
    train_data, val_data = load_data_from_npz(dataset_path)

    print("Loading testing data...")
    x, y = load_batch([l[:] for l in val_data], img_ch, img_cols, img_rows)
    print("Done.")

    predictions = model.predict(x=x, batch_size=batch_size, verbose=1)

    # print and analyze predictions
    err_x = []
    err_y = []
    for i, prediction in enumerate(predictions):
        print("PR: {} {}".format(prediction[0], prediction[1]))
        print("GT: {} {} \n".format(y[i][0], y[i][1]))

        err_x.append(abs(prediction[0] - y[i][0]))
        err_y.append(abs(prediction[1] - y[i][1]))

    # mean absolute error
    mae_x = np.mean(err_x)
    mae_y = np.mean(err_y)

    # standard deviation
    std_x = np.std(err_x)
    std_y = np.std(err_y)

    # final results
    print("MAE: {} {} ({} samples)".format(mae_x, mae_y, len(y)))
    print("STD: {} {} ({} samples)".format(std_x, std_y, len(y)))
