In [None]:
!pip install -q flask pyngrok psutil GPUtil requests

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for GPUtil (setup.py) ... [?25l[?25hdone


In [None]:
!ngrok authtoken **********************************************

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
from flask import Flask, request, jsonify, send_file
from pyngrok import ngrok
import sys
import os
import psutil
import traceback
import time
import threading
import gc
from io import StringIO
from datetime import datetime

app = Flask(__name__)

# Persistent namespace - variables survive between calls
namespace = {"__builtins__": __builtins__}

# Session tracking
session_start = datetime.now()

# =============================================================================
# ENDPOINT: /probe - Understand Environment Capabilities
# =============================================================================
@app.route("/probe", methods=["GET"])
def probe():
    '''Return full environment capabilities and current state'''

    # CPU & RAM
    ram = psutil.virtual_memory()
    cpu_percent = psutil.cpu_percent(interval=0.5)

    # GPU (if available)
    gpu_info = {"available": False}
    try:
        import GPUtil
        gpus = GPUtil.getGPUs()
        if gpus:
            gpu = gpus[0]
            gpu_info = {
                "available": True,
                "name": gpu.name,
                "memory_total_gb": round(gpu.memoryTotal / 1024, 1),
                "memory_free_gb": round(gpu.memoryFree / 1024, 1),
                "memory_used_gb": round(gpu.memoryUsed / 1024, 1),
                "utilization_pct": round(gpu.load * 100, 1)
            }
    except Exception as e:
        gpu_info["error"] = str(e)

    # Disk
    disk = psutil.disk_usage('/')

    # Installed packages
    packages = {}
    pkg_list = [
        'torch', 'tensorflow', 'sklearn', 'pandas', 'numpy',
        'matplotlib', 'seaborn', 'scipy', 'PIL', 'cv2',
        'transformers', 'datasets', 'joblib', 'pickle'
    ]
    for pkg in pkg_list:
        try:
            if pkg == 'PIL':
                mod = __import__('PIL')
            elif pkg == 'cv2':
                mod = __import__('cv2')
            elif pkg == 'sklearn':
                mod = __import__('sklearn')
            else:
                mod = __import__(pkg)
            packages[pkg] = getattr(mod, '__version__', 'installed')
        except:
            packages[pkg] = None

    # Estimate safe limits
    ram_gb = ram.available / (1024**3)
    has_gpu = gpu_info.get("available", False)

    limits = {
        "max_dataset_gb": round(ram_gb * 0.5, 1),
        "max_model_params_millions": 500 if has_gpu else 10,
        "recommended_batch_size": 128 if has_gpu else 32,
        "estimated_session_minutes_remaining": max(0, 90 - (datetime.now() - session_start).seconds // 60),
        "checkpoint_interval_minutes": 15,
        "safe_to_train_cnn": has_gpu,
        "safe_to_load_large_dataset": ram_gb > 4
    }

    # Platform detection
    platform = "unknown"
    if os.path.exists("/content"):
        platform = "colab"
    elif os.environ.get("CODESPACES"):
        platform = "codespaces"
    elif os.environ.get("KAGGLE_KERNEL_RUN_TYPE"):
        platform = "kaggle"

    # What's currently loaded
    user_vars = [k for k in namespace.keys() if not k.startswith('_')]

    return jsonify({
        "compute": {
            "cpu_cores": psutil.cpu_count(),
            "cpu_usage_pct": cpu_percent,
            "ram_total_gb": round(ram.total / (1024**3), 1),
            "ram_available_gb": round(ram.available / (1024**3), 1),
            "ram_used_pct": ram.percent
        },
        "gpu": gpu_info,
        "storage": {
            "disk_total_gb": round(disk.total / (1024**3), 1),
            "disk_free_gb": round(disk.free / (1024**3), 1)
        },
        "packages": packages,
        "limits": limits,
        "platform": platform,
        "session_uptime_minutes": (datetime.now() - session_start).seconds // 60,
        "loaded_variables": user_vars[:30],  # First 30
        "loaded_variables_count": len(user_vars)
    })


# =============================================================================
# ENDPOINT: /execute - Run Code with Resource Awareness
# =============================================================================
@app.route("/execute", methods=["POST"])
def execute():
    '''Execute Python code with memory monitoring and timeout awareness'''

    data = request.json
    code = data.get("code", "")
    timeout = data.get("timeout", 300)

    # Pre-execution checks
    ram_before = psutil.virtual_memory()
    ram_available_gb = ram_before.available / (1024**3)

    # Warn if low memory
    if ram_available_gb < 0.5:
        return jsonify({
            "success": False,
            "error": f"LOW MEMORY: Only {ram_available_gb:.1f}GB available. Call cleanup() first.",
            "stdout": "",
            "stderr": "",
            "suggestion": "Run cleanup() or restart runtime to free memory"
        })

    # Capture stdout/stderr
    old_stdout, old_stderr = sys.stdout, sys.stderr
    sys.stdout = stdout_capture = StringIO()
    sys.stderr = stderr_capture = StringIO()

    result = {
        "success": True,
        "stdout": "",
        "stderr": "",
        "error": None,
        "execution_time_sec": 0,
        "memory_before_gb": round(ram_available_gb, 2),
        "memory_after_gb": 0,
        "memory_delta_mb": 0,
        "warning": None
    }

    start_time = time.time()

    try:
        # Execute in persistent namespace
        exec(code, namespace)
        result["stdout"] = stdout_capture.getvalue()
        result["stderr"] = stderr_capture.getvalue()

    except Exception as e:
        result["success"] = False
        result["error"] = traceback.format_exc()
        result["stderr"] = stderr_capture.getvalue()

    finally:
        sys.stdout, sys.stderr = old_stdout, old_stderr
        result["execution_time_sec"] = round(time.time() - start_time, 2)

        # Post-execution memory check
        ram_after = psutil.virtual_memory()
        ram_after_gb = ram_after.available / (1024**3)
        result["memory_after_gb"] = round(ram_after_gb, 2)
        result["memory_delta_mb"] = round((ram_available_gb - ram_after_gb) * 1024, 1)

        # Generate warnings
        warnings = []
        if ram_after.percent > 85:
            warnings.append(f"HIGH MEMORY ({ram_after.percent:.0f}%). Save results and consider cleanup().")
        if result["execution_time_sec"] > 60:
            warnings.append(f"Long execution ({result['execution_time_sec']:.0f}s). Consider checkpointing.")

        if warnings:
            result["warning"] = " | ".join(warnings)

        # Truncate very long output
        max_output = 50000
        if len(result["stdout"]) > max_output:
            half = max_output // 2
            result["stdout"] = (
                result["stdout"][:half] +
                f"\n\n... OUTPUT TRUNCATED ({len(result['stdout'])} chars total) ...\n\n" +
                result["stdout"][-half:]
            )

        if len(result.get("error", "") or "") > 10000:
            result["error"] = result["error"][:10000] + "\n... ERROR TRUNCATED ..."

    return jsonify(result)


# =============================================================================
# ENDPOINT: /files/list - List Files in Directory
# =============================================================================
@app.route("/files/list", methods=["GET"])
def list_files():
    '''List files in a directory with sizes'''
    path = request.args.get("path", "/content")

    try:
        if not os.path.exists(path):
            return jsonify({"error": f"Path does not exist: {path}"})

        files = []
        for f in os.listdir(path):
            full_path = os.path.join(path, f)
            try:
                stat = os.stat(full_path)
                is_dir = os.path.isdir(full_path)
                files.append({
                    "name": f,
                    "path": full_path,
                    "is_dir": is_dir,
                    "size_mb": round(stat.st_size / (1024**2), 2) if not is_dir else None,
                    "modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
                })
            except:
                files.append({"name": f, "path": full_path, "error": "Could not stat"})

        # Sort: directories first, then by name
        files.sort(key=lambda x: (not x.get("is_dir", False), x["name"].lower()))

        return jsonify({
            "path": path,
            "file_count": len([f for f in files if not f.get("is_dir")]),
            "dir_count": len([f for f in files if f.get("is_dir")]),
            "files": files
        })
    except Exception as e:
        return jsonify({"error": str(e)})


# =============================================================================
# ENDPOINT: /files/download - Download File from Colab
# =============================================================================
@app.route("/files/download", methods=["GET"])
def download_file():
    '''Download a file from the remote environment'''
    path = request.args.get("path")

    if not path:
        return jsonify({"error": "No path provided"}), 400

    if not os.path.exists(path):
        return jsonify({"error": f"File not found: {path}"}), 404

    if os.path.isdir(path):
        return jsonify({"error": f"Path is a directory, not a file: {path}"}), 400

    try:
        return send_file(path, as_attachment=True)
    except Exception as e:
        return jsonify({"error": str(e)}), 500


# =============================================================================
# ENDPOINT: /files/read - Read text file content
# =============================================================================
@app.route("/files/read", methods=["GET"])
def read_file():
    '''Read content of a text file'''
    path = request.args.get("path")
    max_size = int(request.args.get("max_size", 100000))  # 100KB default

    if not path or not os.path.exists(path):
        return jsonify({"error": f"File not found: {path}"}), 404

    try:
        size = os.path.getsize(path)
        if size > max_size:
            return jsonify({
                "error": f"File too large ({size} bytes). Max: {max_size}",
                "suggestion": "Use download endpoint instead"
            }), 400

        with open(path, 'r') as f:
            content = f.read()

        return jsonify({"path": path, "size": size, "content": content})
    except UnicodeDecodeError:
        return jsonify({"error": "Binary file - use download endpoint"}), 400
    except Exception as e:
        return jsonify({"error": str(e)}), 500


# =============================================================================
# ENDPOINT: /cleanup - Free Memory and Resources
# =============================================================================
@app.route("/cleanup", methods=["POST"])
def cleanup():
    '''Free memory by clearing namespace and running garbage collection'''

    ram_before = psutil.virtual_memory()

    # Get list of user variables to clear
    keys_to_remove = [k for k in namespace.keys() if not k.startswith('_')]
    removed_vars = keys_to_remove.copy()

    # Clear namespace
    for k in keys_to_remove:
        try:
            del namespace[k]
        except:
            pass

    # Force garbage collection
    gc.collect()

    # Clear GPU memory if available
    gpu_cleared = False
    try:
        import torch
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            gpu_cleared = True
    except:
        pass

    # Clear any matplotlib figures
    try:
        import matplotlib.pyplot as plt
        plt.close('all')
    except:
        pass

    # Remove temp files
    temp_removed = 0
    temp_extensions = ('.tmp', '.temp', '.log', '.pyc')
    for f in os.listdir("/content"):
        if f.endswith(temp_extensions):
            try:
                os.remove(f"/content/{f}")
                temp_removed += 1
            except:
                pass

    # Force another gc
    gc.collect()

    ram_after = psutil.virtual_memory()
    freed_mb = (ram_after.available - ram_before.available) / (1024**2)

    return jsonify({
        "success": True,
        "memory_freed_mb": round(freed_mb, 1),
        "memory_available_now_gb": round(ram_after.available / (1024**3), 2),
        "memory_used_pct": ram_after.percent,
        "variables_cleared": len(removed_vars),
        "cleared_variables": removed_vars[:20],  # First 20
        "temp_files_removed": temp_removed,
        "gpu_cache_cleared": gpu_cleared
    })


# =============================================================================
# ENDPOINT: /health - Quick Health Check
# =============================================================================
@app.route("/health", methods=["GET"])
def health():
    '''Quick health check'''
    ram = psutil.virtual_memory()
    return jsonify({
        "status": "ok",
        "uptime_minutes": (datetime.now() - session_start).seconds // 60,
        "memory_available_gb": round(ram.available / (1024**3), 1),
        "memory_used_pct": ram.percent
    })


# =============================================================================
# ENDPOINT: /variables - List Variables in Namespace
# =============================================================================
@app.route("/variables", methods=["GET"])
def list_variables():
    '''List all variables currently in memory'''

    vars_info = []
    for name, value in namespace.items():
        if name.startswith('_'):
            continue

        try:
            var_type = type(value).__name__

            # Get size/shape info
            size_info = None
            if hasattr(value, 'shape'):
                size_info = str(value.shape)
            elif hasattr(value, '__len__'):
                size_info = f"len={len(value)}"

            vars_info.append({
                "name": name,
                "type": var_type,
                "size": size_info
            })
        except:
            vars_info.append({"name": name, "type": "unknown", "size": None})

    return jsonify({
        "count": len(vars_info),
        "variables": vars_info
    })

In [None]:
import threading

def run_server():
    app.run(port=5000, threaded=True)

# Start Flask in background thread
server_thread = threading.Thread(target=run_server)
server_thread.daemon = True
server_thread.start()

# Wait for server to start
import time
time.sleep(2)

# Create ngrok tunnel
public_url = ngrok.connect(5000)

# Print connection info
print("=" * 70)
print("üöÄ SMART COLAB EXECUTOR IS READY!")
print("=" * 70)
print()
print(f"üì° PUBLIC URL: {public_url}")
print()
print("=" * 70)
print("ENDPOINTS:")
print("  GET  /probe      - Check environment (RAM, GPU, packages)")
print("  POST /execute    - Run Python code")
print("  GET  /files/list - List files in directory")
print("  GET  /files/download?path=X - Download file")
print("  GET  /files/read?path=X - Read text file")
print("  POST /cleanup    - Free memory")
print("  GET  /health     - Quick status check")
print("  GET  /variables  - List loaded variables")
print("=" * 70)
print()
print("üìã NEXT STEPS:")
print("  1. Copy the PUBLIC URL above")
print("  2. Update your local MCP server config")
print("  3. Restart Claude Desktop")
print("  4. Start chatting!")
print()
print("‚ö†Ô∏è  Keep this notebook running! Closing it kills the server.")
print("=" * 70)

 * Serving Flask app '__main__'
 * Debug mode: off


Address already in use
Port 5000 is in use by another program. Either identify and stop that program, or start the server with a different port.


üöÄ SMART COLAB EXECUTOR IS READY!

üì° PUBLIC URL: NgrokTunnel: "https://dustin-bimolecular-abbey.ngrok-free.dev" -> "http://localhost:5000"

ENDPOINTS:
  GET  /probe      - Check environment (RAM, GPU, packages)
  POST /execute    - Run Python code
  GET  /files/list - List files in directory
  GET  /files/download?path=X - Download file
  GET  /files/read?path=X - Read text file
  POST /cleanup    - Free memory
  GET  /health     - Quick status check
  GET  /variables  - List loaded variables

üìã NEXT STEPS:
  1. Copy the PUBLIC URL above
  2. Update your local MCP server config
  3. Restart Claude Desktop
  4. Start chatting!

‚ö†Ô∏è  Keep this notebook running! Closing it kills the server.
