In [1]:
import numpy as np
import moviepy.editor as e
import numpy.random as npr
import time
import os
import json
from tqdm import trange,tqdm
import gizeh
import torch
from random import choice
from pytorch_pretrained_biggan import BigGAN, truncated_noise_sample
from PIL import Image, ImageOps, ImageDraw, ImageEnhance
from PIL import ImageFilter
from pytorch_style_gan import *
import shutil
import gc
import glob

In [2]:
TEMP =os.path.join(os.getcwd(),'temp')
if not os.path.isdir(TEMP):
    os.mkdir(TEMP)

def random_one_hot(dim=1000,n=1):
    m = np.eye(dim)[np.random.choice(dim, n)]
    return torch.from_numpy(m).type('torch.FloatTensor').cuda()


def tensor_to_images(obj):
    try:
        import PIL
    except ImportError:
        raise ImportError("install Pillow: pip install Pillow")

    if not isinstance(obj, np.ndarray):
        obj = obj.detach().numpy()

    obj = obj.transpose((0, 2, 3, 1))
    obj = np.clip(((obj + 1) / 2.0) * 256, 0, 255)

    img = []
    for i, out in enumerate(obj):
        out_array = np.asarray(np.uint8(out), dtype=np.uint8)
        img.append(out_array)
    return img

class NPEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.int32):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(NPEncoder, self).default(obj)

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
g_all.eval()
g_all.to(device)

def kind_map(kind):
    kinds = ['color','cursor','bg','sg','move','close']
    z = np.zeros(len(kinds))
    index = kinds.index(kind)
    z[index]=1.
    return z

def annotate(kind=None, num_samples = None , data=None, c_speed = None, a_speed = None, truncation = None):
    mat = np.empty((num_samples,784))
    mat.fill(np.nan)
    
    # header (one hot vector for the kind of signal)
    mat[:,0:6] = kind_map(kind)
    
    # speeds
    if kind not in ['move', 'close'] :
        mat[:,-2:] = [c_speed, a_speed]
    if kind == 'cursor':
        mat[:,6:13] = data
    if kind == 'color':
        mat[:,10:13] = data
    if kind == 'sg':
        mat[:,13:525] = data
    if kind == 'bg':
        mat[:,525:781] = data
        mat[:,781] = truncation
    return mat

In [3]:
def colorScreen(data=None,dur=20,h=1024,w=1024,c_speed=1,fps=32, a_speed = np.nan, freeze_rate = 0.5):
    if data is not None:
        iM = 256*data[:,10:13]
        mat = data
        
    else:
        num_switches = int(dur*c_speed)+1
        switch_samples = int(fps/c_speed)
        num_samples = dur*fps

        M = 255*npr.rand(num_switches,3) # main colors
        M_freez_msk = npr.binomial(1, freeze_rate, (num_switches,3))        
        for i in range(num_switches):
            for j in range(3):
                M[i,j] = (1-M_freez_msk[i,j])*M[i,j] + (M_freez_msk[i,j])*M[(i-1),j]


        iM=np.empty((num_samples,3))
        for i in range(num_switches-1):
            iM[i*switch_samples:(i+1)*switch_samples] = np.linspace(M[i],M[i+1],switch_samples,endpoint=False)
        mat = annotate(kind = 'color', num_samples = num_samples, data = iM/256., c_speed = c_speed, a_speed = np.nan)
    imgs = [] # frames
    for c in tqdm(iM):
        img = np.empty((h, w, 3),dtype = np.uint8) # initialize the frame
        img[:, :, 0:3] = c.astype(np.uint8)
        imgs.append(img)
    
    clip = e.ImageSequenceClip(imgs,fps = fps)
    fn = os.path.join(TEMP,time.strftime("%m-%d-%H-%M-%S")+'.mp4')
    clip.write_videofile(fn, fps = fps ,audio_codec='aac', logger = None)
    cdur = clip.duration
    del clip
    gc.collect()
    return fn,mat, cdur

In [4]:
def colorBoxScreen(data = None, dur=20,h=1024,a_speed=2,c_speed=1,fps=32,freeze_rate = 0.5):
    if data is not None:
        iM = data[:,6:13]
        iM[:,0:4] = h * iM[:,0:4]
        mat = data
    
    else:
        num_switches_s = int(dur*a_speed)+1
        num_switches_c = int(dur*c_speed)+1
        switch_samples_s = int(fps/a_speed)
        switch_samples_c = int(fps/c_speed)
        num_samples = dur*fps

        size = np.array([h,h])
        Ss = npr.uniform([1,1],size/2,(num_switches_s,2)) #scales
        Ps = npr.uniform(Ss,size-Ss,(num_switches_s,2)) #positions
        C = npr.uniform(0,1,(num_switches_c,3)) #colors
        G = np.concatenate([Ss,Ps],axis=1)
        
        G_freez_msk = npr.binomial(1, freeze_rate, (num_switches_s,4))        
        for i in range(num_switches_s):
            for j in range(4):
                G[i,j] = (1-G_freez_msk[i,j])*G[i,j] + (G_freez_msk[i,j])*G[(i-1),j]
                
        C_freez_msk = npr.binomial(1, freeze_rate, (num_switches_c,3))
        for i in range(num_switches_c):
            for j in range(3):
                C[i,j] = (1-C_freez_msk[i,j])*C[i,j] + (C_freez_msk[i,j])*C[(i-1),j]
        
        iG=np.empty((num_samples,4))
        iC=np.empty((num_samples,3))
        for i in range(num_switches_s-1):
            iG[i*switch_samples_s:(i+1)*switch_samples_s] = np.linspace(G[i],G[i+1],switch_samples_s,endpoint=False)
        for i in range(num_switches_c-1):
            iC[i*switch_samples_c:(i+1)*switch_samples_c] = np.linspace(C[i],C[i+1],switch_samples_c,endpoint=False)
        iM = np.concatenate([iG,iC],axis=1)
        
        data = iM.copy()
        data[:,0:4] = data[:,0:4]/h
        mat = annotate(kind='cursor', num_samples = num_samples, data = data, c_speed=c_speed, a_speed= a_speed)

    imgs = [] # frames
    n_blur=20
    for m in tqdm(iM):
        surface = gizeh.Surface(h,h) # width, height
        s = m[0:2]
        p = m[2:4]
        c = m[4:]
        for i in range(n_blur):
            box = gizeh.rectangle(lx=s[0]+(n_blur-i),ly=s[1]+(n_blur-i), xy = p, fill=((i+1)/n_blur)*c)
            box.draw(surface)
        img = surface.get_npimage()
        imgs.append(img)
    clip = e.ImageSequenceClip(imgs,fps = fps)
    fn = os.path.join(TEMP,time.strftime("%m-%d-%H-%M-%S")+'.mp4')
    clip.write_videofile(fn, fps = fps ,audio_codec='aac', logger = None)
    cdur = clip.duration
    del clip
    gc.collect()
    return fn,mat, cdur

In [5]:
def faceScreen(data = None, dur=30,h=1024,w=1024,c_speed=1,fps=32, a_speed=np.nan,freeze_rate = 0.5):
    if data is not None:
        iM = data[:,13:525]
        mat = data
    
    else:    
        num_switches = int(dur*c_speed)+1
        switch_samples = int(fps/c_speed)
        num_samples = dur*fps

        M = npr.uniform(low=-3,high=3,size=(num_switches,512)) # main colors
        M_freez_msk = npr.binomial(1, freeze_rate, (num_switches,512))        
        for i in range(num_switches):
            for j in range(512):
                M[i,j] = (1-M_freez_msk[i,j])*M[i,j] + (M_freez_msk[i,j])*M[(i-1),j]

        iM=np.empty((num_samples,512))
        for i in range(num_switches-1):
            iM[i*switch_samples:(i+1)*switch_samples] = np.linspace(M[i],M[i+1],switch_samples,endpoint=False)
        mat = annotate(kind = 'sg', num_samples = num_samples, data = iM, c_speed=c_speed, a_speed=np.nan)    
        
    iMt = torch.from_numpy(iM)
    iMt = iMt.to('cuda').float()
    imgs = []
    with torch.no_grad():
        for i in trange(0,len(iMt)):
            imgs += tensor_to_images(g_all(iMt[i].unsqueeze(0)).cpu())
    
#     for c in iM:
#         img = np.empty((h, w, 3)) # initialize the frame
#         img[:, :, 0:3] = c
#         imgs.append(img)
    clip = e.ImageSequenceClip(imgs,fps = fps)
    fn = os.path.join(TEMP,time.strftime("%m-%d-%H-%M-%S")+'.mp4')
    clip.write_videofile(fn, fps = fps ,audio_codec='aac', logger = None)
    cdur = clip.duration
    del clip
    gc.collect()
    return fn,mat, cdur

In [6]:
def bigGANScreen(data= None, dur=30, h=1024, w=1024, c_speed=1, a_speed=2, truncation=0.6, fps=32,freeze_rate = 0.5):
    model = BigGAN.from_pretrained('biggan-deep-512',cache_dir='cache')
    model.to('cuda')
    
    if data is not None:
        condsnp = data[:,525:781]
        truncation = data[0,781]
        mat = data
    else: 
        num_switches_y = int(dur*c_speed)+1
        num_switches_z = int(dur*a_speed)+1
        switch_samples_y = int(fps/c_speed)
        switch_samples_z = int(fps/a_speed)
        num_samples = dur*fps

        Ys=np.empty((num_samples,128))
        Zs=np.empty((num_samples,128))

        y_oh = random_one_hot(dim=1000, n=num_switches_y)
        y_freez_msk = npr.binomial(1, freeze_rate, (num_switches_y,1000))        
        for i in range(1,num_switches_y):
            for j in range(1000):
                y_oh[i,j] = (1-y_freez_msk[i,j])*y_oh[i,j] + (y_freez_msk[i,j])*y_oh[(i-1),j]
        y = np.array([model.embeddings(v).detach().cpu().numpy() for v in y_oh])
        for i in range(num_switches_y-1):
            Ys[i*switch_samples_y:(i+1)*switch_samples_y] = np.linspace(y[i],y[i+1],switch_samples_y,endpoint=False)
            
        z = truncated_noise_sample(truncation=truncation, batch_size=num_switches_z)
        z_freez_msk = npr.binomial(1, freeze_rate, (num_switches_z,128))        
        for i in range(1,num_switches_z):
            for j in range(128):
                z[i,j] = (1-z_freez_msk[i,j])*z[i,j] + (z_freez_msk[i,j])*z[(i-1),j]
        for i in range(num_switches_z-1):
            Zs[i*switch_samples_z:(i+1)*switch_samples_z] = np.linspace(z[i],z[i+1],switch_samples_z,endpoint=False)

        condsnp = np.concatenate((Zs,Ys),axis = 1)
        mat = annotate(kind='bg', num_samples= num_samples, data=condsnp, truncation=truncation, c_speed=c_speed, a_speed=a_speed)
    
    conds = torch.from_numpy(condsnp).type('torch.FloatTensor').cuda()
    imgs = [] # frames
    for i in trange(0,len(conds),3):
        if i+3<len(conds):
            tensors = model.generator(conds[i:i+3],truncation=truncation).detach().to('cpu').numpy()
        else:
            tensors = model.generator(conds[i:],truncation=truncation).detach().to('cpu').numpy()
        imgs += [np.array(Image.fromarray(im).resize((h,w), resample=Image.BICUBIC)) for im in tensor_to_images(tensors)]
    
    clip = e.ImageSequenceClip(imgs,fps = fps)
    fn = os.path.join(TEMP,time.strftime("%m-%d-%H-%M-%S")+'.mp4')
    clip.write_videofile(fn, fps = fps ,audio_codec='aac', logger = None)
    cdur = clip.duration
    del clip
    gc.collect()
    return fn,mat, cdur

In [7]:
def closeEyes():
    num_samples = 32*10
#     clip = e.VideoFileClip('close.mp4')
    mat = annotate(kind = 'close', num_samples=num_samples)
    return 'close.mp4', mat, 10

In [8]:
def move():
    num_samples = 32*15
#    clip = e.VideoFileClip('move.mp4')
    mat = annotate(kind = 'move', num_samples=num_samples)
    return 'move.mp4', mat, 15

In [9]:
def combo(folder,total_dur=15,fps=32):
    dur = 0
    filled = 0
    clips = []
    mats = np.ndarray(dtype=np.float32, shape=(fps*(30+total_dur*60),784))

    while dur<total_dur*60:
        r = choice(range(20))
        if r in range(2): # 10% close eyes
            print('close ',dur/60)
            clip,mat, cdur = closeEyes()
        elif r in range(6) : # 20% move
            print('move ',dur/60)
            clip,mat, cdur = move()
        elif r in range(8): # 10% color
            print('color ',dur/60)
            c_speed = choice([0.5,1,2])
            clip,mat, cdur = colorScreen(c_speed=c_speed)
        elif r in range(11): # 15% cursor
            print('cursor ',dur/60)
            c_speed = choice([0.5,1])
            a_speed = choice([1,2,4])
            clip,mat, cdur = colorBoxScreen(c_speed=c_speed, a_speed = a_speed)
        elif r in range(15): # 20% sg
            print('face ',dur/60)
            c_speed = choice([0.2,0.5,1,2])
            clip,mat, cdur = faceScreen(c_speed=c_speed)
        else : # 25% bg
            print('biggan ',dur/60)
            c_speed = choice([0.2,0.5,1])
            a_speed = choice([0.2,0.5,1,2])
            clip,mat, cdur = bigGANScreen(c_speed=c_speed, a_speed = a_speed)
        dur += cdur
        clips.append(clip)
        mats[filled:filled+len(mat),:] = mat
        filled += len(mat)
    print('reading the clips...')
    clips = [e.VideoFileClip(c) for c in clips]
    gc.collect()
    clips = e.concatenate_videoclips(clips)
    gc.collect()
    mats = mats[:filled,:]
    gc.collect()
    timestr = time.strftime("%m-%d-%H-%M-%S")
    folder_path = os.path.join(folder,timestr)
    os.mkdir(folder_path)
    vid_fn = os.path.join(folder_path,'vid_'+timestr+'.mp4')
    clips.write_videofile(vid_fn, fps = fps ,audio_codec='aac')
    print('saving the log file ...')
    log_fn = os.path.join(folder_path,'log_'+timestr)
    np.save(log_fn,mats)
    print('deleting temporary files ...')
    del clips, mats
    gc.collect()
    for path in os.listdir(TEMP):
        full_path = os.path.join(TEMP, path)
        if os.path.isfile(full_path):
            os.remove(full_path)
    print('All done!')

In [None]:
num_files = 1
duration = 5 # minutes

exports =os.path.join(os.getcwd(),'exports')
if not os.path.isdir(exports):
    os.mkdir(exports) 
for _ in range(num_files):
    combo(exports,total_dur=duration)