In [None]:
# Dataloader.

import os
import tensorflow as tf
from ops import *
#from tensorflow.data.experimental import shuffle_and_repeat, map_and_batch
from tensorflow.data import Dataset
class ImageData(object):

    """ Dataloader for loading images. """

    def __init__(self, load_size, channels, data_path, ids):

        """ Init.

        Parameters
        ----------
        load_size: int, input image size.
        channels: int, number of channels.
        data_path: str, path of input images.
        ids: int, train/test split point.

        """

        self.load_size = load_size
        self.channels = channels
        self.ids = ids

        self.data_path = data_path
        file_names = [f for f in os.listdir(data_path)
                      if f.endswith('.jpg')]
        self.file_dict = dict()
        for f_name in file_names:
            # key = f_name.split('.')[0].split('_')[0]
            # side = f_name.split('.')[0].split('_')[-1]
            # key = key + '_' + side
            fields = f_name.split('.')[0].split('_')
            identity = fields[0]
            head_pose = fields[2]
            side = fields[-1]
            key = '_'.join([identity, head_pose, side])
            if key not in self.file_dict.keys():
                self.file_dict[key] = []
                self.file_dict[key].append(f_name)
            else:
                self.file_dict[key].append(f_name)

        self.train_images = []
        self.train_angles_r = []
        self.train_labels = []
        self.train_images_t = []
        self.train_angles_g = []

        self.test_images = []
        self.test_angles_r = []
        self.test_labels = []
        self.test_images_t = []
        self.test_angles_g = []

    def image_processing(
        self,
        filename,
        angles_r,
        labels,
        filename_t,
        angles_g
    ):
    # def image_processing(self,input):
    #     print(type(input))
    #     return input
        """ Process input images.

        Parameters
        ----------
        filename: str, path of input image.
        angles_r: list, gaze direction of input image.
        labels: int, subject id. (deprecated!)
        filename_t: str, path of target image.
        angles_g: list, gaze direction of target image.

        Returns
        -------
        image: tensor, float32, normalized input image.
        angles_r: angels_r.
        labels: labels.
        image_t: tensor, float32, normalized target image.
        angles_g: angles_g.

        """

        def _to_image(file_name):

            """ Load image, normalize it and convert it into tf.tensor.

            Parameters
            ----------
            file_name: str, image path.

            Returns
            -------
            img: tf.tensor, tf.float32. Image tensor.

            """

            x = tf.io.read_file(file_name)
            #img = tf.image.decode_jpeg(x, channels=self.channels)
            img = tf.io.decode_jpeg(x, channels=self.channels)
            img = tf.image.resize(img, [self.load_size, self.load_size])
            #img = tf.image.resize_images(img, [self.load_size, self.load_size])
           
            img = tf.cast(img, tf.float32) / 127.5 - 1.0

            return img

        image = _to_image(filename)
        image_t = _to_image(filename_t)

        return image, angles_r, labels, image_t, angles_g

    def preprocess(self):

        for key in self.file_dict.keys():

            if len(self.file_dict[key]) == 1:
                continue

            idx = int(key.split('_')[0])
            flip = 1
            if key.split('_')[-1] == 'R':
                flip = -1

            for f_r in self.file_dict[key]:

                file_path = os.path.join(self.data_path, f_r)

                h_angle_r = flip * float(
                    f_r.split('_')[-2].split('H')[0]) / 15.0
                    
                v_angle_r = float(
                    f_r.split('_')[-3].split('V')[0]) / 10.0
                    

                for f_g in self.file_dict[key]:

                    file_path_t = os.path.join(self.data_path, f_g)

                    h_angle_g = flip * float(
                        f_g.split('_')[-2].split('H')[0]) / 15.0
                        
                    v_angle_g = float(
                        f_g.split('_')[-3].split('V')[0]) / 10.0
                        

                    if idx <= self.ids:
                        self.train_images.append(file_path)
                        self.train_angles_r.append([h_angle_r, v_angle_r])
                        self.train_labels.append(idx - 1)
                        self.train_images_t.append(file_path_t)
                        self.train_angles_g.append([h_angle_g, v_angle_g])
                    else:
                        self.test_images.append(file_path)
                        self.test_angles_r.append([h_angle_r, v_angle_r])
                        self.test_labels.append(idx - 1)
                        self.test_images_t.append(file_path_t)
                        self.test_angles_g.append([h_angle_g, v_angle_g])

        print('\nFinished preprocessing the dataset...')


In [None]:
def data_loader():
        """ load traing and testing dataset """

       

        image_data_class = ImageData(load_size=64,
                                     channels=3,
                                     data_path='/home/user/Downloads/dataset/0P',
                                     ids=50)
        image_data_class.preprocess()

        train_dataset_num = len(image_data_class.train_images)
        test_dataset_num = len(image_data_class.test_images)

        train_dataset = tf.data.Dataset.from_tensor_slices(
            (image_data_class.train_images,
             image_data_class.train_angles_r,
             image_data_class.train_labels,
             image_data_class.train_images_t,
             image_data_class.train_angles_g))
        test_dataset = tf.data.Dataset.from_tensor_slices(
            (image_data_class.test_images,
             image_data_class.test_angles_r,
             image_data_class.test_labels,
             image_data_class.test_images_t,
             image_data_class.test_angles_g))

        # train_dataset = train_dataset.apply(
        #     shuffle_and_repeat(train_dataset_num)).apply(
        #     map_and_batch(image_data_class.image_processing,
        #                   hps.batch_size,
        #                   num_parallel_batches=8))
        train_dataset =train_dataset.map(image_data_class.image_processing)
        train_dataset.shuffle(32)
        train_dataset = train_dataset.batch(4)
       

        return train_dataset

In [None]:
data = data_loader()

In [None]:
len(data)

In [None]:
def discriminator( x_init, reuse=False):

    """ Discriminator.

    Parameters
    ----------
    params: dict.
    x_init: input tensor.
    reuse: bool, reuse the net if True.

    Returns
    -------
    x_gan: tensor, outputs for adversarial training.
    x_reg: tensor, outputs for gaze estimation.

    """

    layers = 5
    channel = 64
    image_size = 64

    with tf.compat.v1.variable_scope('discriminator', reuse=reuse):

        # 64 3 -> 32 64 -> 16 128 -> 8 256 -> 4 512 -> 2 1024

        x = conv2d(x_init, channel, conv_filters_dim=4, d_h=2, d_w=2,
                   scope='conv_0', pad=1, use_bias=True)
        x = lrelu(x)

        for i in range(1, layers):
            x = conv2d(x, channel * 2, conv_filters_dim=4, d_h=2, d_w=2,
                       scope='conv_%d' % i, pad=1, use_bias=True)
            x = lrelu(x)
            channel = channel * 2

        filter_size = int(image_size / 2 ** layers)

        x_gan = conv2d(x, 1, conv_filters_dim=filter_size, d_h=1, d_w=1,
                       pad=1, scope='conv_logit_gan', use_bias=False)

        x_reg = conv2d(x, 2, conv_filters_dim=filter_size, d_h=1, d_w=1,
                       pad=0, scope='conv_logit_reg', use_bias=False)
        x_reg = tf.reshape(x_reg, [-1, 2])

        return x_gan, x_reg


In [None]:
itr=data.as_numpy_iterator()

In [None]:


def generator(input_, angles, reuse=False):

    """ Generator.

    Parameters
    ----------
    input_: tensor, input images.
    angles: tensor, target gaze direction.
    reuse: bool, reuse the net if True.

    Returns
    -------
    x: tensor, generated image.

    """

    channel = 64
    style_dim = angles.get_shape().as_list()[-1]
    #style_dim = angles.get_shape().as_list()[-1]

    angles_reshaped = tf.reshape(angles, [-1, 1, 1, style_dim])
    angles_tiled = tf.tile(angles_reshaped, [1, tf.shape(input_)[1],
                                             tf.shape(input_)[2], 1])
    x = tf.concat([input_, angles_tiled], axis=3)

    with tf.compat.v1.variable_scope('generator', reuse=reuse):

        # input layer
        x = conv2d(x, channel, d_h=1, d_w=1, scope='conv2d_input',
                   use_bias=False, pad=3, conv_filters_dim=7)
        #x = instance_norm(x, scope='in_input')
        #tf.keras.layers.BatchNormalization(axis=[0,1],epsilon=1e-05, center=True, scale=True)(x)
        with tf.compat.v1.variable_scope('in_input'):
            x=tf.keras.layers.BatchNormalization()(x)
        x = relu(x)

        # encoder
        for i in range(2):

            x = conv2d(x, 2 * channel, d_h=2, d_w=2, scope='conv2d_%d' % i,
                       use_bias=False, pad=1, conv_filters_dim=4)
            #x = instance_norm(x, scope='in_conv_%d' % i)
            #tf.keras.layers.BatchNormalization(axis=[0,1],epsilon=1e-05, center=True, scale=True)(x)
            with tf.compat.v1.variable_scope('in_conv_%d' % i):
                x=tf.keras.layers.BatchNormalization()(x)
            x = relu(x)
            channel = 2 * channel

        # bottleneck
        for i in range(6):

            x_a = conv2d(x, channel, conv_filters_dim=3, d_h=1, d_w=1,
                         pad=1, use_bias=False, scope='conv_res_a_%d' % i)
            #x_a = instance_norm(x_a, 'in_res_a_%d' % i)
            with tf.compat.v1.variable_scope('in_res_a_%d' % i):
                x_a=tf.keras.layers.BatchNormalization()(x_a)
            x_a = relu(x_a)
            x_b = conv2d(x_a, channel, conv_filters_dim=3, d_h=1, d_w=1,
                         pad=1, use_bias=False, scope='conv_res_b_%d' % i)
            #x_b = instance_norm(x_b, 'in_res_b_%d' % i)
            with tf.compat.v1.variable_scope('in_res_b_%d' % i):
                x_b=tf.keras.layers.BatchNormalization()(x_b)

            x = x + x_b

        # decoder
        for i in range(2):

            x = deconv2d(x, int(channel / 2), conv_filters_dim=4, d_h=2, d_w=2,
                         use_bias=False, scope='deconv_%d' % i)
            #x = instance_norm(x, scope='in_decon_%d' % i)
            with tf.compat.v1.variable_scope('in_decon_%d' % i):
                x=tf.keras.layers.BatchNormalization()(x)
            x = relu(x)
            channel = int(channel / 2)

        x = conv2d(x, 3, conv_filters_dim=7, d_h=1, d_w=1, pad=3,
                   use_bias=False, scope='output')
        x = tanh(x)

    return x

In [None]:
#all=next(itr)
#output=discriminator(all[0])
#output=generator(all[0],all[1])

In [32]:
for t in data:
    output=generator(t[0],t[1])
    print(type(t[1]))
    input_,angles=t[0],t[1]
    break

<class 'tensorflow.python.framework.ops.EagerTensor'>


  conv = tf.compat.v1.layers.conv2d(
  deconv = tf.compat.v1.layers.conv2d_transpose(


In [33]:
output.shape

TensorShape([4, 64, 64, 3])

In [None]:
print(output[0].shape,output[1].shape)

In [None]:
tf.reduce_mean(output[0][0]).numpy()

In [16]:
style_dim = angles.get_shape().as_list()[-1]

angles_reshaped = tf.reshape(angles, [-1, 1, 1, style_dim])
angles_tiled = tf.tile(angles_reshaped, [1, tf.shape(input_)[1],
                                             tf.shape(input_)[2], 1])

In [30]:
print(angles.shape,angles_reshaped.shape)

(4, 2) (4, 1, 1, 2)


In [17]:
angles_tiled.shape

TensorShape([4, 64, 64, 2])

In [18]:
input_.shape

TensorShape([4, 64, 64, 3])

In [19]:
x = tf.concat([input_, angles_tiled], axis=3)

In [20]:
x.shape

TensorShape([4, 64, 64, 5])