In [None]:
import os
import json
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch_itl import model, sampler, cost, kernel, estimator

### Set paths and load model config / ckpt

In [None]:
# Set trained model paths
base_experiment_path = './LS_Experiments/KDEF_single'
model_name = 'KDEF_NE_itl_model_split1_CF'

# get model config and ckpt
base_model_path = os.path.join(base_experiment_path, model_name, 'model/')
for fname in os.listdir(base_model_path):
    if ('config' in fname) and (fname.split('.')[-1] == 'json'):
        model_config_path = os.path.join(base_model_path, fname)
    elif ('ckpt' in fname) and (fname.split('.')[-1] == 'pt'):
        model_ckpt_path = os.path.join(base_model_path, fname)
    else:
        print(fname, 'does not exist')
print(model_config_path, model_ckpt_path)

# load ckpt and json
with open(model_config_path, 'r') as f:
    model_config = json.load(f)

### Read data

In [None]:
# ----------------------------------
# Reading input/output data
# ----------------------------------
dataset = model_config['Data']['dataset']  
theta_type = model_config['Data']['theta_type']
inp_emotion = model_config['Data']['inpu_emotion'] 
inc_neutral = model_config['Data']['include_emotion']  
use_facealigner = True if model_config['Data']['input_data_version'] == 'facealigner' else False

data_path = './datasets/' + dataset + '_Aligned/' + dataset +'_LANDMARKS'  # set data path
if dataset == 'Rafd':
    # dirty hack only used to get Rafd speaker ids, not continuously ordered
    data_csv_path = '/home/mlpboon/Downloads/Rafd/Rafd.csv'

print('Reading data')
if use_facealigner:
    if dataset == 'KDEF':
        from datasets.datasets import kdef_landmarks_facealigner
        x_train, y_train, x_test, y_test, train_list, test_list = \
            kdef_landmarks_facealigner(data_path, inp_emotion=inp_emotion, inc_emotion=inc_neutral)
    elif dataset == 'Rafd':
        from datasets.datasets import rafd_landmarks_facealigner
        x_train, y_train, x_test, y_test, train_list, test_list = \
            rafd_landmarks_facealigner(data_path, data_csv_path, inc_emotion=inc_neutral)
else:
    from datasets.datasets import import_kdef_landmark_synthesis
    x_train, y_train, x_test, y_test = import_kdef_landmark_synthesis(dtype=input_data_version)

n = x_train.shape[0]
m = y_train.shape[1]
nf = y_train.shape[2]
print('data dimensions', n, m, nf, inp_emotion)

In [None]:
# set ITL model
assert model_config['Kernels']['kernel_input_learnable'] == False
kernel_input = kernel.Gaussian(model_config['Kernels']['gamma_inp'])
kernel_output = kernel.Gaussian(model_config['Kernels']['gamma_out'])
kernel_freq = np.eye(nf) # can be added to ckpt or manually set as np.load(kernel_file)

# define emotion sampler - this can also be added to ckpt
if model_config['Data']['theta_type'] == 'aff':
    from datasets.datasets import import_affectnet_va_embedding
    affect_net_csv_path = ''  # to be set if theta_type == 'aff'
    aff_emo_dict = import_affectnet_va_embedding(affect_net_csv_path)
    
    if dataset == 'KDEF':
        aff_emo_match = {'NE': 'Neutral',
                         'HA': 'Happy',
                         'SA': 'Sad',
                         'SU': 'Surprise',
                         'AF': 'Fear',
                         'DI': 'Disgust',
                         'AN': 'Anger',
                         }
    elif dataset == 'Rafd':
        aff_emo_match = {'neutral': 'Neutral',
                         'happy': 'Happy',
                         'sad': 'Sad',
                         'surprised': 'Surprise',
                         'fearful': 'Fear',
                         'disgusted': 'Disgust',
                         'angry': 'Anger',
                         'contemptous': 'Contempt'
                         }    
    
    
    sampler_ = sampler.CircularSampler(data=dataset+theta_type,
                                       inp_emotion=aff_emo_match[inp_emotion],
                                       inc_emotion=inc_neutral,
                                       sample_dict=aff_emo_dict)
elif theta_type == '':
    sampler_ = sampler.CircularSampler(data=dataset,
                                       inc_neutral=inc_neutral)
sampler_.m = m

itl_model = model.SpeechSynthesisKernelModel(kernel_input, kernel_output,
                                             kernel_freq=torch.from_numpy(kernel_freq).float())

### Load model and predict

In [None]:
ckpt = torch.load(model_ckpt_path)
itl_model.test_mode(x_train=x_train, thetas=sampler_.sample(m), alpha=ckpt['itl_alpha'])
pred_test = itl_model.forward(x_test, sampler_.sample(m))

In [None]:
# compute cost
cost_pred = cost.speech_synth_loss(y_test, pred_test, sampler_.sample(m))
print('cost test:', cost_pred)

# compute expected euclidean distance between samples and mean for each emotion
var_gt_em = 0
var_test_em = 0
var_gt = 0
var_test = 0
for i in range(m):
    var_gt_em = np.sum(np.var(y_test[:,i,:].numpy(), axis=0))
    var_test_em = np.sum(np.var(pred_test[:,i,:].numpy(), axis=0))
    var_gt += var_gt_em
    var_test += var_test_em
    print('{:d}, {:.3f}, {:.3f}'.format(i, var_gt_em, var_test_em))
print('{:.3f}, {:.3f}'.format(var_gt/m, var_test/m))

### Continuous generation

In [None]:
def circular_sampling(theta1, theta2, num_samples):
    angle1 = np.arctan2(theta1[1], theta1[0])
    angle2 = np.arctan2(theta2[1], theta2[0])
    angle1 = angle1 if angle1>=0 else angle1+(2*np.pi)
    angle2 = angle2 if angle2>=0 else angle2+(2*np.pi)
    
    reverse = False
    if angle1>angle2:
        start = angle2; end = angle1
        reverse = True
    else:
        start = angle1; end = angle2
        
    sampled_angles = np.linspace(start=start, stop=end, num=num_samples, endpoint=True)
    sample_coords = np.vstack((np.cos(sampled_angles), np.sin(sampled_angles))).T
    
    if reverse:
        return np.flipud(sample_coords)
    else:
        return sample_coords, sampled_angles

def radial_sampling(theta, num_samples):
    angle = np.arctan2(theta[1], theta[0])
    sampled_radii = np.linspace(start=0, stop=1, num=num_samples, endpoint=True)
    sample_coords = np.vstack((sampled_radii*np.cos(angle), sampled_radii*np.sin(angle))).T
    return sample_coords, sampled_radii


class EdgeMap(object):
    def __init__(self, out_res, num_parts=3):
        self.out_res = out_res
        self.num_parts = num_parts
        self.groups = [
            [np.arange(0, 17, 1), 255],
            [np.arange(17, 22, 1), 255],
            [np.arange(22, 27, 1), 255],
            [np.arange(27, 31, 1), 255],
            [np.arange(31, 36, 1), 255],
            [list(np.arange(36, 42, 1)) + [36], 255],
            [list(np.arange(42, 48, 1)) + [42], 255],
            [list(np.arange(48, 60, 1)) + [48], 255],
            [list(np.arange(60, 68, 1)) + [60], 255]
        ]

    def __call__(self, shape):
        image = np.zeros((self.out_res, self.out_res, self.num_parts), dtype=np.float32)
        for g in self.groups:
            for i in range(len(g[0]) - 1):
                start = int(shape[g[0][i]][0]), int(shape[g[0][i]][1])
                end = int(shape[g[0][i + 1]][0]), int(shape[g[0][i + 1]][1])
                cv2.line(image, start, end, g[1], 1)
        return image

In [None]:
import cv2
sampling_type = 'circular'
num_samples = 180
ckpt = torch.load(model_ckpt_path)
itl_model.test_mode(x_train=x_train, thetas=sampler_.sample(m), alpha=ckpt['itl_alpha'])
if sampling_type == 'circular':
    sampled_emotions, sampled_angles = circular_sampling(aff_emo_dict['Happy'], aff_emo_dict['Sad'], num_samples)
    print(sampled_angles)
elif sampling_type == 'radial':
    sampled_emotions, sampled_radii = radial_sampling(aff_emo_dict['Fear'], num_samples)
    print(sampled_radii)
EM = EdgeMap(out_res=128, num_parts=1)

In [None]:
# %matplotlib inline
# for i in range(len(sampled_emotions)):
#     pred_test = itl_model.forward(x_test, torch.from_numpy(sampled_emotions[i][np.newaxis]).float())
#     im_em = EM(pred_test[0, 0].detach().numpy().reshape(68,2)*128)
#     plt.imshow(np.squeeze(im_em))
#     plt.pause(0.5)
    
output_path_cont_gen = './utils/plot_utils/visualizations/continuous_control/circ_video_kdef/happy_sad'
if not os.path.exists(output_path_cont_gen):
    os.makedirs(output_path_cont_gen)
for i in range(len(sampled_emotions)):
    pred_test = itl_model.forward(x_test, torch.from_numpy(sampled_emotions[i][np.newaxis]).float())
    im_em = EM(pred_test[0, 0].detach().numpy().reshape(68,2)*128)
    cv2.imwrite(os.path.join(output_path_cont_gen, str(i).zfill(3)+'.jpg'), im_em)