In [None]:
# ControlNet Âü∫Á§éÂØ¶‰Ωú - Canny/Depth/OpenPose
# Stage 2 | 40_conditioning/nb-sd-controlnet-basics.ipynb

# %% [1] Shared Cache Bootstrap
import os, pathlib, torch
import sys
from datetime import datetime

# Shared cache configuration (Ë§áË£ΩÂà∞ÊØèÊú¨ notebook)
AI_CACHE_ROOT = os.getenv("AI_CACHE_ROOT", "../ai_warehouse/cache")

for k, v in {
    "HF_HOME": f"{AI_CACHE_ROOT}/hf",
    "TRANSFORMERS_CACHE": f"{AI_CACHE_ROOT}/hf/transformers",
    "HF_DATASETS_CACHE": f"{AI_CACHE_ROOT}/hf/datasets",
    "HUGGINGFACE_HUB_CACHE": f"{AI_CACHE_ROOT}/hf/hub",
    "TORCH_HOME": f"{AI_CACHE_ROOT}/torch",
}.items():
    os.environ[k] = v
    pathlib.Path(v).mkdir(parents=True, exist_ok=True)
print("[Cache]", AI_CACHE_ROOT, "| GPU:", torch.cuda.is_available())

In [None]:
# %% Cell 2: Dependencies Installation & Imports
# Install required packages (run once)
# !pip install diffusers[torch] transformers accelerate xformers controlnet-aux opencv-python pillow

import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
from typing import List, Tuple, Optional, Union
import warnings

warnings.filterwarnings("ignore")

# Core ML libraries
import torch
from diffusers import (
    StableDiffusionControlNetPipeline,
    ControlNetModel,
    UniPCMultistepScheduler,
)
from controlnet_aux import CannyDetector, MidasDetector, OpenposeDetector

print(
    f"üîß Torch: {torch.__version__} | Device: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'}"
)

In [None]:
# %% Cell 3: ControlNet Pipeline Setup (SD1.5 + 3 ControlNets)


def setup_controlnet_pipeline(
    base_model: str = "runwayml/stable-diffusion-v1-5",
    device: str = "cuda",
    enable_memory_efficient: bool = True,
) -> dict:
    """
    Setup ControlNet pipelines for Canny, Depth, and OpenPose
    Returns dict with separate pipelines for memory efficiency
    """

    # ControlNet model IDs
    controlnet_models = {
        "canny": "lllyasviel/sd-controlnet-canny",
        "depth": "lllyasviel/sd-controlnet-depth",
        "openpose": "lllyasviel/sd-controlnet-openpose",
    }

    pipelines = {}

    for control_type, model_id in controlnet_models.items():
        print(f"Loading {control_type} ControlNet...")

        # Load ControlNet model
        controlnet = ControlNetModel.from_pretrained(
            model_id, torch_dtype=torch.float16, use_safetensors=True
        )

        # Create pipeline
        pipe = StableDiffusionControlNetPipeline.from_pretrained(
            base_model,
            controlnet=controlnet,
            torch_dtype=torch.float16,
            safety_checker=None,
            requires_safety_checker=False,
        )

        # Memory optimizations
        if enable_memory_efficient:
            pipe.enable_model_cpu_offload()  # Offload to CPU when not in use
            pipe.enable_attention_slicing()  # Reduce attention memory
            if hasattr(pipe, "enable_xformers_memory_efficient_attention"):
                pipe.enable_xformers_memory_efficient_attention()

        # Faster scheduler
        pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config)

        pipelines[control_type] = pipe

        # Clear VRAM between loads
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            gc.collect()

    print(f"‚úÖ Loaded {len(pipelines)} ControlNet pipelines")
    return pipelines


# Setup preprocessors
def setup_preprocessors():
    """Initialize ControlNet preprocessors"""
    return {
        "canny": CannyDetector(),
        "depth": MidasDetector.from_pretrained("lllyasviel/Annotators"),
        "openpose": OpenposeDetector.from_pretrained("lllyasviel/Annotators"),
    }


# Load pipelines and preprocessors
print("üöÄ Setting up ControlNet pipelines...")
cn_pipelines = setup_controlnet_pipeline()
cn_preprocessors = setup_preprocessors()

In [None]:
# %% Cell 4: Image Preprocessing Functions


def preprocess_canny(
    image: Image.Image, low_threshold: int = 100, high_threshold: int = 200
) -> Image.Image:
    """Extract Canny edges from input image"""
    canny_image = cn_preprocessors["canny"](image, low_threshold, high_threshold)
    return canny_image


def preprocess_depth(image: Image.Image) -> Image.Image:
    """Extract depth map from input image using MiDaS"""
    depth_image = cn_preprocessors["depth"](image)
    return depth_image


def preprocess_openpose(image: Image.Image) -> Image.Image:
    """Extract OpenPose keypoints from input image"""
    pose_image = cn_preprocessors["openpose"](image)
    return pose_image


def create_preprocessing_comparison(input_image: Image.Image) -> Image.Image:
    """Create side-by-side comparison of all preprocessing methods"""

    # Process with all methods
    canny_img = preprocess_canny(input_image)
    depth_img = preprocess_depth(input_image)
    pose_img = preprocess_openpose(input_image)

    # Create comparison grid
    fig, axes = plt.subplots(1, 4, figsize=(16, 4))
    axes[0].imshow(input_image)
    axes[0].set_title("Original", fontsize=12)
    axes[0].axis("off")

    axes[1].imshow(canny_img, cmap="gray")
    axes[1].set_title("Canny Edges", fontsize=12)
    axes[1].axis("off")

    axes[2].imshow(depth_img)
    axes[2].set_title("Depth Map", fontsize=12)
    axes[2].axis("off")

    axes[3].imshow(pose_img)
    axes[3].set_title("OpenPose", fontsize=12)
    axes[3].axis("off")

    plt.tight_layout()
    plt.show()

    return canny_img, depth_img, pose_img

In [None]:
# %% Cell 5: MVP Example - Single ControlNet (Canny)


def generate_with_controlnet(
    pipeline_dict: dict,
    control_type: str,
    prompt: str,
    control_image: Image.Image,
    negative_prompt: str = "blurry, distorted, low quality",
    num_inference_steps: int = 20,
    guidance_scale: float = 7.5,
    controlnet_conditioning_scale: float = 1.0,
    seed: int = 42,
) -> Image.Image:
    """
    Generate image with specific ControlNet type
    """

    pipe = pipeline_dict[control_type]
    generator = torch.manual_seed(seed)

    # Adjust steps for smoke mode
    if SMOKE_MODE:
        num_inference_steps = min(num_inference_steps, 4)
        print(f"üî• SMOKE_MODE: Using {num_inference_steps} steps")

    try:
        result = pipe(
            prompt=prompt,
            image=control_image,
            negative_prompt=negative_prompt,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale,
            controlnet_conditioning_scale=controlnet_conditioning_scale,
            generator=generator,
        )

        return result.images[0]

    except torch.cuda.OutOfMemoryError:
        print("üö® CUDA OOM! Trying CPU offload...")
        pipe.enable_model_cpu_offload()
        torch.cuda.empty_cache()
        gc.collect()

        result = pipe(
            prompt=prompt,
            image=control_image,
            negative_prompt=negative_prompt,
            num_inference_steps=num_inference_steps // 2,  # Reduce steps
            guidance_scale=guidance_scale,
            controlnet_conditioning_scale=controlnet_conditioning_scale,
            generator=generator,
        )
        return result.images[0]


# MVP Example with sample image
def create_sample_image() -> Image.Image:
    """Create a simple test image for demonstration"""
    img = Image.new("RGB", (512, 512), "white")
    draw = ImageDraw.Draw(img)

    # Draw simple shapes for testing
    draw.rectangle([100, 100, 400, 300], outline="black", width=3)
    draw.ellipse([200, 150, 300, 250], outline="blue", width=2)
    draw.line([50, 50, 450, 450], fill="red", width=2)

    return img


# Run MVP example
print("üé® Running MVP example with Canny ControlNet...")
sample_img = create_sample_image()
canny_control = preprocess_canny(sample_img)

mvp_result = generate_with_controlnet(
    cn_pipelines,
    "canny",
    prompt="a beautiful landscape painting, oil on canvas, detailed",
    control_image=canny_control,
    seed=42,
)

# Display results
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].imshow(sample_img)
axes[0].set_title("Original")
axes[0].axis("off")

axes[1].imshow(canny_control, cmap="gray")
axes[1].set_title("Canny Control")
axes[1].axis("off")

axes[2].imshow(mvp_result)
axes[2].set_title("Generated Result")
axes[2].axis("off")

plt.tight_layout()
plt.show()

In [None]:
# %% Cell 6: Multi-ControlNet Comparison Grid


def compare_all_controlnets(
    input_image: Image.Image, prompt: str, seed: int = 42
) -> dict:
    """
    Generate images with all three ControlNet types for comparison
    """

    # Preprocess input image for all control types
    control_images = {
        "canny": preprocess_canny(input_image),
        "depth": preprocess_depth(input_image),
        "openpose": preprocess_openpose(input_image),
    }

    results = {}

    for control_type, control_image in control_images.items():
        print(f"Generating with {control_type}...")

        try:
            result = generate_with_controlnet(
                cn_pipelines,
                control_type,
                prompt=prompt,
                control_image=control_image,
                seed=seed,
                num_inference_steps=12 if not SMOKE_MODE else 4,
            )
            results[control_type] = {"control": control_image, "generated": result}

            # Clear VRAM between generations
            torch.cuda.empty_cache()
            gc.collect()

        except Exception as e:
            print(f"‚ùå Failed to generate with {control_type}: {e}")
            results[control_type] = None

    return results


# Run comparison
print("üîÑ Comparing all ControlNet types...")
comparison_prompt = "a cyberpunk street scene, neon lights, futuristic architecture"

comparison_results = compare_all_controlnets(sample_img, comparison_prompt, seed=42)

# Visualize comparison
fig, axes = plt.subplots(3, 3, figsize=(15, 15))
control_types = ["canny", "depth", "openpose"]

for i, control_type in enumerate(control_types):
    if comparison_results[control_type]:
        # Original input
        axes[i, 0].imshow(sample_img)
        axes[i, 0].set_title(f"Input Image")
        axes[i, 0].axis("off")

        # Control image
        control_img = comparison_results[control_type]["control"]
        if control_type == "canny":
            axes[i, 1].imshow(control_img, cmap="gray")
        else:
            axes[i, 1].imshow(control_img)
        axes[i, 1].set_title(f"{control_type.title()} Control")
        axes[i, 1].axis("off")

        # Generated result
        axes[i, 2].imshow(comparison_results[control_type]["generated"])
        axes[i, 2].set_title(f"Generated ({control_type})")
        axes[i, 2].axis("off")
    else:
        for j in range(3):
            axes[i, j].text(
                0.5,
                0.5,
                f"{control_type} failed",
                ha="center",
                va="center",
                transform=axes[i, j].transAxes,
            )
            axes[i, j].axis("off")

plt.tight_layout()
plt.show()

In [None]:
# %% Cell 7: Conditioning Scale Experiments


def test_conditioning_scales(
    pipeline_dict: dict,
    control_type: str,
    prompt: str,
    control_image: Image.Image,
    scales: List[float] = [0.5, 1.0, 1.5, 2.0],
    seed: int = 42,
) -> dict:
    """
    Test different controlnet_conditioning_scale values
    """

    results = {}

    for scale in scales:
        print(f"Testing conditioning scale: {scale}")

        try:
            result = generate_with_controlnet(
                pipeline_dict,
                control_type,
                prompt=prompt,
                control_image=control_image,
                controlnet_conditioning_scale=scale,
                seed=seed,
                num_inference_steps=8 if not SMOKE_MODE else 3,
            )
            results[scale] = result

        except Exception as e:
            print(f"‚ùå Failed at scale {scale}: {e}")
            results[scale] = None

        # Clear memory
        torch.cuda.empty_cache()

    return results


# Test conditioning scales with Canny
print("üìä Testing conditioning scale effects...")
scale_results = test_conditioning_scales(
    cn_pipelines,
    "canny",
    "a watercolor painting of a mountain landscape",
    canny_control,
    scales=[0.5, 1.0, 1.5, 2.0],
)

# Visualize scale effects
fig, axes = plt.subplots(1, len(scale_results), figsize=(20, 5))
for i, (scale, result) in enumerate(scale_results.items()):
    if result:
        axes[i].imshow(result)
        axes[i].set_title(f"Scale: {scale}")
        axes[i].axis("off")
    else:
        axes[i].text(
            0.5,
            0.5,
            f"Failed\nScale: {scale}",
            ha="center",
            va="center",
            transform=axes[i].transAxes,
        )
        axes[i].axis("off")

plt.tight_layout()
plt.show()

In [None]:
# %% Cell 8: Memory Optimization & Error Handling


def optimize_pipeline_memory(pipeline_dict: dict):
    """Apply aggressive memory optimizations for low-VRAM systems"""

    for name, pipe in pipeline_dict.items():
        print(f"Optimizing {name} pipeline...")

        # Enable all memory optimizations
        pipe.enable_model_cpu_offload()
        pipe.enable_attention_slicing("max")

        # Enable sequential CPU offload for extreme memory savings
        try:
            pipe.enable_sequential_cpu_offload()
            print(f"‚úÖ {name}: Sequential CPU offload enabled")
        except:
            print(f"‚ö†Ô∏è {name}: Sequential CPU offload not available")

        # VAE slicing for large images
        if hasattr(pipe.vae, "enable_slicing"):
            pipe.vae.enable_slicing()
            print(f"‚úÖ {name}: VAE slicing enabled")


def get_memory_usage():
    """Get current GPU memory usage"""
    if torch.cuda.is_available():
        allocated = torch.cuda.memory_allocated() / 1e9
        cached = torch.cuda.memory_reserved() / 1e9
        total = torch.cuda.get_device_properties(0).total_memory / 1e9
        print(
            f"GPU Memory: {allocated:.1f}GB allocated, {cached:.1f}GB cached, {total:.1f}GB total"
        )
        return allocated, cached, total
    return 0, 0, 0


# Apply optimizations
print("üîß Applying memory optimizations...")
optimize_pipeline_memory(cn_pipelines)
get_memory_usage()

In [None]:
# %% Cell 9: Batch Processing Function


def batch_controlnet_generation(
    pipeline_dict: dict,
    prompts: List[str],
    control_images: List[Image.Image],
    control_type: str,
    batch_size: int = 1,
    **generation_kwargs,
) -> List[Image.Image]:
    """
    Process multiple prompts/images in batches to manage memory
    """

    results = []
    pipe = pipeline_dict[control_type]

    for i in range(0, len(prompts), batch_size):
        batch_prompts = prompts[i : i + batch_size]
        batch_images = control_images[i : i + batch_size]

        print(
            f"Processing batch {i//batch_size + 1}/{(len(prompts)-1)//batch_size + 1}"
        )

        for prompt, control_img in zip(batch_prompts, batch_images):
            try:
                result = generate_with_controlnet(
                    pipeline_dict,
                    control_type,
                    prompt=prompt,
                    control_image=control_img,
                    **generation_kwargs,
                )
                results.append(result)

            except Exception as e:
                print(f"‚ùå Batch processing error: {e}")
                # Create placeholder for failed generation
                placeholder = Image.new("RGB", (512, 512), "red")
                results.append(placeholder)

            # Clear memory after each generation
            torch.cuda.empty_cache()
            gc.collect()

    return results


# Example batch processing
batch_prompts = [
    "a serene lake at sunset",
    "a bustling city street",
    "a magical forest with glowing trees",
]
batch_controls = [canny_control] * len(batch_prompts)

if not SMOKE_MODE:  # Skip in smoke mode to save time
    print("üì¶ Running batch processing example...")
    batch_results = batch_controlnet_generation(
        cn_pipelines,
        batch_prompts,
        batch_controls,
        "canny",
        num_inference_steps=8,
        seed=42,
    )

    # Display batch results
    fig, axes = plt.subplots(1, len(batch_results), figsize=(15, 5))
    for i, (result, prompt) in enumerate(zip(batch_results, batch_prompts)):
        axes[i].imshow(result)
        axes[i].set_title(prompt[:20] + "...")
        axes[i].axis("off")
    plt.tight_layout()
    plt.show()

In [None]:
# %% Cell 10: Smoke Test (SMOKE_MODE compatible)


def smoke_test_controlnet():
    """Quick smoke test for CI/CD pipeline"""

    print("üî• Running ControlNet smoke test...")

    # Create minimal test image
    test_img = Image.new("RGB", (256, 256), "white")
    draw = ImageDraw.Draw(test_img)
    draw.rectangle([50, 50, 200, 200], outline="black", width=2)

    # Test single ControlNet
    canny_test = preprocess_canny(test_img)

    smoke_result = generate_with_controlnet(
        cn_pipelines,
        "canny",
        prompt="simple test image",
        control_image=canny_test,
        num_inference_steps=2,  # Minimal steps
        seed=42,
    )

    # Verify result
    assert isinstance(smoke_result, Image.Image), "Generation failed"
    assert smoke_result.size == (512, 512), "Wrong output size"

    print("‚úÖ Smoke test passed!")
    return True


# Run smoke test
try:
    smoke_test_controlnet()
    print("üéâ All systems operational!")
except Exception as e:
    print(f"üí• Smoke test failed: {e}")

In [None]:
# %% Cell 11: Results Analysis & Best Practices


def analyze_controlnet_results():
    """Analyze and document ControlNet behavior patterns"""

    analysis = {
        "canny": {
            "strengths": [
                "Sharp edge preservation",
                "Architectural details",
                "Line art conversion",
            ],
            "weaknesses": ["May miss subtle textures", "Sensitive to noise"],
            "best_for": ["Buildings", "Drawings", "Technical illustrations"],
            "optimal_scale": "1.0-1.5",
        },
        "depth": {
            "strengths": [
                "3D structure preservation",
                "Spatial relationships",
                "Composition control",
            ],
            "weaknesses": ["May flatten textures", "Less detail preservation"],
            "best_for": ["Landscapes", "Portraits", "3D scenes"],
            "optimal_scale": "0.8-1.2",
        },
        "openpose": {
            "strengths": [
                "Human pose accuracy",
                "Animation consistency",
                "Character control",
            ],
            "weaknesses": ["Human-only", "Requires clear poses"],
            "best_for": ["Human figures", "Animation", "Character art"],
            "optimal_scale": "1.0-2.0",
        },
    }

    print("üìã ControlNet Analysis Summary:")
    print("=" * 50)

    for control_type, info in analysis.items():
        print(f"\nüéØ {control_type.upper()}:")
        print(f"   Best for: {', '.join(info['best_for'])}")
        print(f"   Optimal scale: {info['optimal_scale']}")
        print(f"   Strengths: {', '.join(info['strengths'])}")
        print(f"   Considerations: {', '.join(info['weaknesses'])}")

    return analysis


# Memory optimization tips
def print_optimization_tips():
    """Print memory optimization recommendations"""

    tips = [
        "üîß Use torch.float16 for all models",
        "üîÑ Enable model_cpu_offload() for <8GB VRAM",
        "‚úÇÔ∏è Enable attention_slicing('max') always",
        "üß† Clear torch.cuda.empty_cache() between generations",
        "üì¶ Process in batches of 1 for low memory",
        "‚ö° Use UniPC scheduler for faster inference",
        "üéØ Tune conditioning_scale: 0.8-1.5 usually optimal",
        "üñºÔ∏è Resize images to 512x512 for best speed/quality balance",
    ]

    print("\nüí° Memory Optimization Tips:")
    print("=" * 40)
    for tip in tips:
        print(f"  {tip}")


# Run analysis
analysis_results = analyze_controlnet_results()
print_optimization_tips()

In [None]:
# %% Cell 12: Stage Summary & Next Steps


def stage_summary():
    """Summarize completed learning objectives and next steps"""

    completed = [
        "‚úÖ Successfully loaded 3 ControlNet pipelines (Canny/Depth/OpenPose)",
        "‚úÖ Implemented preprocessing functions for all control types",
        "‚úÖ Tested conditioning scale effects (0.5-2.0 range)",
        "‚úÖ Applied memory optimizations for 8GB VRAM compatibility",
        "‚úÖ Created batch processing pipeline",
        "‚úÖ Established error handling and fallback strategies",
    ]

    key_concepts = [
        "üß† ControlNet = Condition + Diffusion guidance",
        "üéõÔ∏è conditioning_scale controls strength (0.5-2.0 typical)",
        "üñºÔ∏è Different preprocessors for different control types",
        "üíæ Memory management crucial for consumer GPUs",
        "üîÑ Sequential generation better than batch for low VRAM",
    ]

    pitfalls = [
        "‚ö†Ô∏è High conditioning_scale (>2.0) may overpower prompt",
        "‚ö†Ô∏è Canny sensitive to noise - may need preprocessing",
        "‚ö†Ô∏è OpenPose only works with clear human poses",
        "‚ö†Ô∏è Always test memory optimizations on target hardware",
        "‚ö†Ô∏è Preprocessing quality directly affects generation quality",
    ]

    next_steps = [
        "üîú T2I-Adapter implementation (nb-cond-t2iadapter.ipynb)",
        "üîú IP-Adapter for style reference (nb-sd-ipadapter-style.ipynb)",
        "üîú Multi-ControlNet combinations and blending",
        "üîú Custom ControlNet training for specialized use cases",
        "üîú Integration with batch pipeline in Stage 4",
    ]

    print("üìä STAGE 2 - ControlNet Basics COMPLETED")
    print("=" * 50)

    print("\n‚úÖ Completed Objectives:")
    for item in completed:
        print(f"  {item}")

    print("\nüß† Key Concepts Learned:")
    for concept in key_concepts:
        print(f"  {concept}")

    print("\n‚ö†Ô∏è Common Pitfalls to Avoid:")
    for pitfall in pitfalls:
        print(f"  {pitfall}")

    print("\nüîú Next Steps (Stage 2 Continuation):")
    for step in next_steps:
        print(f"  {step}")

    # Save configuration for reproducibility
    config = {
        "base_model": "runwayml/stable-diffusion-v1-5",
        "controlnet_models": {
            "canny": "lllyasviel/sd-controlnet-canny",
            "depth": "lllyasviel/sd-controlnet-depth",
            "openpose": "lllyasviel/sd-controlnet-openpose",
        },
        "default_params": {
            "num_inference_steps": 20,
            "guidance_scale": 7.5,
            "controlnet_conditioning_scale": 1.0,
        },
        "memory_optimizations": [
            "model_cpu_offload",
            "attention_slicing",
            "torch.float16",
            "sequential_cpu_offload",
        ],
    }

    import json

    with open("controlnet_config.json", "w") as f:
        json.dump(config, f, indent=2)

    print(f"\nüíæ Configuration saved to: controlnet_config.json")


# Run summary
stage_summary()


# Final memory cleanup
def cleanup_resources():
    """Clean up GPU memory and resources"""
    for name, pipe in cn_pipelines.items():
        try:
            pipe.to("cpu")
            del pipe
        except:
            pass

    torch.cuda.empty_cache()
    gc.collect()

    print("üßπ Resources cleaned up")


if not SMOKE_MODE:  # Keep loaded for interactive use
    print("\nüîÑ Pipelines ready for continued experimentation")
    print("üí° Call cleanup_resources() when finished to free VRAM")
else:
    cleanup_resources()