In [None]:
# @title ComfyUIインストール
import os, sys
os.environ['TZ'] = 'Asia/Tokyo'
!wget https://mega.nz/linux/repo/xUbuntu_22.04/amd64/megacmd-xUbuntu_22.04_amd64.deb
!apt update && apt install -y ./megacmd-xUbuntu_22.04_amd64.deb aria2

!git clone --depth=1 https://github.com/forester3/ImageGenSupporter.git
REPO = "comfy_api_utils"
#!rm -rf /content/{REPO}   # debug後削除
!git clone --depth=1 https://github.com/forester3/{REPO}.git
sys.path.append(f"/content/{REPO}")

!git clone --depth=1 https://github.com/comfyanonymous/ComfyUI.git
!git clone --depth=1 https://github.com/ltdrdata/ComfyUI-Impact-Pack /content/ComfyUI/custom_nodes/ComfyUI-Impact-Pack
%cd /content/ComfyUI
!pip install -r requirements.txt

# ffxvsのネガティブプロンプトパック
%cd /content
!git clone https://huggingface.co/ffxvs/negative-prompts-pack
%cd negative-prompts-pack
!git lfs pull
!mkdir -p /content/ComfyUI/models/embeddings/SD1.5
!mv *.pt *.safetensors /content/ComfyUI/models/embeddings/SD1.5/
%cd /content
!rm -rf negative-prompts-pack

from google.colab import drive      # Google Driveをマウント
drive.mount('/content/drive')
## user/default, output, models/checkpointsをDriveにマウント
!rm -rf /content/ComfyUI/user/default
!mkdir -p /content/ComfyUI/user
!ln -s /content/drive/MyDrive/ComfyUI/user/default /content/ComfyUI/user/default
!rm -rf /content/ComfyUI/output
!ln -s /content/drive/MyDrive/ComfyUI/output /content/ComfyUI/output
!rm -rf /content/ComfyUI/models/checkpoints
!ln -s /content/drive/MyDrive/ComfyUI/models/checkpoints /content/ComfyUI/models/checkpoints
print("✅ComfyUIインストール完了")

In [None]:
# @title ComfyUI起動(バックエンド,Logfile)
import subprocess
import socket
import time
import threading

COMFY_HOST = "127.0.0.1"
COMFY_PORT = "8188"
LOG_FILE = "/content/comfyui_debug_log.txt"

import comfy_log as clg
import importlib
importlib.reload(clg)

# ComfyUI 起動（バックグラウンドでポート8188、ログもキャプチャ）stdoutとstderrをPIPEに設定

comfyui_proc = subprocess.Popen(
        ["python3", "/content/ComfyUI/main.py", "--port", COMFY_PORT, "--verbose"],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1,
        start_new_session=True
)
print("🟡 ComfyUI 起動中...")

# ログ読み取りスレッドを開始
#stdout_thread = threading.Thread(target=read_logs_and_write_file, args=(comfyui_proc.stdout, LOG_FILE, "StdOut", ))
#stdout_thread.daemon = True
#stdout_thread.start()            #stdoutからerror出力がないようなので、常時コメント化
stderr_thread = threading.Thread(target=clg.read_logs_and_write_file, args=(comfyui_proc.stderr, LOG_FILE, "", ))
stderr_thread.daemon = True
stderr_thread.start()

def wait_for_port(port, host=COMFY_HOST, timeout=30):
    start = time.time()
    while time.time() - start < timeout:
        try:
            with socket.create_connection((host, port), timeout=1):
                return True
        except OSError:
            time.sleep(1)
    return False

if wait_for_port(int(COMFY_PORT)):
    print(f"🟢 ComfyUI がポート{COMFY_PORT}で起動しました。")
else:
    print("🔴 ComfyUI のポート応答がありません。ログを確認してください。")
    print(f"\n--- ComfyUI 起動中のログ（{LOG_FILE} も確認） ---")

In [None]:
# @title GradioUI_for_API
import json
import threading
from IPython.display import Image as IPyImage, display
import ipywidgets as widgets
from PIL import Image

import workflow_utils as wfu
import comfy_api as capi
import comfy_task_manager as ctm
import gradio_ui as grui
import importlib
importlib.reload(wfu)
importlib.reload(capi)
importlib.reload(ctm)
importlib.reload(grui)

WORKFLOW_JSON_PATH = f"/content/{REPO}/workflow_api.json"
SAMPLING_METHOD_PATH = f"/content/{REPO}/samplerscheduler.json"
OUTPUT_DIR = "/content/ComfyUI/output"

def viewer():
    while True:
        ctm.sem_view.acquire()
        path = ctm.get_latest_path()
        with output_widget:
            display(Image.open(path))

# Main routine starts here
if __name__ == "__main__":
# workflow読込
    with open(WORKFLOW_JSON_PATH, "r", encoding="utf-8") as f:
        ORIGINAL_WORKFLOW = json.load(f)
# node名抽出
    NODE_IDS = wfu.find_node_ids_from_connections(ORIGINAL_WORKFLOW)
# prest読込
    with open(SAMPLING_METHOD_PATH, "r") as f:
        PRESETS = json.load(f)
# viewer
    output_widget = widgets.Output()
    display(output_widget)  # ここで最初に表示 → UI より上に固定
    thread_viewer = threading.Thread(target=viewer, daemon=True)
    thread_viewer.start()
# Gradio UI
    demo = grui.build_gradio_ui(ORIGINAL_WORKFLOW, NODE_IDS, PRESETS)
    demo.queue()
    demo.launch(debug=True, share=False, prevent_thread_lock=True)


#ダウンロードツール

In [None]:
# @title CivitAI-DL
#!apt install aria2
#!git clone --depth 1 http://github.com/forester3/ImageGenSupporter.git

import sys, os, requests
import ipywidgets as widgets
from IPython.display import display, HTML

sys.path.append('/content/ImageGenSupporter')
import manual_url_download as mud
import model_download_list_ids as mid

# CivitAIモデル辞書（名前: モデルID）
MODEL_DICT = {
    "CemreMix": 137199,
    "BRAV7": 177164,
}

def get_model_page_url_from_version(version_id):
    """モデルバージョンIDから正しいページURLを生成"""
    api_url = f"https://civitai.com/api/v1/model-versions/{version_id}"
    try:
        res = requests.get(api_url).json()
        parent_model_id = res["modelId"]
        page_url = f"https://civitai.com/models/{parent_model_id}?modelVersionId={version_id}"
        return page_url
    except Exception as e:
        print(f"モデルページ取得に失敗: {e}")
        return None

def make_downloader_ui(model_dict, save_dir="./ComfyUI/models/checkpoints"):
    dropdown = widgets.Dropdown(
        options=list(model_dict.keys()),
        description="Model:"
    )
    url_input = widgets.Text(
        placeholder="ここにURLを貼って下さい"
    )
    btn_download = widgets.Button(description="Download", button_style="success")
    out = widgets.Output()

    # モデル切替時にURL欄をクリア
    def on_model_changed(change):
        if change["type"] == "change" and change["name"] == "value":
            url_input.value = ""
    dropdown.observe(on_model_changed)

    def on_download_clicked(b):
        out.clear_output()
        with out:
            # URL入力があればURL優先
            if url_input.value.strip():
                print("🟢 入力URLからダウンロード中…")
                result = mud.download_with_aria2(url_input.value.strip(), save_dir)
                if result != 0:
                    print(f"⚠️ URLダウンロードに失敗しました: {result}")
                return

            # URLが空ならIDダウンロード
            model_name = dropdown.value
            model_id = model_dict[model_name]

            info = mid.fetch_model_info(model_id)
            if info and "download_url" in info:
                print(f"🟢 {model_name} (ID:{model_id}) をダウンロード開始")
                # 既存の download_model(info, output_dir) を使用
                result = mid.download_model(info, save_dir)
                if result == 24:
                    print(f"⚠️ {model_name} の取得には認証が必要です。ブラウザでURLを取得して下さい。")
                    page_url = get_model_page_url_from_version(model_id)
                    if page_url:
                        display(HTML(f'<a href="{page_url}" target="_blank">{model_name} モデルページを開く</a>'))
                    else:
                        print("モデルページが見つかりません。")
            else:
                print(f"⚠️ {model_name} のダウンロードURLを取得できません。")

    btn_download.on_click(on_download_clicked)
    display(widgets.VBox([dropdown, url_input, btn_download, out]))

make_downloader_ui(MODEL_DICT, "/content/ComfyUI/models/checkpoints")


### ダウンロードＵＩ@Huggingface

In [None]:
%%writefile repo_list_hf.mlst
{
  "repositories": [
    {
      "repo_id": "XpucT/Deliberate",
      "desc": "Deliberate_v6",
      "files": ["Deliberate_v6.safetensors"],
      "save_path": "/content/ComfyUI/models/checkpoints"
    }
  ]
}

In [None]:
# @title ダウンロードＵＩ@HuggingFace
!pip install huggingface_hub[hf_xet]

import os, json, glob
from huggingface_hub import hf_hub_download
from dotenv import load_dotenv
import ipywidgets as widgets
from IPython.display import display

# HuggingFace Token
load_dotenv()
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN", None)

# *hf.mlst を検索
mlst_files = glob.glob("**/*hf.mlst", recursive=True)
mlst_files = [os.path.relpath(p, ".") for p in mlst_files]

# 出力
out = widgets.Output()

# ドロップダウン
dropdown = widgets.Dropdown(
    options=mlst_files if mlst_files else ["--- 見つからない ---"],
    description="JSON:",
    layout=widgets.Layout(width="50%"),
    style={'description_width': 'initial'}
)

# ボタン
btn_download = widgets.Button(description="ダウンロード", button_style="success")
btn_delete   = widgets.Button(description="削除", button_style="danger")

# ダウンロード処理
def on_download(_):
    with out:
        out.clear_output()
        json_file = dropdown.value
        print(f"▶ {json_file} を読み込み中...")

        try:
            with open(json_file, "r", encoding="utf-8") as f:
                data = json.load(f)
            repos = data.get("repositories", [])
        except Exception as e:
            print(f"❌ JSON読込失敗: {e}")
            return

        for repo in repos:
            repo_id = repo["repo_id"]
            files = repo.get("files", [])
            save_dir = repo.get("save_path", "./downloads")
            os.makedirs(save_dir, exist_ok=True)

            for filename in files:
                filename_only = os.path.basename(filename)
                target_path = os.path.join(save_dir, filename_only)
                if os.path.exists(target_path):
                    print(f"⏩ {target_path} は既に存在")
                    continue
                try:
                    print(f"⬇ {filename} を {repo_id} からDL中...")
                    downloaded = hf_hub_download(
                        repo_id=repo_id,
                        filename=filename,
                        token=HF_TOKEN,
                        force_download=True
                    )
                    import shutil
                    shutil.copy(downloaded, target_path)
                    print(f"✅ {filename} ダウンロード完了")
                except Exception as e:
                    print(f"❌ {filename} 失敗: {e}")
        print("✅ダウンロード処理を終了しました。")

# 削除処理
def on_delete(_):
    with out:
        out.clear_output()
        json_file = dropdown.value
        print(f"▶ {json_file} に記載されたファイルを削除します")

        try:
            with open(json_file, "r", encoding="utf-8") as f:
                data = json.load(f)
            repos = data.get("repositories", [])
        except Exception as e:
            print(f"❌ JSON読込失敗: {e}")
            return

        for repo in repos:
            save_dir = repo.get("save_path", "./downloads")
            files = repo.get("files", [])
            for filename in files:
                filename_only = os.path.basename(filename)
                target_path = os.path.join(save_dir, filename_only)

                if os.path.exists(target_path):
                    os.remove(target_path)
                    print(f"🗑 {target_path} 削除")
                else:
                    print(f"⚠ {target_path} は存在しません")

# ボタンに処理を登録
btn_download.on_click(on_download)
btn_delete.on_click(on_delete)

# 表示
display(dropdown, widgets.HBox([btn_download, btn_delete]), out)
!rm -rf ~/.cache/huggingface/

In [None]:
# @title MEGAからファイルを同期(Colab連続時は不要)
import os
import subprocess
from pathlib import Path
from google.colab import userdata

def run_cmd(cmd):
    print(f"$ {cmd}")
    result = subprocess.run(cmd, shell=True, text=True, capture_output=True)
    if result.stdout:
        print(result.stdout)
    if result.stderr:
        print("[stderr]", result.stderr)
    result.check_returncode()

userdata.get('MEGA_EMAIL')
userdata.get('MEGA_PASSWORD')
if not MEGA_EMAIL or not MEGA_PASSWORD:
    raise RuntimeError("環境変数 MEGA_EMAIL または MEGA_PASSWORD が設定されていません")

# 2. MEGA にログイン（subprocess 経由）
print("🟡 MEGA にログイン中...")
process = subprocess.run(
    ["mega-login", MEGA_EMAIL, MEGA_PASSWORD],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)
if process.returncode == 0:
    print("🟢 MEGA ログイン成功")
else:
    print("❌ MEGA ログイン失敗")
    print(process.stderr)

# 3. ComfyUI/output から最新の .png ファイルを取得
output_path = Path("ComfyUI/output")
output_path.mkdir(parents=True, exist_ok=True)

print("ファイル一覧取得中（ComfyUI/output）")
result = subprocess.run("mega-ls /ComfyUI/output", shell=True, text=True, capture_output=True)
files = result.stdout.strip().split("\n") if result.stdout else []

png_files = [f for f in files if f.endswith(".png")]
if png_files:
    latest_png = sorted(png_files)[-1]  # 名前順で最新（タイムスタンプ取得は MEGAcmd では困難）
    print(f"最新ファイル: {latest_png}")
    run_cmd(f'mega-get "/ComfyUI/output/{latest_png}" "{output_path}"')
else:
    print("ComfyUI/output に .png ファイルが見つかりませんでした")

# 4. ComfyUI/user/default/workflows から .json をすべて取得
workflow_path = Path("ComfyUI/user/default/workflows")
workflow_path.mkdir(parents=True, exist_ok=True)

print("ファイル一覧取得中（ComfyUI/user/default/workflows）")
result = subprocess.run("mega-ls /ComfyUI/user/default/workflows", shell=True, text=True, capture_output=True)
files = result.stdout.strip().split("\n") if result.stdout else []

json_files = [f for f in files if f.endswith(".json")]
if json_files:
    for jf in json_files:
        print(f"ダウンロード中: {jf}")
        run_cmd(f'mega-get "/ComfyUI/user/default/workflows/{jf}" "{workflow_path}"')
else:
    print("ComfyUI/user/default/workflows に .json ファイルが見つかりませんでした")

# 5. ログアウト処理（安全のため明示）
run_cmd("mega-logout")
print("✅MEGAからの同期終了")

# 保存ツール他

In [None]:
# @title ワークフロー接続チェック
import json

# JSONファイルの読み込み
with open("workflow_api.json", "r", encoding="utf-8") as f:
    data = json.load(f)

nodes = data.get("nodes", [])
node_ids = {node["id"] for node in nodes}

# 全ノードの接続状態チェック
unconnected_inputs = []
invalid_references = []
connected_nodes = set()

for node in nodes:
    node_id = node["id"]
    inputs = node.get("inputs", {})

    for input_name, input_value in inputs.items():
        if isinstance(input_value, list) and len(input_value) == 2:
            source_node_id = input_value[0]
            if source_node_id not in node_ids:
                invalid_references.append({
                    "node_id": node_id,
                    "input": input_name,
                    "invalid_source": source_node_id
                })
            else:
                connected_nodes.add(node_id)
                connected_nodes.add(source_node_id)
        else:
            unconnected_inputs.append({
                "node_id": node_id,
                "input": input_name,
                "value": input_value
            })

# 孤立ノード（接続が一切ないノード）
isolated_nodes = [node for node in nodes if node["id"] not in connected_nodes]

# 結果表示
print("=== 未接続の入力 ===")
if unconnected_inputs:
    for item in unconnected_inputs:
        print(item)
else:
    print("ありません。")

print("\n=== 不正な接続先 ===")
if invalid_references:
    for item in invalid_references:
        print(item)
else:
    print("ありません。")

print("\n=== 孤立ノード ===")
if isolated_nodes:
    for node in isolated_nodes:
        print(f"Node ID {node['id']} ({node['type']})")
else:
    print("ありません。")


In [None]:
# @title BackUpツールインストール(GPU不要)
!wget https://mega.nz/linux/repo/xUbuntu_22.04/amd64/megacmd-xUbuntu_22.04_amd64.deb
!apt update && apt install -y ./megacmd-xUbuntu_22.04_amd64.deb

!git clone --depth=1 https://github.com/forester3/ImageGenSupporter.git

# Google Driveをマウント
from google.colab import drive
drive.mount('/content/drive')

## user/default
!mkdir -p /content/ComfyUI/user
!ln -s /content/drive/MyDrive/ComfyUI/user/default /content/ComfyUI/user/default

## output
!ln -s /content/drive/MyDrive/ComfyUI/output /content/ComfyUI/output

print("✅ Backup Tool インストール完了")

In [None]:
# @title MEGAに画像とワークフローを保存
import os
import subprocess
from pathlib import Path
from google.colab import userdata

def mega_login(email, password):
    result = subprocess.run(['mega-login', email, password], capture_output=True, text=True)
    print(result.stdout)
    if result.returncode != 0:
        raise RuntimeError("❌ MEGAログイン失敗: " + result.stderr)

def mega_logout():
    result = subprocess.run(['mega-logout'], capture_output=True, text=True)
    print(result.stdout)

def mega_upload(local_path, remote_path):
    local_path = str(local_path)
    result = subprocess.run(['mega-put', local_path, remote_path], capture_output=True, text=True)
    if result.returncode != 0:
        raise RuntimeError(f"アップロード失敗: {local_path} -> {remote_path}\n{result.stderr}")
    print(f"アップロード成功: {local_path} -> {remote_path}")

MEGA_EMAIL = userdata.get('MEGA_EMAIL')
MEGA_PASSWORD = userdata.get('MEGA_PASSWORD')
if not MEGA_EMAIL or not MEGA_PASSWORD:
    raise RuntimeError("❌ 環境変数 MEGA_EMAIL または MEGA_PASSWORD が設定されていません")

# ログイン実行
mega_login(MEGA_EMAIL, MEGA_PASSWORD)

# ComfyUI/output フォルダの全pngをアップロード
output_dir = Path("ComfyUI/output")
png_files = sorted(output_dir.glob("*.png"), key=lambda p: p.stat().st_mtime, reverse=True)
if png_files:
    print(f"アップロード対象のPNGファイル数: {len(png_files)}")
    for png_file in png_files:
        try:
            print(f"アップロード中: {png_file}")
            mega_upload(png_file, "/ComfyUI/output/")
        except Exception as e:
            print(f"エラー: {e}")
else:
    print("アップロード対象のpngファイルが見つかりません")

# ComfyUI/user/default/workflows のjsonファイルをすべてアップロード
workflow_dir = Path("ComfyUI/user/default/workflows")
workflow_files = list(workflow_dir.glob("*.json"))
if workflow_files:
    for wf_file in workflow_files:
        try:
            print(f"アップロード中: {wf_file}")
            mega_upload(wf_file, "/ComfyUI/user/default/workflows/")
        except Exception as e:
            print(f"エラー: {e}")
else:
    print("アップロード対象のワークフローファイルが見つかりません")

mega_logout()
print("✅ MEGAアップロード終了")