In [11]:
import tensorflow as tf
import cv2
import numpy as np
import tensorflow as tf
import keras
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
import os
import scipy.io as sio
import utils
import random

In [12]:
BIN_NUM = 66
INPUT_SIZE = 224
BATCH_SIZE=16
EPOCHS=20

## Create HopeNet Model 

In [13]:

# Model create
class Hopenet:
    def __init__(self, dataset, class_num, batch_size, input_size):
        self.class_num = class_num
        self.batch_size = batch_size
        self.input_size = input_size
        self.idx_tensor = [idx for idx in range(self.class_num)]
        self.idx_tensor = tf.Variable(np.array(self.idx_tensor, dtype=np.float32))
        self.dataset = dataset
        self.model = self.__create_model()
        
    
    def multi_loss(self, y_true, y_pred, alpha=0.5):
        """ Calculate the multi-part loss: classification_loss + alpha * regression_loss
        Args:
          y_true: the true label
          y_pred: the predicted label
          alpha: the alpha value
        Returns:
          total_loss: the multipart loss
        """

        # classification loss
        y_true_bin = y_true[:, 0]
        y_true_bin = tf.cast(y_true_bin, tf.int64)
        y_true_bin = tf.one_hot(y_true_bin, 66)
        cls_loss = tf.losses.softmax_cross_entropy(y_true_bin, y_pred)

        # regression loss
        y_true_cont = y_true[:, 1]
        y_pred_cont = tf.nn.softmax(y_pred)
        y_pred_cont = tf.reduce_sum(y_pred_cont * self.idx_tensor, 1) * 3 - 99
        mse_loss = tf.compat.v1.losses.mean_squared_error(y_true_cont, y_pred_cont)

        total_loss = cls_loss + alpha * mse_loss
        return total_loss
    
        
    def __create_model(self):
        inputs = tf.keras.layers.Input(shape=(self.input_size, self.input_size, 3))

        resnet = ResNet50(weights='imagenet', include_top=False)

        feature = resnet(inputs)
        feature = tf.keras.layers.Flatten()(feature)
        feature = tf.keras.layers.Dropout(0.5)(feature)

        yaw = tf.keras.layers.Dense(name='yaw', units=self.class_num, activation=None)(feature)
        pitch = tf.keras.layers.Dense(name='pitch', units=self.class_num, activation=None)(feature)
        roll = tf.keras.layers.Dense(name='roll', units=self.class_num, activation=None)(feature)

        model = Model(inputs=inputs, outputs=[yaw, pitch, roll])
        
        #for layer in resnet.layers:
            #layer.trainable = False

        model.compile(
          optimizer='adam',
            loss={
              'yaw': self.multi_loss,
              'pitch': self.multi_loss,
              'roll': self.multi_loss,
            }
           , metrics=['accuracy']
        )
        
        return model

    
    
    def train(self, model_path, max_epoches=EPOCHS, load_weight=True):
        self.model.summary()
        
        if load_weight:
            self.model.load_weights(model_path)
        else:
            self.model.fit_generator(generator=self.dataset.data_generator(test=False),
                                    epochs=max_epoches,
                                    steps_per_epoch=self.dataset.train_num // self.batch_size,
                                    max_queue_size=10,
                                    workers=1,
                                    verbose=1)

            self.model.save(model_path)
            
    def test(self, save_dir):
        for i, (images, [batch_yaw, batch_pitch, batch_roll], names) in enumerate(self.dataset.data_generator(test=True)):
            predictions = self.model.predict(images, batch_size=self.batch_size, verbose=1)
            predictions = np.asarray(predictions)
            pred_cont_yaw = tf.reduce_sum(tf.nn.softmax(predictions[0,:,:]) * self.idx_tensor, 1) * 3 - 99
            pred_cont_pitch = tf.reduce_sum(tf.nn.softmax(predictions[1,:,:]) * self.idx_tensor, 1) * 3 - 99
            pred_cont_roll = tf.reduce_sum(tf.nn.softmax(predictions[2,:,:]) * self.idx_tensor, 1) * 3 - 99
            # print(pred_cont_yaw.shape)
            
            self.dataset.save_test(names[0], save_dir, [pred_cont_yaw[0], pred_cont_pitch[0], pred_cont_roll[0]])
            
    def test_online(self, face_imgs):
        batch_x = np.array(face_imgs, dtype=np.float32)
        predictions = self.model.predict(batch_x, batch_size=1, verbose=1)
        predictions = np.asarray(predictions)
        # print(predictions)
        pred_cont_yaw = tf.reduce_sum(tf.nn.softmax(predictions[0, :, :]) * self.idx_tensor, 1) * 3 - 99
        pred_cont_pitch = tf.reduce_sum(tf.nn.softmax(predictions[1, :, :]) * self.idx_tensor, 1) * 3 - 99
        pred_cont_roll = tf.reduce_sum(tf.nn.softmax(predictions[2, :, :]) * self.idx_tensor, 1) * 3 - 99
        
        return pred_cont_yaw[0], pred_cont_pitch[0], pred_cont_roll[0]

In [14]:
# split datasets function
def split_samples(samples_file, train_file, test_file, ratio=0.8):
    with open(samples_file) as samples_fp:
        lines = samples_fp.readlines()
        random.shuffle(lines)

        train_num = int(len(lines) * ratio)
        test_num = len(lines) - train_num
        count = 0
        data = []
        for line in lines:
            count += 1
            data.append(line)
            if count == train_num:
                with open(train_file, "w+") as train_fp:
                    for d in data:
                        train_fp.write(d)
                data = []

            if count == train_num + test_num:
                with open(test_file, "w+") as test_fp:
                    for d in data:
                        test_fp.write(d)
                data = []
    return train_num, test_num

def get_list_from_filenames(file_path):
    with open(file_path) as f:
        lines = f.read().splitlines()
    return lines

In [15]:
# datasets AFLW2000
class AFLW2000:
    def __init__(self, data_dir, data_file, batch_size=16, input_size=224):
        self.data_dir = data_dir
        self.data_file = data_file
        self.batch_size = batch_size
        self.input_size = input_size
        self.train_file = None
        self.test_file = None
        self.__gen_filename_list(os.path.join(self.data_dir, self.data_file))
        self.train_num, self.test_num = self.__gen_train_test_file(os.path.join(self.data_dir, '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/datasets/BIWI/train.txt'),
                                                                   os.path.join(self.data_dir, '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/datasets/BIWI/test.txt'))
    def __get_ypr_from_mat(self, mat_path):
        mat = sio.loadmat(mat_path)
        pre_pose_params = mat['Pose_Para'][0]
        pose_params = pre_pose_params[:3]
        return pose_params

    def __get_pt2d_from_mat(self, mat_path):
        mat = sio.loadmat(mat_path)
        pt2d = mat['pt2d']
        return pt2d
    
    def __get_input_img(self, data_dir, file_name, img_ext='.jpg', annot_ext='.mat'):
        img = cv2.imread(os.path.join(data_dir, file_name + img_ext))
        pt2d = self.__get_pt2d_from_mat(os.path.join(data_dir, file_name + annot_ext))
        
        # Crop the face loosely
        x_min = min(pt2d[0, :])
        y_min = min(pt2d[1, :])
        x_max = max(pt2d[0, :])
        y_max = max(pt2d[1, :])
        
        Lx = abs(x_max - x_min)
        Ly = abs(y_max - y_min)
        Lmax = max(Lx, Ly) * 1.5
        center_x = x_min + Lx // 2
        center_y = y_min + Ly // 2
        
        x_min = center_x - Lmax // 2
        x_max = center_x + Lmax // 2
        y_min = center_y - Lmax // 2
        y_max = center_y + Lmax // 2
        
        if x_min < 0:
            y_max -= abs(x_min)
            x_min = 0
        if y_min < 0:
            x_max -= abs(y_min)
            y_min = 0
        if x_max > img.shape[1]:
            y_min += abs(x_max - img.shape[1])
            x_max = img.shape[1]
        if y_max > img.shape[0]:
            x_min += abs(y_max - img.shape[0])
            y_max = img.shape[0]
        
        # print("x_min:{},x_max:{},y_min:{},y_max{}".format(x_min, x_max, y_min, y_max))
        crop_img = img[int(y_min):int(y_max), int(x_min):int(x_max)]
        
        # print(crop_img.shape)
        # cv2.imshow('crop_img', crop_img)
        # cv2.waitKey(0)
        crop_img = np.asarray(cv2.resize(crop_img, (self.input_size, self.input_size)))
        normed_img = (crop_img - crop_img.mean()) / crop_img.std()
        # print(normed_img)
        return normed_img
    
    def __get_input_label(self, data_dir, file_name, annot_ext='.mat'):
        # We get the pose in radians
        pose = self.__get_ypr_from_mat(os.path.join(data_dir, file_name + annot_ext))
        
        # And convert to degrees.
        yaw = pose[1] * 180.0 / np.pi
        pitch = pose[0] * 180.0 / np.pi
        roll = pose[2] * 180.0 / np.pi
        
        cont_labels = [yaw, pitch, roll]
        
        # print(cont_labels)
        # Bin values
        bins = np.array(range(-99, 99, 3))
        bin_labels = np.digitize([yaw, pitch, roll], bins) - 1
        
        return bin_labels, cont_labels

    def __gen_filename_list(self, filename_list_file):
        if not os.path.exists(filename_list_file):
            with open(filename_list_file, 'w+') as tlf:
                for root, dirs, files in os.walk(self.data_dir):
                    for f in files:
                        if os.path.splitext(f)[1] == '.jpg':
                            tlf.write(os.path.splitext(f)[0] + '\n')
                            
    def __gen_train_test_file(self, train_file, test_file):
        self.train_file = train_file
        self.test_file = test_file
        return split_samples(os.path.join(self.data_dir, self.data_file), self.train_file, self.test_file, ratio=0.8)

    def train_num(self):
        return self.train_num

    def test_num(self):
        return self.test_num
    
    def save_test(self, name, save_dir, prediction):
        img_path = os.path.join(self.data_dir, name + '.jpg')
        # print(img_path)
    
        cv2_img = cv2.imread(img_path)
        cv2_img = utils.draw_axis(cv2_img, prediction[0], prediction[1], prediction[2], tdx=200, tdy=200,
                            size=100)
        save_path = os.path.join(save_dir, name + '.jpg')
        # print(save_path)
        cv2.imwrite(save_path, cv2_img)
        
    def data_generator(self, shuffle=True, test=False):
        sample_file = self.train_file
        if test:
            sample_file = self.test_file
            
        filenames = get_list_from_filenames(sample_file)
        file_num = len(filenames)
        print(file_num)
        while True:
            if shuffle:
                idx = np.random.permutation(range(file_num))
                filenames = np.array(filenames)[idx]
            max_num = file_num - (file_num % self.batch_size)
            for i in range(0, max_num, self.batch_size):
                batch_x = []
                batch_yaw = []
                batch_pitch = []
                batch_roll = []
                names = []
                for j in range(self.batch_size):
                    img = self.__get_input_img(self.data_dir, filenames[i + j])
                    bin_labels, cont_labels = self.__get_input_label(self.data_dir, filenames[i + j])
                    # print(img.shape)
                    batch_x.append(img)
                    batch_yaw.append([bin_labels[0], cont_labels[0]])
                    batch_pitch.append([bin_labels[1], cont_labels[1]])
                    batch_roll.append([bin_labels[2], cont_labels[2]])
                    names.append(filenames[i + j])
                
                batch_x = np.array(batch_x, dtype=np.float32)
                batch_yaw = np.array(batch_yaw)
                batch_pitch = np.array(batch_pitch)
                batch_roll = np.array(batch_roll)
                
                if test:
                    yield (batch_x, [batch_yaw, batch_pitch, batch_roll], names)
                else:
                    yield (batch_x, [batch_yaw, batch_pitch, batch_roll])


In [16]:
# dataset BIWI
class Biwi:
    def __init__(self, data_dir, data_file, batch_size=16, input_size=224, ratio=0.8):
        self.data_dir = data_dir
        self.data_file = data_file
        self.batch_size = batch_size
        self.input_size = input_size
        self.train_file = None
        self.test_file = None
        self.__gen_filename_list(os.path.join(self.data_dir, self.data_file))
        self.train_num, self.test_num = self.__gen_train_test_file(os.path.join(self.data_dir, '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/datasets/BIWI/train.txt'),
                                                                   os.path.join(self.data_dir, '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/datasets/BIWI/test.txt'), ratio=ratio)
        
    def __get_input_img(self, data_dir, file_name, img_ext='.png', annot_ext='.txt'):
        img = cv2.imread(os.path.join(data_dir, file_name + '_rgb' + img_ext))
        bbox_path = os.path.join(data_dir, file_name.split('/')[0] + '/bbox.txt')
        
        # Load bounding box
        bbox = open(bbox_path, 'r')
        line = bbox.readline().split(' ')
        if len(line) < 4:
            x_min, x_max, y_min, y_max = 0, img.size[0], 0, img.size[1]
        else:
            x_min, x_max, y_min, y_max = [float(line[0]), float(line[1]), float(line[2]), float(line[3])]
        bbox.close()
    
        # Loosely crop face
        k = 0.3
        x_min -= k * abs(x_max - x_min)
        y_min -= k * abs(y_max - y_min)
        x_max += k * abs(x_max - x_min)
        y_max += k * abs(y_max - y_min)
        crop_img = img[int(y_min): int(y_max), int(x_min): int(x_max)]
        
        # print(crop_img.shape)
        # cv2.imshow('crop_img', crop_img)
        # cv2.waitKey(0)
        
        crop_img = cv2.resize(crop_img, (self.input_size, self.input_size))
        
        crop_img = np.asarray(crop_img)
        normed_img = (crop_img - crop_img.mean())/crop_img.std()
        
        return normed_img
        
    
    def __get_input_label(self, data_dir, file_name, annot_ext='.txt'):
        # Load pose in degrees
        pose_path = os.path.join(data_dir, file_name + '_pose' + annot_ext)
        pose_annot = open(pose_path, 'r')
        R = []
        for line in pose_annot:
            line = line.strip('\n').split(' ')
            l = []
            if line[0] != '':
                for nb in line:
                    if nb == '':
                        continue
                    l.append(float(nb))
                R.append(l)
        
        R = np.array(R)
        T = R[3, :]
        R = R[:3, :]
        pose_annot.close()
        
        R = np.transpose(R)
        
        roll = -np.arctan2(R[1][0], R[0][0]) * 180 / np.pi
        yaw = -np.arctan2(-R[2][0], np.sqrt(R[2][1] ** 2 + R[2][2] ** 2)) * 180 / np.pi
        pitch = np.arctan2(R[2][1], R[2][2]) * 180 / np.pi
        
        # Bin values
        bins = np.array(range(-99, 99, 3))
        binned_labels = np.digitize([yaw, pitch, roll], bins) - 1
    
        cont_labels = [yaw, pitch, roll]
    
        return binned_labels, cont_labels

    def __gen_filename_list(self, filename_list_file):
        if not os.path.exists(filename_list_file):
            with open(filename_list_file, 'w+') as tlf:
                for root, dirs, files in os.walk(self.data_dir):
                    for subdir in dirs:
                        subfiles = os.listdir(os.path.join(self.data_dir, subdir))
                    
                        for f in subfiles:
                            if os.path.splitext(f)[1] == '.png':
                                token = os.path.splitext(f)[0].split('_')
                                filename = token[0] + '_' + token[1]
                                # print(filename)
                                tlf.write(subdir + '/' + filename + '\n')
    
    def __gen_train_test_file(self, train_file, test_file, ratio=0.8):
        self.train_file = train_file
        self.test_file = test_file
        return split_samples(os.path.join(self.data_dir, self.data_file), self.train_file, self.test_file, ratio=ratio)
    
    def train_num(self):
        return self.train_num
    
    def test_num(self):
        return self.test_num
    
    def save_test(self, name, save_dir, prediction):
        img_path = os.path.join(self.data_dir, name + '_rgb.png')
        # print(img_path)
    
        cv2_img = cv2.imread(img_path)
        cv2_img = utils.draw_axis(cv2_img, prediction[0], prediction[1], prediction[2], tdx=200, tdy=200,
                            size=100)
        save_path = os.path.join(save_dir, name.split('/')[1] + '.png')
        # print(save_path)
        cv2.imwrite(save_path, cv2_img)
        
    def data_generator(self, shuffle=True, test=False):
        sample_file = self.train_file
        if test:
            sample_file = self.test_file
    
        filenames = get_list_from_filenames(sample_file)
        file_num = len(filenames)
        
        while True:
            if shuffle and not test:
                idx = np.random.permutation(range(file_num))
                filenames = np.array(filenames)[idx]
            max_num = file_num - (file_num % self.batch_size)
            for i in range(0, max_num, self.batch_size):
                batch_x = []
                batch_yaw = []
                batch_pitch = []
                batch_roll = []
                names = []
                for j in range(self.batch_size):
                    img = self.__get_input_img(self.data_dir, filenames[i + j])
                    bin_labels, cont_labels = self.__get_input_label(self.data_dir, filenames[i + j])
                    #print(img.shape)
                    batch_x.append(img)
                    batch_yaw.append([bin_labels[0], cont_labels[0]])
                    batch_pitch.append([bin_labels[1], cont_labels[1]])
                    batch_roll.append([bin_labels[2], cont_labels[2]])
                    names.append(filenames[i + j])
                    
                batch_x = np.array(batch_x, dtype=np.float32)
                batch_yaw = np.array(batch_yaw)
                batch_pitch = np.array(batch_pitch)
                batch_roll = np.array(batch_roll)
                
                if test:
                    yield (batch_x, [batch_yaw, batch_pitch, batch_roll], names)
                else:
                    yield (batch_x, [batch_yaw, batch_pitch, batch_roll])
            if test:
                break

## Create Model

In [17]:
AFLW2000_DATA_DIR = '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/datasets/AFLW2000'
AFLW2000_MODEL_FILE = 'models/aflw2000_model.h5'
AFLW2000_TEST_SAVE_DIR = '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/datasets/AFLW2000_test/'

BIWI_DATA_DIR = '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/BIWI'
BIWI_MODEL_FILE = 'models/biwi_model.h5'
BIWI_TEST_SAVE_DIR = '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/biwi_test/'

face_landmark_path = 'models/shape_predictor_68_face_landmarks.dat'


dataset = Biwi(BIWI_DATA_DIR, '/opt/Documents/I.F.I-Vietnam/COURS_IFI/TPE/My work/datasets/BIWI/filename_list.txt', batch_size=BATCH_SIZE, ratio=0.95)

net = Hopenet(dataset, BIN_NUM, batch_size=BATCH_SIZE, input_size=INPUT_SIZE)

net.train(BIWI_MODEL_FILE, max_epoches=EPOCHS, load_weight=False)

# net.test(BIWI_TEST_SAVE_DIR)

Instructions for updating:
Colocations handled automatically by placer.




Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
Instructions for updating:
Use tf.cast instead.
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 224, 224, 3)  0                                            
__________________________________________________________________________________________________
resnet50 (Model)                multiple             23587712    input_1[0][0]                    
__________________________________________________________________________________________________
flatten (Flatten)               (None, 100352)       0           resnet50[1][0]                   
__________________________________________________________________________________________________
dropout (Dropout)               (None, 100352

IndexError: list index out of range