In [17]:
import sys
import torch
import numpy as np
# import gradio as gr
import cv2
from PIL import Image
from omegaconf import OmegaConf
from einops import repeat, rearrange
from pytorch_lightning import seed_everything
import os
from imwatermark import WatermarkEncoder
import imgviz
sys.path.append("/home/boyili/share_data/Stable_Videos_run/stablediffusion")
from stablediffusion.scripts.txt2img import put_watermark
from stablediffusion.ldm.util import instantiate_from_config
from stablediffusion.ldm.models.diffusion.ddim import DDIMSampler
from stablediffusion.ldm.data.util import AddMiDaS

torch.set_grad_enabled(False)

<torch.autograd.grad_mode.set_grad_enabled at 0x7f5fa36ff820>

In [19]:
def make_batch_sd(
        image,
        txt,
        device,
        num_samples=1,
        model_type="dpt_hybrid"
):
    image = np.array(image.convert("RGB"))
    image = torch.from_numpy(image).to(dtype=torch.float32) / 127.5 - 1.0
    # sample['jpg'] is tensor hwc in [-1, 1] at this point
    midas_trafo = AddMiDaS(model_type=model_type)
    batch = {
        "jpg": image,
        "txt": num_samples * [txt],
    }
    batch = midas_trafo(batch)
    batch["jpg"] = rearrange(batch["jpg"], 'h w c -> 1 c h w')
    batch["jpg"] = repeat(batch["jpg"].to(device=device),
                          "1 ... -> n ...", n=num_samples)
    batch["midas_in"] = repeat(torch.from_numpy(batch["midas_in"][None, ...]).to(
        device=device), "1 ... -> n ...", n=num_samples)
    return batch


def paint(sampler, image, prompt, t_enc, seed, scale, num_samples=1, callback=None,
          do_full_sample=False):
    device = torch.device(
        "cuda") if torch.cuda.is_available() else torch.device("cpu")
    model = sampler.model
    seed_everything(seed)

    print("Creating invisible watermark encoder (see https://github.com/ShieldMnt/invisible-watermark)...")
    wm = "SDV2"
    wm_encoder = WatermarkEncoder()
    wm_encoder.set_watermark('bytes', wm.encode('utf-8'))

    with torch.no_grad(),\
            torch.autocast("cuda"):
        batch = make_batch_sd(
            image, txt=prompt, device=device, num_samples=num_samples)
        z = model.get_first_stage_encoding(model.encode_first_stage(
            batch[model.first_stage_key]))  # move to latent space
        c = model.cond_stage_model.encode(batch["txt"])
        c_cat = list()
        for ck in model.concat_keys:
            cc = batch[ck]
            cc = model.depth_model(cc)
            depth_min, depth_max = torch.amin(cc, dim=[1, 2, 3], keepdim=True), torch.amax(cc, dim=[1, 2, 3],
                                                                                           keepdim=True)
            display_depth = (cc - depth_min) / (depth_max - depth_min)
            depth_image = Image.fromarray(
                (display_depth[0, 0, ...].cpu().numpy() * 255.).astype(np.uint8))
            cc = torch.nn.functional.interpolate(
                cc,
                size=z.shape[2:],
                mode="bicubic",
                align_corners=False,
            )
            depth_min, depth_max = torch.amin(cc, dim=[1, 2, 3], keepdim=True), torch.amax(cc, dim=[1, 2, 3],
                                                                                           keepdim=True)
            cc = 2. * (cc - depth_min) / (depth_max - depth_min) - 1.
            c_cat.append(cc)
        c_cat = torch.cat(c_cat, dim=1)
        # cond
        cond = {"c_concat": [c_cat], "c_crossattn": [c]}

        # uncond cond
        uc_cross = model.get_unconditional_conditioning(num_samples, "")
        uc_full = {"c_concat": [c_cat], "c_crossattn": [uc_cross]}
        if not do_full_sample:
            # encode (scaled latent)
            z_enc = sampler.stochastic_encode(
                z, torch.tensor([t_enc] * num_samples).to(model.device))
        else:
            z_enc = torch.randn_like(z)
        # decode it
        samples = sampler.decode(z_enc, cond, t_enc, unconditional_guidance_scale=scale,
                                 unconditional_conditioning=uc_full, callback=callback)
        x_samples_ddim = model.decode_first_stage(samples)
        result = torch.clamp((x_samples_ddim + 1.0) / 2.0, min=0.0, max=1.0)
        result = result.cpu().numpy().transpose(0, 2, 3, 1) * 255
    return [depth_image] + [put_watermark(Image.fromarray(img.astype(np.uint8)), wm_encoder) for img in result]


def pad_image(input_image):
    pad_w, pad_h = np.max(((2, 2), np.ceil(
        np.array(input_image.size) / 64).astype(int)), axis=0) * 64 - input_image.size
    im_padded = Image.fromarray(
        np.pad(np.array(input_image), ((0, pad_h), (0, pad_w), (0, 0)), mode='edge'))
    w, h = im_padded.size
    if w == h:
        return im_padded
    elif w > h:
        new_image = Image.new(im_padded.mode, (w, w), (0, 0, 0))
        new_image.paste(im_padded, (0, (w - h) // 2))
        return new_image
    else:
        new_image = Image.new(im_padded.mode, (h, h), (0, 0, 0))
        new_image.paste(im_padded, ((h - w) // 2, 0))
        return new_image


def predict(input_image, prompt, steps, num_samples, scale, seed, eta, strength):
    num_samples = 1
    init_image = input_image.convert("RGB")
    image = pad_image(init_image)  # resize to integer multiple of 32
    image = image.resize((512, 512))
    sampler.make_schedule(steps, ddim_eta=eta, verbose=True)
    assert 0. <= strength <= 1., 'can only work with strength in [0.0, 1.0]'
    do_full_sample = strength == 1.
    t_enc = min(int(strength * steps), steps-1)
    result = paint(
        sampler=sampler,
        image=image,
        prompt=prompt,
        t_enc=t_enc,
        seed=seed,
        scale=scale,
        num_samples=num_samples,
        callback=None,
        do_full_sample=do_full_sample
    )
    return result




def initialize_model(config, ckpt):
    config = OmegaConf.load(config)
    model = instantiate_from_config(config.model)
    model.load_state_dict(torch.load(ckpt)["state_dict"], strict=False)

    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    model = model.to(device)
    sampler = DDIMSampler(model)
    return sampler

os.makedirs(".out", exist_ok=True)

In [20]:
fpath = "stablediffusion/configs/stable-diffusion/v2-midas-inference.yaml"
wpath = "weights/512-depth-ema.ckpt"
sampler = initialize_model(fpath, wpath)

No module 'xformers'. Proceeding without it.
LatentDepth2ImageDiffusion: Running in eps-prediction mode
DiffusionWrapper has 865.91 M params.
making attention of type 'vanilla' with 512 in_channels
Working with z of shape (1, 4, 32, 32) = 4096 dimensions.
making attention of type 'vanilla' with 512 in_channels


In [21]:
prompt = "A high resolution photo of a woman with a red dress and a black hat."
ddim_steps = 50
num_samples = 1
scale = 5.0
seed = 42
eta = 0.0
strength = 0.9

In [23]:
video_path = "assets/videos/youtube_run_000/img/"
save_path = "out/v0_test1.mp4"
list_of_frames = np.sort([i for i in os.listdir(video_path) if ".jpg" in i])
for fid, fname in enumerate(list_of_frames):
    input_image = Image.open(video_path + fname)
    result = predict(input_image, prompt, ddim_steps, num_samples, scale, seed, eta, strength)
    # import ipdb; ipdb.set_trace()
    output_image = imgviz.tile([np.array(input_image.resize(result[1].size)), np.array(result[0].resize(result[1].size)), np.array(result[1])], shape=(1, 3), border=(255, 255, 255))
    output_image = output_image[:, :, ::-1]
    if(fid==0):
        video_file = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, frameSize=(output_image.shape[1],output_image.shape[0]))
    video_file.write(output_image)
video_file.release()

In [1]:
from IPython.display import HTML

HTML("""
<video width="80%" controls>
  <source src="out/v0_test1.mp4" type="video/mp4">
</video>
""")

In [3]:
from ipywidgets import Video, Image
video = Video.from_file("out/v0_test1.mp4",play=True,width=1400, height=1400)
video

Video(value=b'\x00\x00\x00\x1cftypisom\x00\x00\x02\x00isomiso2mp41\x00\x00\x00\x08free\x00:I\xbf...', height='…