# Local Manga AI

This notebook launches a fully-local pipeline: **Qwen2.5 → SDXL → Page Composer** and can optionally expose a public HTTPS URL via **Cloudflare Tunnel**.

Model folders:
- `models/qwen2.5/`
- `models/sdxl/`

In [None]:
import os

# Runtime settings (run this BEFORE loading any torch/diffusers models)
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")

print("PYTORCH_CUDA_ALLOC_CONF=", os.environ.get("PYTORCH_CUDA_ALLOC_CONF"))

In [None]:
import sys
from pathlib import Path

# Detect paths
CWD = Path.cwd().resolve()
ROOT = CWD

# If we opened this notebook from inside ./manga_ai, ROOT is already the package folder.
# If we opened it from repo root, ROOT needs to be ./manga_ai.
if not ((ROOT / "models").exists() and (ROOT / "scripts").exists()):
    if (ROOT / "manga_ai" / "models").exists() and (ROOT / "manga_ai" / "scripts").exists():
        ROOT = (ROOT / "manga_ai").resolve()

REPO_ROOT = ROOT.parent

# Make sure we can `import manga_ai` regardless of where Jupyter was launched from.
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

print("CWD:", CWD)
print("ROOT (manga_ai folder):", ROOT)
print("REPO_ROOT:", REPO_ROOT)
print("Python:", sys.version)


In [None]:
import os

from manga_ai.scripts.model_downloader import ensure_models_downloaded
from manga_ai.scripts.pipeline import default_model_paths

paths = default_model_paths(ROOT)
print("Qwen dir:", paths.qwen_dir)
print("SDXL dir:", paths.sdxl_dir)

# This will download models into the folders above if they are missing.
# For gated HuggingFace models, set: MANGA_AI_HF_TOKEN
try:
    ensure_models_downloaded(
        qwen_dir=paths.qwen_dir,
        sdxl_dir=paths.sdxl_dir,
        hf_token=os.environ.get("MANGA_AI_HF_TOKEN"),
    )
    print("Models are present (downloaded if needed).")
except Exception as e:
    raise RuntimeError(
        "Model download/setup failed. If SDXL/Qwen are gated, accept the license on HuggingFace and set MANGA_AI_HF_TOKEN. "
        f"Original error: {e}"
    )

In [None]:
from pathlib import Path

# Quick sanity checks (doesn't load the full ML pipelines)
assert (paths.qwen_dir / "config.json").exists(), "Qwen config.json not found"
assert (paths.sdxl_dir / "model_index.json").exists(), "SDXL model_index.json not found"

print("Sanity checks passed.")

## Launch Web UI + Cloudflare Public URL

Running the next cell will:
- Start Gradio locally on port `7860`
- Start a Cloudflare tunnel and print a public `https://...trycloudflare.com` URL


In [None]:
import os
import socket
import tempfile
import traceback
from pathlib import Path

import gradio as gr
import torch
from diffusers import (DDIMScheduler, DPMSolverMultistepScheduler, EulerAncestralDiscreteScheduler,
                       EulerDiscreteScheduler, LMSDiscreteScheduler, PNDMScheduler,
                       StableDiffusionXLImg2ImgPipeline, StableDiffusionXLPipeline, UniPCMultistepScheduler)
from PIL import Image

from manga_ai.scripts.cloudflare_tunnel import start_tunnel

HOST = "127.0.0.1"

# Pick a free port
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((HOST, 0))
    PORT = int(s.getsockname()[1])

MODEL_DIR = Path(ROOT) / "models" / "sdxl"  # Animagine XL 4.0 is stored here

print(f"=== MODEL DIRECTORY ===")
print(f"Looking for Animagine XL 4.0 at: {MODEL_DIR}")
print(f"Directory exists: {MODEL_DIR.exists()}")
if MODEL_DIR.exists():
    files = list(MODEL_DIR.glob("*"))[:10]  # Show first 10 files
    print(f"Files in model dir: {[f.name for f in files]}")
else:
    print("ERROR: Model directory not found!")
print("=" * 50)

dtype = torch.float16
_device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"=== DEVICE INFO ===")
print(f"Using device: {_device}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name()}")
    print(f"CUDA memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
print("=" * 50)

_pipe = None
_img2img = None

def _create_dpmpp_2m_sde_config():
    """Create the exact DPM++ 2M SDE configuration."""
    print("\n=== DPM++ 2M SDE CONFIGURATION ===")
    config = {
        # Core DPM++ 2M settings
        "algorithm_type": "dpmsolver++",
        "solver_type": "midpoint",  # This makes it "2M" (multistep)
        
        # SDE (stochastic differential equation) settings
        "variance_type": "fixed_small",  # SDE style (Euler a-like)
        "prediction_type": "epsilon",
        
        # Karras sigmas for better quality
        "use_karras_sigmas": True,
        
        # Additional DPM++ 2M SDE parameters
        "thresholding": False,
        "dynamic_thresholding_ratio": 0.995,
        "sample_max_value": 1.0,
        "lower_order_final": True,
        "lambda_min_clipped": -float("inf"),
        
        # SDE specific noise handling
        "noise_offset": 0,
        "rho": 7.0,  # Karras rho parameter
    }
    
    print("DPM++ 2M SDE Configuration:")
    for key, value in config.items():
        print(f"  {key}: {value}")
    print("=" * 50)
    
    return config

def _make_dpmpp_2m_sde_scheduler():
    """Create DPM++ 2M SDE scheduler with comprehensive verification."""
    print("\n=== CREATING DPM++ 2M SDE SCHEDULER ===")
    print("Attempting to create DPM++ 2M SDE (Euler a style)...")
    
    # Get the exact configuration
    config = _create_dpmpp_2m_sde_config()
    
    try:
        # Method 1: Try to create from config directly
        print("Method 1: Creating from config...")
        scheduler = DPMSolverMultistepScheduler.from_config(config)
        print("✓ SUCCESS: DPM++ 2M SDE created from config")
        return scheduler
        
    except Exception as e:
        print(f"✗ Method 1 failed: {str(e)[:200]}")
        
        try:
            # Method 2: Try to load from model directory then override config
            print("Method 2: Loading from model directory...")
            scheduler = DPMSolverMultistepScheduler.from_pretrained(
                MODEL_DIR,
                subfolder="scheduler",
                use_karras_sigmas=True,
            )
            
            # Override with DPM++ 2M SDE settings
            print("Overriding configuration...")
            for key, value in config.items():
                if hasattr(scheduler.config, key):
                    setattr(scheduler.config, key, value)
                    print(f"  Set {key}: {value}")
            
            print("✓ SUCCESS: DPM++ 2M SDE created from model + override")
            return scheduler
            
        except Exception as e2:
            print(f"✗ Method 2 failed: {str(e2)[:200]}")
            
            try:
                # Method 3: Create generic DPMSolverMultistepScheduler and force settings
                print("Method 3: Creating generic scheduler...")
                scheduler = DPMSolverMultistepScheduler.from_config({})
                
                # Force all DPM++ 2M SDE settings
                print("Force-setting configuration...")
                for key, value in config.items():
                    try:
                        setattr(scheduler.config, key, value)
                        print(f"  Forced {key}: {value}")
                    except Exception as e3:
                        print(f"  Could not set {key}: {str(e3)[:100]}")
                
                print("✓ SUCCESS: Generic scheduler configured as DPM++ 2M SDE")
                return scheduler
                
            except Exception as e3:
                print(f"✗ Method 3 failed: {str(e3)[:200]}")
                print("All DPM++ 2M SDE attempts failed, using fallback...")
                return None

def _verify_dpmpp_2m_sde_configuration(scheduler):
    """Comprehensive verification of DPM++ 2M SDE configuration."""
    print("\n=== VERIFYING DPM++ 2M SDE CONFIGURATION ===")
    
    if not hasattr(scheduler, 'config'):
        print("✗ ERROR: Scheduler has no config attribute")
        return False
    
    config = scheduler.config
    scheduler_name = scheduler.__class__.__name__
    
    print(f"Scheduler class: {scheduler_name}")
    print("\nConfiguration verification:")
    
    # Required settings for DPM++ 2M SDE
    required_settings = {
        "algorithm_type": "dpmsolver++",
        "solver_type": "midpoint", 
        "variance_type": "fixed_small",
        "use_karras_sigmas": True,
    }
    
    all_correct = True
    for setting, expected_value in required_settings.items():
        actual_value = getattr(config, setting, None)
        status = "✓" if actual_value == expected_value else "✗"
        print(f"  {status} {setting}: {actual_value} (expected: {expected_value})")
        if actual_value != expected_value:
            all_correct = False
    
    # Optional but recommended settings
    optional_settings = {
        "prediction_type": "epsilon",
        "thresholding": False,
        "lower_order_final": True,
    }
    
    print("\nOptional settings:")
    for setting, expected_value in optional_settings.items():
        actual_value = getattr(config, setting, None)
        status = "✓" if actual_value == expected_value else "⚠"
        print(f"  {status} {setting}: {actual_value} (expected: {expected_value})")
    
    # Final verification result
    if all_correct:
        print("\n✓ VERIFICATION PASSED: Using DPM++ 2M SDE (Euler a style)")
        return True
    elif "dpmsolver++" in str(getattr(config, 'algorithm_type', '')):
        print("\n⚠ PARTIAL VERIFICATION: DPM++ detected but not full 2M SDE")
        return False
    else:
        print(f"\n✗ VERIFICATION FAILED: Not DPM++ 2M SDE (using {scheduler_name})")
        return False

def _make_dpmpp_2m_sde_scheduler():
    """Create DPM++ 2M SDE scheduler with comprehensive verification."""
    print("\n=== CREATING DPM++ 2M SDE SCHEDULER ===")
    print("Attempting to create DPM++ 2M SDE (Euler a style)...")
    
    # Get the exact configuration
    config = _create_dpmpp_2m_sde_config()
    
    try:
        # Method 1: Try to create from config directly
        print("Method 1: Creating from config...")
        scheduler = DPMSolverMultistepScheduler.from_config(config)
        print("✓ SUCCESS: DPM++ 2M SDE created from config")
        return scheduler
        
    except Exception as e:
        print(f"✗ Method 1 failed: {str(e)[:200]}")
        
        try:
            # Method 2: Try to load from model directory then override config
            print("Method 2: Loading from model directory...")
            scheduler = DPMSolverMultistepScheduler.from_pretrained(
                MODEL_DIR,
                subfolder="scheduler",
                use_karras_sigmas=True,
            )
            
            # Override with DPM++ 2M SDE settings
            print("Overriding configuration...")
            for key, value in config.items():
                if hasattr(scheduler.config, key):
                    setattr(scheduler.config, key, value)
                    print(f"  Set {key}: {value}")
            
            print("✓ SUCCESS: DPM++ 2M SDE created from model + override")
            return scheduler
            
        except Exception as e2:
            print(f"✗ Method 2 failed: {str(e2)[:200]}")
            
            try:
                # Method 3: Create generic DPMSolverMultistepScheduler and force settings
                print("Method 3: Creating generic scheduler...")
                scheduler = DPMSolverMultistepScheduler.from_config({})
                
                # Force all DPM++ 2M SDE settings
                print("Force-setting configuration...")
                for key, value in config.items():
                    try:
                        setattr(scheduler.config, key, value)
                        print(f"  Forced {key}: {value}")
                    except Exception as e3:
                        print(f"  Could not set {key}: {str(e3)[:100]}")
                
                print("✓ SUCCESS: Generic scheduler configured as DPM++ 2M SDE")
                return scheduler
                
            except Exception as e3:
                print(f"✗ Method 3 failed: {str(e3)[:200]}")
                print("All DPM++ 2M SDE attempts failed, using fallback...")
                return None

def _get_pipes():
    global _pipe, _img2img
    print("\n=== LOADING PIPELINES ===")
    
    if _pipe is None:
        print("Loading txt2img pipeline...")
        try:
            _pipe = StableDiffusionXLPipeline.from_pretrained(
                MODEL_DIR,
                torch_dtype=dtype,
                use_safetensors=True,
                variant=None,
            )
            print("✓ Pipeline loaded successfully")
        except Exception as e:
            print(f"✗ FAILED to load pipeline: {str(e)}")
            print("\nFull error:")
            traceback.print_exc()
            raise
        
        print("Creating DPM++ 2M SDE scheduler...")
        scheduler = _make_dpmpp_2m_sde_scheduler()
        
        if scheduler is not None:
            _pipe.scheduler = scheduler
            if _verify_dpmpp_2m_sde_configuration(scheduler):
                print("✓ DPM++ 2M SDE scheduler verified and active")
            else:
                print("⚠ DPM++ 2M SDE scheduler created but verification failed")
        else:
            print("✗ DPM++ 2M SDE failed, using fallback scheduler")
            # Use fallback
            fallback_schedulers = [
                ("Euler Ancestral", lambda: EulerAncestralDiscreteScheduler.from_config({
                    "use_karras_sigmas": True,
                    "prediction_type": "epsilon",
                    "steps_offset": 0,
                })),
                ("Euler Discrete", lambda: EulerDiscreteScheduler.from_config({
                    "use_karras_sigmas": True,
                    "prediction_type": "epsilon",
                    "steps_offset": 1,
                })),
                ("PNDMScheduler", lambda: PNDMScheduler.from_config({})),
            ]
            
            for name, scheduler_fn in fallback_schedulers:
                try:
                    _pipe.scheduler = scheduler_fn()
                    print(f"✓ Fallback: Using {name}")
                    break
                except Exception as e:
                    print(f"✗ Fallback {name} failed: {str(e)[:100]}")
                    continue
        
        _pipe.set_progress_bar_config(disable=True)
        
        print("Moving to device and optimizing...")
        if _device == "cuda":
            _pipe.to(_device)
            try:
                _pipe.enable_xformers_memory_efficient_attention()
                print("✓ xFormers enabled")
            except Exception as e:
                print(f"⚠ xFormers failed: {str(e)[:100]}")
            try:
                if hasattr(_pipe, "vae") and hasattr(_pipe.vae, "enable_tiling"):
                    _pipe.vae.enable_tiling()
                else:
                    _pipe.enable_vae_tiling()
                print("✓ VAE tiling enabled")
            except Exception as e:
                print(f"⚠ VAE tiling failed: {str(e)[:100]}")
        
        print("✓ txt2img pipeline ready")

    if _img2img is None:
        print("Loading img2img pipeline...")
        try:
            _img2img = StableDiffusionXLImg2ImgPipeline.from_pretrained(
                MODEL_DIR,
                torch_dtype=dtype,
                use_safetensors=True,
                variant=None,
            )
            print("✓ img2img pipeline loaded successfully")
        except Exception as e:
            print(f"✗ FAILED to load img2img pipeline: {str(e)}")
            print("\nFull error:")
            traceback.print_exc()
            raise
        
        _img2img.scheduler = _pipe.scheduler
        _img2img.set_progress_bar_config(disable=True)
        
        if _device == "cuda":
            _img2img.to(_device)
            try:
                _img2img.enable_xformers_memory_efficient_attention()
            except Exception:
                pass
            try:
                if hasattr(_img2img, "vae") and hasattr(_img2img.vae, "enable_tiling"):
                    _img2img.vae.enable_tiling()
                else:
                    _img2img.enable_vae_tiling()
            except Exception:
                pass
        
        print("✓ img2img pipeline ready")

    print("=" * 50)
    return _pipe, _img2img


def generate(
    prompt: str,
    negative: str,
    width: int,
    height: int,
    steps: int,
    cfg: float,
    seed: int,
    hires: bool,
    hires_upscale: float,
    hires_strength: float,
    hires_steps: int,
):
    print(f"\n=== GENERATING IMAGE ===")
    print(f"Prompt: {prompt[:100]}...")
    print(f"Size: {width}x{height}, Steps: {steps}, CFG: {cfg}, Seed: {seed}")
    print(f"Hires: {hires}, Upscale: {hires_upscale}, Strength: {hires_strength}")
    
    try:
        pipe, img2img = _get_pipes()
    except Exception as e:
        print(f"✗ PIPELINE ERROR: {str(e)}")
        print("\nFull error:")
        traceback.print_exc()
        raise gr.Error(f"Pipeline loading failed: {str(e)}")

    width = int(width) // 8 * 8
    height = int(height) // 8 * 8

    g = torch.Generator(device=_device).manual_seed(int(seed))

    if not hires:
        print("Generating single pass...")
        try:
            img = pipe(
                prompt=prompt,
                negative_prompt=negative,
                width=width,
                height=height,
                num_inference_steps=int(steps),
                guidance_scale=float(cfg),
                generator=g,
            ).images[0]
            print("✓ Generation complete")
            return img
        except Exception as e:
            print(f"✗ GENERATION ERROR: {str(e)}")
            print("\nFull error:")
            traceback.print_exc()
            raise gr.Error(f"Generation failed: {str(e)}")

    print("Generating 2-pass hires...")
    try:
        # Pass 1: Base generation
        base_w = max(1024, int(width / float(hires_upscale)))
        base_h = max(1024, int(height / float(hires_upscale)))
        base_w = base_w // 8 * 8
        base_h = base_h // 8 * 8
        
        print(f"Pass 1: {base_w}x{base_h}")
        img = pipe(
            prompt=prompt,
            negative_prompt=negative,
            width=base_w,
            height=base_h,
            num_inference_steps=int(steps),
            guidance_scale=float(cfg),
            generator=g,
        ).images[0]

        if not isinstance(img, Image.Image):
            img = Image.fromarray(img)

        # Upscale
        img = img.resize((width, height), resample=Image.LANCZOS)
        
        # Pass 2: Refine
        print(f"Pass 2: {width}x{height}")
        img = img2img(
            prompt=prompt,
            negative_prompt=negative,
            image=img,
            strength=float(hires_strength),
            num_inference_steps=int(hires_steps),
            guidance_scale=float(cfg),
            generator=g,
        ).images[0]

        print("✓ 2-pass generation complete")
        return img
    except Exception as e:
        print(f"✗ HIRES GENERATION ERROR: {str(e)}")
        print("\nFull error:")
        traceback.print_exc()
        raise gr.Error(f"Hires generation failed: {str(e)}")


demo = gr.Interface(
    fn=generate,
    inputs=[
        gr.Textbox(label="Prompt", lines=6, value="masterpiece, best quality, very aesthetic, absurdres, clean manga lineart, crisp ink, screentone shading, sharp focus, high contrast"),
        gr.Textbox(label="Negative", lines=3, value="worst quality, low quality, lowres, blurry, jpeg artifacts, watermark, logo, signature, bad anatomy, bad hands, extra fingers, missing fingers, extra limbs, deformed"),
        gr.Slider(label="Width", minimum=768, maximum=2048, step=64, value=1024),
        gr.Slider(label="Height", minimum=768, maximum=3072, step=64, value=1536),
        gr.Slider(label="Steps", minimum=10, maximum=120, step=1, value=60),
        gr.Slider(label="CFG", minimum=1.0, maximum=12.0, step=0.1, value=6.5),
        gr.Number(label="Seed", value=1234, precision=0),
        gr.Checkbox(label="Hires refine (2-pass)", value=True),
        gr.Slider(label="Hires upscale", minimum=1.5, maximum=3.0, step=0.1, value=2.0),
        gr.Slider(label="Hires strength", minimum=0.05, maximum=0.6, step=0.01, value=0.26),
        gr.Slider(label="Hires steps", minimum=5, maximum=60, step=1, value=32),
    ],
    outputs=gr.Image(type="pil", label="Result"),
    title="Animagine XL 4.0 (Direct) — DPM++ 2M SDE (Euler a)",
    description="Using DPM++ 2M SDE sampler (Euler a style) with comprehensive configuration verification",
)

print("\n=== STARTING GRADIO ===")
print(f"Local URL will be: http://{HOST}:{PORT}")

# Launch local UI
# NOTE: prevent_thread_lock=True keeps this cell from blocking indefinitely.
demo.launch(server_name=HOST, server_port=PORT, share=False, inbrowser=False, prevent_thread_lock=True)

print("\n=== STARTING CLOUDFLARE TUNNEL ===")
# Start Cloudflare tunnel (use an executable cache dir; teamspace outputs may be mounted noexec)
cloudflared_cache = Path.home() / ".cache" / "manga_ai" / "cloudflared"
# If home cache isn't usable, fallback to /tmp:
# cloudflared_cache = Path(tempfile.gettempdir()) / "manga_ai" / "cloudflared"

try:
    tunnel = start_tunnel(local_port=PORT, cache_dir=cloudflared_cache)
    print("✓ Cloudflare tunnel started")
    print(f"Public URL: {tunnel.public_url}")
    print(f"Local URL: http://{HOST}:{PORT}")
except Exception as e:
    print(f"✗ TUNNEL ERROR: {str(e)}")
    print("\nFull error:")
    traceback.print_exc()
    print(f"Local URL still available: http://{HOST}:{PORT}")

print("=" * 50)
print("READY! You can now generate images.")
print("=" * 50)