<a href="https://colab.research.google.com/github/drf0rk/AnxLight/blob/main/notebook/AnxLight_Launcher_v0.1.1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Cell 1: AnxLight v3 Pre-Flight Setup with Comprehensive Testing Suite
# This cell ALWAYS deletes the old repository to ensure the latest version is used.
# Enhanced with WebUI verification, model testing, and fancy visual feedback.

import os
import subprocess
import sys
import shutil
import time
import signal
import threading
import psutil
import json
import requests
from datetime import datetime
from pathlib import Path
from IPython.display import HTML, display, Javascript

# --- Configuration ---
repo_path = '/content/AnxLight'

# --- Unified Logging Setup ---
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_dir = f"/content/anxlight_logs_{timestamp}"
os.makedirs(log_dir, exist_ok=True)
session_log_file = f"{log_dir}/session_log_{timestamp}.txt"
os.environ['ANXLIGHT_LOG_FILE'] = session_log_file

def log_message(message, level="INFO"):
    timestamp_msg = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    formatted_msg = f"[{timestamp_msg}] [{level}] {message}"
    print(formatted_msg)
    with open(session_log_file, "a") as f:
        f.write(formatted_msg + "\n")

# --- Fancy Header and CSS ---
header_html = """
<style>
    @import url('https://fonts.googleapis.com/css2?family=Orbitron:wght@400;700;900&display=swap');

    .anxlight-header {
        background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
        border-radius: 20px;
        padding: 30px;
        margin: 20px 0;
        box-shadow: 0 10px 30px rgba(0,0,0,0.3);
        position: relative;
        overflow: hidden;
    }

    .anxlight-header::before {
        content: '';
        position: absolute;
        top: -50%;
        right: -50%;
        width: 200%;
        height: 200%;
        background: repeating-linear-gradient(
            45deg,
            transparent,
            transparent 10px,
            rgba(255,255,255,0.05) 10px,
            rgba(255,255,255,0.05) 20px
        );
        animation: slide 20s linear infinite;
    }

    @keyframes slide {
        0% { transform: translate(0, 0); }
        100% { transform: translate(50px, 50px); }
    }

    .anxlight-title {
        font-family: 'Orbitron', monospace;
        font-size: 48px;
        font-weight: 900;
        color: #ffffff;
        text-align: center;
        margin: 0;
        text-shadow: 0 0 20px rgba(255,255,255,0.5);
        position: relative;
        z-index: 1;
    }

    .anxlight-subtitle {
        font-family: 'Orbitron', monospace;
        font-size: 18px;
        color: #e0e0e0;
        text-align: center;
        margin: 10px 0 0 0;
        position: relative;
        z-index: 1;
    }

    .test-container {
        background: #1a1a2e;
        border-radius: 15px;
        padding: 20px;
        margin: 20px 0;
        box-shadow: 0 5px 20px rgba(0,0,0,0.3);
    }

    .test-grid {
        display: grid;
        grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
        gap: 15px;
        margin-top: 20px;
    }

    .test-card {
        background: #16213e;
        border-radius: 10px;
        padding: 15px;
        border: 2px solid #0f3460;
        transition: all 0.3s ease;
        position: relative;
        overflow: hidden;
    }

    .test-card::before {
        content: '';
        position: absolute;
        top: 0;
        left: -100%;
        width: 100%;
        height: 100%;
        background: linear-gradient(90deg, transparent, rgba(255,255,255,0.1), transparent);
        transition: left 0.5s ease;
    }

    .test-card:hover::before {
        left: 100%;
    }

    .test-card.success {
        border-color: #4ade80;
        background: #16213e;
        box-shadow: 0 0 15px rgba(74, 222, 128, 0.3);
    }

    .test-card.warning {
        border-color: #fbbf24;
        background: #16213e;
        box-shadow: 0 0 15px rgba(251, 191, 36, 0.3);
    }

    .test-card.error {
        border-color: #f87171;
        background: #16213e;
        box-shadow: 0 0 15px rgba(248, 113, 113, 0.3);
    }

    .test-card.testing {
        border-color: #60a5fa;
        background: #16213e;
        animation: pulse 2s infinite;
    }

    @keyframes pulse {
        0% { box-shadow: 0 0 0 0 rgba(96, 165, 250, 0.7); }
        70% { box-shadow: 0 0 0 10px rgba(96, 165, 250, 0); }
        100% { box-shadow: 0 0 0 0 rgba(96, 165, 250, 0); }
    }

    .test-title {
        font-family: 'Orbitron', monospace;
        font-size: 18px;
        font-weight: 700;
        color: #ffffff;
        margin-bottom: 10px;
        display: flex;
        align-items: center;
        justify-content: space-between;
    }

    .test-status {
        font-size: 24px;
        margin-left: 10px;
    }

    .test-details {
        font-family: monospace;
        font-size: 12px;
        color: #94a3b8;
        margin-top: 5px;
        max-height: 100px;
        overflow-y: auto;
    }

    .progress-bar {
        width: 100%;
        height: 30px;
        background: #0f3460;
        border-radius: 15px;
        overflow: hidden;
        margin: 20px 0;
        position: relative;
    }

    .progress-fill {
        height: 100%;
        background: linear-gradient(90deg, #4ade80, #22d3ee);
        transition: width 0.5s ease;
        display: flex;
        align-items: center;
        justify-content: center;
        color: #000;
        font-weight: bold;
        font-family: 'Orbitron', monospace;
    }

    .summary-box {
        background: linear-gradient(135deg, #1e3a8a 0%, #312e81 100%);
        border-radius: 15px;
        padding: 25px;
        margin: 20px 0;
        text-align: center;
        color: white;
        font-family: 'Orbitron', monospace;
    }

    .summary-title {
        font-size: 28px;
        font-weight: 900;
        margin-bottom: 15px;
    }

    .summary-stats {
        display: flex;
        justify-content: space-around;
        margin-top: 20px;
    }

    .stat-item {
        text-align: center;
    }

    .stat-number {
        font-size: 36px;
        font-weight: 900;
    }

    .stat-label {
        font-size: 14px;
        opacity: 0.8;
    }

    .loading-spinner {
        display: inline-block;
        width: 20px;
        height: 20px;
        border: 3px solid rgba(255,255,255,0.3);
        border-radius: 50%;
        border-top-color: #fff;
        animation: spin 1s ease-in-out infinite;
    }

    @keyframes spin {
        to { transform: rotate(360deg); }
    }
</style>

<div class="anxlight-header">
    <h1 class="anxlight-title">🚀 AnxLight v3</h1>
    <p class="anxlight-subtitle">Pre-Flight System Verification & Setup</p>
</div>

<div id="test-results" class="test-container">
    <h2 style="color: #60a5fa; font-family: 'Orbitron', monospace; text-align: center;">
        System Testing Progress
    </h2>
    <div class="progress-bar">
        <div id="progress-fill" class="progress-fill" style="width: 0%">0%</div>
    </div>
    <div id="test-grid" class="test-grid"></div>
</div>
"""

display(HTML(header_html))

# --- Test Results Container ---
test_results_html = """
<div id="test-results-container"></div>
<script>
    // JavaScript for updating test results
    window.testResults = {};
    window.totalTests = 0;
    window.completedTests = 0;

    function updateTestCard(testName, status, details) {
        const testId = testName.replace(/[^a-zA-Z0-9]/g, '-');
        let card = document.getElementById(`test-${testId}`);

        if (!card) {
            // Create new test card
            card = document.createElement('div');
            card.id = `test-${testId}`;
            card.className = 'test-card testing';
            card.innerHTML = `
                <div class="test-title">
                    ${testName}
                    <span class="test-status"><div class="loading-spinner"></div></span>
                </div>
                <div class="test-details">Testing...</div>
            `;
            document.getElementById('test-grid').appendChild(card);
        }

        // Update status
        const statusIcons = {
            'passed': '✅',
            'failed': '❌',
            'warning': '⚠️',
            'testing': '<div class="loading-spinner"></div>'
        };

        card.className = `test-card ${status}`;
        card.querySelector('.test-status').innerHTML = statusIcons[status] || '❓';
        card.querySelector('.test-details').innerHTML = details || 'No details';

        // Update progress
        if (status !== 'testing') {
            window.completedTests++;
            const progress = Math.round((window.completedTests / window.totalTests) * 100);
            document.getElementById('progress-fill').style.width = progress + '%';
            document.getElementById('progress-fill').innerHTML = progress + '%';
        }
    }

    function showSummary(passed, warning, failed) {
        const summaryHtml = `
            <div class="summary-box">
                <div class="summary-title">🎯 Testing Complete!</div>
                <div class="summary-stats">
                    <div class="stat-item">
                        <div class="stat-number" style="color: #4ade80;">${passed}</div>
                        <div class="stat-label">Passed</div>
                    </div>
                    <div class="stat-item">
                        <div class="stat-number" style="color: #fbbf24;">${warning}</div>
                        <div class="stat-label">Warnings</div>
                    </div>
                    <div class="stat-item">
                        <div class="stat-number" style="color: #f87171;">${failed}</div>
                        <div class="stat-label">Failed</div>
                    </div>
                </div>
            </div>
        `;
        document.getElementById('test-results').insertAdjacentHTML('beforeend', summaryHtml);
    }
</script>
"""

display(HTML(test_results_html))

# --- JavaScript Helper Functions ---
def update_test_ui(test_name, status, details=""):
    """Update the test UI with JavaScript"""
    js_code = f"""
    updateTestCard("{test_name}", "{status}", "{details.replace('"', '\\"').replace('\n', '<br>')}");
    """
    display(Javascript(js_code))

def set_total_tests(count):
    """Set the total number of tests"""
    js_code = f"window.totalTests = {count};"
    display(Javascript(js_code))

# --- Main Setup Code ---
log_message("--- CELL 1: PRE-FLIGHT SETUP (ALWAYS FORCING CLEAN CLONE) ---")

# Step 1: Clone repository (your existing code)
if os.path.exists(repo_path):
    log_message(f"Removing existing directory at {repo_path} to ensure a clean state...", "WARNING")
    try:
        shutil.rmtree(repo_path)
        log_message("Directory removed successfully.")
    except Exception as e:
        log_message(f"Error removing directory: {e}. Please check permissions.", "ERROR")
        sys.exit(1)

log_message(f"Cloning fresh AnxLight repository to {repo_path}...")
try:
    subprocess.run(
        ['git', 'clone', 'https://github.com/drf0rk/AnxLight.git', repo_path],
        check=True, capture_output=True, text=True, timeout=120
    )
    log_message("Fresh clone successful.")
except Exception as e:
    log_message(f"FATAL: Failed to clone repository. Error: {e}", "ERROR")
    sys.exit(1)

# Step 2: Set Environment & Execute Setup
os.chdir(repo_path)
log_message(f"Current working directory: {os.getcwd()}")
os.environ['PROJECT_ROOT'] = repo_path

pre_flight_script_path = os.path.join(repo_path, 'scripts', 'pre_flight_setup.py')
if not os.path.exists(pre_flight_script_path):
    log_message(f"FATAL: Pre-flight script not found at {pre_flight_script_path}", "ERROR")
    sys.exit(1)

log_message(f"Executing Pre-Flight Setup Script: {pre_flight_script_path}")
try:
    process = subprocess.Popen(
        [sys.executable, pre_flight_script_path],
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True
    )
    with open(session_log_file, "a") as f:
        for line in iter(process.stdout.readline, ''):
            print(line, end='')
            f.write(line)
    if process.wait() != 0:
        log_message("Pre-flight setup script failed.", "ERROR")
        sys.exit(1)
    log_message("Pre-Flight Setup Script Finished Successfully.")
except Exception as e:
    log_message(f"FATAL: An unexpected error occurred during pre-flight setup: {e}", "ERROR")
    sys.exit(1)

# === COMPREHENSIVE TESTING PHASE ===
print("\n" + "="*60)
print("🔍 STARTING COMPREHENSIVE SYSTEM VERIFICATION")
print("="*60)

class SystemTester:
    def __init__(self):
        self.test_results = {}
        self.tests_to_run = [
            ("System Environment", self.test_system_env),
            ("Python Virtual Environment", self.test_venv),
            ("CUDA/GPU Access", self.test_cuda),
            ("Core Python Packages", self.test_packages),
            ("Disk Space", self.test_disk_space),
            ("Network Connectivity", self.test_network),
            ("A1111 WebUI", self.test_a1111),
            ("ComfyUI WebUI", self.test_comfyui),
            ("Forge WebUI", self.test_forge),
            ("Model Download Test", self.test_model_download),
            ("Extension Test", self.test_extensions),
            ("API Functionality", self.test_api)
        ]
        set_total_tests(len(self.tests_to_run))

    def run_all_tests(self):
        """Run all system tests"""
        for test_name, test_func in self.tests_to_run:
            update_test_ui(test_name, "testing", "Running test...")
            try:
                result = test_func()
                self.test_results[test_name] = result
                update_test_ui(test_name, result['status'], result.get('message', ''))
                time.sleep(0.5)  # Visual effect
            except Exception as e:
                self.test_results[test_name] = {'status': 'error', 'message': str(e)}
                update_test_ui(test_name, "failed", f"Error: {str(e)}")

    def test_system_env(self):
        """Test basic system environment"""
        try:
            info = []
            info.append(f"Python: {sys.version.split()[0]}")
            info.append(f"OS: {os.uname().sysname}")
            info.append(f"CPU cores: {os.cpu_count()}")

            # Check RAM
            mem = psutil.virtual_memory()
            info.append(f"RAM: {mem.total // (1024**3)}GB")

            return {'status': 'passed', 'message': ' | '.join(info)}
        except Exception as e:
            return {'status': 'failed', 'message': str(e)}

    def test_venv(self):
        """Test virtual environment"""
        venv_python = '/content/AnxLight/anxlight_venv/bin/python'
        if os.path.exists(venv_python):
            result = subprocess.run([venv_python, '--version'], capture_output=True, text=True)
            if result.returncode == 0:
                return {'status': 'passed', 'message': result.stdout.strip()}
        return {'status': 'failed', 'message': 'Virtual environment not found'}

    def test_cuda(self):
        """Test CUDA/GPU availability"""
        try:
            result = subprocess.run(['nvidia-smi'], capture_output=True, text=True)
            if result.returncode == 0:
                for line in result.stdout.split('\n'):
                    if 'Tesla' in line or 'RTX' in line or 'GTX' in line:
                        gpu_info = line.strip().split()[2:5]
                        return {'status': 'passed', 'message': f'GPU: {" ".join(gpu_info)}'}
                return {'status': 'passed', 'message': 'GPU detected'}
            return {'status': 'failed', 'message': 'No GPU detected'}
        except:
            return {'status': 'failed', 'message': 'nvidia-smi not available'}

    def test_packages(self):
        """Test critical Python packages"""
        venv_python = '/content/AnxLight/anxlight_venv/bin/python'
        packages = ['torch', 'torchvision', 'gradio', 'transformers', 'diffusers', 'xformers']

        failed = []
        for package in packages:
            result = subprocess.run(
                [venv_python, '-c', f'import {package}'],
                capture_output=True
            )
            if result.returncode != 0:
                failed.append(package)

        if not failed:
            return {'status': 'passed', 'message': f'All {len(packages)} packages OK'}
        elif len(failed) < 3:
            return {'status': 'warning', 'message': f'Missing: {", ".join(failed)}'}
        else:
            return {'status': 'failed', 'message': f'Missing {len(failed)} packages'}

    def test_disk_space(self):
        """Test available disk space"""
        stat = os.statvfs('/content')
        free_gb = (stat.f_bavail * stat.f_frsize) / (1024**3)

        if free_gb < 5:
            return {'status': 'failed', 'message': f'{free_gb:.1f}GB free (need 5GB+)'}
        elif free_gb < 10:
            return {'status': 'warning', 'message': f'{free_gb:.1f}GB free'}
        else:
            return {'status': 'passed', 'message': f'{free_gb:.1f}GB free'}

    def test_network(self):
        """Test network connectivity"""
        try:
            response = requests.get('https://huggingface.co', timeout=5)
            if response.status_code == 200:
                return {'status': 'passed', 'message': 'Internet OK'}
        except:
            pass
        return {'status': 'failed', 'message': 'Network issues'}

    def test_webui_generic(self, name, path, test_file):
        """Generic WebUI test"""
        webui_path = Path(f'/content/AnxLight/{path}')
        if not webui_path.exists():
            return {'status': 'warning', 'message': 'Not installed'}

        test_path = webui_path / test_file
        if test_path.exists():
            return {'status': 'passed', 'message': f'{name} files OK'}
        else:
            return {'status': 'failed', 'message': 'Missing core files'}

    def test_a1111(self):
        return self.test_webui_generic('A1111', 'A1111', 'launch.py')

    def test_comfyui(self):
        return self.test_webui_generic('ComfyUI', 'ComfyUI', 'main.py')

    def test_forge(self):
        return self.test_webui_generic('Forge', 'Forge', 'launch.py')

    def test_model_download(self):
        """Test model download capability"""
        try:
            # Test downloading a small file
            test_url = 'https://huggingface.co/runwayml/stable-diffusion-v1-5/raw/main/model_index.json'
            response = requests.get(test_url, timeout=10)
            if response.status_code == 200:
                return {'status': 'passed', 'message': 'Download test OK'}
        except:
            pass
        return {'status': 'failed', 'message': 'Download test failed'}

    def test_extensions(self):
        """Test if key extensions exist"""
        ext_count = 0
        for webui in ['A1111', 'Forge']:
            ext_dir = Path(f'/content/AnxLight/{webui}/extensions')
            if ext_dir.exists():
                ext_count += len(list(ext_dir.glob('*')))

        if ext_count > 5:
            return {'status': 'passed', 'message': f'{ext_count} extensions found'}
        elif ext_count > 0:
            return {'status': 'warning', 'message': f'Only {ext_count} extensions'}
        else:
            return {'status': 'failed', 'message': 'No extensions found'}

    def test_api(self):
        """Test API readiness"""
        # Just check if launch scripts have API flags available
        api_ready = 0
        for webui in ['A1111', 'Forge']:
            launch_file = Path(f'/content/AnxLight/{webui}/launch.py')
            if launch_file.exists():
                content = launch_file.read_text()
                if '--api' in content or 'api' in content.lower():
                    api_ready += 1

        if api_ready >= 2:
            return {'status': 'passed', 'message': 'API support ready'}
        elif api_ready > 0:
            return {'status': 'warning', 'message': 'Partial API support'}
        else:
            return {'status': 'failed', 'message': 'No API support'}

# Run all tests
tester = SystemTester()
tester.run_all_tests()

# Calculate summary
passed = sum(1 for r in tester.test_results.values() if r['status'] == 'passed')
warning = sum(1 for r in tester.test_results.values() if r['status'] == 'warning')
failed = sum(1 for r in tester.test_results.values() if r['status'] in ['failed', 'error'])

# Show summary
time.sleep(1)
display(Javascript(f"showSummary({passed}, {warning}, {failed})"))

# Save results
with open('/content/AnxLight/test_results.json', 'w') as f:
    json.dump(tester.test_results, f, indent=2)

# Decision logic
critical_tests = ['Python Virtual Environment', 'Core Python Packages', 'Disk Space']
critical_failures = [test for test in critical_tests if tester.test_results.get(test, {}).get('status') == 'failed']

if critical_failures:
    print("\n🚨 CRITICAL FAILURES DETECTED!")
    print(f"Failed: {', '.join(critical_failures)}")
    print("❌ Cannot continue. Please check errors above.")
else:
    print("\n✅ Pre-flight checks complete!")
    if failed > 0:
        print(f"⚠️  {failed} non-critical issues found, but you can continue.")
    print("🚀 Proceeding to configuration phase in 3 seconds...")
    time.sleep(3)
    display(Javascript('IPython.notebook.execute_cell(1)'))  # Run Cell 2

log_message("--- CELL 1: PRE-FLIGHT SETUP END ---")

In [None]:
# @title Cell 2: AnxLight v3 Gradio App Launcher (Shell-Based - No Iframe)
# Using shell execution to completely prevent inline iframe embedding

import os
import sys
import subprocess
import time
import re
from datetime import datetime

# --- Logging Setup ---
session_log_file = os.environ.get('ANXLIGHT_LOG_FILE')
if not session_log_file:
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_dir = f"/content/anxlight_logs_{timestamp}"
    os.makedirs(log_dir, exist_ok=True)
    session_log_file = f"{log_dir}/session_log_{timestamp}.txt"

def log_message(message, level="INFO"):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    formatted_msg = f"[{timestamp}] [{level}] {message}"
    print(formatted_msg)
    with open(session_log_file, "a") as f:
        f.write(formatted_msg + "\n")

log_message("--- CELL 2: SHELL-BASED LAUNCHER (NO IFRAME) ---")

# --- Fix the json_utils.save error first ---
def fix_json_save_error():
    try:
        with open('/content/AnxLight/scripts/main_gradio_app.py', 'r') as f:
            content = f.read()

        if 'json_utils.save(config_path, config_data)' in content:
            content = content.replace(
                'json_utils.save(config_path, config_data)',
                'json_utils.save(config_path, "", config_data)'
            )
            with open('/content/AnxLight/scripts/main_gradio_app.py', 'w') as f:
                f.write(content)
            log_message("✅ Fixed json_utils.save() call")
        else:
            log_message("✅ json_utils.save() call already correct")
    except Exception as e:
        log_message(f"❌ Could not fix json_utils.save() error: {e}", "ERROR")

fix_json_save_error()

# --- Setup paths ---
project_root = '/content/AnxLight'
os.chdir(project_root)

# --- Create a simple launcher script that runs in background ---
launcher_script = f"""
import sys
import os
sys.path.insert(0, '{project_root}')
sys.path.insert(0, '{project_root}/modules')
sys.path.insert(0, '{project_root}/scripts')
sys.path.insert(0, '{project_root}/scripts/data')

os.environ['PROJECT_ROOT'] = '{project_root}'
os.environ['PYTHONPATH'] = ':'.join(sys.path)

from scripts.main_gradio_app import setup_gradio_interface
import time

app = setup_gradio_interface()
app.launch(debug=False, share=True, quiet=False)

while True:
    time.sleep(1)
"""

# Write the launcher script
launcher_path = f"{project_root}/temp_launcher.py"
with open(launcher_path, 'w') as f:
    f.write(launcher_script)

log_message("🚀 Starting Gradio via background shell process...")
log_message("📱 This prevents any iframe embedding in the notebook")

# --- Launch via shell in background and capture output ---
venv_python = f"{project_root}/anxlight_venv/bin/python"

try:
    # Start the process in background
    process = subprocess.Popen(
        [venv_python, launcher_path],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
        universal_newlines=True
    )

    log_message("⏳ Waiting for Gradio to start and generate public URL...")

    # Read output until we get the URL
    public_url = None
    start_time = time.time()
    timeout = 60  # 60 second timeout

    while time.time() - start_time < timeout:
        try:
            line = process.stdout.readline()
            if line:
                # Look for the public URL
                if 'Running on public URL:' in line:
                    public_url = line.split('Running on public URL:')[-1].strip()
                    break
                elif 'https://' in line and 'gradio.live' in line:
                    # Extract URL with regex
                    url_match = re.search(r'https://[^\s]+\.gradio\.live[^\s]*', line)
                    if url_match:
                        public_url = url_match.group()
                        break

                # Show important output
                if any(keyword in line.lower() for keyword in ['error', 'running on', 'public url', 'started']):
                    print(f"Gradio: {line.strip()}")

            # Check if process died
            if process.poll() is not None:
                log_message("❌ Gradio process ended unexpectedly", "ERROR")
                break

        except Exception as e:
            log_message(f"Error reading output: {e}", "ERROR")
            break

    if public_url:
        log_message("✅ SUCCESS! Gradio is running")
        log_message("=" * 60)
        log_message(f"🔗 PUBLIC URL: {public_url}")
        log_message(f"📋 CLICK HERE: {public_url}")
        log_message("=" * 60)
        log_message("🎉 No iframe clutter - just the clean URL above!")

        # Continue monitoring without spam
        log_message("🔄 Monitoring server (will log every 10 minutes)...")

        counter = 0
        while True:
            time.sleep(60)  # Check every minute
            counter += 1

            # Check if process is still alive
            if process.poll() is not None:
                log_message("❌ Gradio process has stopped", "ERROR")
                break

            # Log every 10 minutes
            if counter % 10 == 0:
                elapsed = counter
                log_message(f"⏱️ Server running for {elapsed} minutes")
                log_message(f"🔗 URL: {public_url}")

    else:
        log_message("❌ Could not extract public URL within timeout", "ERROR")
        log_message("🔍 Check the Gradio output above for any error messages")

except Exception as e:
    log_message(f"❌ Failed to launch Gradio: {e}", "ERROR")
    import traceback
    log_message(traceback.format_exc(), "ERROR")

finally:
    # Clean up
    try:
        if os.path.exists(launcher_path):
            os.remove(launcher_path)
    except:
        pass

log_message("--- CELL 2: SHELL LAUNCHER END ---")

In [26]:
# Save the corrected Manager.py file
manager_content = '''""" Manager Module | by ANXETY """

from modules.CivitaiAPI import CivitAiAPI    # CivitAI API
import modules.json_utils as js              # JSON

from urllib.parse import urlparse, parse_qs
from pathlib import Path
import subprocess
import requests
import zipfile
import shlex
import sys
import os
import re


osENV = os.environ
CD = os.chdir

# Constants (auto-convert env vars to Path)
PATHS = {k: Path(v) for k, v in osENV.items() if k.endswith('_path')}

HOME = PATHS.get('home_path', Path(osENV.get('HOME', '/content')))
SCR_PATH = PATHS.get('scr_path', HOME) # Fallback scr_path to HOME if not set
SETTINGS_PATH = PATHS.get('settings_path', HOME / 'anxlight_config.json')

# These will be default tokens if not overridden or found in config by specific functions
CAI_TOKEN_DEFAULT = ''
HF_TOKEN_DEFAULT = ''
try:
    if SETTINGS_PATH and SETTINGS_PATH.exists():
        CAI_TOKEN_DEFAULT = js.read(SETTINGS_PATH, 'WIDGETS.civitai_token') or ''
        HF_TOKEN_DEFAULT = js.read(SETTINGS_PATH, 'WIDGETS.huggingface_token') or ''
except Exception: # Handle cases where SETTINGS_PATH might not be fully formed yet or js.read fails
    print("[Manager.py] Warning: Could not read tokens from SETTINGS_PATH at module load.")
    pass


# ===================== Helper Function ====================

def log_message(message, log=False):
    if log:
        print(f"{message}")

def handle_errors(func):
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            log_message(f"> \\033[31m[Error] in {func.__name__}:\\033[0m {e}", kwargs.get('log', True)) # Default log to True for errors
            return False
    return wrapper

def _handle_path_and_filename(parts, url, is_git=False): # Primarily for old m_download
    path, filename = None, None
    if len(parts) >= 3:
        path = Path(parts[1]).expanduser(); filename = parts[2]
    elif len(parts) == 2:
        arg = parts[1]
        if '/' in arg or arg.startswith('~'): path = Path(arg).expanduser()
        else: filename = arg
    if not is_git and 'drive.google.com' not in url:
        if filename and not Path(filename).suffix:
            url_ext = Path(urlparse(url).path).suffix
            if url_ext: filename += url_ext
            else: filename = None # Invalid if no extension derivable
    return path, filename

def is_github_url(url):
    parsed = urlparse(url); return parsed.netloc in ('github.com', 'www.github.com')

@handle_errors
def clean_url(url: str, cai_token_override: str = None) -> str | None:
    """Clean and format URLs. Returns cleaned URL or None on failure."""
    log_message(f"> Cleaning URL: {url}", True) # Log attempt
    token_to_use_cai = cai_token_override if cai_token_override is not None else CAI_TOKEN_DEFAULT

    if 'civitai.com/models/' in url:
        api = CivitAiAPI(str(token_to_use_cai) if token_to_use_cai else None)
        data = api.validate_download(url)
        if not data or not data.download_url:
            log_message(f"> Civitai URL validation/resolution failed for: {url}", True)
            return None
        url = data.download_url
        log_message(f"> Cleaned Civitai URL: {url}", True)
    elif 'huggingface.co' in url:
        if '/blob/' in url: url = url.replace('/blob/', '/resolve/')
        if '?' in url: url = url.split('?')[0]
        log_message(f"> Cleaned HuggingFace URL: {url}", True)
    elif 'github.com' in url:
        if '/blob/' in url: url = url.replace('/blob/', '/raw/')
        log_message(f"> Cleaned GitHub URL: {url}", True)
    return url

def get_file_name(url: str) -> str | None:
    """Get the file name based on the URL. Returns None if not determinable."""
    if any(domain in url for domain in ['civitai.com', 'drive.google.com']): return None
    try:
        name = Path(urlparse(url).path).name
        return name if name else None
    except Exception:
        return None

def execute_shell_command_with_bool_return(command_str: str, log: bool = False, cwd: str = None) -> bool:
    log_message(f"Executing shell command: {command_str} (CWD: {cwd or Path.cwd()})", log)
    try:
        process = subprocess.run(shlex.split(command_str), capture_output=True, text=True, check=False, cwd=cwd)
        if log and process.stdout.strip(): log_message(f"Stdout: {process.stdout.strip()}", log)
        if log and process.stderr.strip(): log_message(f"Stderr: {process.stderr.strip()}", log)
        if process.returncode == 0:
            log_message(">> Command executed successfully.", log); return True
        else:
            log_message(f">> Command failed with exit code {process.returncode}.", log); return False
    except Exception as e:
        log_message(f">> Exception during shell command: {e}", log); return False

# ======================== Download ========================

@handle_errors
def download_url_to_path(url: str, target_full_path: str, log: bool = False, hf_token: str = None, cai_token: str = None) -> bool:
    log_message(f"> Downloading: {url} \\n  To: {target_full_path}", log)
    if not url or not target_full_path: log_message("> Error: URL or target_full_path is empty.", log); return False

    cleaned_url = clean_url(url, cai_token_override=cai_token)
    if not cleaned_url: log_message(f"> Error: URL cleaning failed for {url}.", log); return False
    url = cleaned_url

    target_path_obj = Path(target_full_path)
    target_dir = target_path_obj.parent
    target_filename = target_path_obj.name

    if not target_filename:
        inferred_name = get_file_name(url)
        if inferred_name:
            target_filename = inferred_name
            target_path_obj = target_dir / target_filename
            log_message(f">> Inferred filename: {target_filename}. Full path: {target_path_obj}", log)
        else:
            log_message(f"> Error: Target filename is empty and could not be inferred from URL: {url}", log); return False

    try: target_dir.mkdir(parents=True, exist_ok=True)
    except Exception as e: log_message(f"> Error creating directory {target_dir}: {e}", log); return False

    log_message(f">> Target directory: {target_dir}\\n>> Target filename: {target_filename}", log)

    token_to_use_hf = hf_token if hf_token is not None else HF_TOKEN_DEFAULT

    if any(domain in url for domain in ['huggingface.co', 'github.com', 'civitai.com']):
        aria2_args_list = ['aria2c', '--header="User-Agent: Mozilla/5.0"', '--allow-overwrite=true',
                           '--console-log-level=warn', '--summary-interval=0',
                           '--stderr=true', '-c', '-x16', '-s16', '-k1M', '-j5',
                           f'--dir="{str(target_dir)}"', f'--out="{target_filename}"']
        if token_to_use_hf and 'huggingface.co' in url:
            aria2_args_list.append(f'--header="Authorization: Bearer {token_to_use_hf}"')
        aria2_args_list.append(f'"{url}"')
        command = " ".join(aria2_args_list)
        log_message(f">> Attempting Aria2c: {command}", log)
        process = subprocess.run(shlex.split(command), capture_output=True, text=True)
        if process.returncode == 0 and target_path_obj.exists():
             log_message(f">> Aria2c download successful for {target_filename}", log); return True
        else:
            log_message(f">> Aria2c download FAILED for {target_filename}. Code: {process.returncode}", log)
            if process.stderr: log_message(f"   Aria2c stderr: {process.stderr.strip()}", log)
            return False
    elif 'drive.google.com' in url:
        cmd_list = ['gdown', '--fuzzy']
        if 'drive.google.com/drive/folders' in url: cmd_list.extend(['--folder', '-O', str(target_dir)])
        else: cmd_list.extend(['-O', str(target_path_obj)])
        cmd_list.append(url)
        command = " ".join(cmd_list)
        log_message(f">> Attempting GDown: {command}", log)
        return execute_shell_command_with_bool_return(command, log)
    else:
        command_list = ['curl', '-#', '-L', '-f', '-o', str(target_path_obj), url]
        command = " ".join(command_list)
        log_message(f">> Attempting Curl: {command}", log)
        return execute_shell_command_with_bool_return(command, log)


@handle_errors
def download_file(url, filename, log):
    log_message(f"[Manager old `download_file`] Called for URL: {url}, Filename: {filename}, CWD: {Path.cwd()}", log)
    if not filename:
        filename = get_file_name(url)
        if not filename:
            log_message(f"> Old download_file: Could not determine filename for {url}", log)
            return False

    full_target_path = str(Path.cwd() / filename)
    return download_url_to_path(url, full_target_path, log)


@handle_errors
def process_download(line, log, unzip):
    parts = line.split(); url = parts[0].replace('\\\\\\\\', '')
    cleaned_url = clean_url(url)
    if not cleaned_url: return False
    url = cleaned_url

    path, filename = _handle_path_and_filename(parts, url)
    current_dir = Path.cwd()
    download_successful = False
    try:
        target_dir_for_this_item = path if path else current_dir
        if path:
            target_dir_for_this_item.mkdir(parents=True, exist_ok=True)

        final_filename = filename if filename else get_file_name(url)
        if not final_filename:
            log_message(f"> process_download: Could not determine filename for {url}", log)
            return False

        full_target_path = str(target_dir_for_this_item / final_filename)
        download_successful = download_url_to_path(url, full_target_path, log)

        if download_successful and unzip and final_filename.endswith('.zip'):
            unzip_file(full_target_path, log)
    finally:
        pass
    return download_successful


@handle_errors
def m_download(line, log=False, unzip=False):
    links = [link.strip() for link in line.split(',') if link.strip()]
    if not links: log_message('> Missing URL, downloading nothing', log); return
    for link_item in links:
        potential_txt_path = Path(link_item).expanduser()
        if link_item.endswith('.txt') and potential_txt_path.is_file():
            log_message(f"> Reading URLs from file: {potential_txt_path}", log)
            with open(potential_txt_path, 'r') as file:
                for file_line in file:
                    if file_line.strip(): process_download(file_line.strip(), log, unzip)
        else:
            process_download(link_item, log, unzip)


@handle_errors
def unzip_file(zip_filepath_str, log):
    zip_filepath = Path(zip_filepath_str)
    extract_path = zip_filepath.parent
    log_message(f">> Unzipping: {zip_filepath} to {extract_path}", log)
    with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    log_message(f">> Successfully unpacked: {zip_filepath}", log)
    return True

@handle_errors
def execute_git_command(command, log=False):
    repo_url_match = re.search(r'https?://\\S+', command)
    repo_url = repo_url_match.group() if repo_url_match else "Unknown_Repo"
    log_message(f">> Executing Git: {command}", log)
    process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    ret_code = process.wait()
    if ret_code == 0:
        log_message(f">> Git command successful for {repo_url}", log)
        return True
    else:
        log_message(f">> Git command FAILED for {repo_url} with code {ret_code}", log)
        return False

def build_git_command(url, repo_name, recursive, depth):
    cmd = ['git', 'clone']
    if depth > 0: cmd.extend(['--depth', str(depth)])
    if recursive: cmd.append('--recursive')
    cmd.append(url)
    if repo_name: cmd.append(repo_name)
    return ' '.join(cmd)

@handle_errors
def process_clone(input_source, recursive, depth, log=False):
    parts = shlex.split(input_source)
    if not parts: log_message('>> \\033[31m[Error]: Empty clone command\\033[0m', log); return False
    url = parts[0].replace('\\\\\\\\', '')
    if not url: log_message(f'>> \\033[31m[Error]:\\033[0m Empty URL in clone: {input_source}', log); return False
    if not is_github_url(url): log_message(f'>>  \\033[33m[Warning]:\\033[0m Not a GitHub URL - {url}', log); return False

    path, repo_name = _handle_path_and_filename(parts, url, is_git=True)
    current_dir = Path.cwd()
    clone_success = False
    try:
        target_clone_dir = path if path else current_dir
        if path: target_clone_dir.mkdir(parents=True, exist_ok=True)

        command = build_git_command(url, repo_name, recursive, depth)

        execution_cwd = str(path) if path else None
        clone_success = execute_git_command(command, log)
    finally:
        pass
    return clone_success

def m_clone(input_source, recursive=True, depth=1, log=False):
    sources = [link.strip() for link in input_source.split(',') if link.strip()]
    if not sources: log_message('>> No valid repositories to clone', log); return
    for source_item in sources:
        if source_item.endswith('.txt') and Path(source_item).expanduser().is_file():
            log_message(f"> Reading clone sources from file: {source_item}", log)
            with open(Path(source_item).expanduser(), 'r') as f:
                for line in f:
                    line = line.strip()
                    if line: process_clone(line, recursive, depth, log)
        else:
            process_clone(source_item, recursive, depth, log)

# ===================== Asset Management for Gradio =====================

def download_selected_assets(config_data):
    """Download assets based on user selections from Gradio config"""
    try:
        yield "🔍 Analyzing selected assets..."

        # Extract selections from config
        webui_choice = config_data.get('webui_choice', 'A1111')
        sd_version = config_data.get('sd_version', 'SD1.5')
        selected_models = config_data.get('selected_models', [])
        selected_vaes = config_data.get('selected_vaes', [])
        selected_controlnets = config_data.get('selected_controlnets', [])
        selected_loras = config_data.get('selected_loras', [])

        # Get tokens if available
        civitai_token = config_data.get('civitai_token', CAI_TOKEN_DEFAULT)
        hf_token = config_data.get('huggingface_token', HF_TOKEN_DEFAULT)

        total_selected = len(selected_models) + len(selected_vaes) + len(selected_controlnets) + len(selected_loras)

        if total_selected == 0:
            yield "ℹ️ No assets selected for download"
            return

        yield f"📦 Found {total_selected} assets to process:"
        if selected_models:
            models_preview = ', '.join(selected_models[:3])
            if len(selected_models) > 3:
                models_preview += '...'
            yield f"  • {len(selected_models)} Models: {models_preview}"
        if selected_vaes:
            vaes_preview = ', '.join(selected_vaes[:3])
            if len(selected_vaes) > 3:
                vaes_preview += '...'
            yield f"  • {len(selected_vaes)} VAEs: {vaes_preview}"
        if selected_controlnets:
            cnets_preview = ', '.join(selected_controlnets[:3])
            if len(selected_controlnets) > 3:
                cnets_preview += '...'
            yield f"  • {len(selected_controlnets)} ControlNets: {cnets_preview}"
        if selected_loras:
            loras_preview = ', '.join(selected_loras[:3])
            if len(selected_loras) > 3:
                loras_preview += '...'
            yield f"  • {len(selected_loras)} LoRAs: {loras_preview}"

        # For now, just placeholder - asset downloading needs data files
        yield "⚠️ Asset downloading feature is under development"
        yield "📋 Selected assets have been logged above"
        yield "🚀 Proceeding to WebUI launch..."
        yield "✅ Asset check complete"

    except Exception as e:
        yield f"❌ Fatal error in download_selected_assets: {str(e)}"
        import traceback
        yield f"Traceback: {traceback.format_exc()}"
'''

# Write the file
with open('/content/AnxLight/modules/Manager.py', 'w') as f:
    f.write(manager_content)

print("✅ Updated Manager.py with fixed download_selected_assets function")

✅ Updated Manager.py with fixed download_selected_assets function


In [27]:
# Create the fixed launcher file
launcher_code = '''import os
os.environ["MPLBACKEND"] = "Agg"  # Fix matplotlib backend

import sys
sys.path.insert(0, "/content/AnxLight")
sys.path.insert(0, "/content/AnxLight/modules")
sys.path.insert(0, "/content/AnxLight/scripts")
sys.path.insert(0, "/content/AnxLight/scripts/data")

from scripts.main_gradio_app import setup_gradio_interface
import time

print("Setting up Gradio interface with fixed matplotlib...")
app = setup_gradio_interface()
print("Launching with share=True...")
app.launch(debug=False, share=True, quiet=False)

print("Gradio launched successfully!")
while True:
    time.sleep(1)
'''

# Write the launcher
with open('/content/AnxLight/fixed_launcher.py', 'w') as f:
    f.write(launcher_code)

print("✅ Created fixed_launcher.py")

✅ Created fixed_launcher.py


In [28]:
# Check what WebUI was configured and if it exists
import json
from pathlib import Path

config_path = '/content/AnxLight/anxlight_config.json'
if Path(config_path).exists():
    with open(config_path, 'r') as f:
        config = json.load(f)

    webui_choice = config.get('webui_choice', 'Unknown')
    print(f"Selected WebUI: {webui_choice}")

    # Check if the WebUI directory exists
    webui_path = Path(f"/content/AnxLight/{webui_choice}")
    if webui_path.exists():
        print(f"✅ WebUI directory exists: {webui_path}")

        # List contents
        contents = list(webui_path.iterdir())
        print(f"📁 WebUI contents ({len(contents)} items):")
        for item in contents[:10]:  # Show first 10 items
            print(f"   {item.name}")
        if len(contents) > 10:
            print("   ...")
    else:
        print(f"❌ WebUI directory NOT found: {webui_path}")
else:
    print("❌ No config file found")

Selected WebUI: Unknown
❌ WebUI directory NOT found: /content/AnxLight/Unknown


In [29]:
# Kill any running Gradio processes
!pkill -f "launcher.py" 2>/dev/null || true
!pkill -f "main_gradio_app.py" 2>/dev/null || true

print("🔄 Killed existing Gradio processes")
print("⏳ Starting fresh Gradio instance...")

import time
time.sleep(2)

^C
^C
🔄 Killed existing Gradio processes
⏳ Starting fresh Gradio instance...


In [30]:
# Check if launch.py exists and what it contains
launch_script = '/content/AnxLight/scripts/launch.py'
if Path(launch_script).exists():
    print("✅ launch.py exists")

    # Show the first 50 lines to understand what it does
    with open(launch_script, 'r') as f:
        lines = f.readlines()

    print(f"📄 launch.py has {len(lines)} lines. First 20 lines:")
    for i, line in enumerate(lines[:20], 1):
        print(f"{i:2d}: {line.rstrip()}")
else:
    print("❌ launch.py NOT found")

✅ launch.py exists
📄 launch.py has 399 lines. First 20 lines:
 1: # ~ launch.py | by ANXETY ~
 2: # Refactored by SuperAssistant to remove IPython dependencies and fix tunnel logic
 3: 
 4: from TunnelHub import Tunnel    # Tunneling
 5: import json_utils as js         # JSON
 6: 
 7: from datetime import timedelta
 8: from pathlib import Path
 9: import nest_asyncio
10: import subprocess
11: import requests
12: import argparse
13: import logging
14: import asyncio
15: import shlex
16: import time
17: import json
18: import yaml
19: import sys
20: import os


In [31]:
# Check what WebUIs were actually installed during pre-flight setup
anxlight_dir = Path('/content/AnxLight')
webui_dirs = []

for item in anxlight_dir.iterdir():
    if item.is_dir() and item.name in ['A1111', 'ComfyUI', 'Forge', 'Classic', 'ReForge', 'SD-UX']:
        webui_dirs.append(item.name)

print("🏗️ Installed WebUIs:")
for webui in webui_dirs:
    webui_path = anxlight_dir / webui
    size = sum(f.stat().st_size for f in webui_path.rglob('*') if f.is_file())
    size_mb = size // (1024 * 1024)
    print(f"   ✅ {webui} ({size_mb}MB)")

if not webui_dirs:
    print("   ❌ No WebUI directories found!")
    print("   💡 You might need to run Cell 1 (pre-flight setup) again")

🏗️ Installed WebUIs:
   ✅ A1111 (915MB)
   ✅ SD-UX (916MB)
   ✅ Classic (1255MB)
   ✅ ReForge (163MB)


In [32]:
# Run launch.py manually to see the full error output
print("🔧 Testing launch.py manually to see detailed error:")
print("=" * 60)

!/content/AnxLight/anxlight_venv/bin/python /content/AnxLight/scripts/launch.py

🔧 Testing launch.py manually to see detailed error:
Traceback (most recent call last):
  File "/content/AnxLight/scripts/launch.py", line 4, in <module>
    from TunnelHub import Tunnel    # Tunneling
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'TunnelHub'
