In [None]:
import tensorflow as tf
gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)
print(gpus)
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import os,glob, random
import tensorflow_models as tfm
from ptge import GazeModel, vgg16_processor
tf.__version__

### data pipeline

In [None]:
face=[]
lefteye=[]
righteye=[]
rotation_matrix=[]
flipped_rotation_matrix=[]
gaze=[]
gaze_flipped=[]
subject_id=[]
eye_coords=[]
subject_map={}
data_path='processed_data/Image'
persons=os.listdir(data_path)
persons.sort()
print(persons)
id=0
for person in persons[:2]:
    face+=glob.glob(f'{data_path}/{person}/face/*')
    lefteye+=glob.glob(f'{data_path}/{person}/lefteye/*')
    righteye+=glob.glob(f'{data_path}/{person}/righteye/*')
    rotation_matrix+=glob.glob(f'{data_path}/{person}/rotation_matrix/*')
    flipped_rotation_matrix+=glob.glob(f'{data_path}/{person}/rotation_matrix_flipped/*')
    gaze+=glob.glob(f'{data_path}/{person}/3d_gaze/*')
    gaze_flipped+=glob.glob(f'{data_path}/{person}/3d_gaze_flipped/*')
    subject_id+=[f'{data_path}/{person}' for _ in range(len(face))]
    eye_coords+=glob.glob(f'{data_path}/{person}/eye_coords/*')
    subject_map[f'{data_path}/{person}']=id
    id+=1
face.sort()
lefteye.sort()
righteye.sort()
rotation_matrix.sort()
flipped_rotation_matrix.sort()
gaze.sort()
gaze_flipped.sort()
eye_coords.sort()
subject_id.sort()
data=list(zip(face,lefteye,righteye,rotation_matrix,flipped_rotation_matrix,eye_coords,gaze,gaze_flipped,subject_id))   
random.seed(12)
random.shuffle(data)
data=tf.data.experimental.from_list(data)
print(subject_map)

In [3]:
subject_map=tf.lookup.StaticHashTable( tf.lookup.KeyValueTensorInitializer(list(subject_map.keys()), 
                                                                           list(subject_map.values())),default_value=-1)

### Load the GazeModel

In [None]:
tf.keras.backend.clear_session()
gaze_model=GazeModel()
print(gaze_model({'face':tf.ones((1,224,224,3)),
        'flipped_face':tf.ones((1,224,224,3)),
        'lefteye':tf.ones((1,36,60,3)),
        'righteye':tf.ones((1,36,60,3)),
        'rotation_matrix':tf.ones((1,9)),
        'eye_coords':tf.ones((1,6)),
        'id':tf.constant([1.])}))
gaze_model.load_weights('best_GazeModel.h5')

### Pipeline for tf.data, subject embedding from GazeModel used as ground truth 

In [5]:
@tf.function
def load_img(img):
    img=tf.io.read_file(img)
    img=tf.io.decode_jpeg(img,3)
    return img
@tf.numpy_function(Tout=tf.float32)
def ld(x):
    return np.load(x).astype('float32').ravel()
@tf.function
def map_fn(face,
            lefteye,
            righteye,
            rotation_matrix,
            flipped_rotation_matrix,
            eye_coords,
            gaze,
            gaze_flipped,
            subject_id,
            ):
    face=load_img(face)
    flipped_face=tf.image.flip_left_right(face)
    lefteye=load_img(lefteye)
    righteye=load_img(righteye)
    rotation_matrix=ld(rotation_matrix)
    flipped_rotation_matrix=ld(flipped_rotation_matrix)
    eye_coords=ld(eye_coords)
    id=subject_map[subject_id]
    gaze=ld(gaze)
    gaze_flipped=ld(gaze_flipped)
    subject_embedding=gaze_model.embedding(id)
    return {
            'face':face,
            'flipped_face':flipped_face,
            'lefteye':lefteye,
            'righteye':righteye,
            'rotation_matrix':rotation_matrix,
            'flipped_rotation_matrix':flipped_rotation_matrix,
            'eye_coords':eye_coords,
            'gaze':gaze,
            'gaze_flipped':gaze_flipped,
            },subject_embedding


### stack of transformer blocks as described in the paper with 6 blocks each with 4 attention heads

In [6]:
transformer = tfm.nlp.models.TransformerEncoder(
    num_layers=6,
    num_attention_heads=4,
    intermediate_size=2048,
    activation='relu',
    dropout_rate=0.0,
    attention_dropout_rate=0.0,
    use_bias=not False,
    norm_first=True,
    norm_epsilon=1e-06,
    intermediate_dropout=0.0,
)

### Design of calibration model with transformers stacked between MLP

In [7]:
gaze_model.trainable=False
class CalibrationModel(tf.keras.Model):
    def __init__(self):
        super(CalibrationModel,self).__init__()
        #g_face from trained gaze_model
        self.g_face=gaze_model.g_face
        #g_eye from trained gaze_model
        self.g_eye=gaze_model.g_eye
        #transformer encoder stack
        self.transformer_stack=transformer
        self.flat=tf.keras.layers.Flatten()
        #pre transformer MLP
        self.MLP1=tf.keras.Sequential([
            tf.keras.layers.Dense(1280,activation='relu'),
            tf.keras.layers.BatchNormalization(),
            ],name='MLP1')
        #post transformer MLP
        self.MLP2=tf.keras.Sequential([
            tf.keras.layers.Dense(1280,activation='relu'),
            tf.keras.layers.BatchNormalization(),
            ],name='MLP2')
        #final output layer predicting person specific preference vector of lenght 6
        self.output_layer=tf.keras.layers.Dense(6,name='subject_feature')
    def call(self,input_dict):
        #face features from GazeModel
        face_features=self.g_face(input_dict['face'])
        #flipped face features from GazeModel( left face right face combination)
        flipped_face_features=self.g_face(input_dict['flipped_face'])
        #left eye features from gazeModel
        left_features=vgg16_processor(input_dict['lefteye'])
        left_features=self.g_eye(left_features)
        #right eye features from gazeModel
        right_features=vgg16_processor(input_dict['righteye'])
        right_features=self.g_eye(right_features)
        #flatteneded feature matrices
        face_features=self.flat(face_features)
        flipped_face_features=self.flat(flipped_face_features)
        left_features=self.flat(left_features)
        right_features=self.flat(right_features)
        #face roration matrix
        rot_mat=input_dict['rotation_matrix']
        #left face rotation matrix
        rot_mat_flipped=input_dict['flipped_rotation_matrix']
        #3d eye coordinates left eye,right eye
        eye_coords=input_dict['eye_coords']
        #3d gaze
        gaze=input_dict['gaze']
        #left gaze flipped (left,right combo)
        gaze_flipped=input_dict['gaze_flipped']
        #concatenated features
        total=tf.concat([face_features,flipped_face_features,left_features,
                            right_features,eye_coords,rot_mat,rot_mat_flipped,
                            gaze,gaze_flipped],1)
        #pre transformer MLP
        total=self.MLP1(total)
        total = tf.expand_dims(total, axis=1) 
        total=self.transformer_stack(total)
        #post-transformer MLP
        total=self.MLP2(tf.squeeze(total,1))
        #final output of predicted preference vector
        final_output=self.output_layer(total)
        return final_output
        

### Optimizer Configuration

In [8]:
optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001,
                                   beta_1=0.9,
                                   beta_2=0.999,
                                   epsilon=1e-7)

In [14]:
model=CalibrationModel()
model.compile(loss='MSE',optimizer=optimizer)

In [None]:
model(next(iter(data.map(map_fn).batch(1).map(lambda x,y:x))))

### Train Test split and train stopping callback

In [16]:
train_data=data.take(data.cardinality().numpy()*0.8)
test_data=data.skip(data.cardinality().numpy()*0.8)
cb=tf.keras.callbacks.EarlyStopping(monitor='val_loss',mode='min',patience=10,restore_best_weights=True)

### Training

In [None]:
model.fit(train_data.map(map_fn,num_parallel_calls=tf.data.AUTOTUNE)
          .batch(32,num_parallel_calls=tf.data.AUTOTUNE).prefetch(2),epochs=200,
          validation_data=test_data.map(map_fn).batch(100),callbacks=[cb])

### Saving the calibration model

In [13]:
model.save_weights('calibr.h5')

### data for new person p02 for whom calibration has to be done to estimate its embeddings

In [None]:
face=[]
lefteye=[]
righteye=[]
rotation_matrix=[]
flipped_rotation_matrix=[]
gaze=[]
gaze_flipped=[]
subject_id=[]
eye_coords=[]
subject_map={'processed_data/Image/p00': 0, 'processed_data/Image/p01': 1}
data_path='processed_data/Image'
persons=os.listdir(data_path)
persons.sort()
person=persons[2]
face+=glob.glob(f'{data_path}/{person}/face/*')
lefteye+=glob.glob(f'{data_path}/{person}/lefteye/*')
righteye+=glob.glob(f'{data_path}/{person}/righteye/*')
rotation_matrix+=glob.glob(f'{data_path}/{person}/rotation_matrix/*')
flipped_rotation_matrix+=glob.glob(f'{data_path}/{person}/rotation_matrix_flipped/*')
gaze+=glob.glob(f'{data_path}/{person}/3d_gaze/*')
gaze_flipped+=glob.glob(f'{data_path}/{person}/3d_gaze_flipped/*')
subject_id+=[f'{data_path}/{person}' for _ in range(len(face))]
eye_coords+=glob.glob(f'{data_path}/{person}/eye_coords/*')
subject_map[f'{data_path}/{person}']=2

face.sort()
lefteye.sort()
righteye.sort()
rotation_matrix.sort()
flipped_rotation_matrix.sort()
gaze.sort()
gaze_flipped.sort()
eye_coords.sort()
subject_id.sort()
data=list(zip(face,lefteye,righteye,rotation_matrix,flipped_rotation_matrix,eye_coords,gaze,gaze_flipped,subject_id))   
random.seed(12)
random.shuffle(data)
data=tf.data.experimental.from_list(data)
print(subject_map)

### 8 batches of 16 calibration samples taken for estimating person specific embeddings

In [15]:
new_data=data.take(16*8)

In [20]:
new_data.cardinality()

<tf.Tensor: shape=(), dtype=int64, numpy=128>

In [None]:
model(next(iter(new_data.map(map_fn).batch(16).map(lambda x,y:x))))

### batches of 16 calibration samples are forward passed through the calibration model every batch predicts 16 preference vectors but we want same embeddings for same person so angular difference between these vectors should be minimum so cosine similarity is taken as cumulutive loss as with . tf cosine similarity gives -1 as perfect match so the best loss will be -15

### training loop for sample calibration for new person

In [None]:
best_loss=999
for e in range(100):
    losses=[]
    for data in new_data.map(map_fn).batch(16).map(lambda x,y:x):
        with tf.GradientTape() as tape:
            logits = model(data, training=True)
            loss=tf.reduce_sum(tf.keras.losses.cosine_similarity(logits[0],logits[1:]))
        grads = tape.gradient(loss, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        # tf.print(loss)
        losses.append(loss)
    mean_loss=tf.reduce_mean(losses)
    if mean_loss<best_loss:
        model.save_weights('calibr.h5')
        tf.print(mean_loss)
        best_loss=mean_loss.numpy()
            