# A1111 Model Installer — SD 1.5 (UI)

Installs **SD 1.5** assets for Automatic1111 using a simple UI:

- **Checkpoints** (paste URLs) → `/workspace/a1111/models/Stable-diffusion/`
- **VAE (optional)** → `/workspace/a1111/models/VAE/`
- **ControlNet (SD15)** → `/workspace/a1111/models/ControlNet/`

**Tokens (optional, not required):**
- `HF_TOKEN` — for gated Hugging Face models.
- `CIVITAI_TOKEN` — for private/NSFW or rate-limited Civitai links.

Re-running is safe: existing files are skipped; downloads resume where possible.

In [None]:
from pathlib import Path
import os

# Matches your A1111 data root (WEBUI_ROOT)
DATA_ROOT = Path(os.environ.get("WEBUI_ROOT", "/workspace/a1111"))
CKPT_DIR = DATA_ROOT / "models/Stable-diffusion"
VAE_DIR  = DATA_ROOT / "models/VAE"
CN_DIR   = DATA_ROOT / "models/ControlNet"

for d in (CKPT_DIR, VAE_DIR, CN_DIR):
    d.mkdir(parents=True, exist_ok=True)

print("Data root:", DATA_ROOT)
print("↳ Checkpoints:", CKPT_DIR)
print("↳ VAE:", VAE_DIR)
print("↳ ControlNet (SD15):", CN_DIR)

## Helpers (progress + optional tokens)
Supports **Hugging Face** (repo + filename) and **direct URLs** (e.g., Civitai). Uses tokens if present; otherwise proceeds without them.

In [None]:
import os, re, shutil, requests
from tqdm import tqdm
from huggingface_hub import hf_hub_download
from pathlib import Path

# Optional tokens — safe to be missing
HF_TOKEN = os.environ.get("HF_TOKEN")
CIVITAI_TOKEN = os.environ.get("CIVITAI_TOKEN")

def human(n: int) -> str:
    for u in ("B","KB","MB","GB"):
        if n < 1024: return f"{n:.1f} {u}"
        n /= 1024
    return f"{n:.1f} TB"

def _save_stream_to(dest: Path, r: requests.Response) -> Path:
    dest.parent.mkdir(parents=True, exist_ok=True)
    if dest.exists():
        print("✔ Exists:", dest); return dest
    total = int(r.headers.get('Content-Length', 0))
    tmp = dest.with_suffix(dest.suffix + ".part")
    with open(tmp, 'wb') as f, tqdm(total=total, unit='B', unit_scale=True, desc=dest.name, ncols=80) as bar:
        for chunk in r.iter_content(chunk_size=1024*1024):
            if chunk:
                f.write(chunk); bar.update(len(chunk))
    tmp.replace(dest)
    try:
        print(f"✔ Saved: {dest} ({human(dest.stat().st_size)})")
    except Exception:
        print("✔ Saved:", dest)
    return dest

def download_direct(url: str, dest_dir: Path, dest_name: str|None=None) -> Path:
    """Direct URL downloader (Civitai/HF file links/etc.) with token + filename handling."""
    headers = {"Authorization": f"Bearer {CIVITAI_TOKEN}"} if CIVITAI_TOKEN else {}
    r = requests.get(url, headers=headers, stream=True, allow_redirects=True, timeout=60)
    try:
        r.raise_for_status()
    except requests.HTTPError as e:
        print(f"⚠️  Direct download failed: {e}")
        if 'civitai' in url.lower():
            print("   If this is a private/NSFW Civitai model, set CIVITAI_TOKEN in RunPod env.")
        # best-effort return so caller prints something
        return dest_dir / (dest_name or url.split('/')[-1].split('?',1)[0] or 'model.safetensors')
    # filename preference: explicit > Content-Disposition > URL tail > .safetensors fallback
    if dest_name:
        filename = dest_name
    else:
        cd = r.headers.get('Content-Disposition', '')
        m = re.search(r'filename="?([^";]+)"?', cd)
        if m:
            filename = m.group(1)
        else:
            filename = url.split('/')[-1].split('?', 1)[0] or 'download.bin'
            if '.' not in filename:
                filename += '.safetensors'
    return _save_stream_to(dest_dir / filename, r)

def download_hf(repo_id: str, filename: str, dest_dir: Path, dest_name: str|None=None) -> Path:
    """Hugging Face downloader with optional HF token + resume."""
    dest = dest_dir / (dest_name or filename.split('/')[-1])
    if dest.exists():
        print('✔ Exists:', dest); return dest
    print(f'→ HF: {repo_id}/{filename}')
    try:
        path = hf_hub_download(
            repo_id=repo_id,
            filename=filename,
            token=HF_TOKEN,
            resume_download=True,
            local_dir=dest_dir,
            local_dir_use_symlinks=False,
        )
        if Path(path) != dest:
            shutil.copy2(path, dest)
        print(f'✔ HF saved: {dest} ({human(dest.stat().st_size)})')
    except Exception as e:
        print(f'⚠️  Hugging Face download failed: {e}')
        print('   If this repo is gated, set HF_TOKEN in RunPod env and accept the license on HF.')
    return dest

def fetch(item: dict, dest_dir: Path) -> None:
    """Item can be {name, hf:{repo,filename}} OR {name, url, dest_name?}."""
    if "hf" in item:
        repo, fname = item["hf"]["repo"], item["hf"]["filename"]
        download_hf(repo, fname, dest_dir, item.get("dest_name"))
    elif "url" in item:
        download_direct(item["url"], dest_dir, item.get("dest_name"))
    else:
        raise ValueError("Item must contain 'hf' or 'url'.")

def download_extras(lines: str, dest_dir: Path) -> None:
    for url in [u.strip() for u in lines.splitlines() if u.strip()]:
        try:
            download_direct(url, dest_dir)
        except Exception as e:
            print("✖", url, "→", e)

print("Helpers loaded. HF_TOKEN=", bool(HF_TOKEN), "CIVITAI_TOKEN=", bool(CIVITAI_TOKEN))

## Select and download assets
- Paste **SD 1.5 checkpoint** URLs (one per line). 
- Optional VAE and ControlNet choices below.

In [None]:
import ipywidgets as W
from IPython.display import display, Markdown

# VAE optional (unchecked by default)
vae_checkbox = W.Checkbox(
    description='Install SD 1.5 MSE VAE (stabilityai/sd-vae-ft-mse)',
    value=False
)

# ControlNet SD15 options
cn_options = [
    ("Canny",     "lllyasviel/control_v11p_sd15_canny"),
    ("Depth",     "lllyasviel/control_v11p_sd15_depth"),
    ("SoftEdge",  "lllyasviel/control_v11p_sd15_softedge"),
    ("LineArt",   "lllyasviel/control_v11p_sd15_lineart"),
    ("NormalBae", "lllyasviel/control_v11p_sd15_normalbae"),
    ("OpenPose",  "lllyasviel/control_v11p_sd15_openpose"),
]
prechecked = {"Canny","SoftEdge","OpenPose"}
cn_checkboxes = [W.Checkbox(description=label, value=(label in prechecked)) for label, _ in cn_options]

# User-provided checkpoint URLs
ckpt_text = W.Textarea(
    value='',
    placeholder='Paste one or more SD1.5 .safetensors URLs here (one per line).',
    layout=W.Layout(width='100%', height='140px')
)

btn = W.Button(description='Download Selected', button_style='success', icon='download')
out = W.Output()

def on_click(_):
    out.clear_output()
    with out:
        errors = []
        # 1) VAE (optional)
        if vae_checkbox.value:
            try:
                download_hf(
                    repo_id='stabilityai/sd-vae-ft-mse',
                    filename='vae-ft-mse-840000-ema-pruned.safetensors',
                    dest_dir=VAE_DIR,
                    dest_name='vae-ft-mse-840000-ema-pruned.safetensors',
                )
            except Exception as e:
                errors.append(('VAE', str(e)))

        # 2) ControlNet SD15
        for cb, (label, repo) in zip(cn_checkboxes, cn_options):
            if not cb.value: continue
            try:
                download_hf(
                    repo_id=repo,
                    filename='diffusion_pytorch_model.safetensors',
                    dest_dir=CN_DIR,
                    dest_name=f'control_v11p_sd15_{label.lower()}.safetensors',
                )
            except Exception as e:
                errors.append((f'ControlNet {label}', str(e)))

        # 3) Checkpoints (direct URLs)
        urls = [u.strip() for u in ckpt_text.value.splitlines() if u.strip()]
        for url in urls:
            try:
                download_direct(url, CKPT_DIR)
            except Exception as e:
                errors.append((url, str(e)))

        # Summary
        print('\n=== Summary ===')
        any_files = False
        for p in sorted(list(CKPT_DIR.glob('*')) + list(VAE_DIR.glob('*')) + list(CN_DIR.glob('*'))):
            try:
                print(f"✔ {p.name:45s} {os.path.getsize(p):,} B"); any_files = True
            except Exception:
                print('✔', p.name); any_files = True
        if not any_files:
            print('(No new files)')
        if errors:
            print('\nErrors:')
            for name, msg in errors:
                print('✖', name, '→', msg)

btn.on_click(on_click)

display(Markdown('### SD 1.5 Downloads')))
display(Markdown('**Checkpoints (paste URLs):**'))
display(ckpt_text)
display(Markdown('**VAE (optional):**'))
display(vae_checkbox)
display(Markdown('**ControlNet (SD15):**'))
for cb in cn_checkboxes:
    display(cb)
display(btn, out)

## (Optional) Restart the WebUI
Use this after new models are added so A1111 refreshes file lists.

In [None]:
import subprocess, shlex, time
print('Attempting to restart the A1111 WebUI service via supervisord …')
try:
    cmd = 'supervisorctl -c /etc/supervisord.conf restart a1111'
    out = subprocess.run(shlex.split(cmd), capture_output=True, text=True, timeout=30)
    if out.returncode == 0 and 'a1111:' in (out.stdout or ''):
        print((out.stdout or out.stderr).strip() or '✔ Restarted via supervisorctl')
    else:
        print('Supervisorctl unavailable; forcing process restart…')
        subprocess.run(['pkill', '-f', 'launch.py'], check=False)
        time.sleep(5)
        print('✔ WebUI killed; supervisord will auto-restart. Refresh in ~10–15 s.')
except Exception as e:
    print('⚠️ Restart error:', e)
    print('Manual command:')
    print('supervisorctl -c /etc/supervisord.conf restart a1111')