# Cosmos Predict2 Full Pipeline on A100 (with uv)

This notebook runs both T5 encoding and Cosmos Predict2 inference on a single A100 GPU.
Uses `uv` package manager for faster and more reliable installation (NVIDIA recommended).

**Requirements:**
- Google Colab with A100 runtime
- 40GB GPU memory (or T4 with reduced settings)

**Note:** Make sure to select `Runtime > Change runtime type > A100 GPU` before running.

## 1. Install uv Package Manager

NVIDIA highly recommends using `uv` for Cosmos-Predict2 installation. It's much faster than pip and handles dependencies better.

In [None]:
# Install uv package manager (NVIDIA recommended)
!curl -LsSf https://astral.sh/uv/install.sh | sh

# Add uv to PATH for this session
import os
import sys
os.environ['PATH'] = f"{os.path.expanduser('~/.local/bin')}:{os.environ['PATH']}"
sys.path.insert(0, os.path.expanduser('~/.local/bin'))

# Verify uv installation
!uv --version
print("✅ uv package manager installed")

## 2. Create Python Environment and Install Cosmos-Predict2

In [None]:
# Set installation method
USE_GITHUB = False  # Set to True for GitHub source, False for PyPI (recommended)

# Create virtual environment with Python 3.10 (required by Cosmos)
# Note: In Colab, uv always uses system Python, so no --system flag needed
!uv venv --python 3.10 /content/cosmos_env
print("✅ Created Python 3.10 virtual environment")

# Activate the environment for subsequent commands
venv_python = "/content/cosmos_env/bin/python"
venv_pip = "/content/cosmos_env/bin/pip"
os.environ['VIRTUAL_ENV'] = '/content/cosmos_env'
os.environ['PATH'] = f"/content/cosmos_env/bin:{os.environ['PATH']}"

# Verify Python version
!$venv_python --version

print("📦 Installing Cosmos-Predict2 with uv...")

In [None]:
%%capture
if USE_GITHUB:
    # Clone and install from GitHub source
    !git clone https://github.com/nvidia-cosmos/cosmos-predict2.git /content/cosmos-predict2
    os.chdir('/content/cosmos-predict2')
    
    # Install with uv from source
    !uv pip install -e ".[cu126]" --extra-index-url https://nvidia-cosmos.github.io/cosmos-dependencies/cu126_torch260/simple
    
    COSMOS_PATH = '/content/cosmos-predict2'
    sys.path.insert(0, COSMOS_PATH)
    print("✅ Installed Cosmos-Predict2 from GitHub source")
else:
    # Install from PyPI with uv (recommended)
    !uv pip install -U "cosmos-predict2[cu126]" --extra-index-url https://nvidia-cosmos.github.io/cosmos-dependencies/cu126_torch260/simple
    
    COSMOS_PATH = None
    print("✅ Installed Cosmos-Predict2 from PyPI")

### Install Additional Dependencies

In [None]:
%%capture
# Install additional dependencies with uv (much faster than pip)
!uv pip install transformers accelerate bitsandbytes
!uv pip install decord einops "imageio[ffmpeg]"
!uv pip install opencv-python-headless pillow

print("✅ Additional dependencies installed with uv")

## 3. Verify Installation and GPU

In [None]:
# Use the virtual environment's Python
# Update sys.path to use the virtual environment
import sys
if '/content/cosmos_env/lib/python3.10/site-packages' not in sys.path:
    sys.path.insert(0, '/content/cosmos_env/lib/python3.10/site-packages')

import torch
from datetime import datetime

# Check GPU
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
    print(f"🖥️ GPU: {gpu_name} ({gpu_memory:.1f} GB)")
    
    # Check CUDA version
    print(f"🔧 CUDA Version: {torch.version.cuda}")
    print(f"🔧 PyTorch Version: {torch.__version__}")
else:
    print("❌ No GPU detected!")
    gpu_memory = 0
    gpu_name = "CPU"

# Verify we're using the right Python
import subprocess
result = subprocess.run(['which', 'python'], capture_output=True, text=True)
print(f"📍 Using Python from: {result.stdout.strip()}")

# Test Cosmos import
try:
    from cosmos_predict2.inference import Video2WorldPipeline
    print("✅ Cosmos Predict2 imports working correctly")
except ImportError as e:
    print(f"❌ Import error: {e}")
    print("Trying to fix path...")
    if USE_GITHUB and COSMOS_PATH:
        sys.path.insert(0, os.path.join(COSMOS_PATH, 'imaginaire'))

## 4. Mount Google Drive (Recommended)

Auto-save outputs to prevent data loss if session disconnects.

In [None]:
# Mount Google Drive for automatic saving
mount_drive = True  # Set to True to auto-save outputs

if mount_drive:
    from google.colab import drive
    drive.mount('/content/drive')
    print("✅ Google Drive mounted")
    
    # Create timestamped output directory
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    drive_output_dir = f"/content/drive/MyDrive/cosmos_outputs_{timestamp}"
    os.makedirs(drive_output_dir, exist_ok=True)
    
    print(f"📁 Output directory: {drive_output_dir}")
    print("💾 All outputs will be automatically saved to Drive")
    
    # Save session info
    with open(f"{drive_output_dir}/session_info.txt", "w") as f:
        f.write(f"Session started: {datetime.now()}\n")
        f.write(f"GPU: {gpu_name} ({gpu_memory:.1f} GB)\n")
        f.write(f"Installation method: {'GitHub' if USE_GITHUB else 'PyPI'}\n")
        f.write(f"Using uv package manager\n")
else:
    print("⚠️ WARNING: Drive not mounted - outputs may be lost!")
    drive_output_dir = None

## 5. Download Model Checkpoints

In [None]:
from huggingface_hub import snapshot_download

# Auto-select model size based on GPU
if gpu_memory >= 40:  # A100
    MODEL_SIZE = "14B"
    print("🚀 A100 detected - using largest model (14B)")
elif gpu_memory >= 24:  # A10G, 3090, 4090
    MODEL_SIZE = "5B"
    print("Using medium model (5B) for 24GB GPU")
elif gpu_memory >= 16:  # T4, V100
    MODEL_SIZE = "2B"
    print("Using small model (2B) for 16GB GPU")
else:
    MODEL_SIZE = "2B"
    print("⚠️ Limited GPU memory - using smallest model (2B)")

print(f"\n📥 Downloading Cosmos-Predict2-{MODEL_SIZE} checkpoint...")
print("(This may take 2-5 minutes)")

# Download with progress
checkpoint_dir = snapshot_download(
    repo_id=f"nvidia/Cosmos-Predict2-{MODEL_SIZE}-Video2World",
    cache_dir="/content/cosmos_checkpoints",
    resume_download=True
)

print(f"✅ Checkpoint downloaded: {checkpoint_dir}")

# Verify checkpoint files
import glob
model_files = glob.glob(f"{checkpoint_dir}/*.pt")
if model_files:
    print(f"Found model files: {[os.path.basename(f) for f in model_files]}")
else:
    print("⚠️ No .pt files found, checking subdirectories...")
    model_files = glob.glob(f"{checkpoint_dir}/**/*.pt", recursive=True)
    if model_files:
        print(f"Found: {model_files[0]}")

## 6. Initialize T5 Text Encoder

In [None]:
from transformers import T5EncoderModel, T5Tokenizer

class OptimizedT5Encoder:
    """Memory-efficient T5 encoder with FP16 and 8-bit options."""
    
    def __init__(self, model_name="google-t5/t5-11b"):
        self.model_name = model_name
        self.model = None
        self.tokenizer = None
        
    def load(self, use_fp16=True, use_8bit=False):
        print(f"📚 Loading T5 encoder: {self.model_name}")
        
        # Load tokenizer
        self.tokenizer = T5Tokenizer.from_pretrained(self.model_name)
        
        # Load model with optimizations
        if use_8bit:
            from transformers import BitsAndBytesConfig
            quantization_config = BitsAndBytesConfig(load_in_8bit=True)
            self.model = T5EncoderModel.from_pretrained(
                self.model_name,
                quantization_config=quantization_config,
                device_map="auto"
            )
            print("✅ Loaded in 8-bit quantized mode")
        else:
            self.model = T5EncoderModel.from_pretrained(self.model_name)
            if use_fp16:
                self.model = self.model.half()
                print("✅ Using FP16 precision")
            self.model = self.model.to("cuda")
        
        self.model.eval()
        print(f"✅ T5 encoder ready")
        
    def encode(self, text, max_length=77):
        inputs = self.tokenizer(
            text,
            return_tensors="pt",
            max_length=max_length,
            padding="max_length",
            truncation=True
        ).to("cuda")
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            
        return {
            "encoder_hidden_states": outputs.last_hidden_state,
            "attention_mask": inputs.attention_mask
        }
    
    def unload(self):
        if self.model:
            del self.model
            self.model = None
        if self.tokenizer:
            del self.tokenizer
            self.tokenizer = None
        torch.cuda.empty_cache()
        print("✅ T5 encoder unloaded")

In [None]:
# Auto-select T5 model based on GPU memory
if gpu_memory >= 40:  # A100
    t5_model = "google-t5/t5-11b"  # Best quality, ~22GB in FP16
    use_8bit = False
    print("Using T5-11B (best quality)")
elif gpu_memory >= 24:
    t5_model = "google-t5/t5-3b"  # Good balance
    use_8bit = False
    print("Using T5-3B (balanced)")
elif gpu_memory >= 16:
    t5_model = "google/flan-t5-xl"  # Efficient, ~3GB
    use_8bit = False
    print("Using Flan-T5-XL (efficient)")
else:
    t5_model = "google/flan-t5-base"  # Minimal, <1GB
    use_8bit = False
    print("Using Flan-T5-Base (minimal)")

# Initialize and load
t5_encoder = OptimizedT5Encoder(model_name=t5_model)
t5_encoder.load(use_fp16=True, use_8bit=use_8bit)

print(f"\n💾 GPU memory used: {torch.cuda.memory_allocated()/1024**3:.2f} GB")
print(f"💾 GPU memory free: {(gpu_memory - torch.cuda.memory_allocated()/1024**3):.2f} GB")

## 7. Encode Text Prompts

In [None]:
# Define prompts for robot manipulation
prompts = [
    "A robotic arm picks up white paper and places it into a red square target area on the table.",
    "High-definition video of SO-100 robot manipulating paper with precise movements.",
    "Robot gripper grasps paper and moves it to designated red square zone.",
    "Automated paper handling: robot transfers white sheet to red target area.",
    "The robot arm carefully picks up a sheet of paper from the table.",
]

print("📝 Encoding prompts with T5...")
encoded_prompts = {}

for i, prompt in enumerate(prompts, 1):
    print(f"  [{i}/{len(prompts)}] {prompt[:50]}...")
    encoded = t5_encoder.encode(prompt)
    encoded_prompts[prompt] = encoded["encoder_hidden_states"]

print(f"\n✅ Encoded {len(prompts)} prompts")
print(f"💾 Memory after encoding: {torch.cuda.memory_allocated()/1024**3:.2f} GB")

## 8. Load Cosmos Predict2 Pipeline

In [None]:
from cosmos_predict2.inference import (
    Video2WorldPipeline,
    get_cosmos_predict2_video2world_pipeline,
)

print(f"🚀 Loading Cosmos-Predict2-{MODEL_SIZE} pipeline...")

# Get pipeline config
config = get_cosmos_predict2_video2world_pipeline(model_size=MODEL_SIZE)

# Find the correct model file
model_path = None
for fps in ['16fps', '10fps']:
    potential_path = os.path.join(checkpoint_dir, f"model-720p-{fps}.pt")
    if os.path.exists(potential_path):
        model_path = potential_path
        print(f"Found model: {os.path.basename(model_path)}")
        break

if not model_path:
    # Search in subdirectories
    import glob
    pt_files = glob.glob(f"{checkpoint_dir}/**/*.pt", recursive=True)
    if pt_files:
        model_path = pt_files[0]
        print(f"Found model in subdirectory: {model_path}")

if model_path:
    config['dit_checkpoint_path'] = model_path
else:
    print("⚠️ Model file not found, using default path")

# Initialize pipeline
try:
    cosmos_pipe = Video2WorldPipeline.from_config(config)
    cosmos_pipe = cosmos_pipe.to("cuda")
    cosmos_pipe.eval()
    
    print(f"✅ Cosmos pipeline loaded successfully")
    print(f"💾 Total GPU memory used: {torch.cuda.memory_allocated()/1024**3:.2f} GB")
    
except Exception as e:
    print(f"❌ Error loading pipeline: {e}")
    print("\nTroubleshooting:")
    print("1. Try restarting runtime")
    print("2. Check GPU memory")
    print("3. Try smaller model size")
    raise

## 9. Create Input Image

In [None]:
import numpy as np
import cv2
from IPython.display import HTML, display, Image
from PIL import Image as PILImage
import base64

def create_test_image(output_path="test_input.jpg", width=1280, height=720):
    """Create a test image simulating a robot workspace."""
    img = np.zeros((height, width, 3), dtype=np.uint8)
    
    # Gradient background
    for y in range(height):
        img[y, :] = [100 + int(50 * y / height), 80, 60]
    
    # White paper
    paper_x, paper_y = width // 3, height // 2
    cv2.rectangle(img, (paper_x, paper_y), (paper_x + 200, paper_y + 150), 
                  (255, 255, 255), -1)
    cv2.rectangle(img, (paper_x, paper_y), (paper_x + 200, paper_y + 150), 
                  (200, 200, 200), 2)
    
    # Red target
    target_x = 2 * width // 3
    cv2.rectangle(img, (target_x, paper_y), (target_x + 150, paper_y + 150),
                  (50, 50, 200), -1)
    cv2.rectangle(img, (target_x, paper_y), (target_x + 150, paper_y + 150),
                  (30, 30, 150), 3)
    
    # Labels
    cv2.putText(img, "Paper", (paper_x + 70, paper_y - 10), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    cv2.putText(img, "Target", (target_x + 40, paper_y - 10), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    
    cv2.imwrite(output_path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
    return output_path

# Create or upload
use_test = True  # Set False to upload your own

if use_test:
    input_path = create_test_image()
    print("✅ Created test input image")
    img = PILImage.open(input_path)
    display(img)
else:
    from google.colab import files
    print("Upload an image:")
    uploaded = files.upload()
    input_path = list(uploaded.keys())[0]
    print(f"✅ Using: {input_path}")

## 10. Generate Video

In [None]:
import decord
from einops import rearrange
import time

def generate_video(input_path, prompt_embedding, num_frames=16, fps=16):
    """Generate video using Cosmos Predict2."""
    
    # Load input
    if input_path.endswith(('.jpg', '.jpeg', '.png')):
        img = PILImage.open(input_path)
        frames = np.array(img)[np.newaxis, ...]
    else:
        vr = decord.VideoReader(input_path)
        frames = vr[:1].asnumpy()
    
    # Prepare tensor
    frames_tensor = torch.from_numpy(frames).float() / 255.0
    frames_tensor = rearrange(frames_tensor, "t h w c -> 1 c t h w").to("cuda")
    
    print(f"🎬 Generating {num_frames} frames at {fps} FPS...")
    start = time.time()
    
    with torch.no_grad():
        with torch.cuda.amp.autocast():
            output = cosmos_pipe(
                frames_tensor,
                prompt_embedding,
                num_frames=num_frames,
                fps=fps,
                seed=42
            )
    
    elapsed = time.time() - start
    print(f"✅ Generated in {elapsed:.1f}s ({num_frames/elapsed:.1f} fps)")
    return output

# Configure based on GPU
if gpu_memory >= 40:
    gen_params = {"num_frames": 121, "fps": 16}  # 7.5s
elif gpu_memory >= 24:
    gen_params = {"num_frames": 61, "fps": 16}  # 3.8s
else:
    gen_params = {"num_frames": 16, "fps": 16}  # 1s

print(f"Settings: {gen_params}")

# Generate
selected_prompt = prompts[0]
print(f"\nPrompt: {selected_prompt[:60]}...")

output_video = generate_video(
    input_path,
    encoded_prompts[selected_prompt],
    **gen_params
)

## 11. Save and Display Results

In [None]:
import imageio
import shutil

def save_video(tensor, path="output.mp4", fps=16, backup=True):
    """Save video with optional Drive backup."""
    # Convert to numpy
    if isinstance(tensor, torch.Tensor):
        video = tensor.cpu().numpy()
    else:
        video = tensor
    
    # Fix dimensions
    if video.ndim == 5:
        video = video[0]
    if video.shape[0] == 3:
        video = np.transpose(video, (1, 2, 3, 0))
    
    # Normalize
    if video.max() <= 1.0:
        video = (video * 255).astype(np.uint8)
    
    # Save
    writer = imageio.get_writer(path, fps=fps)
    for frame in video:
        writer.append_data(frame)
    writer.close()
    print(f"✅ Saved: {path}")
    
    # Backup to Drive
    if backup and drive_output_dir:
        drive_path = os.path.join(drive_output_dir, os.path.basename(path))
        shutil.copy2(path, drive_path)
        print(f"☁️ Backed up to Drive")
        
        # Save metadata
        meta_path = drive_path.replace('.mp4', '_info.txt')
        with open(meta_path, 'w') as f:
            f.write(f"Prompt: {selected_prompt}\n")
            f.write(f"Model: Cosmos-{MODEL_SIZE}\n")
            f.write(f"Frames: {gen_params['num_frames']}\n")
            f.write(f"FPS: {gen_params['fps']}\n")
            f.write(f"Time: {datetime.now()}\n")
    
    return path

def display_video(path):
    """Display video in notebook."""
    with open(path, 'rb') as f:
        video = f.read()
    encoded = base64.b64encode(video).decode('ascii')
    display(HTML(f'''
    <video width="640" height="360" controls autoplay loop>
        <source src="data:video/mp4;base64,{encoded}" type="video/mp4">
    </video>
    '''))

# Save and display
output_file = f"cosmos_{datetime.now().strftime('%H%M%S')}.mp4"
output_path = save_video(output_video, output_file, fps=16)

print("\n🎥 Generated video:")
display_video(output_path)

# Optional download
from google.colab import files
if input("\nDownload? (y/n): ").lower() == 'y':
    files.download(output_path)

## 12. Batch Processing (Optional)

In [None]:
# Process all prompts
batch_process = True

if batch_process:
    results = {}
    print(f"🎬 Processing {len(prompts)} prompts...\n")
    
    for i, prompt in enumerate(prompts):
        print(f"[{i+1}/{len(prompts)}] {prompt[:50]}...")
        
        try:
            # Generate
            output = generate_video(
                input_path,
                encoded_prompts[prompt],
                **gen_params
            )
            
            # Save
            filename = f"batch_{i:02d}_{datetime.now().strftime('%H%M%S')}.mp4"
            save_video(output, filename, fps=16)
            results[prompt] = filename
            
            # Clear memory
            torch.cuda.empty_cache()
            
        except Exception as e:
            print(f"  ❌ Failed: {e}")
    
    print(f"\n✅ Complete: {len(results)}/{len(prompts)} succeeded")
    
    # Save summary
    if drive_output_dir:
        summary_path = f"{drive_output_dir}/batch_summary.txt"
        with open(summary_path, 'w') as f:
            f.write(f"Batch Processing Summary\n")
            f.write(f"========================\n")
            f.write(f"Total: {len(prompts)}\n")
            f.write(f"Success: {len(results)}\n\n")
            for p, f in results.items():
                f.write(f"{p}\n  -> {f}\n\n")
        print(f"📝 Summary saved to Drive")

## 13. Memory Management

In [None]:
# Session status
print("📊 Session Status")
print("="*40)
print(f"GPU: {gpu_name}")
print(f"Total Memory: {gpu_memory:.1f} GB")
print(f"Allocated: {torch.cuda.memory_allocated()/1024**3:.2f} GB")
print(f"Reserved: {torch.cuda.memory_reserved()/1024**3:.2f} GB")
print(f"Free: {(gpu_memory - torch.cuda.memory_allocated()/1024**3):.2f} GB")

if drive_output_dir:
    print(f"\n✅ Outputs saved to Drive:")
    print(f"   {drive_output_dir}")
    
    # List saved files
    import glob
    saved_videos = glob.glob(f"{drive_output_dir}/*.mp4")
    if saved_videos:
        print(f"\n📹 Saved {len(saved_videos)} videos:")
        for v in saved_videos[:5]:  # Show first 5
            print(f"   - {os.path.basename(v)}")
        if len(saved_videos) > 5:
            print(f"   ... and {len(saved_videos)-5} more")

# Cleanup option
cleanup = False  # Set True to free memory

if cleanup:
    print("\n🧹 Cleaning up...")
    
    if 't5_encoder' in locals():
        t5_encoder.unload()
        del t5_encoder
    
    if 'cosmos_pipe' in locals():
        del cosmos_pipe
    
    import gc
    gc.collect()
    torch.cuda.empty_cache()
    
    print("✅ Memory freed")
    print(f"Allocated: {torch.cuda.memory_allocated()/1024**3:.2f} GB")

## Tips & Troubleshooting

### Why use `uv`?
- **3-10x faster** than pip for installing packages
- **Better dependency resolution** - avoids conflicts
- **NVIDIA recommended** for Cosmos-Predict2
- **Consistent environments** - always uses Python 3.10 as required

### GPU Memory Guide:
| GPU | Memory | T5 Model | Cosmos Model | Max Frames |
|-----|--------|----------|--------------|------------|
| A100 | 40GB | T5-11B | Cosmos-14B | 121 |
| A10G | 24GB | T5-3B | Cosmos-5B | 61 |
| T4 | 16GB | Flan-T5-XL | Cosmos-2B | 16-30 |

### Common Issues:

1. **Import Error**: Restart runtime after installation
2. **OOM Error**: Reduce `num_frames` or use smaller models
3. **Slow Generation**: Normal speeds are 1-5 fps for generation
4. **uv not found**: Re-run the uv installation cell

### Performance Tips:
- Use FP16 precision (default)
- Clear cache between batches
- Mount Drive to prevent data loss
- Use A100 for best performance

### Recovery:
If session disconnects but Drive was mounted, your videos are safe in:
`/content/drive/MyDrive/cosmos_outputs_[timestamp]/`