[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Vampire-Chan/SDUI/blob/main/ComfyUI.ipynb)


In [None]:
#@title **ComfyUI Environment Auto-Setup with Update Support**
import os
from IPython.display import clear_output

# ✅ User-configurable options
USE_GOOGLE_DRIVE = True  #@param {type:"boolean"}
UPDATE_COMFY_UI = True  #@param {type:"boolean"}

# ✅ Define workspace path
if USE_GOOGLE_DRIVE:
    print("🔗 Mounting Google Drive...")
    from google.colab import drive
    drive.mount('/content/drive')
    WORKSPACE = "/content/drive/MyDrive/ComfyUI"
else:
    WORKSPACE = "./ComfyUI"

COMFY_REPO = "https://github.com/comfyanonymous/ComfyUI"
MANAGER_REPO = "https://github.com/ltdrdata/ComfyUI-Manager.git"
CUSTOM_NODES_DIR = os.path.join(WORKSPACE, "custom_nodes", "ComfyUI-Manager")

# ✅ Clone or update ComfyUI
if not os.path.exists(WORKSPACE):
    print("📂 Cloning ComfyUI for the first time...")
    os.system(f"git clone {COMFY_REPO} {WORKSPACE}")
else:
    if UPDATE_COMFY_UI:
        print("🔄 Updating ComfyUI...")
        os.chdir(WORKSPACE)
        os.system("git reset --hard")
        os.system("git pull origin master")
    else:
        print("⏭️ Skipping ComfyUI update.")

# ✅ Clone or update ComfyUI-Manager
if not os.path.exists(CUSTOM_NODES_DIR):
    print("📂 Cloning ComfyUI-Manager for the first time...")
    os.system(f"git clone {MANAGER_REPO} {CUSTOM_NODES_DIR}")
else:
    if UPDATE_COMFY_UI:
        print("🔄 Updating ComfyUI-Manager...")
        os.chdir(CUSTOM_NODES_DIR)
        os.system("git reset --hard")
        os.system("git pull origin main")
    else:
        print("⏭️ Skipping ComfyUI-Manager update.")

# ✅ Install dependencies
os.chdir(WORKSPACE)
print("📦 Installing dependencies...")
os.system(
    "pip install xformers!=0.0.18 -r requirements.txt "
    "--extra-index-url https://download.pytorch.org/whl/cu121 "
    "--extra-index-url https://download.pytorch.org/whl/cu118 "
    "--extra-index-url https://download.pytorch.org/whl/cu117"
)

print("✅ Environment setup and update complete.")
clear_output()


In [3]:
import os
import requests
import shutil
import urllib.parse
import time
import sys

# 🔥 Custom Colab Form Inputs
MODEL_URL = "https://civitai.com/api/download/models/1671255?type=Model&format=SafeTensor"  #@param {type:"string"}
API_KEY = ""  #@param {type:"string"}
MODEL_TYPE = "loras"  #@param ["checkpoints", "clip_vision", "vae", "loras", "controlnet", "style_models", "upscale_models", "diffusion_models", "hypernetworks", "gligen", "custom_nodes"]
SAVE_TO_DRIVE = True  #@param {type:"boolean"}
USE_COMFYUI_FOLDER = False  #@param {type:"boolean"}


def setup_environment():
    """Setup the base directory dynamically based on user options."""
    if SAVE_TO_DRIVE:
        print("🔗 Mounting Google Drive...")
        from google.colab import drive
        drive.mount('/content/drive')
        base_dir = '/content/drive/MyDrive/ComfyUI'
    else:
        base_dir = './ComfyUI' if USE_COMFYUI_FOLDER else './content/sdui'
    return base_dir


# Human-readable time formatter for ETA
def human_readable_time(seconds):
    if seconds >= 3600:
        return f"{int(seconds // 3600)}h {int((seconds % 3600) // 60)}m {int(seconds % 60)}s"
    elif seconds >= 60:
        return f"{int(seconds // 60)}m {int(seconds % 60)}s"
    else:
        return f"{int(seconds)}s"


# Human-readable size formatter for bytes to KB, MB, GB
def human_readable_size(byte_count):
    for unit in ['', 'K', 'M', 'G', 'T']:
        if abs(byte_count) < 1024.0:
            return f"{byte_count:3.1f}{unit}B"
        byte_count /= 1024.0
    return f"{byte_count:.1f}PB"


def format_progress_bar(percent, bar_length=30):
    """Create a text-based progress bar similar to pip."""
    filled_length = int(bar_length * percent // 100)
    bar = '█' * filled_length + '-' * (bar_length - filled_length)
    return f"[{bar}]"


def download_file(url, save_path, api_key=None):
    """Download a file from a given URL and save it to the specified path with a progress bar, percentage, time left, and download speed."""
    try:
        headers = {}
        if api_key:
            headers['Authorization'] = f'Bearer {api_key}'

        response = requests.get(url, stream=True, allow_redirects=True, headers=headers)

        # Retry with API key if access denied and no API key used yet
        if response.status_code == 403 and not api_key:
            print("⚠️ Access denied. Retrying with API key...")
            return download_file(url, save_path, api_key=API_KEY)

        response.raise_for_status()

        # Determine filename
        filename = os.path.basename(urllib.parse.urlparse(url).path)
        if 'content-disposition' in response.headers:
            import cgi
            _, params = cgi.parse_header(response.headers['content-disposition'])
            filename = params.get('filename', filename)

        save_path = os.path.join(save_path, filename)

        total_size = int(response.headers.get('Content-Length', 0))
        downloaded = 0
        start_time = time.time()

        with open(save_path, 'wb') as file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    file.write(chunk)
                    downloaded += len(chunk)

                    percent = (downloaded / total_size) * 100 if total_size else 0
                    elapsed_time = time.time() - start_time
                    speed = (downloaded / (1024 * 1024)) / elapsed_time if elapsed_time > 0 else 0  # MB/s
                    remaining = total_size - downloaded
                    time_left = (remaining / (speed * 1024 * 1024)) if speed > 0 else 0

                    progress_bar = format_progress_bar(percent)
                    speedstr = f"{speed:.2f} MB/s"
                    timestr = human_readable_time(time_left)

                    sys.stdout.write(
                        f"\rDownloading {filename} {progress_bar} {percent:6.2f}% | "
                        f"{human_readable_size(downloaded)} / {human_readable_size(total_size)} | "
                        f"{speedstr} | ETA: {timestr}  "
                    )
                    sys.stdout.flush()

        print(f"\n✅ Downloaded: {filename} to {save_path} - Total Size: {human_readable_size(total_size)}")
    except requests.exceptions.RequestException as e:
        print(f"❌ Error downloading {url}: {e}")


# 🌍 Start setup environment
BASE_DIR = setup_environment()

# Define all possible model directories from extra_model_paths.yaml
MODEL_DIRS = {
    'checkpoints': 'models/checkpoints',
    'clip_vision': 'models/clip_vision',
    'vae': 'models/vae',
    'loras': 'models/loras',
    'controlnet': 'models/controlnet',
    'style_models': 'models/style_models',
    'upscale_models': 'models/upscale_models',
    'diffusion_models': 'models/diffusion_models',
    'hypernetworks': 'models/hypernetworks',
    'gligen': 'models/gligen',
    'custom_nodes': 'path/custom_nodes',
    'clip': 'models/clip',
    'configs': 'models/configs',
}

# Update model paths to use Google Drive, ComfyUI, or content/sdui
MODEL_DIRS = {key: os.path.join(BASE_DIR, path) for key, path in MODEL_DIRS.items()}

# Create directories if they do not exist
for path in MODEL_DIRS.values():
    os.makedirs(path, exist_ok=True)


# 🌍 Start model download
if MODEL_URL:
    print(f"⏳ Starting download for {MODEL_URL} to the '{MODEL_TYPE}' directory...")
    save_path = MODEL_DIRS.get(MODEL_TYPE, os.path.join(BASE_DIR, 'models'))
    download_file(MODEL_URL, save_path, api_key=API_KEY)
else:
    print("❌ No URL provided for download.")


🔗 Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
⏳ Starting download for https://civitai.com/api/download/models/1671255?type=Model&format=SafeTensor to the 'loras' directory...
❌ Error downloading https://civitai.com/api/download/models/1671255?type=Model&format=SafeTensor: 401 Client Error: Unauthorized for url: https://civitai.com/api/download/models/1671255?type=Model&format=SafeTensor


In [None]:
#@title **ComfyUI Launcher**
#@markdown Launch ComfyUI for Stable Diffusion on Colab.
#@markdown Uncheck **Google Drive** if running locally.
#@markdown If ComfyUI fails with "module not found," add the missing module to **required_modules**.

import os
import subprocess
import threading
import time
import socket
from google.colab import drive

# ✅ User options
mount_gdrive = True  #@param {type:"boolean"}
required_modules = ['torch', 'flask', 'flask_cors', 'requests', 'torchsde', 'spandrel', 'kornia', 'piexif', 'segment_anything']  #@param

# ✅ Step 1: Mount Google Drive if selected
folder_path = "/content/ComfyUI"
if mount_gdrive:
    print("🔗 Mounting Google Drive...")
    try:
        drive.mount('/content/drive')
        folder_path = "/content/drive/MyDrive/ComfyUI"
        print("✅ Google Drive mounted successfully!")
    except Exception as e:
        print(f"❌ Error mounting Google Drive: {e}")
        exit(1)
else:
    print("🖥️ Running without Google Drive.")

# ✅ Step 2: Check CUDA availability
def check_cuda():
    import torch
    print("🚀 CUDA Available!" if torch.cuda.is_available() else "⚠️ No GPU found, running on CPU.")
check_cuda()

# ✅ Step 3: Install missing modules
def install_module(module):
    try:
        __import__(module)
    except ModuleNotFoundError:
        print(f"❌ Module '{module}' not found, installing...")
        os.system(f"pip install {module}")
        print(f"✅ Module '{module}' installed!")

for mod in required_modules:
    install_module(mod)

# ✅ Step 4: Download & Install Cloudflared
def download_and_install_cloudflared():
    url = "https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb"
    deb_path = os.path.join(folder_path, "cloudflared.deb")

    print("📥 Downloading Cloudflared...")
    if os.system(f"wget --progress=bar:force {url} -O {deb_path}") != 0:
        print("❌ Failed to download Cloudflared.")
        exit(1)

    print("🛠️ Installing Cloudflared...")
    if os.system(f"dpkg -i {deb_path}") != 0:
        print("❌ Cloudflared installation failed.")
        exit(1)

    print("✅ Cloudflared is installed!")
download_and_install_cloudflared()

# ✅ Step 5: Launch ComfyUI & Monitor Cloudflared
def launch_cloudflared(port):
    while True:
        time.sleep(0.5)
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            if sock.connect_ex(('127.0.0.1', port)) == 0:
                break

    print("\n🎉 ComfyUI is ready, launching Cloudflared...\n")
    process = subprocess.Popen(["cloudflared", "tunnel", "--url", f"http://127.0.0.1:{port}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    for line in process.stderr:
        l = line.decode()
        if "trycloudflare.com" in l:
            print("🔗 Access ComfyUI at:", l[l.find("http"):], end='')

threading.Thread(target=launch_cloudflared, daemon=True, args=(8188,)).start()

# ✅ Step 6: Start ComfyUI
print("🚀 Starting ComfyUI...")
os.system(f"python {os.path.join(folder_path, 'main.py')} --dont-print-server")

🔗 Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
📁 Using storage at: /content/drive/MyDrive/ComfyUI
🚀 CUDA Available!


ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [None]:
#@title **📦 Compress & Clean Output Folder**
#@markdown Use this tool to compress your output folder and clean it after.
#@markdown Choose file format, compression level, and destination.
!pip install py7zr
!pip install rarfile
import os
import shutil
import zipfile
import sys
import subprocess
import py7zr
import rarfile
from tqdm import tqdm
from google.colab import drive

# ✅ **Mount Google Drive**
mount_gdrive = True  #@param {type:"boolean"}
if mount_gdrive:
    drive.mount('/content/drive')

# ✅ **User Options**
output_folder = "/content/drive/MyDrive/ComfyUI/output"  #@param {type: "string"}
archive_name = "output"  #@param {type: "string"}
archive_destination = "/content/drive/MyDrive/"  #@param {type: "string"}

file_type = "zip"  #@param ["zip", "7z", "rar"]
compression_level = "normal"  #@param ["store", "fast", "normal", "high", "maximum"]

# 🔥 **Compression Level Mapping**
compression_map = {
    'store': 0, 'fast': 1, 'normal': 5, 'high': 7, 'maximum': 9
}

def compress_and_clean_output(output_folder, archive_name, archive_destination, file_type, compression_level):
    """Compresses and cleans the output folder."""

    if not os.path.exists(output_folder):
        print(f"❌ Error: The folder '{output_folder}' does not exist.")
        return

    if not os.path.exists(archive_destination):
        print(f"❌ Error: The destination folder '{archive_destination}' does not exist.")
        return

    total_original_size = sum(
        os.path.getsize(os.path.join(root, file))
        for root, _, files in os.walk(output_folder)
        for file in files
    )

    extension = {'zip': '.zip', '7z': '.7z', 'rar': '.rar'}[file_type]
    archive_path = os.path.join(archive_destination, f"{archive_name}{extension}")
    compression_value = compression_map[compression_level]

    # ✅ **ZIP Compression**
    if file_type == 'zip':
        compression = zipfile.ZIP_STORED if compression_level == 'store' else zipfile.ZIP_DEFLATED
        with zipfile.ZipFile(archive_path, 'w', compression) as zipf:
            for root, _, files in os.walk(output_folder):
                for file in tqdm(files, desc="📦 Compressing as .zip", unit="file"):
                    zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), output_folder))

    # ✅ **7Z Compression**
    elif file_type == '7z':
        with py7zr.SevenZipFile(archive_path, 'w', filters=[{'id': py7zr.FILTER_LZMA2, 'preset': compression_value}]) as archive:
            for root, _, files in os.walk(output_folder):
                for file in tqdm(files, desc="📦 Compressing as .7z", unit="file"):
                    archive.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), output_folder))

    # ✅ **RAR Compression**
    elif file_type == 'rar':
        try:
            print("📦 Compressing as .rar (This may take some time, progress bar not supported)")
            subprocess.run(['rar', 'a', '-m' + str(compression_value), archive_path, output_folder], check=True)
        except subprocess.CalledProcessError:
            print("❌ Failed to create .rar archive. Install 'rar' using 'apt-get install rar'.")

    total_compressed_size = os.path.getsize(archive_path)
    compression_percentage = (1 - (total_compressed_size / total_original_size)) * 100 if total_original_size > 0 else 0

    print(f"📦 Original size: {total_original_size / 1024**2:.2f} MB")
    print(f"📦 Compressed size: {total_compressed_size / 1024**2:.2f} MB")
    print(f"📦 Compression percentage: {compression_percentage:.2f}%")

    # ✅ **Delete original files**
    for filename in os.listdir(output_folder):
        file_path = os.path.join(output_folder, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print(f'❌ Failed to delete {file_path}. Reason: {e}')

    print(f"✅ Successfully compressed and cleaned {output_folder}")

# 🔥 **Run the function**
compress_and_clean_output(output_folder, archive_name, archive_destination, file_type, compression_level)


Collecting rarfile
  Downloading rarfile-4.2-py3-none-any.whl.metadata (4.4 kB)
Downloading rarfile-4.2-py3-none-any.whl (29 kB)
Installing collected packages: rarfile
Successfully installed rarfile-4.2
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


📦 Compressing as .zip: 100%|██████████| 70/70 [00:18<00:00,  3.85file/s]


📦 Original size: 336.22 MB
📦 Compressed size: 335.62 MB
📦 Compression percentage: 0.18%
✅ Successfully compressed and cleaned /content/drive/MyDrive/ComfyUI/output
