In [1]:
import glob
import os
from PIL import Image
import numpy as np
import torch
import torch.nn.functional as F
from torchvision import transforms 
import ot


2023-08-02 17:25:50.993354: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
def linear_scheduler(timesteps, start=0.0001, end=0.02):
    
    """
    Returns linear schedule for beta
    """
    return torch.linspace(start, end, timesteps)

def get_index_from_list(vals, t, x_shape):
    
    """ 
    Returns values from vals for corresponding timesteps
    while considering the batch dimension.
    
    """
    batch_size = t.shape[0]
    output = vals.gather(-1, t.cpu())
    return output.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)

def forward_diffusion_sample(x_0, t, device="cpu"):
    """ 
    Takes an image and a timestep as input and 
    returns the noisy version of it after adding noise t times.
    """
    noise = torch.randn_like(x_0)
    sqrt_alphas_cumprod_t = get_index_from_list(sqrt_alphas_cumprod, t, x_0.shape)
    sqrt_one_minus_alphas_cumprod_t = get_index_from_list(sqrt_one_minus_alphas_cumprod, t, x_0.shape)
    
    # mean + variance
    return sqrt_alphas_cumprod_t.to(device) * x_0.to(device) + sqrt_one_minus_alphas_cumprod_t.to(device) * noise.to(device), noise.to(device)


# Define beta schedule
T = 300
betas = linear_scheduler(timesteps=T)

# Pre-calculate different terms for closed form
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis=0)
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas)
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)

In [6]:
IMG_SIZE = 112
forward_transform = [
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(), # Scales data into [0,1] 
    transforms.Lambda(lambda t: (t * 2) - 1) # Scale between [-1, 1] 
]
forward_transform = transforms.Compose(forward_transform)
backward_transform = [
    transforms.Lambda(lambda t: (t + 1) / 2),
    transforms.Lambda(lambda t: t.permute(1, 2, 0)), # CHW to HWC
    transforms.Lambda(lambda t: t * 255.),
    transforms.Lambda(lambda t: t.numpy().astype(np.uint8)),
    transforms.ToPILImage()
]
backward_transform = transforms.Compose(backward_transform)

In [11]:
img = Image.open('data/ucf101/ApplyEyeMakeup/v_ApplyEyeMakeup_g01_c01/00000.jpg')
img = forward_transform(img)
t = torch.Tensor([1]).type(torch.int64)
img, noise = forward_diffusion_sample(img, t)
img = backward_transform(img)
img.save('test.jpg')

In [None]:

def sample_video(frames):
    # return randomly 4 segments of 16 frames each
    # each segment is a tensor of shape (16, 112, 112, 3)
    while len(frames) < 16:
        frames += frames
    start_idx = np.random.randint(len(frames) - 16 + 1, size=4)
    segments = []
    for i in range(4):
        x = np.stack([frames[j] for j in range(start_idx[i], start_idx[i]+16)])
        segments.append(x)
    return segments

def prep_video():
    video_path = 'data/ucf101/ApplyEyeMakeup/v_ApplyEyeMakeup_g01_c01'
    frames_path = glob.glob(os.path.join(video_path, '*.jpg'))
    frames_path.sort()
    frames = [Image.open(path) for path in frames_path]
    



In [4]:
x = torch.ones((4)) / 4
y = torch.ones((4)) / 4
z = torch.rand(4, 4)
z1 = ot.sinkhorn(x, y, z, 0.01, numItermax=2000)

In [5]:
print(z1)

tensor([[8.2536e-43, 2.5000e-01, 9.4096e-10, 2.8499e-13],
        [5.7574e-02, 2.3579e-05, 1.9240e-01, 5.3894e-31],
        [1.9246e-01, 4.0652e-20, 5.7543e-02, 2.5366e-15],
        [1.2835e-10, 4.6883e-06, 8.4780e-05, 2.4991e-01]])
