# 1. Prepare model

In [10]:
import os
import torch
from pipelines.pipeline_animatediff import *
from diffusers.schedulers import DDIMInverseScheduler
from diffusers.utils import export_to_gif, export_to_video, load_image
from utils.attn_utils import *

# Set proxy environment variables
os.environ['http_proxy'] = 'http://oversea-squid5.sgp.txyun:11080'
os.environ['https_proxy'] = 'http://oversea-squid5.sgp.txyun:11080'
os.environ['no_proxy'] = 'localhost,127.0.0.1,localaddress,localdomain.com,internal,corp.kuaishou.com,test.gifshow.com,staging.kuaishou.com'

# Verify the setting
print("http_proxy:", os.environ.get('http_proxy'))
print("https_proxy:", os.environ.get('https_proxy'))
print("no_proxy:", os.environ.get('no_proxy'))


# Load the motion adapter
adapter = MotionAdapter.from_pretrained("/home/wangluozhou/projects/AnimateDiff/models/Motion_Module/animatediff-motion-adapter-v1-5-2", torch_dtype=torch.float32)
# Load the controlnet
# controlnet = ControlNetModel.from_pretrained('/home/wangluozhou/pretrained_models/sd-controlnet-depth', torch_dtype=torch.float16)
# load SD 1.5 based finetuned model
model_id = "/home/wangluozhou/pretrained_models/zeroscope_v2_576w"
pipe = VideoDiffPipeline.from_pretrained(
    model_id, 
    motion_adapter=None, 
    controlnet=None, 
    use_motion_mid_block=True,
    torch_dtype=torch.float32
)

pipe.scheduler = DDIMScheduler.from_pretrained(model_id, subfolder='scheduler')
device = torch.device('cuda')

# enable memory savings
pipe.enable_vae_slicing()

pipe.enable_model_cpu_offload()

The config attributes {'motion_activation_fn': 'geglu', 'motion_attention_bias': False, 'motion_cross_attention_dim': None} were passed to MotionAdapter, but are not expected and will be ignored. Please verify your config.json configuration file.


http_proxy: http://oversea-squid5.sgp.txyun:11080
https_proxy: http://oversea-squid5.sgp.txyun:11080
no_proxy: localhost,127.0.0.1,localaddress,localdomain.com,internal,corp.kuaishou.com,test.gifshow.com,staging.kuaishou.com


Loading pipeline components...: 100%|██████████| 5/5 [00:33<00:00,  6.61s/it]


In [None]:
import os
import torch
from pipelines.pipeline_animatediff import *
from diffusers.schedulers import DDIMInverseScheduler
from diffusers.utils import export_to_gif, export_to_video, load_image
from utils.attn_utils import *

model_id = "/home/wangluozhou/pretrained_models/Realistic_Vision_V6.0_B1_noVAE"

tokenizer = CLIPTokenizer.from_pretrained(
    model_id, subfolder="tokenizer", revision=None)
    
motion_adapter = MotionAdapter.from_pretrained(
    "/home/wangluozhou/projects/AnimateDiff/models/Motion_Module/animatediff-motion-adapter-v1-5-2",
    variant="fp16",
    torch_dtype=torch.float16
)
text_encoder = CLIPTextModel.from_pretrained(
    model_id, subfolder="text_encoder", revision=None
)
vae = AutoencoderKL.from_pretrained(
    model_id, subfolder="vae", revision=None)

# unet = UNet2DConditionModel.from_pretrained(
#     model_id,
#     subfolder="unet",
#     low_cpu_mem_usage=True,
# )
unet = UNetMotionModel.from_unet2d(UNet2DConditionModel.from_pretrained(
    model_id,
    subfolder="unet",
    low_cpu_mem_usage=True,
), motion_adapter)


pipe = AnimateDiffPipeline.from_pretrained(
    model_id, 
    motion_adapter=None, 
    controlnet=None, 
    use_motion_mid_block=True,
    use_safetensors=True,
    torch_dtype=torch.float16)
pipe.unet = unet.to(device='cuda',dtype=torch.float16)
# pipe.scheduler = DDIMScheduler.from_pretrained(model_id, subfolder='scheduler')

In [None]:
pipe.unet.config['use_motion_mid_block']

# 2. Text-to-Video Generation

In [None]:
output = pipe(
    prompt=(
        # "a man"
        "orange sky, warm lighting, fishing boats, ocean waves seagulls, "
        "rippling water, wharf, silhouette, serene atmosphere, dusk, evening glow, "
        "golden hour, coastal landscape, seaside scenery"
    ),
    negative_prompt="bad quality, worse quality",
    height=256,
    width=256,
    num_frames=16,
    guidance_scale=7.5,
    num_inference_steps=25,
    generator=torch.Generator("cpu").manual_seed(42),
)
frames = output.frames[0]
export_to_gif(frames, "animation_16.gif")

# 3. Video Editing

## 3.1 Load Source Video

In [None]:
# load video and into frames
frames = load_video('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/locomotive_run.mp4')

# 1. encode frames into batch of latents
latents_frames = pipe.encode_frames(frames, device=device)

## 3.1.1 Inverse with noise

In [None]:
inv_latent, _ = pipe.add_noise_to_latents(
    init_latents=latents_frames, 
    strength=0.8,
    generator=torch.Generator("cpu").manual_seed(42),
    num_inference_steps=25,
)

## 3.1.2 Inverse with DDIM

In [None]:
pipe.scheduler = DDIMInverseScheduler.from_pretrained(model_id, subfolder='scheduler')
inv_latent = pipe(
        prompt="", 
        negative_prompt="",
        num_frames=16,
        guidance_scale=7.5,
        output_type='latent', 
        num_inference_steps=25,
        strength=0.8, 
        latents=latents_frames,
        inverse=True,
    ).frames

## 3.2 Generation

In [None]:
pipe.scheduler = DDIMScheduler.from_pretrained(model_id, subfolder='scheduler')
output = pipe(
    prompt="a pretty girl, white singlet, dark pants, on the stage",
    negative_prompt="",
    num_frames=16,
    guidance_scale=7.5,
    num_inference_steps=25,
    latents=inv_latent,
    # frames=frames_controlnet,
    strength=0.8,
    # generator=torch.Generator("cpu").manual_seed(42),
)
frames = output.frames[0]
export_to_gif(frames, "/home/wangluozhou/projects/VideoDiffusion_Playground/resources/Human/sample_3_edit.gif")

# 4. Text-to-Video Generation with ControlNets

## 4.1 Load Control Signal

In [None]:
from controlnet_aux.processor import Processor
processor = Processor("depth_midas")

# load video and into frames
frames = load_video('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/Animals/sample_0_src.mp4')

frames_controlnet = []
for frame in frames:
    frames_controlnet.append(processor(frame, to_pil=True))

In [None]:
frames[0].size

## 4.2 Generation

In [None]:
pipe.scheduler = DDIMScheduler.from_pretrained(model_id, subfolder='scheduler')
output = pipe(
    prompt="a sea lion, lying on the ice, winter, snow",
    negative_prompt="",
    num_frames=16,
    height=320,
    width=512,
    guidance_scale=7.5,
    num_inference_steps=25,
    frames_controlnet=frames_controlnet,
    strength=1.0,
    generator=torch.Generator("cpu").manual_seed(42),
)
frames = output.frames[0]
export_to_gif(frames, "/home/wangluozhou/projects/VideoDiffusion_Playground/resources/Animals/sample_0_edit_2.gif")

# 5. Image-to-Video Generation

In [None]:
def build_curve_tensor(max_value, min_value, length, frames, strategy='linear'):
    """
    Build a curve based on the given strategy and return it as a PyTorch tensor.
    The curve starts from the min_value and increases to the max_value.

    Parameters:
    max_value (float): The maximum value of the curve.
    min_value (float): The minimum value of the curve.
    length (int): The length over which the curve changes from min to max.
    frames (int): The total number of frames in the curve.
    strategy (str): The strategy for building the curve. Options: 'linear', 'exponential', 'logarithmic'.

    Returns:
    torch.Tensor: A tensor representing the curve.
    """

    if strategy == 'linear':
        # Linear increase from min_value to max_value over 'length' frames, then constant
        curve = np.linspace(max_value, min_value, length)
        curve = np.pad(curve, (0, frames - length), mode='constant', constant_values=min_value)

    elif strategy == 'exponential':
        # Exponential increase from min_value to max_value
        curve = np.geomspace(max_value, min_value, length)
        curve = np.pad(curve, (0, frames - length), mode='constant', constant_values=min_value)

    elif strategy == 'logarithmic':
        # Logarithmic increase from min_value to max_value
        log_space = np.linspace(1, length + 1, length)
        curve = (np.log(log_space) / np.log(length + 1)) * (min_value - max_value) + min_value
        curve = np.pad(curve, (0, frames - length), mode='constant', constant_values=min_value)

    else:
        raise ValueError("Unknown strategy: Choose from 'linear', 'exponential', 'logarithmic'")

    # Convert the numpy array to a PyTorch tensor
    return torch.from_numpy(curve)

# Example usage with reversed curve
# curve_tensor_reversed = build_curve_tensor_reversed(1, 0.5, 3, 16, strategy='linear')
# curve_tensor_reversed  # Display the generated tensor curve
# Result
# tensor([0.5000, 0.7500, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
#         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000],
#        dtype=torch.float64)

## 5.1 load source image

In [None]:
# from video
frames = load_video('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/sample_5_src.mp4')
frames_inpaint = [frames[0]] * 16
latents_frames_inpaint = pipe.encode_frames(frames_inpaint, device=device)

In [None]:
# from image
frames = load_image('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/4.png')
frames_inpaint = [frames] * 16
latents_frames_inpaint = pipe.encode_frames(frames_inpaint, device=device)

## 5.2 Prepare inputs

In [None]:
# prepare inital latents
latents = pipe.prepare_latents(
    batch_size=1,
    num_channels_latents=4,
    num_frames=16,
    height=frames_inpaint[0].size[1],
    width=frames_inpaint[0].size[0],
    dtype=torch.float16,
    device=device,
    generator=torch.Generator("cpu").manual_seed(42)
)

mask_inpaint = torch.zeros_like(latents_frames_inpaint)

# # Values to assign along the frames dimension
# frame_values = build_curve_tensor(
#     max_value=1.0,
#     min_value=0.5,
#     length=8,
#     frames=16,
# )
# frame_values[-1]=1

mask_inpaint[:,:,0,:,:]=1
mask_inpaint[:,:,-1,:,:]=1

# # Assign the values to each frame in the mask
# for i, value in enumerate(frame_values):
#     mask_inpaint[:, :, i, :, :] = value

## 5.3 generation

In [None]:
pipe.scheduler = DDIMScheduler.from_pretrained(model_id, subfolder='scheduler')
output = pipe(
    prompt="Sunny seaside with blue sky",
    negative_prompt="",
    num_frames=16,
    guidance_scale=7.5,
    num_inference_steps=25,
    latents=latents,
    frames_inpaint=latents_frames_inpaint,
    noise_inpaint=latents,
    mask_inpaint=mask_inpaint,
    strength=1.0,
    generator=torch.Generator("cpu").manual_seed(42),
)
frames = output.frames[0]
export_to_gif(frames, "sample_4_animation.gif")

# 6. Image Animation - Noise Rectification

In [None]:
# from image
frames = load_image('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/4.png')
frames_inpaint = [frames] * 16
latents_frames_inpaint = pipe.encode_frames(frames_inpaint, device=device)

generator = torch.Generator("cpu").manual_seed(42)

In [None]:
inv_latent, init_noise = pipe.add_noise_to_latents(
    init_latents=latents_frames_inpaint, 
    strength=1.0,
    generator=generator,
    num_inference_steps=25,
)

In [None]:
mask_inpaint = torch.ones_like(inv_latent)

# Values to assign along the frames dimension
frame_values, curves = build_curve_tensor(
    max_value=1.0,
    min_value=0.5,
    length=8,
    frames=16,
)

plt.show()
# Assign the values to each frame in the mask
for i, value in enumerate(frame_values):
    mask_inpaint[:, :, i, :, :] = value

In [None]:
pipe.scheduler = DDIMScheduler.from_pretrained(model_id, subfolder='scheduler')
output = pipe(
    prompt="Sunny seaside with blue sky",
    negative_prompt="",
    num_frames=16,
    guidance_scale=7.5,
    num_inference_steps=25,
    # rect_scheduled_sampling_beta=0.6,
    latents=inv_latent,
    # noise_rect=init_noise,
    # mask_inpaint=mask_inpaint,
    strength=1.0,
    generator=generator,
)
frames = output.frames[0]
export_to_gif(frames, "sample_4_animation_noise1.0.gif")

# 7. Video Outpainting

## 7.1 Load source video

In [None]:
# from video
frames_inpaint = load_video('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/Outpainting/sample_7_src.mp4')

latents_frames_inpaint = pipe.encode_frames(frames_inpaint, device=device)

In [None]:
frames_inpaint[0].size[1]

## 7.2 Prepare noise and mask

In [None]:
# prepare inital latents
latents = pipe.prepare_latents(
    batch_size=1,
    num_channels_latents=4,
    num_frames=16,
    height=frames_inpaint[0].size[1],
    width=frames_inpaint[0].size[0],
    dtype=torch.float16,
    device=device,
    generator=torch.Generator("cpu").manual_seed(42)
)

In [None]:
# w/o motion adapter

# [bs, channels, frames, height, width] -> -> [bs * frames, channels, height, width]
frames_inpaint = frames_inpaint.permute(0,2,1,3,4).reshape((latents.shape[0] * num_frames, -1) + frames_inpaint.shape[3:])

# [bs * frames, channels, height, width]
mask_inpaint = torch.zeros_like(frames_inpaint)

In [None]:
# with motion adapter
# [bs, channels frames, height, width]

mask_inpaint = torch.ones_like(latents_frames_inpaint)
mask_inpaint[:, :, :, mask_inpaint.shape[3]//4:mask_inpaint.shape[3]//4 * 3, :] = 0

## 7.3 Generation

In [None]:
pipe.scheduler = DDIMScheduler.from_pretrained(model_id, subfolder='scheduler')
output = pipe(
    prompt="a pretty girl, grey t-shirt",
    negative_prompt="",
    num_frames=16,
    guidance_scale=7.5,
    num_inference_steps=25,
    latents=latents,
    frames_inpaint=latents_frames_inpaint,
    noise_inpaint=latents,
    mask_inpaint=mask_inpaint,
    strength=1.0,
    generator=torch.Generator("cpu").manual_seed(42),
)
frames = output.frames[0]
export_to_gif(frames, "/home/wangluozhou/projects/VideoDiffusion_Playground/resources/Outpainting/sample_7_edit.gif")

# 8. Frames Attention Analysis

## 8.1 Prepare Inputs

In [None]:
latents = pipe.prepare_latents(
    batch_size=1,
    num_channels_latents=4,
    num_frames=16,
    height=512,
    width=512,
    dtype=torch.float16,
    device=device,
    generator=torch.Generator("cpu").manual_seed(42)
)

In [None]:
latents_2 = pipe.prepare_latents(
    batch_size=1,
    num_channels_latents=4,
    num_frames=8,
    height=512,
    width=512,
    dtype=torch.float16,
    device=device,
    generator=torch.Generator("cpu").manual_seed(0)
)
latents = torch.cat([latents, latents_2], dim=2)

## 8.2 Prepare Attention Controller

In [None]:
controller = AttentionStore()
register_attention_control(pipe, controller=controller)

In [None]:
controller.target_keys = ('down_self',)
controller.target_resolutions = [16]
# prompts = ["a man is surfing", "a cat is climbing", "a dog is running"]

## 8.3 Generation

In [None]:
# controller.reset()
output = pipe(
    prompt=[
            (
                "a spiderman is surfing"
            ),
            # (
            #     "masterpiece, bestquality, highlydetailed, ultradetailed, sunset, "
            #     "orange sky, warm lighting, fishing boats, ocean waves seagulls, "
            #     "rippling water, wharf, silhouette, serene atmosphere, dusk, evening glow, "
            #     "golden hour, coastal landscape, seaside scenery"
            # )
            # (
            #     "a man is surfing"
            # )
        ],
    negative_prompt=[
        "bad quality, worse quality",
        # "bad quality, worse quality"
        ],
    guidance_scale=7.5,
    num_inference_steps=25,
    latents=latents
)
# frames = output.frames[1]
# export_to_gif(frames, "animation_24_animatediff.gif")

In [None]:
export_to_gif(output.frames[1], "outputs/animation_16_ad_seed42_bs1_attn_down_16_32.gif")

## 8.4 Visualization

In [None]:
controller.attention_store['down_self'][0].shape

In [None]:
compute_average_map(controller.attention_store['down_self'], frames=16, pixel_size=5, reduction='spatial')[1]

In [None]:
build_image_grid(controller.attention_store['up_self'], frames=16, pixel_size=20)

## 8.5 Batch Run

In [None]:
from itertools import chain, combinations
import random

def all_combinations(lst):
    return chain(*map(lambda x: combinations(lst, x), range(0, len(lst) + 1)))

controller_target_keys = ['down_self', 'mid_self', 'up_self']
controller_target_resolutions = [16, 32, 64, 128]
prompts = ["a man is surfing", "a cat is climbing", "a dog is running"]

key_combinations = list(all_combinations(controller_target_keys))
resolution_combinations = [controller_target_resolutions[:i + 1] for i in range(len(controller_target_resolutions))]

parameter_combinations = []
for keys in key_combinations:
    if not keys:
        resolutions_combinations = [[]]  # Skip resolution combinations if no key is selected
    else:
        resolutions_combinations = resolution_combinations

    for resolutions in resolutions_combinations:
        for prompt in prompts:
            combination = (keys, resolutions, prompt)
            parameter_combinations.append(combination)

def generate_name(combination):
    keys, resolutions, prompt = combination
    keys_name = '_'.join(keys) if keys else 'nokey'
    resolutions_name = '_'.join(map(str, resolutions)) if keys else 'noresolution'
    prompt_name = prompt.replace(' ', '_')
    return f"{keys_name}_{resolutions_name}_{prompt_name}"

# output_names = [generate_name(combination) for combination in parameter_combinations]

# # Example output names
# print(output_names[:5])  # Displaying first 5 names for brevity


In [None]:
for combination in parameter_combinations:
    (keys, resolutions, prompt) = combination
    controller.target_keys = keys
    controller.target_resolutions = resolutions
    

    output = pipe(
        prompt=[
                (
                    "a spiderman is surfing"
                ),
                # (
                #     "masterpiece, bestquality, highlydetailed, ultradetailed, sunset, "
                #     "orange sky, warm lighting, fishing boats, ocean waves seagulls, "
                #     "rippling water, wharf, silhouette, serene atmosphere, dusk, evening glow, "
                #     "golden hour, coastal landscape, seaside scenery"
                # )
                (
                    prompt
                )
            ],
        negative_prompt=[
            "bad quality, worse quality",
            "bad quality, worse quality"
            ],
        guidance_scale=7.5,
        num_inference_steps=25,
        latents=latents
    )
    export_to_gif(output.frames[1], f"outputs/{generate_name(combination)}.gif")
    controller.reset()

# 9. PE Inversion Testing

In [11]:
from utils.pe_utils import *

In [None]:
replace_positional_embedding_unet3d(pipe.unet, target_size=[320, 640, 1280], target_module=['down','mid','up'])

In [None]:
pe_path = '/home/wangluozhou/projects/VideoDiffusion_Playground/outputs/size_1280_unet3d/pos_embed.pt'
load_positional_embedding(pipe.unet, pe_path)

In [12]:
pipe.init_filter(
    video_length=16,
    height=320,
    width=512
)

  num_channels_latents = self.unet.in_channels


In [13]:
src_frames = load_video('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/Objects/sample_0_src.mp4')

In [14]:
latents_frames = pipe.encode_frames(src_frames, device=device)

In [15]:
latents_frames.shape

torch.Size([1, 4, 16, 40, 64])

In [16]:
output = pipe(
    prompt=(
        "Amazing quality, masterpiece, a man rides a bicycle in the snow field"
        # "orange sky, warm lighting, fishing boats, ocean waves seagulls, "
        # "rippling water, wharf, silhouette, serene atmosphere, dusk, evening glow, "
        # "golden hour, coastal landscape, seaside scenery"
    ),
    negative_prompt="bad quality, distortions, unrealistic, distorted image, watermark, signature",
    height=320,
    width=512,
    num_frames=16,
    guidance_scale=10,
    num_inference_steps=50,
    generator=torch.Generator("cuda").manual_seed(0),
    freeinit=True,
    frames_video=latents_frames,
)
frames = output.frames[0]
export_to_gif(frames, "/home/wangluozhou/projects/VideoDiffusion_Playground/outputs/test/animation.gif")

100%|██████████| 50/50 [01:05<00:00,  1.32s/it]


'/home/wangluozhou/projects/VideoDiffusion_Playground/outputs/test/animation.gif'

# 10. Mischelleos

In [None]:
import os
import torch
from pipelines.pipeline_animatediff import *
from diffusers.schedulers import DDIMInverseScheduler
from diffusers.utils import export_to_gif, export_to_video, load_image
from utils.attn_utils import *

In [None]:
frames = load_video('/home/wangluozhou/projects/VideoDiffusion_Playground/resources/dog_jump_water.mp4')
len(frames)

In [None]:
import os
for idx, frame in enumerate(frames):
    frame.save(os.path.join('/home/wangluozhou/projects/diffusion-motion-transfer/data/car',f'{str(idx).zfill(4)}.png'))