In [None]:
!pip install torchao
!pip install git+https://github.com/huggingface/diffusers

In [None]:
import torch
from diffusers import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel, CogVideoXImageToVideoPipeline
from diffusers.utils import export_to_video, load_image
from transformers import T5EncoderModel
from torchao.quantization import quantize_, int8_weight_only

quantization = int8_weight_only

text_encoder = T5EncoderModel.from_pretrained("THUDM/CogVideoX1.5-5B-I2V", subfolder="text_encoder",
                                              torch_dtype=torch.bfloat16)
quantize_(text_encoder, quantization())

transformer = CogVideoXTransformer3DModel.from_pretrained("THUDM/CogVideoX1.5-5B-I2V", subfolder="transformer",
                                                          torch_dtype=torch.bfloat16)
quantize_(transformer, quantization())

vae = AutoencoderKLCogVideoX.from_pretrained("THUDM/CogVideoX1.5-5B-I2V", subfolder="vae", torch_dtype=torch.bfloat16)
quantize_(vae, quantization())


In [None]:
import os
import numpy as np
import torch
import imageio
from PIL import Image
import cv2

# Define parameters for the pipeline
NUM_FRAMES = 16
NUM_INF_STEPS = 50
NUM_VIDEOS = 1
GUIDANCE = 6
HEIGHT, WIDTH = 256, 256
mean = 0  # Mean of the Gaussian noise
std_dev = 25  # Standard deviation of the Gaussian noise


def process_frame_and_generate_video(first_frame_pil, output_video_path, prompt):

    image = np.array(first_frame_pil).astype(np.float16) #use pillow image

    print(image.shape)

    noise = np.random.normal(mean, std_dev, image.shape)

    noisy_image = image + noise

    noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8)

    images = [noisy_image for _ in range(NUM_FRAMES)]

    images = np.stack(images, axis=0)

    print(images.shape)

    # Convert the image to tensor and move it to the correct device
    images = torch.from_numpy(images).to('cuda', dtype=torch.bfloat16)

    reshaped_image = images.reshape(1, 3, NUM_FRAMES, HEIGHT, WIDTH)


    # print(reshaped_image.shape)
    # Pass through the VAE to obtain latents
    latents = pipe.vae.encode(reshaped_image).latent_dist

    alpha = 1/100000 # adjust alpha
    latents=alpha*latents.mean.reshape(1, NUM_FRAMES // 4, 16, HEIGHT // 8, WIDTH // 8)

    generator = torch.Generator(device="cuda").manual_seed(42)

    beta = 1 # adjust beta
    random_tensor = beta*torch.randn((1, NUM_FRAMES // 4, 16, HEIGHT // 8, WIDTH // 8), generator=generator, device="cuda").to(dtype=torch.bfloat16)

    latents = latents + random_tensor

    print(latents.shape)
    video = pipe(
        prompt=prompt,
        image=first_frame_pil,
        num_videos_per_prompt=NUM_VIDEOS,
        num_inference_steps=NUM_INF_STEPS,
        num_frames=NUM_FRAMES,
        guidance_scale=GUIDANCE,
        latents=latents,
        generator=torch.Generator(device="cuda").manual_seed(42),
        height=HEIGHT,
        width=WIDTH
    ).frames[0]

    init_path = output_video_path.split('.')[0] + '_init.mp4'
    export_to_video(video, init_path, fps=8)

    # export_to_video(video, f"outputvideo_init.mp4", fps=8)

    for i in range(NUM_FRAMES-1):

        previous_images = images.to(dtype=torch.float16).cpu().numpy()

        last_frame = np.array(video[-1]).reshape(1,256,256,3)

        del video

        noise = np.random.normal(mean, std_dev, last_frame.shape)

        noisy_image = last_frame + noise

        noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8)

        images = np.concatenate([previous_images[1:], noisy_image], axis=0)

        images = torch.from_numpy(images).to('cuda', dtype=torch.bfloat16)

        reshaped_image = images.reshape(1, 3, NUM_FRAMES, HEIGHT, WIDTH)

        latents = pipe.vae.encode(reshaped_image).latent_dist

        latents = 1/100000 * latents.mean.reshape(1, NUM_FRAMES // 4, 16, HEIGHT // 8, WIDTH // 8)

        # image = load_image(image="afantou_d.jpg")

        generator = torch.Generator(device="cuda").manual_seed(42)

        random_tensor = torch.randn((1, NUM_FRAMES // 4, 16, HEIGHT // 8, WIDTH // 8), generator=generator, device="cuda").to(dtype=torch.bfloat16)

        latents = latents + random_tensor

        video = pipe(
            prompt=prompt,
            image=first_frame_pil,
            num_videos_per_prompt=NUM_VIDEOS,
            num_inference_steps=NUM_INF_STEPS,
            num_frames=NUM_FRAMES,
            guidance_scale=GUIDANCE,
            latents=latents,
            generator=torch.Generator(device="cuda").manual_seed(42),
            height=HEIGHT,
            width=WIDTH
        ).frames[0]

    export_to_video(video, output_video_path, fps=8)


## UCF-101

In [None]:
# Copyright (c) 2024 Mitsubishi Electric Research Laboratories (MERL)
# Copyright (c) 2021-2022 The Alibaba Fundamental Vision Team Authors. All rights reserved.
#
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-License-Identifier: Apache-2.0
#
# Code adapted from https://github.com/modelscope/modelscope/blob/57791a8cc59ccf9eda8b94a9a9512d9e3029c00b/modelscope/models/cv/anydoor/ldm/util.py -- Apache-2.0 license

import importlib
import random
from copy import deepcopy
from inspect import isfunction
import cv2
import imageio
import numpy as np
import torch
from PIL import Image, ImageDraw, ImageFont



def log_txt_as_img(wh, xc, size=10):
    # wh a tuple of (width, height)
    # xc a list of captions to plot
    b = len(xc)
    txts = list()
    for bi in range(b):
        txt = Image.new("RGB", wh, color="white")
        draw = ImageDraw.Draw(txt)
        font = ImageFont.truetype("data/DejaVuSans.ttf", size=size)
        nc = int(40 * (wh[0] / 256))
        lines = "\n".join(xc[bi][start : start + nc] for start in range(0, len(xc[bi]), nc))

        try:
            draw.text((0, 0), lines, fill="black", font=font)
        except UnicodeEncodeError:
            print("Cant encode string for logging. Skipping.")

        txt = np.array(txt).transpose(2, 0, 1) / 127.5 - 1.0
        txts.append(txt)
    txts = np.stack(txts)
    txts = torch.tensor(txts)
    return txts


def ismap(x):
    if not isinstance(x, torch.Tensor):
        return False
    return (len(x.shape) == 4) and (x.shape[1] > 3)


def isimage(x):
    if not isinstance(x, torch.Tensor):
        return False
    return (len(x.shape) == 4) and (x.shape[1] == 3 or x.shape[1] == 1)


def exists(x):
    return x is not None


def default(val, d):
    if exists(val):
        return val
    return d() if isfunction(d) else d


def mean_flat(tensor):
    """
    https://github.com/openai/guided-diffusion/blob/27c20a8fab9cb472df5d6bdd6c8d11c8f430b924/guided_diffusion/nn.py#L86
    Take the mean over all non-batch dimensions.
    """
    return tensor.mean(dim=list(range(1, len(tensor.shape))))


def count_params(model, verbose=False):
    total_params = sum(p.numel() for p in model.parameters())
    if verbose:
        print(f"{model.__class__.__name__} has {total_params*1.e-6:.2f} M params.")
    return total_params


def instantiate_from_config(config):
    if not "target" in config:
        if config == "__is_first_stage__":
            return None
        elif config == "__is_unconditional__":
            return None
        raise KeyError("Expected key `target` to instantiate.")
    return get_obj_from_str(config["target"])(**config.get("params", dict()))


def get_obj_from_str(string, reload=False):
    module, cls = string.rsplit(".", 1)
    if reload:
        module_imp = importlib.import_module(module)
        importlib.reload(module_imp)
    return getattr(importlib.import_module(module, package=None), cls)


def center_crop(img, new_width=None, new_height=None):
    width = img.shape[1]
    height = img.shape[0]

    if width == height:
        return img

    if new_width is None:
        new_width = min(width, height)

    if new_height is None:
        new_height = min(width, height)

    left = int(np.ceil((width - new_width) / 2))
    right = width - int(np.floor((width - new_width) / 2))

    top = int(np.ceil((height - new_height) / 2))
    bottom = height - int(np.floor((height - new_height) / 2))

    if len(img.shape) == 2:
        center_cropped_img = img[top:bottom, left:right]
    else:
        center_cropped_img = img[top:bottom, left:right, ...]

    return center_cropped_img


def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True


def resize(img_npy, IMG_H, IMG_W):
    return np.asarray(Image.fromarray(img_npy).resize((IMG_H, IMG_W)))


def preprocess_image(img):
    img_tensor = torch.from_numpy(img / 255.0).type(torch.float32)
    img_tensor = img_tensor.unsqueeze(dim=0)
    img_tensor = img_tensor.permute(0, 3, 1, 2)  # nchw
    # normalization
    mean = [0.5, 0.5, 0.5]
    std = [0.5, 0.5, 0.5]
    mean = torch.tensor(mean, device=img_tensor.device).reshape(1, -1, 1, 1)  # nchw
    std = torch.tensor(std, device=img_tensor.device).reshape(1, -1, 1, 1)  # nchw
    img_tensor = img_tensor.sub_(mean).div_(std)
    return img_tensor


def postprocess_image(img_tensor, batch_idx=0):
    img_tensor = img_tensor.clone().detach().cpu()
    mean = [0.5, 0.5, 0.5]
    std = [0.5, 0.5, 0.5]
    mean = torch.tensor(mean, device=img_tensor.device).reshape(1, -1, 1, 1)  # nchw
    std = torch.tensor(std, device=img_tensor.device).reshape(1, -1, 1, 1)  # nchw
    img_tensor = img_tensor.mul_(std).add_(mean)
    img_tensor = img_tensor[batch_idx].permute(1, 2, 0)
    img_tensor[img_tensor < 0] = 0
    img_tensor[img_tensor > 1] = 1
    img_data = np.array(img_tensor * 255, dtype=np.uint8)
    return img_data


def resize_with_border(im, desired_size, interpolation):
    old_size = im.shape[:2]
    ratio = float(desired_size) / max(old_size)
    new_size = tuple(int(x * ratio) for x in old_size)

    im = cv2.resize(im, (new_size[1], new_size[0]), interpolation=interpolation)
    delta_w = desired_size - new_size[1]
    delta_h = desired_size - new_size[0]
    top, bottom = delta_h // 2, delta_h - (delta_h // 2)
    left, right = delta_w // 2, delta_w - (delta_w // 2)

    color = [0, 0, 0]
    new_im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)

    return new_im


def nearest_true_index(mask, index):
    if mask[index]:
        return index  # The given index is True, so it's the nearest True index.

    left_index = index - 1
    right_index = index + 1

    while left_index >= 0 or right_index < len(mask):
        if left_index >= 0 and mask[left_index]:
            return left_index
        if right_index < len(mask) and mask[right_index]:
            return right_index

        left_index -= 1
        right_index += 1

    return None  # No True value found in the mask.


def binary_to_hex(binary_list):
    binary_list = deepcopy(binary_list)
    # binary_list.reverse()  # Reverse the list to match bit order.
    binary_string = "".join([str(int(bit)) for bit in binary_list])
    decimal_number = int(binary_string, 2)
    hex_string = hex(decimal_number).lstrip("0x")
    return hex_string


def list2gif(img_path_list, gif_path, save_img_dir):
    save_npy_list = [imageio.v2.imread(x) for x in img_path_list]
    imageio.mimwrite(gif_path, save_npy_list, duration=1000 / 8)
    for i, save_npy in enumerate(save_npy_list):
        imageio.v2.imsave(os.path.join(save_img_dir, "%04d.png" % i), save_npy)


class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count



# Function to center crop and resize a frame
def process_frame(frame, size=256):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert to RGB
    cropped_frame = center_crop(frame_rgb)  # Center crop the frame
    resized_frame = resize(cropped_frame, size, size)  # Resize to the desired size
    resized_bgr = cv2.cvtColor(resized_frame, cv2.COLOR_RGB2BGR)  # Convert back to BGR
    return resized_bgr

# Function to process each video in the directory
def process_video(input_path, output_path, output_size=256):
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {input_path}")
        return

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"Processing video: {input_path}")
    print(f"FPS: {fps}, Total Frames: {total_frames}")

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # Codec for .mp4 format
    out = cv2.VideoWriter(output_path, fourcc, fps, (output_size, output_size))

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # End of video

        processed_frame = process_frame(frame, size=output_size)
        out.write(processed_frame)

        frame_count += 1

        # Optional: Progress tracking
        if frame_count % 50 == 0:
            print(f"Processed {frame_count}/{total_frames} frames.")

    cap.release()
    out.release()
    print(f"Video saved to: {output_path}")

# Directory containing the videos
input_dir = "videos"  # Replace with your video directory

# crop the videos in the desired shape (output_size)
# Loop over all files in the directory
for filename in os.listdir(input_dir):
    if filename.endswith(".avi") or filename.endswith(".mp4"):  # Adjust for your video file extensions
        input_video_path = os.path.join(input_dir, filename)
        output_video_path = input_video_path[:-4] + "proc.mp4"

        process_video(input_video_path, output_video_path, output_size=256)

In [None]:
import cv2
import os

# get the num_frames first frames of the videos
def extract_frames(video_path, output_dir, num_frames=32):
    """
    Extracts the first 'num_frames' from a video and saves them in a folder.

    Args:
        video_path: Path to the input video file.
        output_dir: Path to the output directory.
        num_frames: Number of frames to extract.
    """

    try:
        os.makedirs(output_dir, exist_ok=True)  # Create output directory if it doesn't exist

        cap = cv2.VideoCapture(video_path)

        frame_count = 0
        while frame_count < num_frames and cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_name = f"frame_{frame_count:03d}.jpg"  # Use zero-padding for consistent filenames
            output_path = os.path.join(output_dir, frame_name)
            cv2.imwrite(output_path, frame)

            frame_count += 1

        cap.release()
        print(f"Extracted {frame_count} frames from {video_path}")

    except Exception as e:
        print(f"Error processing {video_path}: {e}")

# Example usage
input_dir = "videos"  # Replace with the actual directory
output_dir_template = "{video_name}"

for video_file in os.listdir(input_dir):
    if video_file.endswith(".mp4"):
        video_path = os.path.join(input_dir, video_file)
        video_name = os.path.splitext(video_file)[0]  # Get video name without extension
        output_dir = output_dir_template.format(video_name=video_name)
        extract_frames(video_path, output_dir)

In [None]:
def process_video_directory(video_dir):
    # Find the first frame image
    frame_files = [f for f in os.listdir(video_dir) if f.endswith('.jpg')]
    if not frame_files:
        print(f"No frames found in directory {video_dir}")
        return

    first_frame_path = os.path.join(video_dir, "frame_000.jpg")

    print("first file path", first_frame_path)
    first_frame = Image.open(first_frame_path)

    width, height = first_frame.size
    mode = first_frame.mode

    print(f"Width: {width} pixels")
    print(f"Height: {height} pixels")
    print(f"Mode: {mode}")

    if mode in ("RGB", "RGBA", "CMYK"):
      num_channels = len(mode)
      print(f"Number of channels: {num_channels}")
    elif mode in ("1", "L", "P"):
      num_channels = 1
      print(f"Number of channels: {num_channels}")
    else:
        print("Number of channels could not be determined from the mode.")

    # Process the first frame and generate a new video
    selected_class_dict = {
        "ApplyEyeMakeup": "A person is applying eye makeup.",
        "BabyCrawling": "A Baby is crawling.",
        "BreastStroke": "A person is performing breaststroke.",
        "Drumming": "A person is drumming.",
        "HorseRiding": "A person is riding horse.",
        "Kayaking": "A person is kayaking.",
        "PlayingGuitar": "A person is playing Guitar.",
        "Surfing": "A person is surfing.",
        "ShavingBeard": "A person is shaving beard.",
    }
    prompt = selected_class_dict[video_dir.split('_')[1]]
    print(prompt)
    output_video_path = os.path.join(video_dir, "generated_video.mp4")
    process_frame_and_generate_video(first_frame, output_video_path, prompt)

def process_all_videos(root_dir):
    # List all subdirectories that represent the processed video folders
    video_dirs = [f for f in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, f))]

    if not video_dirs:
        print("No video directories found.")
        return

    print(f"Found {len(video_dirs)} video directories. Starting processing...")

    for video_dir in video_dirs:
        video_dir_path = os.path.join(root_dir, video_dir)
        if video_dir.startswith('.'):
            continue
        else :
          print("video dir path", video_dir_path)
          pro = video_dir_path.split('_')[1]
          print(pro)
          process_video_directory(video_dir_path)

    print("All videos processed successfully.")


# generate the videos
root_dir = "frms/"  # Replace with the path where your video directories are stored
process_all_videos(root_dir)

### Metrics

In [None]:
# code adapted from https://github.com/JunyaoHu/common_metrics_on_video_quality
# download and save on the same directory the files of the repo
import torch
from calculate_fvd import calculate_fvd
from calculate_psnr import calculate_psnr
from calculate_ssim import calculate_ssim
from calculate_lpips import calculate_lpips

# ps: pixel value should be in [0, 1]!

import os
import cv2
import torch
import numpy as np
from glob import glob

def process_video(video_path, size=(256, 256)):
    """
    Process a video file into a tensor of shape (VIDEO_LENGTH, CHANNEL, SIZE, SIZE).
    """
    cap = cv2.VideoCapture(video_path)
    frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # Resize frame
        frame = cv2.resize(frame, size)
        # Convert BGR to RGB and append
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(frame)

    cap.release()

    # Convert to tensor (VIDEO_LENGTH, CHANNEL, SIZE, SIZE)
    frames = np.array(frames)  # (VIDEO_LENGTH, SIZE, SIZE, CHANNEL)
    frames = np.transpose(frames, (0, 3, 1, 2))  # (VIDEO_LENGTH, CHANNEL, SIZE, SIZE)
    return torch.tensor(frames, dtype=torch.float32)

def process_images(image_paths, size=(256, 256)):
    """
    Process a list of image files into a tensor of shape (NUM_IMAGES, CHANNEL, SIZE, SIZE).
    """
    images = []
    for img_path in sorted(image_paths):
        img = cv2.imread(img_path)
        img = cv2.resize(img, size)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        images.append(img)

    # Convert to tensor (NUM_IMAGES, CHANNEL, SIZE, SIZE)
    images = np.array(images)  # (NUM_IMAGES, SIZE, SIZE, CHANNEL)
    images = np.transpose(images, (0, 3, 1, 2))  # (NUM_IMAGES, CHANNEL, SIZE, SIZE)
    return torch.tensor(images, dtype=torch.float32)

def create_tensors(data_dir, size=(256, 256)):
    """
    Create tensors for all videos and images in the given directory.
    """
    video_tensors = []
    image_tensors = []

    for folder in sorted(os.listdir(data_dir)):
        folder_path = os.path.join(data_dir, folder)
        if not os.path.isdir(folder_path):
            continue

        # Process video
        video_path = glob(os.path.join(folder_path, "*.gif"))[0]
        video_tensor = process_video(video_path, size=size)
        video_tensors.append(video_tensor)

        # Process images
        image_paths = glob(os.path.join(folder_path, "*.jpg"))
        image_tensor = process_images(image_paths, size=size)
        image_tensors.append(image_tensor)

    # Stack tensors into final shapes
    video_tensors = torch.stack(video_tensors)  # (NUMBER_OF_VIDEOS, VIDEO_LENGTH, CHANNEL, SIZE, SIZE)
    image_tensors = torch.stack(image_tensors)  # (NUMBER_OF_VIDEOS, VIDEO_LENGTH, CHANNEL, SIZE, SIZE)

    return video_tensors, image_tensors

# Example usage
data_dir = "frms"  # Replace with the path to your data directory
videos1, videos2 = create_tensors(data_dir)
print("Videos Tensor Shape:", videos1.shape)
print("Images Tensor Shape:", videos2.shape)


# adjust these parameterz
NUMBER_OF_VIDEOS = 6
VIDEO_LENGTH = 32
CHANNEL = 3
SIZE = 256

device = torch.device("cpu") # or "cuda"


#uncomment the metrics you want to compute
import json
result = {}
only_final = False
# only_final = True
# result['fvd'] = calculate_fvd(videos1, videos2, device, method='styleganv', only_final=only_final)
result['fvd'] = calculate_fvd(videos1, videos2, device, method='videogpt', only_final=only_final)
# result['ssim'] = calculate_ssim(videos1, videos2, only_final=only_final)
# result['psnr'] = calculate_psnr(videos1, videos2, only_final=only_final)
# result['lpips'] = calculate_lpips(videos1, videos2, device, only_final=only_final)
print(json.dumps(result, indent=4))