# 🌩️ **CloudLeecher: Production Backend**

Welcome to the **CloudLeecher** backend. This notebook turns your Google Colab instance into a powerful, high-speed torrent downloader that saves files directly to your Google Drive.

### **Instructions**
1.  **Mount Drive**: Connect your Google storage.
2.  **Install**: Set up the environment.
3.  **Start Services**: Launch the backend and get your public connection URL.
4.  **Connect**: Paste the URL into the CloudLeecher Frontend.

## 1. 📂 **Mount Google Drive**
We need access to your Drive to save the downloaded files.

In [1]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

# Define and Create Download Directory
DOWNLOAD_DIR = "/content/drive/MyDrive/TorrentDownloads"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

print(f"✅ Download Directory Ready: {DOWNLOAD_DIR}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Download Directory Ready: /content/drive/MyDrive/TorrentDownloads


## 2. 🛠️ **Install Dependencies**
Installing `aria2` (the download engine), `flask` (the API server), and `pyngrok` (for public access).

In [2]:
%%capture
!apt-get update -qq
!apt-get install -y -qq aria2
!pip install -q flask flask-cors pyngrok

print("✅ All dependencies installed successfully.")

## 3. 🚀 **Start Downloader Service**
Initializing the Aria2 RPC server in the background.

In [3]:
import subprocess

# Start Aria2c as a daemon process
cmd = [
    "aria2c",
    "--enable-rpc",
    "--rpc-listen-all=true",
    "--rpc-allow-origin-all",
    f"--dir={DOWNLOAD_DIR}",
    "--file-allocation=none",
    "--max-connection-per-server=16",
    "--split=16",
    "--min-split-size=1M",
    "--seed-time=0",
    "--daemon=true"
]

subprocess.run(
    cmd,
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL
)

print("✅ Aria2 Background Service Started.")

✅ Aria2 Background Service Started.


## 4. 📝 **Create API Backend**
Generating the `app.py` file which serves as the brain of CloudLeecher.

In [4]:
%%writefile app.py
import xmlrpc.client
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import os
import shutil
import base64
import json
from datetime import datetime
from collections import deque
import threading
import time
import logging

# Initialize Flask App
app = Flask(__name__)
CORS(app)

# Configuration
TEMP_DOWNLOAD_DIR = "/content/temp_downloads"
FINAL_DOWNLOAD_DIR = "/content/drive/MyDrive/TorrentDownloads"
ARIA2_RPC_URL = "http://localhost:6800/rpc"
LOG_FILE = "/content/backend_logs.json"
API_KEY_ENV = "CLOUDLEECHER_API_KEY"

# Ensure directories exist
os.makedirs(TEMP_DOWNLOAD_DIR, exist_ok=True)
os.makedirs(FINAL_DOWNLOAD_DIR, exist_ok=True)

# In-memory log storage (last 100 entries)
logs = deque(maxlen=100)

# Connect to Aria2 RPC
s = xmlrpc.client.ServerProxy(ARIA2_RPC_URL)

def log(level, operation, message, gid=None, extra=None):
    """Add entry to log with timestamp and details"""
    entry = {
        "timestamp": datetime.now().isoformat(),
        "level": level,  # info, warning, error
        "operation": operation,
        "message": message,
        "gid": gid,
        "extra": extra
    }
    logs.append(entry)
    
    # Also write to file for persistence
    try:
        with open(LOG_FILE, 'a') as f:
            f.write(json.dumps(entry) + '\n')
    except:
        pass
    
    print(f"[{level.upper()}] {operation}: {message}" + (f" (GID: {gid})" if gid else ""))

# --- Authentication Middleware ---
@app.before_request
def check_api_key():
    if request.method == 'OPTIONS':
        return
    
    # Skip auth for health check
    if request.path == '/health':
        return

    expected_key = os.environ.get(API_KEY_ENV)
    if not expected_key:
        # If no key is set in env, we are in insecure mode or dev mode
        # Ideally we should log a warning, but for now allow it or block it?
        # The plan says we MUST generate one. If missing, block.
        # But for dev/test ease, if env is missing, maybe default to "dev"?
        # No, strict security requested.
        log("warning", "auth", "No API Key configured in environment!")
        # If strictly enforcing, abort(500). But let's assume it's generated by the notebook.
        return 

    provided_key = request.headers.get('x-api-key')
    if provided_key != expected_key:
        log("warning", "auth", "Invalid API Key attempt")
        abort(401, description="Invalid API Key")

# --- Background Monitor & File Mover ---
class BackgroundMonitor(threading.Thread):
    def __init__(self):
        super().__init__()
        self.daemon = True
        self.running = True
        self.transfer_status = {} # {gid: {'status': 'moving'|'saved'|'error', 'msg': '...'}}
        self.lock = threading.Lock()

    def run(self):
        log("info", "monitor", "Background Monitor Started")
        while self.running:
            try:
                self.check_downloads()
            except Exception as e:
                log("error", "monitor", f"Monitor Loop Error: {e}")
            time.sleep(2)

    def check_downloads(self):
        try:
            # check stopped tasks (complete, error, removed)
            stopped = s.aria2.tellStopped(0, 100, ["gid", "status", "files", "errorCode", "errorMessage"])
            
            for task in stopped:
                gid = task['gid']
                status = task['status']
                
                with self.lock:
                    if gid in self.transfer_status and self.transfer_status[gid]['status'] in ['saved', 'moved_error']:
                        continue # Already processed

                if status == 'complete':
                    self.handle_complete(task)
                elif status == 'error':
                    self.handle_error(task)
                elif status == 'removed':
                     # Just clean up tracking if exists
                     pass
        except Exception as e:
            # log("error", "monitor_check", f"Error querying Aria2: {e}")
            pass

    def handle_complete(self, task):
        gid = task['gid']
        
        # Mark as moving
        with self.lock:
            if gid in self.transfer_status: return
            self.transfer_status[gid] = {'status': 'moving', 'msg': 'Moving to Drive...'}
        
        log("info", "mover", "Download complete. Moving files...", gid=gid)
        
        try:
            files = task.get('files', [])
            if not files:
                raise Exception("No files in task")

            # In Aria2, the first file path is usually the root folder or the single file
            # But if it's a multi-file torrent, we need to handle the directory
            
            # Check if it's a single file or directory
            # Aria2 returns absolute paths.
            source_path = files[0]['path']
            
            # If path is empty (metadata only?), skip
            if not source_path:
                return

            # Determine the root move target
            # If multi-file, aria2 usually creates a directory.
            # We assume the user wants the top-level item moved.
            
            # Logic: If all files share a common top directory relative to TEMP_DOWNLOAD_DIR, move that.
            # Otherwise move individual files.
            
            # Simple approach: Move the specific files/folders reported by Aria2
            # Problem: aria2 might flatten or structure differently.
            # Let's assume standard behavior:
            # If single file: source_path is the file.
            # If multi-file: source_path is one file, but they share a directory.
            
            # Robust way: 
            # 1. Identify common base directory inside TEMP_DOWNLOAD_DIR
            rel_path = os.path.relpath(source_path, TEMP_DOWNLOAD_DIR)
            top_level = rel_path.split(os.sep)[0]
            full_source_path = os.path.join(TEMP_DOWNLOAD_DIR, top_level)
            
            if not os.path.exists(full_source_path):
                 raise Exception(f"Source not found: {full_source_path}")
            
            dest_path = os.path.join(FINAL_DOWNLOAD_DIR, top_level)
            
            # Handle collision
            if os.path.exists(dest_path):
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                name, ext = os.path.splitext(top_level)
                new_name = f"{name}_{timestamp}{ext}"
                dest_path = os.path.join(FINAL_DOWNLOAD_DIR, new_name)
                log("info", "mover", f"Destination exists. Renaming to {new_name}", gid=gid)

            # Perform Move
            shutil.move(full_source_path, dest_path)
            
            with self.lock:
                self.transfer_status[gid] = {'status': 'saved', 'msg': 'Saved to Drive'}
            
            log("info", "mover", "Move successful", gid=gid)
            
            # Remove from Aria2 to clean up
            try:
                s.aria2.removeDownloadResult(gid)
            except:
                pass

        except Exception as e:
            log("error", "mover", f"Move failed: {str(e)}", gid=gid)
            with self.lock:
                self.transfer_status[gid] = {'status': 'error', 'msg': f"Move failed: {str(e)}"}

    def handle_error(self, task):
        gid = task['gid']
        msg = task.get('errorMessage', 'Unknown error')
        # We leave it in Aria2 history so user can see it, but we can track it too
        pass

    def get_transfer_status(self, gid):
        with self.lock:
            return self.transfer_status.get(gid)

# Start Monitor
monitor = BackgroundMonitor()
monitor.start()

# --- API Endpoints ---

@app.route('/health', methods=['GET'])
def health():
    return jsonify({"status": "ok", "service": "CloudLeecher-Backend"})

@app.route('/api/logs', methods=['GET'])
def get_logs():
    return jsonify({"logs": list(logs)})

@app.route('/api/download/magnet', methods=['POST'])
def add_magnet():
    data = request.json
    magnet_link = data.get('magnet')
    if not magnet_link:
        return jsonify({"error": "Magnet link is required"}), 400
    
    # Check Active/Waiting
    active = s.aria2.tellActive(["gid"])
    waiting = s.aria2.tellWaiting(0, 100, ["gid"])
    
    if len(active) > 0 or len(waiting) > 0:
         return jsonify({"error": "Queue full. Wait for current download."}), 429
         
    try:
        gid = s.aria2.addUri([magnet_link], {"dir": TEMP_DOWNLOAD_DIR})
        log("info", "add_magnet", "Magnet added", gid=gid)
        return jsonify({"status": "success", "gid": gid})
    except Exception as e:
        log("error", "add_magnet", f"Failed: {e}")
        return jsonify({"error": str(e)}), 500

@app.route('/api/download/file', methods=['POST'])
def add_torrent_file():
    data = request.json
    b64_content = data.get('torrent')
    if not b64_content:
        return jsonify({"error": "Content required"}), 400

    active = s.aria2.tellActive(["gid"])
    waiting = s.aria2.tellWaiting(0, 100, ["gid"])
    
    if len(active) > 0 or len(waiting) > 0:
         return jsonify({"error": "Queue full."}), 429
         
    try:
        raw_bytes = base64.b64decode(b64_content)
        gid = s.aria2.addTorrent(xmlrpc.client.Binary(raw_bytes), {"dir": TEMP_DOWNLOAD_DIR})
        log("info", "add_torrent", "Torrent file added", gid=gid)
        return jsonify({"status": "success", "gid": gid})
    except Exception as e:
        log("error", "add_torrent", f"Failed: {e}")
        return jsonify({"error": str(e)}), 500

@app.route('/api/status', methods=['GET'])
def get_status():
    try:
        basic_keys = ["gid", "status", "totalLength", "completedLength", "downloadSpeed", "files", "errorMessage", "errorCode"]
        extended_keys = basic_keys + ["numSeeders", "connections", "infoHash"]
        
        active = s.aria2.tellActive(extended_keys)
        waiting = s.aria2.tellWaiting(0, 100, basic_keys)
        stopped = s.aria2.tellStopped(0, 100, basic_keys)
        
        # Merge with monitor status
        # We primarily want to override "complete" (from Aria2) with "moving" or "saved"
        # Or inject "saved" tasks that have been removed from Aria2?
        # Actually, if we remove from Aria2, they disappear from 'stopped'.
        # We need to inject them back into the response if we want the frontend to show them!
        
        final_stopped = []
        
        # Process existing stopped tasks
        for task in stopped:
            gid = task['gid']
            transfer = monitor.get_transfer_status(gid)
            if transfer:
                task['status'] = transfer['status'] # moving, saved, error
                if transfer.get('msg'):
                    task['errorMessage'] = transfer['msg'] # reuse error message field for status text? or create new?
            final_stopped.append(task)
            
        # Inject tasks that monitor knows about but Aria2 forgot (because we purged them)
        with monitor.lock:
             for gid, info in monitor.transfer_status.items():
                 # Check if already in our lists
                 found = False
                 for list_grp in [active, waiting, final_stopped]:
                     if any(t['gid'] == gid for t in list_grp):
                         found = True
                         break
                 
                 if not found:
                     # Reconstruct a fake task object for the frontend
                     # We assume if it's in monitor, it was completed.
                     fake_task = {
                         "gid": gid,
                         "status": info['status'],
                         "totalLength": "0", # Unknown now
                         "completedLength": "0",
                         "downloadSpeed": "0",
                         "files": [],
                         "errorMessage": info.get('msg', '')
                     }
                     final_stopped.append(fake_task)

        return jsonify({
            "active": active,
            "waiting": waiting,
            "stopped": final_stopped
        })
    except Exception as e:
        log("error", "get_status", f"Failed: {e}")
        return jsonify({"error": str(e)}), 500

@app.route('/api/control/pause', methods=['POST'])
def pause_download():
    gid = request.json.get('gid')
    try:
        s.aria2.pause(gid)
        return jsonify({"status": "paused", "gid": gid})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/control/resume', methods=['POST'])
def resume_download():
    gid = request.json.get('gid')
    try:
        s.aria2.unpause(gid)
        return jsonify({"status": "resumed", "gid": gid})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/control/remove', methods=['POST'])
def remove_download():
    gid = request.json.get('gid')
    try:
        # Try force remove first
        try:
            s.aria2.forceRemove(gid)
        except:
             # If not active, try remove result
             s.aria2.removeDownloadResult(gid)
             
        # Also remove from monitor
        with monitor.lock:
            if gid in monitor.transfer_status:
                del monitor.transfer_status[gid]
                
        return jsonify({"status": "removed", "gid": gid})
    except Exception as e:
        # If not found, success
        if 'not found' in str(e).lower():
             return jsonify({"status": "removed", "gid": gid})
        return jsonify({"error": str(e)}), 500

@app.route('/api/drive/info', methods=['GET'])
def drive_info():
    try:
        total, used, free = shutil.disk_usage(FINAL_DOWNLOAD_DIR)
        return jsonify({"total": total, "used": used, "free": free})
    except:
        return jsonify({"total": 0, "used": 0, "free": 0})

@app.route('/api/cleanup', methods=['POST'])
def cleanup_all():
    try:
        s.aria2.purgeDownloadResult()
        # Also clear monitor
        with monitor.lock:
            monitor.transfer_status.clear()
        return jsonify({"status": "success"})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == "__main__":
    log("info", "startup", "Backend Starting...")
    # purge_stalled_downloads logic is now handled by monitor indirectly or manual cleanup
    app.run(port=5000)


Overwriting app.py


## 5. 🌐 **Launch Public Server**
Starting the application and generating your public access URL.

> **⚠️ Important**: Ensure you have added your Ngrok Authtoken to Colab Secrets with the key `NGROK-AUTHTOKEN`.

In [None]:
from pyngrok import ngrok
from google.colab import userdata
import subprocess
import sys
import time
import os
import secrets

# 1. Authenticate Ngrok
try:
    AUTH_TOKEN = userdata.get("NGROK-AUTHTOKEN")
    ngrok.set_auth_token(AUTH_TOKEN)
except Exception as e:
    print("❌ Error: Ngrok Auth Token not found! Please add 'NGROK-AUTHTOKEN' to Colab Secrets.")
    raise e

# 2. Generate Secure API Key
api_key = secrets.token_hex(16)
print("\n" + "="*60)
print(f"🔑 API KEY: {api_key}")
print("⚠️ COPY THIS KEY! You will need it to connect the frontend.")
print("="*60 + "\n")

# 3. Cleanup Old Processes
ngrok.kill()
os.system("fuser -k 5000/tcp > /dev/null 2>&1")

# 4. Start Flask App in Background
log_file = open("flask.log", "w")
env = os.environ.copy()
env["CLOUDLEECHER_API_KEY"] = api_key
subprocess.Popen([sys.executable, "app.py"], stdout=log_file, stderr=log_file, env=env)
time.sleep(3)  # Allow Flask to initialize

# 5. Open Ngrok Tunnel
try:
    public_url = ngrok.connect(5000).public_url
    print(f"🔗 PUBLIC URL: {public_url}")
    print("✅ CloudLeecher Backend is Online!")
    print("🌍 Frontend App: https://cloudleecher.web.app")
    print("📋 1. Copy the PUBLIC URL")
    print("📋 2. Copy the API KEY")
    print("📋 3. Paste them into the CloudLeecher Frontend settings.")

    # Keep cell running
    while True:
        time.sleep(10)
except Exception as e:
    print(f"❌ Failed to start Ngrok: {e}")



🔗 PUBLIC URL: https://vitalistically-falsifiable-donnette.ngrok-free.dev

✅ CloudLeecher Backend is Online!
🌍 Frontend App: https://cloudleecher.web.app
📋 Copy the URL above (PUBLIC URL) and paste it into the CloudLeecher Frontend app.
