<a href="https://colab.research.google.com/github/daviddaven-port/demon2/blob/master/Untitled13.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 🔹 STEP 1: Safe installs (with retries and verbose)
import sys, subprocess, os, time
def safe_install(packages):
    for package in packages:
        for attempt in range(2):
            try:
                print(f"⏳ Installing {package} (attempt {attempt+1})...")
                subprocess.check_call(
                    [sys.executable, "-m", "pip", "install", package, "-q", "--no-warn-script-location"]
                )
                print(f"✅ {package} installed successfully")
                break
            except Exception as e:
                print(f"⚠️ Failed to install {package}: {str(e)[:100]}")
                if attempt == 1:
                    print(f"❌ Critical: {package} failed permanently")

safe_install(["gradio", "pandas", "tqdm", "requests", "huggingface_hub", "git"])

# 🔹 STEP 2: Drive mount
from google.colab import drive
drive.mount("/content/drive")
BASE_DIR = "/content/drive/MyDrive/ModelManager"
os.makedirs(BASE_DIR, exist_ok=True)

# 🔹 STEP 3: Utilities
import requests, zipfile, tarfile, pandas as pd
from tqdm import tqdm
from huggingface_hub import snapshot_download
import re

def is_huggingface_url(url):
    """Detect if URL is a Hugging Face repository"""
    return bool(re.search(r'huggingface\.co/[^/]+/[^/]+', url))

def download_hf_repo(repo_id, dest):
    """Download Hugging Face repo using snapshot_download"""
    try:
        print(f"⏳ Downloading Hugging Face repository: {repo_id}")
        snapshot_download(
            repo_id=repo_id,
            local_dir=dest,
            local_dir_use_symlinks=False,
            resume_download=True
        )
        print(f"✅ Hugging Face repository downloaded to: {dest}")
        return True
    except Exception as e:
        print(f"❌ Hugging Face download failed: {str(e)[:200]}")
        return False

def download_file(url, dest):
    """Download with progress bar + retries"""
    for attempt in range(2):
        try:
            print(f"⏳ Downloading: {url}")
            r = requests.get(url, stream=True, timeout=60)
            total = int(r.headers.get("content-length", 0))
            with open(dest, "wb") as f, tqdm(
                desc="📥 Downloading",
                total=total,
                unit="B",
                unit_scale=True,
                unit_divisor=1024,
            ) as bar:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
                    bar.update(len(chunk))
            print(f"✅ Download complete: {dest}")
            return True
        except Exception as e:
            print(f"⚠️ Error downloading: {e}")
            if attempt == 1:
                return False

def extract_file(path, target):
    """Extract archives with error handling"""
    try:
        if path.endswith(".zip"):
            with zipfile.ZipFile(path, "r") as z:
                z.extractall(target)
        elif path.endswith(".tar") or path.endswith(".tar.gz"):
            with tarfile.open(path, "r:*") as t:
                t.extractall(target)
        else:
            print("⚠️ Unknown format, skipping extraction")
            return False
        print(f"✅ Extracted: {target}")
        return True
    except Exception as e:
        print(f"❌ Extraction failed: {e}")
        return False

def build_index(target_dir):
    """Index all files in a directory"""
    index = []
    for root, _, files in os.walk(target_dir):
        for f in files:
            path = os.path.join(root, f)
            try:
                size = os.path.getsize(path)
                index.append({"file": f, "path": path, "size_MB": round(size/1e6,2)})
            except Exception as e:
                print(f"⚠️ Skipped {path}: {e}")
    df = pd.DataFrame(index)
    csv_path = os.path.join(BASE_DIR, "model_index.csv")
    df.to_csv(csv_path, index=False)
    print(f"📑 Index saved: {csv_path}")
    return df

# 🔹 STEP 4: GUI with Gradio
import gradio as gr
import logging
logging.basicConfig(level=logging.INFO)

def process_url(url):
    """Process any URL - detects Hugging Face repos and handles appropriately"""
    url = url.strip()

    # Extract repo ID from Hugging Face URL if present
    if "huggingface.co" in url:
        # Extract repo_id like "Wan-AI/Wan2.2-T2V-A14B" from the URL
        match = re.search(r'huggingface\.co/([^/]+/[^/]+)', url)
        if match:
            repo_id = match.group(1)
            dest = os.path.join(BASE_DIR, repo_id.replace("/", "_"))
            os.makedirs(dest, exist_ok=True)
            if download_hf_repo(repo_id, dest):
                df = build_index(BASE_DIR)
                return f"✅ Success! Hugging Face repository '{repo_id}' downloaded to:\n{dest}\n\nIndexed {len(df)} files total."
            else:
                return "❌ Failed to download Hugging Face repository"
        else:
            return "❌ Could not parse Hugging Face repository ID from URL"

    # Handle GitHub repos
    elif "github.com" in url:
        repo_name = url.split("/")[-1].replace(".git", "")
        dest = os.path.join(BASE_DIR, repo_name)
        os.makedirs(dest, exist_ok=True)

        try:
            print(f"⏳ Cloning GitHub repository: {url}")
            subprocess.run(["git", "clone", url, dest], check=True)
            print(f"✅ GitHub repository cloned to: {dest}")
            df = build_index(BASE_DIR)
            return f"✅ Success! GitHub repository cloned to:\n{dest}\n\nIndexed {len(df)} files total."
        except Exception as e:
            return f"❌ GitHub clone failed: {str(e)[:200]}"

    # Handle direct file downloads
    else:
        filename = url.split("/")[-1]
        dest = os.path.join(BASE_DIR, filename)
        if not download_file(url, dest):
            return "❌ Download failed"
        if filename.endswith((".zip", ".tar", ".tar.gz")):
            if extract_file(dest, BASE_DIR):
                os.remove(dest)  # cleanup archive
        df = build_index(BASE_DIR)
        return f"✅ Done! Indexed {len(df)} files total."

with gr.Blocks() as demo:
    gr.Markdown("# 🗂️ Model Manager (Colab High-RAM Edition)")
    gr.Markdown("### Enter any model URL:\n- Hugging Face repo (e.g., https://huggingface.co/Wan-AI/Wan2.2-T2V-A14B)\n- GitHub repo\n- Direct file download")
    url = gr.Textbox(label="Enter URL", placeholder="https://huggingface.co/Wan-AI/Wan2.2-T2V-A14B")
    out = gr.Textbox(label="Status", lines=5)
    btn = gr.Button("Download + Index")
    btn.click(process_url, inputs=url, outputs=out)

    # Add example buttons for quick testing
    with gr.Row():
        hf_btn = gr.Button("Try Wan2.2-T2V-A14B (Hugging Face)")
        gh_btn = gr.Button("Try Wan2.1 (GitHub)")

    def set_hf_example():
        return "https://huggingface.co/Wan-AI/Wan2.2-T2V-A14B"

    def set_gh_example():
        return "https://github.com/Wan-Video/Wan2.1.git"

    hf_btn.click(set_hf_example, outputs=url)
    gh_btn.click(set_gh_example, outputs=url)

demo.launch()

⏳ Installing gradio (attempt 1)...
✅ gradio installed successfully
⏳ Installing pandas (attempt 1)...
✅ pandas installed successfully
⏳ Installing tqdm (attempt 1)...
✅ tqdm installed successfully
⏳ Installing requests (attempt 1)...
✅ requests installed successfully
⏳ Installing huggingface_hub (attempt 1)...
✅ huggingface_hub installed successfully
⏳ Installing git (attempt 1)...
⚠️ Failed to install git: Command '['/usr/bin/python3', '-m', 'pip', 'install', 'git', '-q', '--no-warn-script-location']' ret
⏳ Installing git (attempt 2)...
⚠️ Failed to install git: Command '['/usr/bin/python3', '-m', 'pip', 'install', 'git', '-q', '--no-warn-script-location']' ret
❌ Critical: git failed permanently
Mounted at /content/drive
It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, se

