In [1]:
import os
os.chdir("..")  # go to project root
print(f"cwd: {os.getcwd()}")  # sanity check

cwd: /home/dude-desktop/dev/cs760


In [2]:
# import torch
# from diffusers import StableDiffusionInpaintPipeline
# from PIL import Image

# image = Image.open("outputs/debug_video_frame.png").convert("RGB")
# mask_image = Image.open("outputs/debug_mask_frame.png").convert("RGB")


# pipe = StableDiffusionInpaintPipeline.from_pretrained(
#     "stabilityai/stable-diffusion-2-inpainting",
#     torch_dtype=torch.float16,
# )
# pipe.to("cuda")
# prompt = "Face of a yellow cat, high resolution, sitting on a park bench"
# #image and mask_image should be PIL images.
# #The mask structure is white for inpainting and black for keeping as is
# image = pipe(prompt=prompt, image=image, mask_image=mask_image).images[0]
# image.save("./yellow_cat_on_park_bench.png")

In [3]:
import cv2
import numpy as np
import torch
from diffusers import StableDiffusionInpaintPipeline
from PIL import Image
from tqdm import tqdm
from lib import iter_dir_for_video_and_mask, get_video_stats, ensure_video_and_mask_match

In [4]:
pipe = StableDiffusionInpaintPipeline.from_pretrained(
    "stabilityai/stable-diffusion-2-inpainting",
    dtype=torch.float16,
)
pipe = pipe.to("cuda")

Keyword arguments {'dtype': torch.float16} are not expected by StableDiffusionInpaintPipeline and will be ignored.


Loading pipeline components...:   0%|          | 0/6 [00:00<?, ?it/s]

In [None]:
def inpaint_video_borders(pipe, video_path, mask_path, output_path, prompt, batch_size=64, seed=42):
    assert os.path.exists(video_path), f"Video path {video_path} does not exist."
    assert os.path.exists(mask_path), f"Mask path {mask_path} does not exist"
    
    # loading video and mask with opencv
    video_capture = cv2.VideoCapture(video_path)
    mask_capture = cv2.VideoCapture(mask_path)

    fps = video_capture.get(cv2.CAP_PROP_FPS)
    width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
    print(f"Video FPS: {fps}, Width: {width}, Height: {height}")

    frames = []
    while True:
        ret, frame = video_capture.read()
        if not ret:
            break
        frames.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))

    mask_frames = []
    while True:
        ret, frame = mask_capture.read()
        if not ret:
            break
        frame = cv2.bitwise_not(frame)
        mask_frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY))
        mask_frames.append(mask_frame)

    video_capture.release()
    mask_capture.release()

    if len(frames) != len(mask_frames):
        raise ValueError("The number of frames in the video and mask must be the same.")
    
    generator = torch.Generator(device="cuda").manual_seed(seed)

    output_frames = []
    for i in tqdm(range(0, len(frames), batch_size), total=(len(frames) + batch_size - 1) // batch_size):
        frame_batch = frames[i:i + batch_size]
        mask_batch = mask_frames[i:i + batch_size]
        prompts = [prompt] * len(frame_batch)

        with torch.no_grad():
            with torch.autocast("cuda"):
                output = pipe(
                    prompt=prompts,
                    image=frame_batch,
                    mask_image=mask_batch,
                    width=width,
                    height=height,
                    generator=generator,
                ).images

        output_frames.extend(output)
        torch.cuda.empty_cache()

    # combine outputted frames back into video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    for frame in output_frames:
        if isinstance(frame, Image.Image):
            frame = np.array(frame)
        frame = frame.astype(np.uint8)
        # Convert RGB (from PIL) to BGR (for OpenCV)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        out.write(frame)

    out.release()
    print(f"Inpainted video saved to {output_path}")


dummy_video_path = "out_pairs/stabilised/_Boom_Snap_Clap__challenge_clap_u_nm_np1_fr_med_1.avi"
dummy_mask_path = "out_pairs/masks/_Boom_Snap_Clap__challenge_clap_u_nm_np1_fr_med_1_mask.avi"
output_video_path = "outputs/output_video.mp4"
inpaint_video_borders(
    pipe=pipe,
    video_path=dummy_video_path,
    mask_path=dummy_mask_path,
    output_path=output_video_path,
    prompt="A realistic photo.",
    batch_size=64
)

Video FPS: 30.0, Width: 320, Height: 240


  0%|          | 0/2 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

 50%|█████     | 1/2 [00:42<00:42, 42.26s/it]

  0%|          | 0/50 [00:00<?, ?it/s]

100%|██████████| 2/2 [00:52<00:00, 26.29s/it]

Inpainted video saved to outputs/output_video.mp4





In [None]:
input_dir = "out_pairs"
output_dir = "outputs/StableDiffusion"
output_width, output_height = 320, 240

os.makedirs(output_dir, exist_ok=True)

for video in tqdm(iter_dir_for_video_and_mask(
    input_dir, video_dir="stabilised"
), desc="Checking video and mask"):
    video_stats = get_video_stats(video["video"])
    mask_stats = get_video_stats(video["mask"])
    mask_stats.frame_count = video_stats.frame_count  # TODO: HACK
    ensure_video_and_mask_match(video_stats, mask_stats)
    assert video_stats.width == output_width
    assert video_stats.height == output_height

for video in tqdm(iter_dir_for_video_and_mask(
    input_dir, video_dir="stabilised"
), desc="Inpainting videos"):
    video_name = os.path.basename(video["video"])
    output_path = os.path.join(output_dir, video_name)
    inpaint_video_borders(
        pipe=pipe,
        video_path=video["video"],
        mask_path=video["mask"],
        output_path=output_path,
        prompt="A realistic photo.",
        batch_size=32
    )

Checking video and mask: 100%|██████████| 500/500 [00:00<00:00, 566.46it/s]
Inpainting videos:   0%|          | 0/500 [00:00<?, ?it/s]

Video FPS: 30.0, Width: 320, Height: 240




  0%|          | 0/50 [00:00<?, ?it/s]



  0%|          | 0/50 [00:00<?, ?it/s]