<a href="https://colab.research.google.com/github/MrClown321/00_Z/blob/main/universal_aria2_downloader.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Mount Your Google Drive

from google.colab import drive
drive.mount('/content/drive')

In [None]:
#@title Universal Aria2 Downloader (Magnet, URL, or .torrent)

import os, shutil, zipfile

# === CONFIGURATION ===
DOWNLOAD_DIR = "/content/drive/MyDrive/0DL" #@param {type:"string"}
SOURCE = "" #@param {type:"string"}
# ^ Input a Magnet Link, Direct URL, or File Path to a .torrent file
USE_TEMP = True #@param {type: 'boolean'}
ZIP_FOLDER = False #@param {type: 'boolean'}

# === HELPER FUNCTIONS ===
def format_bytes_2dp(num):
    for unit in ["B", "KB", "MB", "GB"]:
        if num < 1024:
            return f"{num:.2f} {unit}"
        num /= 1024

def copy_with_progress(src, dst, chunk_size=1024*1024*8):
    total = os.path.getsize(src)
    with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst, tqdm(
        total=total, unit='B', unit_scale=True, unit_divisor=1024, desc="Moving"
    ) as pbar:
        while (chunk := fsrc.read(chunk_size)):
            fdst.write(chunk)
            pbar.update(len(chunk))

def move_with_progress(src, dst_dir):
    os.makedirs(dst_dir, exist_ok=True)
    dst_path = os.path.join(dst_dir, os.path.basename(src))

    if os.path.isdir(src):
        for root, _, files in os.walk(src):
            rel_path = os.path.relpath(root, src)
            target_root = os.path.join(dst_path, rel_path)
            os.makedirs(target_root, exist_ok=True)
            for f in files:
                copy_with_progress(os.path.join(root, f),
                                   os.path.join(target_root, f))
        shutil.rmtree(src)
    else:
        copy_with_progress(src, dst_path)
        os.remove(src)

    return dst_path

def zip_with_progress(temp_dir, top_level_path):
    src_path = os.path.join(temp_dir, top_level_path)
    zip_path = os.path.join(temp_dir, f"{top_level_path}.zip")

    file_list = []
    if os.path.isdir(src_path):
        for root, _, files in os.walk(src_path):
            for f in files:
                file_list.append(os.path.join(root, f))
    else:
        file_list.append(src_path)

    with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED) as zipf:
        for file in tqdm(file_list, desc=f"Zipping {top_level_path}", unit="file"):
            arcname = os.path.relpath(file, os.path.dirname(src_path))
            zipf.write(file, arcname)

    return zip_path

# === SETUP ===
if not os.path.exists('/content/drive'):
  from google.colab import drive
  drive.mount('/content/drive')

if USE_TEMP:
  from datetime import datetime
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  folder_name = f"temp_{timestamp}"
  TEMP_DIR = f"/content/{folder_name}"
  os.makedirs(TEMP_DIR, exist_ok=True)
else:
  TEMP_DIR = DOWNLOAD_DIR

os.makedirs(DOWNLOAD_DIR, exist_ok=True)

if not os.path.isfile("/usr/bin/aria2c"):
  from IPython.display import clear_output
  !apt-get install -y aria2
  !pip install aria2p tqdm
  clear_output()

import subprocess
import time
import aria2p
from tqdm.notebook import tqdm

# === KILL PREVIOUS PROCESSES ===
!killall -q aria2c || true

# Start aria2c
aria2_process = subprocess.Popen([
    "aria2c",
    "--enable-rpc",
    "--rpc-listen-all=false",
    "--rpc-allow-origin-all",
    "--rpc-listen-port=6800",
    f"--dir={TEMP_DIR}",
    "--max-connection-per-server=10",
    "--split=10",
    "--min-split-size=1M",
    "--continue=true",
    "--seed-time=0",
    "--bt-seed-unverified=false",
    "--file-allocation=prealloc",
    "--follow-torrent=mem",
    "--enable-dht=true",
    "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
])

time.sleep(2)
aria2 = aria2p.API(aria2p.Client(host="http://localhost", port=6800, secret=""))

# === SOURCE DETECTION & ADDING ===
download = None

if SOURCE.startswith("magnet:?"):
    print("üß≤ Detected Magnet Link.")
    download = aria2.add_magnet(SOURCE)
    print("‚è≥ Waiting for metadata...")
    while not download.followed_by_ids:
        download.update()
        time.sleep(1)
    download = aria2.get_download(download.followed_by_ids[0])

elif os.path.isfile(SOURCE) and SOURCE.endswith(".torrent"):
    print("üìÑ Detected Torrent File.")
    download = aria2.add_torrent(SOURCE)

else:
    print("üîó Detected Direct URL.")
    download = aria2.add_uris([SOURCE])
    # Wait loop for filename resolution
    print("‚è≥ Waiting for filename resolution...")
    timeout_counter = 0
    while True:
        download.update()
        if download.status == "error":
            break
        if download.files and len(download.files) > 0:
             current_path = download.files[0].path
             # Ensure we have a valid filename (not just '.')
             if current_path and os.path.basename(current_path) not in [".", ""]:
                 break
        time.sleep(1)
        timeout_counter += 1
        if timeout_counter > 30:
            print("‚ö†Ô∏è Filename resolution timed out (proceeding anyway).")
            break

# === DOWNLOAD INFO ===
if download.status == "error":
    print(f"‚ùå Error initializing download: {download.error_message}")
    aria2_process.terminate()
else:
    print("\n‚úÖ Download Initialized. Files:")
    for f in download.files:
        size_str = format_bytes_2dp(f.length)
        print(f"\t- {os.path.basename(f.path)} [{size_str}]")

    # Resolve Top Level Path
    try:
        relative_path = download.files[0].path.relative_to(TEMP_DIR)
        top_level_path = relative_path.parts[0]
    except Exception:
        top_level_path = os.path.basename(download.files[0].path)

    print(f"\nTarget Name: {top_level_path}\n")

    # === DOWNLOAD LOOP ===
    with tqdm(total=download.total_length, unit="B", unit_scale=True, unit_divisor=1024, desc="Downloading") as pbar:
        while not download.is_complete:
            download.update()
            if download.status == "error":
                print(f"\n‚ùå Download failed: {download.error_message}")
                break
            pbar.total = download.total_length or pbar.total
            pbar.n = download.completed_length
            pbar.refresh()
            time.sleep(1)

    # === COMPLETION & MOVING ===
    if download.status == "complete":
        aria2_process.terminate() # Release locks

        to_move = f"{TEMP_DIR}/{top_level_path}"

        # Check if the file actually exists (edge case: aria2 marks complete but file missing)
        if not os.path.exists(to_move):
             # Try to find it if name resolution was slightly off
             possible_files = os.listdir(TEMP_DIR)
             if possible_files:
                 to_move = os.path.join(TEMP_DIR, possible_files[0])
                 top_level_path = possible_files[0]

        if ZIP_FOLDER:
            print(f"üì¶ Zipping: {top_level_path}...")
            zip_path = zip_with_progress(TEMP_DIR, top_level_path)
            print("‚úÖ Zipping Completed.")
            to_move = zip_path

            # Clean up the original unzipped folder/file inside Temp before moving zip
            original = f"{TEMP_DIR}/{top_level_path}"
            if os.path.isdir(original):
                 shutil.rmtree(original)
            elif os.path.isfile(original) and original != zip_path:
                 os.remove(original)

        print("üöö Moving files to Drive...")
        move_with_progress(to_move, DOWNLOAD_DIR)

        if USE_TEMP:
            shutil.rmtree(TEMP_DIR)

        print(f"\n‚úÖ SUCCESS! Saved to: {DOWNLOAD_DIR}")
    else:
        aria2_process.terminate()
        print("\n‚ùå Download failed or was interrupted.")