In [28]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import PIL
import cv2
import torchvision
import torch as th
import scipy
import scipy.ndimage
import os

def save_video(list_frame, name='vid'):
    torchvision.io.write_video(f'./{name}.mp4', th.tensor(np.stack(list_frame)), fps=15)

def draw_face_landmarks(image, landmarks, landmarks2=None):
    for i, landmark in enumerate(landmarks):
        x, y = landmark
        cv2.circle(image, (int(x), int(y)), 10, (0, 255, 0), -1)
        if landmarks2 is not None:
            x2, y2 = landmarks2[i]
            cv2.circle(image, (int(x2), int(y2)), 10, (255, 0, 0), -1)
    return image

def image_align(src_file,
                face_landmarks,
                output_size=1024,
                transform_size=4096,
                enable_padding=True):
    # Align function from FFHQ dataset pre-processing step
    # https://github.com/NVlabs/ffhq-dataset/blob/master/download_ffhq.py

    lm = np.array(face_landmarks)
    lm_chin = lm[0:17]  # left-right
    lm_eyebrow_left = lm[17:22]  # left-right
    lm_eyebrow_right = lm[22:27]  # left-right
    lm_nose = lm[27:31]  # top-down
    lm_nostrils = lm[31:36]  # top-down
    lm_eye_left = lm[36:42]  # left-clockwise
    lm_eye_right = lm[42:48]  # left-clockwise
    lm_mouth_outer = lm[48:60]  # left-clockwise
    lm_mouth_inner = lm[60:68]  # left-clockwise

    # Calculate auxiliary vectors.
    eye_left = np.mean(lm_eye_left, axis=0)
    eye_right = np.mean(lm_eye_right, axis=0)
    eye_avg = (eye_left + eye_right) * 0.5
    eye_to_eye = eye_right - eye_left
    mouth_left = lm_mouth_outer[0]
    mouth_right = lm_mouth_outer[6]
    mouth_avg = (mouth_left + mouth_right) * 0.5
    eye_to_mouth = mouth_avg - eye_avg

    # Choose oriented crop rectangle.
    x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1]
    x /= np.hypot(*x)
    x *= max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8)
    y = np.flipud(x) * [-1, 1]
    c = eye_avg + eye_to_mouth * 0.1
    quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y])
    qsize = np.hypot(*x) * 2

    # Load in-the-wild image.
    if not os.path.isfile(src_file):
        print(
            '\nCannot find source image. Please run "--wilds" before "--align".'
        )
        return
    img = PIL.Image.open(src_file)
    img = img.convert('RGB')

    # Shrink.
    shrink = int(np.floor(qsize / output_size * 0.5))
    if shrink > 1:
        rsize = (int(np.rint(float(img.size[0]) / shrink)),
                 int(np.rint(float(img.size[1]) / shrink)))
        img = img.resize(rsize, PIL.Image.ANTIALIAS)
        quad /= shrink
        qsize /= shrink

    # Crop.
    border = max(int(np.rint(qsize * 0.1)), 3)
    crop = (int(np.floor(min(quad[:, 0]))), int(np.floor(min(quad[:, 1]))),
            int(np.ceil(max(quad[:, 0]))), int(np.ceil(max(quad[:, 1]))))
    crop = (max(crop[0] - border, 0), max(crop[1] - border, 0),
            min(crop[2] + border,
                img.size[0]), min(crop[3] + border, img.size[1]))
    if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]:
        img = img.crop(crop)
        quad -= crop[0:2]

    # Pad.
    pad = (int(np.floor(min(quad[:, 0]))), int(np.floor(min(quad[:, 1]))),
           int(np.ceil(max(quad[:, 0]))), int(np.ceil(max(quad[:, 1]))))
    pad = (max(-pad[0] + border,
               0), max(-pad[1] + border,
                       0), max(pad[2] - img.size[0] + border,
                               0), max(pad[3] - img.size[1] + border, 0))
    if enable_padding and max(pad) > border - 4:
        pad = np.maximum(pad, int(np.rint(qsize * 0.3)))
        img = np.pad(np.float32(img),
                     ((pad[1], pad[3]), (pad[0], pad[2]), (0, 0)), 'reflect')
        h, w, _ = img.shape
        y, x, _ = np.ogrid[:h, :w, :1]
        mask = np.maximum(
            1.0 -
            np.minimum(np.float32(x) / pad[0],
                       np.float32(w - 1 - x) / pad[2]), 1.0 -
            np.minimum(np.float32(y) / pad[1],
                       np.float32(h - 1 - y) / pad[3]))
        blur = qsize * 0.02
        img += (scipy.ndimage.gaussian_filter(img, [blur, blur, 0]) -
                img) * np.clip(mask * 3.0 + 1.0, 0.0, 1.0)
        img += (np.median(img, axis=(0, 1)) - img) * np.clip(mask, 0.0, 1.0)
        img = PIL.Image.fromarray(np.uint8(np.clip(np.rint(img), 0, 255)),
                                  'RGB')
        quad += pad[:2]

    # Transform.
    img = img.transform((transform_size, transform_size), PIL.Image.QUAD,
                        (quad + 0.5).flatten(), PIL.Image.BILINEAR)
    if output_size < transform_size:
        img = img.resize((output_size, output_size), PIL.Image.ANTIALIAS)

    return img


# Visualize Keypoints

In [2]:
data_dir = '/data/mint/DPM_Dataset/Videos/joker_3/images/'
kpts = np.load('./joker_3_align_params.npy', allow_pickle=True).item()
frames = sorted(kpts.keys(), key=lambda x:int(x[5:-4]))
kpts_vis = []
for f in frames:
    img = np.array(Image.open(data_dir + f))
    lmk = draw_face_landmarks(img, kpts[f]['face_landmark'])
    kpts_vis.append(lmk)
    
save_video(kpts_vis, name='lmk')



# Aligning function given Landmarks

In [3]:
data_dir = '/data/mint/DPM_Dataset/Videos/joker_2/images/'
kpts = np.load('./joker_2_align_params.npy', allow_pickle=True).item()
frames = sorted(kpts.keys(), key=lambda x:int(x[5:-4]))
aligned_vis = []
for f in frames:
    aligned_img = image_align(src_file=data_dir + f,
                              face_landmarks=kpts[f]['face_landmark'], 
                              output_size=256)
    aligned_vis.append(aligned_img)
    
save_video(aligned_vis, name='aligned')


# Optical flow on face landmarks

In [34]:
data_dir = '/data/mint/DPM_Dataset/Videos/joker_3/images/'
kpts = np.load('./joker_3_align_params.npy', allow_pickle=True).item()
frames = sorted(kpts.keys(), key=lambda x:int(x[5:-4]))
all_kpts = []
for f in frames:
    all_kpts.append(kpts[f]['face_landmark'])
all_kpts = np.stack(all_kpts)   # T x #N-points x 2

def compute_flow(kpts):
    fw_flows = kpts[1:] - kpts[:-1]
    bw_flows = (kpts[::-1][1:] - kpts[::-1][:-1])[::-1]
    print(fw_flows.shape, bw_flows.shape)
    return fw_flows, bw_flows

def flow_smooth_kpts(kpts, n=2):
    fw_flows, bw_flows = compute_flow(kpts)
    smooth_kpts = []
    for i, kpt in enumerate(kpts):
        fw_idx = np.clip(i+n, 0, kpts.shape[0])
        bw_idx = np.clip(i-n, 0, kpts.shape[0])
        
        # Kpts
        fw_kpt = kpts[i:fw_idx]
        bw_kpt = kpts[bw_idx:i]
        
        # Flows
        fw_flow = fw_flows[i:fw_idx]
        bw_flow = fw_flows[bw_idx:i]
        # print(i, fw_kpt.shape, bw_kpt.shape, fw_flow.shape, bw_flow.shape)


        candidate = [kpt]
        for j in range(fw_flow.shape[0]):
            candidate.append(fw_kpt[j] + (np.sum(fw_flow[j:], axis=0)))
            
        for j in range(bw_flow.shape[0]):
            candidate.append(bw_kpt[j] + (np.sum(bw_flow[j:], axis=0)))
    
        candidate = np.stack(candidate)
        smooth_kpts.append(np.mean(candidate, axis=0))
    return np.stack(smooth_kpts)
    
smooth_kpts = flow_smooth_kpts(all_kpts, n=5)
print(smooth_kpts.shape)

kpts_vis = []
for i, f in enumerate(frames):
    img = np.array(Image.open(data_dir + f))
    lmk = draw_face_landmarks(img, kpts[f]['face_landmark'], smooth_kpts[i])
    kpts_vis.append(lmk)
    
save_video(kpts_vis, name='smooth_lmk')


aligned_vis = []
for i, f in enumerate(frames):
    aligned_img = image_align(src_file=data_dir + f,
                              face_landmarks=smooth_kpts[i], 
                              output_size=256)
    aligned_vis.append(aligned_img)
    
save_video(aligned_vis, name='smooth_aligned')


(141, 68, 2) (141, 68, 2)
(142, 68, 2)
