<a href="https://colab.research.google.com/github/fulltrick/-/blob/main/waifast4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==========================================================
#  Stable Diffusion WebUI (A1111) を Python 3.10 で起動する版
#  - Colab カーネルは 3.11/3.12 でもOK（実行系だけ 3.10）
#  - xFormers 未使用（--opt-sdp-attention でSDPA）
#  - ControlNet (SDXL) / Civitai 取得ロジックを踏襲
# ==========================================================

import os
import sys
import json
import shutil
import subprocess
import threading
import time
import urllib.error
import urllib.request
import importlib.util
import math
from pathlib import Path

IPY_SPEC = importlib.util.find_spec("IPython")
DISPLAY_SPEC = importlib.util.find_spec("IPython.display") if IPY_SPEC else None
if DISPLAY_SPEC is not None:
    from IPython.display import Audio, display  # type: ignore
else:
    class _DummyAudio:
        def __init__(self, *, data, rate, autoplay):
            self.data = data
            self.rate = rate
            self.autoplay = autoplay

        def __repr__(self):
            duration = len(self.data) / self.rate if self.rate else 0
            return f"Audio(dry-run, {duration:.2f}s, autoplay={self.autoplay})"

    def display(obj):  # noqa: D401 - simple console stub
        print(f"🔈 {obj}")

    def Audio(*, data, rate, autoplay):  # noqa: N802 - mimic IPython API
        return _DummyAudio(data=data, rate=rate, autoplay=autoplay)

NP_SPEC = importlib.util.find_spec("numpy")
if NP_SPEC is not None:
    import numpy as np  # noqa: WPS433 - runtime optional import
else:
    np = None


def _is_truthy(value: str | None) -> bool:
    if value is None:
        return False
    return value.strip().lower() in {"1", "true", "yes", "on"}


DRY_RUN = _is_truthy(os.environ.get("WAIFAST4_DRY_RUN"))

if DRY_RUN:
    Path("/content/drive/MyDrive").mkdir(parents=True, exist_ok=True)
    Path("/content/stable-diffusion-webui").parent.mkdir(parents=True, exist_ok=True)


MICROMAMBA = (
    Path("/usr/local/bin/micromamba")
    if not DRY_RUN
    else Path("/tmp/waifast4/micromamba")
)

if DRY_RUN:
    MICROMAMBA.parent.mkdir(parents=True, exist_ok=True)


GOOGLE_SPEC = importlib.util.find_spec("google")
colab_spec = importlib.util.find_spec("google.colab") if GOOGLE_SPEC else None

if colab_spec is None:
    class _DummyDrive:
        def mount(self, *_args, **_kwargs):
            if DRY_RUN:
                print("⚠️ (dry-run) Google Drive マウントをスキップします")
                return
            raise ModuleNotFoundError(
                "google.colab.drive は Colab 以外では使用できません"
            )


    class _DummyUserData(dict):
        def get(self, key, default=None):  # noqa: D401
            if DRY_RUN:
                return os.environ.get(key, default)
            raise ModuleNotFoundError(
                "google.colab.userdata は Colab 以外では使用できません"
            )


    drive = _DummyDrive()
    userdata = _DummyUserData()
else:
    from google.colab import drive, userdata

def run(cmd, *, cwd=None, check=True, **kwargs):
    if isinstance(cmd, (list, tuple)):
        printable = ' '.join(str(c) for c in cmd)
    else:
        printable = cmd
    print(f"$ {printable}")
    if DRY_RUN:
        print("   ↪︎ dry-run: 実行をスキップしました")
        stdout = '' if kwargs.get('capture_output') else None
        stderr = '' if kwargs.get('capture_output') else None
        return subprocess.CompletedProcess(cmd, 0, stdout=stdout, stderr=stderr)
    return subprocess.run(cmd, cwd=cwd, check=check, **kwargs)


def ensure_micromamba():
    if MICROMAMBA.exists():
        return
    if DRY_RUN:
        MICROMAMBA.write_text("#!/bin/sh\nexit 0\n")
        MICROMAMBA.chmod(0o755)
        print("⚠️ (dry-run) micromamba をダミー作成しました")
        return
    archive = Path('/tmp/micromamba.tar.bz2')
    run(['wget', '-qO', str(archive), 'https://micromamba.snakepit.net/api/micromamba/linux-64/latest'])
    run(['tar', '-xjf', str(archive), '-C', '/usr/local/bin', '--strip-components=1', 'bin/micromamba'])


def ensure_directories():
    drive_base_data_path = Path('/content/drive/MyDrive/sd_colab_data')
    drive_models_path = drive_base_data_path / 'models/Stable-diffusion'
    drive_controlnet_path = drive_base_data_path / 'models/ControlNet'
    drive_vae_path = drive_base_data_path / 'models/VAE-approx'
    for p in (drive_models_path, drive_controlnet_path, drive_vae_path):
        p.mkdir(parents=True, exist_ok=True)
    return drive_models_path, drive_controlnet_path, drive_vae_path


def download_civitai_model(token: str, drive_models_path: Path):
    print("\n🔑 Civitai API トークンを取得（Colab 左の🔑から事前登録）...")
    if DRY_RUN:
        dummy_name = 'dryrun-model.safetensors'
        output_drive = drive_models_path / dummy_name
        output_local_dir = Path('/content/stable-diffusion-webui/models/Stable-diffusion')
        output_local_dir.mkdir(parents=True, exist_ok=True)
        output_drive.parent.mkdir(parents=True, exist_ok=True)
        output_drive.write_bytes(b'')
        shutil.copy(output_drive, output_local_dir / dummy_name)
        print('✅ (dry-run) Civitai モデル準備をスキップしました')
        return

    def get_latest_model_version_id(token: str):
        print("🔍 WAI-NSFW-illustrious-SDXL の最新バージョンIDを取得中...")
        model_id = '827184'
        api_url = f"https://civitai.com/api/v1/models/{model_id}?token={token}"
        r = run(['curl', '-s', api_url], capture_output=True, text=True)
        if r.returncode != 0 or not r.stdout:
            print("⚠️ API呼び出しエラー、既知IDへフォールバックします")
            return None
        try:
            data = json.loads(r.stdout)
            mv = data.get('modelVersions') or []
            if mv:
                vid = str(mv[0]['id'])
                print(f"✅ 最新バージョンIDを取得: {vid}")
                return vid
        except json.JSONDecodeError:
            print("⚠️ APIレスポンスを解析できませんでした")
        return None

    def download_model(token: str, version_id: str):
        api = f"https://civitai.com/api/v1/model-versions/{version_id}?token={token}"
        r = run(['curl', '-s', api], capture_output=True, text=True)
        if r.returncode != 0 or not r.stdout:
            raise SystemExit('❌ モデル情報の取得に失敗しました')
        data = json.loads(r.stdout)
        files = data.get('files') or []
        chosen = next((f for f in files if f.get('type') == 'Model'), None)
        if not chosen:
            raise SystemExit('❌ モデルファイルが見つかりません')
        download_url = chosen['downloadUrl'] + f"?token={token}"
        output_filename = chosen['name']
        output_drive = drive_models_path / output_filename
        output_local = Path('/content/stable-diffusion-webui/models/Stable-diffusion') / output_filename
        print(f"⬇️ {output_filename} をダウンロードします...")
        if DRY_RUN:
            output_drive.parent.mkdir(parents=True, exist_ok=True)
            output_local.parent.mkdir(parents=True, exist_ok=True)
            output_drive.write_bytes(b'')
            shutil.copy(output_drive, output_local)
            print('✅ (dry-run) モデルのダウンロードをスキップしました')
            return
        aria_args = ['aria2c', '-c', '-x16', '-s16', '--check-certificate=false',
                     download_url, '-d', str(drive_models_path), '-o', output_filename]
        result = run(aria_args)
        if result.returncode == 0 and output_drive.exists():
            shutil.copy(output_drive, output_local)
            print('✅ Civitai からモデルをダウンロードしました')
        else:
            raise SystemExit('❌ モデルのダウンロードに失敗しました')

    vid = get_latest_model_version_id(token)
    if vid:
        download_model(token, vid)
        return
    for fallback in ['1410435', '1396035', '1378467', '1360950']:
        try:
            download_model(token, fallback)
            return
        except SystemExit:
            continue
    raise SystemExit('❌ すべてのモデルダウンロードに失敗しました')


def prepare_controlnet(drive_controlnet_path: Path):
    controlnet_dir_local = Path('/content/stable-diffusion-webui/extensions/sd-webui-controlnet/models')
    controlnet_dir_local.mkdir(parents=True, exist_ok=True)
    if DRY_RUN:
        for name in ['OpenPoseXL2.safetensors', 'diffusers_xl_canny_full.safetensors']:
            drive_file = drive_controlnet_path / name
            drive_file.parent.mkdir(parents=True, exist_ok=True)
            drive_file.write_bytes(b'')
            (controlnet_dir_local / name).write_bytes(b'')
        print('✅ (dry-run) ControlNet モデル準備をスキップしました')
        return
    models = [
        ('OpenPoseXL2.safetensors',
         'https://huggingface.co/thibaud/controlnet-openpose-sdxl-1.0/resolve/main/OpenPoseXL2.safetensors'),
        ('diffusers_xl_canny_full.safetensors',
         'https://huggingface.co/lllyasviel/sd_control_collection/resolve/main/diffusers_xl_canny_full.safetensors')
    ]
    for name, url in models:
        p_drive = drive_controlnet_path / name
        p_local = controlnet_dir_local / name
        if p_drive.exists() and p_drive.stat().st_size > 1_000_000:
            shutil.copy(p_drive, p_local)
            continue
        run(['aria2c', '-c', '-x8', '-s8', url, '-d', str(drive_controlnet_path), '-o', name])
        if p_drive.exists():
            shutil.copy(p_drive, p_local)
    print('✅ ControlNet モデル準備完了')


def write_config():
    config = """{
  "samples_save": true,
  "samples_format": "png",
  "grid_save": true,
  "grid_format": "png",
  "enable_pnginfo": true,
  "save_selected_only": true,
  "jpeg_quality": 80,
  "export_for_4chan": true,
  "img_downscale_threshold": 4.0,
  "target_side_length": 4000,
  "img_max_size_mp": 200
}"""
    cfg_path = Path('/content/stable-diffusion-webui/config.json')
    cfg_path.parent.mkdir(parents=True, exist_ok=True)
    cfg_path.write_text(config)


def play_notification():
    if np is None:
        print('🔔 (dry-run) numpy が無いためサウンド再生をスキップしました')
        return
    sr = 22050
    t = np.linspace(0, 0.75, int(sr * 0.75), endpoint=False)
    tone = 0.3 * np.sin(2 * np.pi * 880 * t) * np.exp(-3 * t)
    display(Audio(data=tone, rate=sr, autoplay=True))



def start_gradio_live_monitor(url: str, proc: subprocess.Popen) -> None:
    url = (url or '').strip()
    if not url:
        print('⚠️ Gradio Live のURLを特定できませんでしたが通知を再生します')
        play_notification()
        return
    if DRY_RUN:
        print(f'🔔 (dry-run) Gradio Live URL 検知: {url}')
        play_notification()
        return

    def _worker() -> None:
        print(f'⏳ Gradio Live URL の疎通確認を開始します: {url}')
        while True:
            if proc.poll() is not None:
                print('⚠️ WebUI プロセスが終了したため通知待機を中断します')
                return
            try:
                with urllib.request.urlopen(url, timeout=10) as response:
                    status = getattr(response, 'status', None)
                    if status is None:
                        status = response.getcode()
                    if status is None:
                        status = 200
                    if 200 <= status < 400:
                        print(f'✅ Gradio Live にアクセスできました (status={status})')
                        play_notification()
                        return
                    print(f'   ↪︎ 応答を受信しました (status={status}) - 再確認します')
            except urllib.error.HTTPError as exc:
                print(f'   ↪︎ アクセス待機中... (HTTP {exc.code})')
            except urllib.error.URLError as exc:
                print(f'   ↪︎ アクセス待機中... ({exc.reason})')
            except Exception as exc:
                print(f'   ↪︎ アクセス待機中... ({exc})')
            time.sleep(3)

    threading.Thread(target=_worker, name='gradio-live-monitor', daemon=True).start()


_active_processes = []


def launch_webui():
    env = os.environ.copy()
    env['TORCH_COMMAND'] = 'pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121'
    cmd = [
        str(MICROMAMBA), 'run', '-n', 'py310', 'python', 'launch.py',
        '--share', '--listen', '--disable-console-progressbars', '--opt-sdp-attention',
        '--enable-insecure-extension-access', '--lora-dir', '/content/stable-diffusion-webui/models/Lora',
        '--medvram-sdxl', '--no-half-vae', '--api', '--cors-allow-origins=*'
    ]
    print('\n🚀 WebUIを起動します (Python 3.10環境)')
    if DRY_RUN:
        print('   ↪︎ (dry-run) WebUI 起動をスキップしました')
        return
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env)
    _active_processes.append(proc)

    stop_event = threading.Event()

    def monitor():
        notified = False
        stream = proc.stdout
        if stream is None:
            proc.wait()
            if proc.returncode not in (0, None) and not stop_event.is_set():
                print(f"\n❌ WebUI が終了しました (returncode={proc.returncode})")
            return
        try:
            while not stop_event.is_set():
                line = stream.readline()
                if not line:
                    break
                sys.stdout.write(line)
                if not notified and 'Running on public URL:' in line:
                    public_url = line.split('Running on public URL:', 1)[1].strip()
                    start_gradio_live_monitor(public_url, proc)
                    notified = True
        finally:
            proc.wait()
            if proc.returncode not in (0, None) and not stop_event.is_set():
                print(f"\n❌ WebUI が終了しました (returncode={proc.returncode})")

    monitor_thread = threading.Thread(target=monitor, name='waifast4-webui-monitor', daemon=True)
    monitor_thread.start()

    try:
        while monitor_thread.is_alive():
            monitor_thread.join(timeout=0.5)
    except KeyboardInterrupt:
        print("\n⏹️ WebUI 停止要求を受信しました。プロセスを終了します...")
        stop_event.set()
        if proc.poll() is None:
            proc.terminate()
            try:
                proc.wait(timeout=10)
            except subprocess.TimeoutExpired:
                proc.kill()
                proc.wait()
        else:
            proc.wait()
        return
    finally:
        stop_event.set()
        monitor_thread.join(timeout=5)
        if proc.stdout is not None:
            proc.stdout.close()
        if proc in _active_processes:
            _active_processes.remove(proc)


print('🧪 Notebook kernel Python (参考):')
run(['python', '-V'])

print('\n📦 micromamba をセットアップして py310 環境を作成...')
ensure_micromamba()
run([str(MICROMAMBA), 'create', '-y', '-n', 'py310', '-c', 'conda-forge', 'python=3.10.13', 'pip', 'git', 'aria2', 'curl'])

print('\n🧪 Runtime (py310) バージョン確認：')
run([str(MICROMAMBA), 'run', '-n', 'py310', 'python', '-V'])

print('\n🔧 apt / Google Drive 準備...')
run(['apt-get', 'update', '-qq'])
run(['apt-get', 'install', '-y', '-qq', 'aria2', 'wget', 'curl'])
try:
    drive.mount('/content/drive')
    print('✅ Google Drive をマウントしました')
except Exception as e:
    print(f'⚠️ Drive マウント失敗または既にマウント済み: {e}')

drive_models_path, drive_controlnet_path, drive_vae_path = ensure_directories()

webui_path = Path('/content/stable-diffusion-webui')
if webui_path.exists():
    shutil.rmtree(webui_path)
    print(f'🧹 既存のWebUIを削除: {webui_path}')

print('\n⬇️ Stable Diffusion WebUI をクローン...')
os.chdir('/content')
run(['git', 'clone', 'https://github.com/AUTOMATIC1111/stable-diffusion-webui.git'])
if DRY_RUN:
    webui_path.mkdir(parents=True, exist_ok=True)
    (webui_path / 'extensions').mkdir(parents=True, exist_ok=True)
    (webui_path / 'models/Stable-diffusion').mkdir(parents=True, exist_ok=True)
    (webui_path / 'models/Lora').mkdir(parents=True, exist_ok=True)

print('\n🔥 PyTorch (cu121) を py310 環境へインストール...')
run([str(MICROMAMBA), 'run', '-n', 'py310', 'python', '-m', 'pip', 'install', '--upgrade', 'pip'])
run([str(MICROMAMBA), 'run', '-n', 'py310', 'pip', 'install', '--index-url', 'https://download.pytorch.org/whl/cu121', 'torch', 'torchvision', 'torchaudio'])

print('\n📚 追加ライブラリ（py310 環境）をインストール...')
run([str(MICROMAMBA), 'run', '-n', 'py310', 'pip', 'install', 'open_clip_torch', 'transformers', 'timm', 'requests'])

print('\n🔌 拡張機能インストール...')
os.chdir('/content/stable-diffusion-webui/extensions')
run(['git', 'clone', 'https://github.com/Mikubill/sd-webui-controlnet.git'])
run(['git', 'clone', 'https://github.com/butaixianran/Stable-Diffusion-Webui-Civitai-Helper.git'])
if DRY_RUN:
    Path('sd-webui-controlnet').mkdir(parents=True, exist_ok=True)
    Path('Stable-Diffusion-Webui-Civitai-Helper').mkdir(parents=True, exist_ok=True)
print('✅ 拡張インストール完了')

print('\n🔑 CIVITAI_API_TOKEN を検出中...')
try:
    civitai_token = (userdata.get('CIVITAI_API_TOKEN') or '').strip()
    if not civitai_token and DRY_RUN:
        civitai_token = 'dry-run-token'
    if not civitai_token:
        raise ValueError('CIVITAI_API_TOKEN が未設定')
    os.environ['CIVITAI_API_TOKEN'] = civitai_token
    print('✅ CIVITAI_API_TOKEN を検出')
except Exception as e:
    print('❌ Civitai APIトークンが見つかりません')
    print('   → サイドバーの 🔑 Secrets で `CIVITAI_API_TOKEN` を設定して再実行してください。')
    raise

os.chdir('/content/stable-diffusion-webui')
download_civitai_model(civitai_token, drive_models_path)
prepare_controlnet(drive_controlnet_path)
write_config()

os.environ.pop('MPLBACKEND', None)

print('\n🔎 insightface (追加) をインストール...')
run([str(MICROMAMBA), 'run', '-n', 'py310', 'pip', 'install', 'insightface'])

launch_webui()

