In [None]:
# Install dependencies
pip install diffusers transformers accelerate imageio imageio-ffmpeg

import torch
from diffusers import DiffusionPipeline
import imageio
import numpy as np
import os
from PIL import Image
from IPython.display import Video, display

# Set device
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# Load the text-to-video model
pipe = DiffusionPipeline.from_pretrained(
    "damo-vilab/text-to-video-ms-1.7b",
    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
    variant="fp16" if device == "cuda" else None
).to(device)

# Prompt for video generation
prompt = "a robot walking in a futuristic city"
video_frames = pipe(prompt, num_inference_steps=25).frames

# Format frames correctly (handling the batch of frames)
video_frames_uint8 = []
for batch_frame in video_frames:  # batch_frame should be (16, 256, 256, 3)
    for frame in batch_frame:  # Iterate through each frame in the batch
        frame_np = np.array(frame)
        frame_np = np.squeeze(frame_np)  # Remove all singleton dimensions safely

        # Ensure shape is (H, W, 3)
        if frame_np.ndim == 2:
            frame_np = np.stack([frame_np] * 3, axis=-1)
        elif frame_np.ndim == 3 and frame_np.shape[2] == 1:
            frame_np = np.concatenate([frame_np] * 3, axis=2)
        elif frame_np.ndim == 3 and frame_np.shape[2] == 4:
            frame_np = frame_np[:, :, :3]
        elif frame_np.ndim != 3 or frame_np.shape[2] != 3:
            raise ValueError(f"Unexpected frame shape: {frame_np.shape}")

        # Convert to uint8 if needed
        if frame_np.dtype != np.uint8:
            frame_np = np.clip(frame_np, 0, 1) * 255
            frame_np = frame_np.astype(np.uint8)

        video_frames_uint8.append(frame_np)

# Optional: repeat frames to extend duration
extended_frames = []
for frame in video_frames_uint8:
    extended_frames.extend([frame] * 3)  # Repeat each frame 3 times (adjust for longer duration)

# Save to MP4 using imageio with FFMPEG
os.makedirs("outputs", exist_ok=True)
video_path = "outputs/generated_video.mp4"
with imageio.get_writer(video_path, fps=5, codec='libx264') as writer:
    for frame in extended_frames:
        writer.append_data(frame)

# Display video inside notebook
display(Video(video_path, embed=True))
