# Civitai Model Downloader for ComfyUI

This notebook helps you download models from Civitai directly to your ComfyUI checkpoints folder.

## Features:
- Download models by URL or model ID
- Automatic file naming with model info
- Progress bar for downloads
- Support for different model types (checkpoints, LoRA, VAE, etc.)
- Resume interrupted downloads

In [None]:
# Install required packages if not already installed
import subprocess
import sys

def install_package(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Check and install required packages
try:
    import requests
except ImportError:
    print("Installing requests...")
    install_package("requests")
    import requests

try:
    from tqdm import tqdm
except ImportError:
    print("Installing tqdm...")
    install_package("tqdm")
    from tqdm import tqdm

print("✅ All required packages are installed")

In [None]:
import os
import json
import re
from pathlib import Path
from urllib.parse import urlparse, parse_qs
import time

# Configuration - Update these paths for your setup
COMFYUI_BASE_PATH = "/workspace/ComfyUI"  # Update this to your ComfyUI installation path

# Model type to folder mapping
MODEL_FOLDERS = {
    "checkpoint": "models/checkpoints",
    "lora": "models/loras",
    "vae": "models/vae",
    "upscaler": "models/upscale_models",
    "controlnet": "models/controlnet",
    "ipadapter": "models/ipadapter",
    "clip": "models/clip",
    "embedding": "models/embeddings"
}

# Civitai API configuration
CIVITAI_API_KEY = ""  # Optional: Add your Civitai API key for faster downloads

print(f"📁 ComfyUI Base Path: {COMFYUI_BASE_PATH}")
print("\n📂 Model folders:")
for model_type, folder in MODEL_FOLDERS.items():
    full_path = os.path.join(COMFYUI_BASE_PATH, folder)
    exists = "✅" if os.path.exists(full_path) else "❌"
    print(f"  {exists} {model_type}: {folder}")

In [None]:
class CivitaiDownloader:
    def __init__(self, base_path=COMFYUI_BASE_PATH, api_key=None):
        self.base_path = base_path
        self.api_key = api_key
        self.session = requests.Session()
        if api_key:
            self.session.headers.update({"Authorization": f"Bearer {api_key}"})
    
    def extract_model_info(self, url):
        """Extract model ID and version ID from Civitai URL"""
        # Pattern for model URLs: https://civitai.com/models/[model_id]
        # Pattern for direct download: https://civitai.com/api/download/models/[version_id]
        
        if "api/download/models" in url:
            match = re.search(r'/models/(\d+)', url)
            if match:
                return None, match.group(1)
        else:
            match = re.search(r'/models/(\d+)', url)
            if match:
                model_id = match.group(1)
                # Check for version in URL
                version_match = re.search(r'modelVersionId=(\d+)', url)
                version_id = version_match.group(1) if version_match else None
                return model_id, version_id
        
        return None, None
    
    def get_model_details(self, model_id=None, version_id=None):
        """Fetch model details from Civitai API"""
        try:
            if version_id:
                # Get version details
                api_url = f"https://civitai.com/api/v1/model-versions/{version_id}"
                response = self.session.get(api_url)
                if response.status_code == 200:
                    return response.json()
            
            if model_id:
                # Get model details
                api_url = f"https://civitai.com/api/v1/models/{model_id}"
                response = self.session.get(api_url)
                if response.status_code == 200:
                    data = response.json()
                    # Return the latest version
                    if data.get('modelVersions'):
                        return data['modelVersions'][0]
        except Exception as e:
            print(f"⚠️ Failed to fetch model details: {e}")
        
        return None
    
    def determine_model_type(self, model_info):
        """Determine the model type from model info"""
        if not model_info:
            return "checkpoint"
        
        # Check model type from API response
        model_type = model_info.get('model', {}).get('type', '').lower()
        
        type_mapping = {
            'checkpoint': 'checkpoint',
            'lora': 'lora',
            'locon': 'lora',
            'vae': 'vae',
            'upscaler': 'upscaler',
            'controlnet': 'controlnet',
            'textualinversion': 'embedding',
            'embedding': 'embedding'
        }
        
        return type_mapping.get(model_type, 'checkpoint')
    
    def get_download_url(self, model_info, version_id=None):
        """Get the direct download URL for a model"""
        if not model_info:
            if version_id:
                return f"https://civitai.com/api/download/models/{version_id}"
            return None
        
        # Get the primary file
        files = model_info.get('files', [])
        if files:
            primary_file = files[0]  # Usually the main model file
            download_url = primary_file.get('downloadUrl')
            if download_url:
                return download_url
        
        # Fallback to version ID
        version_id = model_info.get('id')
        if version_id:
            return f"https://civitai.com/api/download/models/{version_id}"
        
        return None
    
    def get_filename(self, model_info, response_headers=None):
        """Generate a filename for the model"""
        filename = None
        
        # Try to get filename from model info
        if model_info:
            files = model_info.get('files', [])
            if files:
                filename = files[0].get('name')
            
            if not filename:
                # Generate filename from model name and version
                model_name = model_info.get('model', {}).get('name', 'model')
                version_name = model_info.get('name', 'v1')
                model_name = re.sub(r'[^\w\s-]', '', model_name).strip()
                version_name = re.sub(r'[^\w\s-]', '', version_name).strip()
                filename = f"{model_name}_{version_name}.safetensors"
        
        # Try to get filename from response headers
        if not filename and response_headers:
            content_disposition = response_headers.get('content-disposition', '')
            if 'filename=' in content_disposition:
                filename = content_disposition.split('filename=')[1].strip('"')
        
        # Default filename
        if not filename:
            filename = f"model_{int(time.time())}.safetensors"
        
        return filename
    
    def download_file(self, url, destination, filename=None, resume=True):
        """Download a file with progress bar and resume support"""
        headers = {}
        mode = 'wb'
        resume_pos = 0
        
        # Check if file exists for resume
        if resume and filename:
            filepath = os.path.join(destination, filename)
            if os.path.exists(filepath):
                resume_pos = os.path.getsize(filepath)
                headers['Range'] = f'bytes={resume_pos}-'
                mode = 'ab'
                print(f"📂 Resuming download from {resume_pos:,} bytes")
        
        # Start download
        response = self.session.get(url, headers=headers, stream=True)
        response.raise_for_status()
        
        # Get filename if not provided
        if not filename:
            filename = self.get_filename(None, response.headers)
        
        # Get total file size
        total_size = int(response.headers.get('content-length', 0))
        if resume_pos > 0:
            total_size += resume_pos
        
        # Create destination directory
        os.makedirs(destination, exist_ok=True)
        filepath = os.path.join(destination, filename)
        
        # Download with progress bar
        with open(filepath, mode) as f:
            with tqdm(total=total_size, initial=resume_pos, unit='B', unit_scale=True, desc=filename) as pbar:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        pbar.update(len(chunk))
        
        return filepath
    
    def download_model(self, url, model_type=None, custom_name=None):
        """Main download function"""
        print(f"\n🔍 Processing URL: {url}")
        
        # Extract model information
        model_id, version_id = self.extract_model_info(url)
        
        # Get model details from API
        model_info = self.get_model_details(model_id, version_id)
        
        if model_info:
            model_name = model_info.get('model', {}).get('name', 'Unknown')
            version_name = model_info.get('name', 'Unknown')
            print(f"📦 Model: {model_name} - {version_name}")
        
        # Determine model type
        if not model_type:
            model_type = self.determine_model_type(model_info)
        print(f"📁 Model Type: {model_type}")
        
        # Get destination folder
        folder = MODEL_FOLDERS.get(model_type, MODEL_FOLDERS['checkpoint'])
        destination = os.path.join(self.base_path, folder)
        
        # Get download URL
        download_url = self.get_download_url(model_info, version_id)
        if not download_url:
            download_url = url  # Use original URL as fallback
        
        # Add API key to URL if available
        if self.api_key and 'civitai.com' in download_url:
            separator = '&' if '?' in download_url else '?'
            download_url = f"{download_url}{separator}token={self.api_key}"
        
        # Get filename
        if custom_name:
            filename = custom_name
        else:
            filename = self.get_filename(model_info)
        
        print(f"💾 Downloading to: {destination}")
        print(f"📄 Filename: {filename}")
        
        try:
            filepath = self.download_file(download_url, destination, filename)
            print(f"\n✅ Successfully downloaded: {filepath}")
            
            # Save model info
            if model_info:
                info_file = filepath + '.json'
                with open(info_file, 'w') as f:
                    json.dump(model_info, f, indent=2)
                print(f"📋 Model info saved to: {info_file}")
            
            return filepath
        
        except Exception as e:
            print(f"\n❌ Download failed: {e}")
            return None

# Initialize downloader
downloader = CivitaiDownloader(COMFYUI_BASE_PATH, CIVITAI_API_KEY)
print("✅ Downloader initialized")

## Download Models

### Method 1: Download by URL
Simply paste the Civitai model URL to download it.

In [None]:
# Example: Download a model by URL
# Replace with your desired model URL
model_url = "https://civitai.com/models/4384/dreamshaper"

# Optional: Specify model type (checkpoint, lora, vae, etc.)
# If not specified, it will be auto-detected
model_type = None  # or "checkpoint", "lora", "vae", etc.

# Download the model
result = downloader.download_model(model_url, model_type=model_type)

### Method 2: Batch Download
Download multiple models at once.

In [None]:
# List of models to download
models_to_download = [
    {
        "url": "https://civitai.com/models/4384/dreamshaper",
        "type": "checkpoint"  # optional
    },
    {
        "url": "https://civitai.com/models/7808/easynegative",
        "type": "embedding"
    },
    # Add more models here
]

# Download all models
results = []
for model in models_to_download:
    print(f"\n{'='*60}")
    result = downloader.download_model(
        model['url'], 
        model_type=model.get('type')
    )
    results.append(result)
    print(f"{'='*60}")

# Summary
print("\n📊 Download Summary:")
success_count = sum(1 for r in results if r is not None)
print(f"✅ Successful: {success_count}/{len(results)}")
print(f"❌ Failed: {len(results) - success_count}/{len(results)}")

### Method 3: Download by Model/Version ID
If you know the specific model or version ID.

In [None]:
# Download by version ID (more specific)
version_id = "128713"  # Example version ID
download_url = f"https://civitai.com/api/download/models/{version_id}"

result = downloader.download_model(download_url)

## Utility Functions

In [None]:
def list_downloaded_models():
    """List all downloaded models in ComfyUI folders"""
    print("📦 Downloaded Models:\n")
    
    for model_type, folder in MODEL_FOLDERS.items():
        path = os.path.join(COMFYUI_BASE_PATH, folder)
        if os.path.exists(path):
            files = [f for f in os.listdir(path) 
                    if f.endswith(('.safetensors', '.ckpt', '.pt', '.pth', '.bin'))]
            
            if files:
                print(f"📁 {model_type.upper()} ({len(files)} models):")
                for file in sorted(files)[:5]:  # Show first 5
                    size = os.path.getsize(os.path.join(path, file)) / (1024**3)  # GB
                    print(f"   • {file} ({size:.2f} GB)")
                if len(files) > 5:
                    print(f"   ... and {len(files) - 5} more")
                print()

# List current models
list_downloaded_models()

In [None]:
def clean_duplicate_models():
    """Remove duplicate model files (keeps the largest file)"""
    print("🧹 Checking for duplicate models...\n")
    
    for model_type, folder in MODEL_FOLDERS.items():
        path = os.path.join(COMFYUI_BASE_PATH, folder)
        if not os.path.exists(path):
            continue
        
        # Group files by base name
        file_groups = {}
        for file in os.listdir(path):
            if file.endswith(('.safetensors', '.ckpt', '.pt', '.pth')):
                base_name = re.sub(r'[_-]v\d+.*', '', file.rsplit('.', 1)[0])
                if base_name not in file_groups:
                    file_groups[base_name] = []
                file_groups[base_name].append(file)
        
        # Check for duplicates
        for base_name, files in file_groups.items():
            if len(files) > 1:
                print(f"📁 Found duplicates for {base_name}:")
                
                # Sort by file size (keep largest)
                files_with_size = []
                for file in files:
                    filepath = os.path.join(path, file)
                    size = os.path.getsize(filepath)
                    files_with_size.append((file, size, filepath))
                
                files_with_size.sort(key=lambda x: x[1], reverse=True)
                
                print(f"   ✅ Keeping: {files_with_size[0][0]} ({files_with_size[0][1]/(1024**3):.2f} GB)")
                for file, size, filepath in files_with_size[1:]:
                    print(f"   ❌ Would remove: {file} ({size/(1024**3):.2f} GB)")
                    # Uncomment to actually delete:
                    # os.remove(filepath)
                print()

# Check for duplicates (dry run - doesn't delete)
clean_duplicate_models()

In [None]:
def get_disk_usage():
    """Check disk usage of model folders"""
    print("💾 Disk Usage Report:\n")
    
    total_size = 0
    folder_sizes = {}
    
    for model_type, folder in MODEL_FOLDERS.items():
        path = os.path.join(COMFYUI_BASE_PATH, folder)
        if os.path.exists(path):
            size = 0
            for dirpath, dirnames, filenames in os.walk(path):
                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    if os.path.exists(filepath):
                        size += os.path.getsize(filepath)
            
            folder_sizes[model_type] = size
            total_size += size
    
    # Sort by size
    sorted_folders = sorted(folder_sizes.items(), key=lambda x: x[1], reverse=True)
    
    for model_type, size in sorted_folders:
        size_gb = size / (1024**3)
        if size_gb > 0.01:  # Only show if > 10MB
            bar_length = int(size_gb / max(folder_sizes.values()) * (1024**3) * 40)
            bar = '█' * bar_length
            print(f"{model_type:12} {bar} {size_gb:.2f} GB")
    
    print(f"\n📊 Total: {total_size/(1024**3):.2f} GB")

# Check disk usage
get_disk_usage()

## Quick Download Templates

Common models you might want to download:

In [None]:
# Popular SD 1.5 checkpoints
sd15_models = [
    "https://civitai.com/models/4384/dreamshaper",  # DreamShaper
    "https://civitai.com/models/6424/chilloutmix",  # ChilloutMix
    "https://civitai.com/models/43331/majicmix-realistic",  # MajicMix
]

# Popular SDXL checkpoints
sdxl_models = [
    "https://civitai.com/models/101055/sd-xl",  # SDXL Base
    "https://civitai.com/models/112902/dreamshaper-xl",  # DreamShaper XL
    "https://civitai.com/models/133005/juggernaut-xl",  # Juggernaut XL
]

# Essential embeddings
embeddings = [
    "https://civitai.com/models/7808/easynegative",  # EasyNegative
    "https://civitai.com/models/16993/badhandsv4-animeillustdiffusion",  # BadHands
]

# Choose which set to download
# models_to_download = sd15_models  # Uncomment to download SD 1.5 models
# models_to_download = sdxl_models  # Uncomment to download SDXL models
# models_to_download = embeddings  # Uncomment to download embeddings

print("Ready to download. Uncomment the model set you want to download.")