<a href="https://colab.research.google.com/github/NotStark/dlx/blob/main/dlx.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **DLX** - A Project for downloading and uploading files to fast cloud storage providers

In [None]:
# @title 🛠️ Setup Environment


# Update Silently
!apt update -qq

# Install required packages
!apt install -y -qq aria2 ffmpeg mediainfo

# Install ytdl from github
!curl -L https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp -o /usr/local/bin/yt-dlp
!chmod +x /usr/local/bin/yt-dlp
!yt-dlp --version

# Install Python libraries
!pip install -q --upgrade \
  tqdm \
  httpx \
  psutil \
  Kurigram \
  tgcrypto \
  natsort \
  google-api-python-client


import os
import re
from google.colab import userdata
from pathlib import Path
from typing import Literal

def get_secret(key):
    try:
        return userdata.get(key)
    except Exception:
        print(f"{key} Not Found")
        return None

# Load secrets
TELEGRAM_BOT_TOKEN = get_secret('TELEGRAM_BOT_TOKEN')
GOFILE_API_TOKEN = get_secret("GOFILE_API_TOKEN")
VIKINGFILE_USER_HASH = get_secret("VIKINGFILE_USER_HASH")
BUZZHEAVIER_USER_TOKEN = get_secret("BUZZHEAVIER_USER_TOKEN")


# Download directory
DOWNLOADS_DIR = Path(os.path.abspath("downloads"))
DOWNLOADS_DIR.mkdir(exist_ok=True)


# Utilities
def format_size(bytes: float) -> str:
    units = ["B", "KB", "MB", "GB", "TB"]
    i = 0
    while bytes >= 1024 and i < len(units) - 1:
        bytes /= 1024
        i += 1
    return f"{bytes:.2f} {units[i]}"

def sanitize_filename(name: str) -> str:
    return re.sub(r'[^a-zA-Z0-9_\-. ]', '_', name)


def find_full_path(base_path_no_ext: str) -> str | None:
    base = Path(base_path_no_ext)
    parent_dir = base.parent or Path.cwd()
    for file in parent_dir.glob(base.stem + ".*"):
        if file.is_file():
            return str(file.resolve())
    return None

def detect_url_type(url: str) -> Literal["magnet" , "torrent" , "m3u8" , "direct" , "unknown"]:
    url = url.lower()
    if url.startswith("magnet:"):
        return "magnet"
    elif url.endswith(".torrent"):
        return "torrent"
    elif url.endswith(".m3u8"):
        return "m3u8"
    elif url.startswith(("http://", "https://", "ftp://")):
        return "direct"
    return "unknown"


def is_downloaded(file_path: str) -> bool:
  return file_path.exists() and file_path.stat().st_size > 0


print(f"✅ Environment ready. Download path: {DOWNLOADS_DIR}")


In [None]:
# @title ⏬ Universal Media Downloader

import os
import subprocess
import threading
import time
from pathlib import Path
from urllib.parse import urlparse
import re
from tqdm.notebook import tqdm

DEFAULT_M3U8_FILE_HEIGHT = 1080

def download_m3u8(
    url: str,
    output_file=None,
    yt_dlp_args: str = "",
    aria2c_args: str = "",
    file_height: int = DEFAULT_M3U8_FILE_HEIGHT,
    aria2c_connections: int = 16,
    aria2c_chunk_size: str = "2M"
):
    fmt = f"bv[height<={file_height}]+ba/best[height<={file_height}]"

    if not output_file:
        output_file = str(DOWNLOADS_DIR / f"file_{int(time.time())}")

    cmd = [
        "yt-dlp",
        "-f", fmt,
        "--downloader", "aria2c",
        "--downloader-args", f"aria2c:-x {aria2c_connections} -k {aria2c_chunk_size} --summary-interval=1 --console-log-level=warn {aria2c_args}",
        "-o", f"{output_file}.%(ext)s",
        url
    ]

    if yt_dlp_args:
        cmd[1:1] = yt_dlp_args.split()

    print(f"\n⬇️ Starting Download [{file_height}p max]:\n    {' '.join(cmd)}\n")

    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)

    output_file_basename = os.path.basename(output_file)
    progress_bar = tqdm(total=100, desc=f"⚙️ Downloading: {output_file_basename}", unit="%",)
    progress_regex = re.compile(r"\((\d+)%\)")
    last_value = 0

    for line in proc.stdout:
        line = line.strip()
        match = progress_regex.search(line)
        if match:
            perc = int(match.group(1))
            if perc >= last_value:
                progress_bar.n = perc
                progress_bar.refresh()
                last_value = perc

        if "[FixupM3u8]" in line:
            progress_bar.set_description("🛠️ Finalizing Video...")
            progress_bar.n = 95
            progress_bar.refresh()

    proc.wait()
    progress_bar.n = 100
    progress_bar.set_description(f"✅ Completed: {output_file_basename}")
    progress_bar.refresh()
    progress_bar.close()

    if proc.returncode == 0:
        print(f"\n✅ Download complete: {output_file_basename}")
        return find_full_path(output_file), True
    else:
        print(f"\n❌ Download failed (exit code {proc.returncode})")
        return None, False


def download_media(
    urls: list[str] = None,
    folder: str = None,
    batch_size: int = 5,
    delay: int = 10,
    m3u8_file_height=DEFAULT_M3U8_FILE_HEIGHT
):

    def download_single(url, out_dir, filename=None, m3u8_file_height=DEFAULT_M3U8_FILE_HEIGHT):
        url_type = detect_url_type(url)
        sanitized_filename = sanitize_filename(filename or f"file_{int(time.time())}")
        output_path = out_dir / sanitized_filename

        if is_downloaded(output_path):
            print(f"⏭️ Already exists: {filename}")
            return

        if url_type in ["magnet", "torrent", "direct"]:
            cmd = [
                "aria2c",
                f"--dir={out_dir}",
                "--enable-dht=true",
                "--enable-peer-exchange=true",
                "--bt-save-metadata=true",
                "--console-log-level=notice",
                "--summary-interval=1",
                "--file-allocation=none",
                "--seed-time=0",
                "--bt-seed-unverified=true",
                url
            ]

            output_file_regex = re.compile(r"(?:Download complete:|FILE:)\s*(.+)")
            progress_regex = re.compile(r"(\d+)%")

            if filename:
                cmd.insert(-1, f"--out={filename}")
                final_output_name = filename
                desc = f"Downloading: {filename}"
            else:
                final_output_name = None
                desc = "Downloading: [detecting...]"

            print(f"⬇️ {desc}")
            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)

            pbar = tqdm(total=100, unit='%', desc=desc, leave=True)
            for line in proc.stdout:
                line = line.strip()
                # print(line , end = "\n")
                _match = progress_regex.search(line)
                if _match:
                    perc = int(_match.group(1))
                    pbar.n = perc
                    pbar.refresh()

                if not filename:
                    fmatch = output_file_regex.search(line)
                    if fmatch:
                        final_output_name = Path(fmatch.group(1)).name
                        pbar.set_description(f"Downloading: {final_output_name}")

            pbar.n = 100
            pbar.refresh()
            pbar.close()
            proc.wait()


            if proc.returncode == 0:
                print(f"\n✅ Download complete: {final_output_name}")
                return str(out_dir / final_output_name), True
            else:
                print(f"\n❌ Download failed (exit code {proc.returncode})")
                return None , False

        elif url_type == "m3u8":
            return download_m3u8(url, output_file=str(output_path), file_height=m3u8_file_height)
        else:
            print("❌ Unsupported URL type.")
            return None, False


    out_dir = Path(folder) if folder else DOWNLOADS_DIR
    out_dir.mkdir(parents=True, exist_ok=True)

    if urls is None:
        url = input("🔗 Enter the media URL: ").strip()
        filename = input("📁 Enter output filename (optional): ").strip() or None
        if detect_url_type(url) == "m3u8":
            res = input("🧐 Enter resolution (360, 480, 720, 1080): ").strip()
            res = int(res) if res.isdigit() else DEFAULT_M3U8_FILE_HEIGHT
            m3u8_file_height = res

        return download_single(url, out_dir, filename, m3u8_file_height)
    else:
        results = []
        total = len(urls)
        for i in range(0, total, batch_size):
            batch = urls[i:i + batch_size]
            print(f"🚀 Batch {i // batch_size + 1} of {((total - 1) // batch_size) + 1}")

            threads = []
            batch_results = [None] * len(batch)

            def thread_wrapper(idx, url):
                path, success = download_single(url, out_dir, None, m3u8_file_height)
                batch_results[idx] = {
                    "url": url,
                    "path": path,
                    "success": success
                }

            for idx, url in enumerate(batch):
                t = threading.Thread(target=thread_wrapper, args=(idx, url))
                t.start()
                threads.append(t)

            for t in threads:
                t.join()

            results.extend(batch_results)

            if i + batch_size < total:
                print(f"⏳ Sleeping {delay}s before next batch...")
                time.sleep(delay)

        print("\n🎉 All downloads processed.")
        return results



if __name__ == "__main__":
  print(download_media())

In [None]:
# @title 📺 yt-dlp Downloader {"form-width":"35%"}
# @markdown See [Supported sites](https://github.com/yt-dlp/yt-dlp/blob/master/supportedsites.md)


import os
import subprocess
import sys
from pathlib import Path

VIDEO_QUALITIES = [2160, 1440, 1080, 720, 480, 360, 240, 144]
AUDIO_FORMATS = ["mp3", "m4a", "opus", "wav", "flac"]

def ytdlp_downloader(
    url: str,
    fmt: str,
    container: str | None,
    is_playlist: bool,
):
    if is_playlist:
        out_tpl = str(DOWNLOADS_DIR / "%(playlist_title)s/%(title)s.%(ext)s")
    else:
        out_tpl = str(DOWNLOADS_DIR / "%(title)s.%(ext)s")

    cmd = [
        "yt-dlp",
        "-f", fmt,
        "-o", out_tpl,
        "--no-part",
        "--progress",
        "--progress-template", "%(info.playlist_index)s %(progress._default_template)s",
        "--no-continue",
        "--yes-playlist" if is_playlist else "--no-playlist",
        "--newline",
        url
    ]

    if fmt == "bestaudio" and container:
        cmd += [
            "--extract-audio",
            "--audio-format", container,
            "--audio-quality", "0"
        ]
    elif container and fmt != "bestaudio":
        cmd.insert(3, "--merge-output-format")
        cmd.insert(4, container)

    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    for line in proc.stdout:
        print(line, end="")

    proc.wait()

    if proc.returncode == 0:
        print("✅ Download complete! 🎉")
    else:
        print(f"⚠️ yt-dlp exited with code {proc.returncode}")


def prompt_for_format() -> tuple[str, str]:
    choice = input("🎧 Audio or 📺 Video? (audio/video): ").strip().lower()
    if choice.startswith("a"):
        print("🎼 Supported audio formats: " + ", ".join(AUDIO_FORMATS))
        ext = input("🎚️ Desired audio format (default=mp3): ").strip().lower() or "mp3"
        if ext not in AUDIO_FORMATS:
            print("❌ Invalid format. Using default mp3.")
            ext = "mp3"
        return "bestaudio", ext
    elif choice.startswith("v"):
        ext = input("🎚️ Desired resolution container (mp4/mpv/webm, default=mp4): ").strip().lower() or "mp4"
        if ext not in ["mp4", "webm", "mkv"]:
            print("❌ Unsupported container. Using default mp4.")
            ext = "mp4"
        print("🔢 Resolutions: " + ", ".join(str(r) for r in VIDEO_QUALITIES))
        while True:
            r = input("🎚️ Desired max height (e.g. 1080, or blank for best): ").strip()
            if r == "":
                return "bv+ba/best", ext
            if r.isdigit() and int(r) in VIDEO_QUALITIES:
                res = int(r)
                fmt = f"bv[height<={res}]+ba/best[height<={res}]"
                return fmt, ext
            print("❌ Invalid resolution.")

    else:
        print("❌ Invalid choice.")
        return prompt_for_format()

if __name__ == "__main__":
    mode = input("➡️ Mode? (single/playlist): ").strip().lower()
    url = input("🔗 URL: ").strip()

    if mode.startswith("s") or mode.startswith("p"):
        fmt, ext = prompt_for_format()
        ytdlp_downloader(
            url,
            fmt,
            ext,
            is_playlist=mode.startswith("p"),
        )
    else:
        print("❌ Unknown mode.")
        sys.exit(1)

    print(f"\n✅ All done! Files saved in {DOWNLOADS_DIR}")


In [None]:
# @title ⬆️ Cloud Uploader (Drive, Gofile, VikingFile, BuzzHeavier) {"form-width":"35%"}


import os
import time
import gc
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx
import json
import math
from google.colab import drive, auth
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from tqdm.notebook import tqdm
from tqdm.utils import CallbackIOWrapper
from io import BytesIO


if not GOFILE_API_TOKEN:
    GOFILE_API_TOKEN = ""  # @param {type:"string", placeholder:"(Optional) Enter your optional Gofile API Token"}

if not VIKINGFILE_USER_HASH:
    VIKINGFILE_USER_HASH = ""  # @param {type:"string", placeholder:"(Optional) Enter your optional VikingFile User Hash"}

if not BUZZHEAVIER_USER_TOKEN:
    BUZZHEAVIER_USER_TOKEN = ""  # @param {type:"string", placeholder:"(Optional) Enter your optional BuzzHeavier User Token"}



class BaseUploader(ABC):
    @abstractmethod
    def upload(self, remote_path: str) -> None:
        """Uploads file or folder"""

# Gdrive Uploader
class GoogleDriveUploader(BaseUploader):
    DRIVE_MOUNT = '/content/drive'
    DRIVE_SERVICE = ('drive', 'v3')

    def __init__(self, base_folder: str = ''):
        self.base_folder = base_folder
        self.service = None
        self.folder_map = {'': None}

    def _authenticate(self):
        auth.authenticate_user()
        drive.mount(self.DRIVE_MOUNT, force_remount=True)
        self.service = build(*self.DRIVE_SERVICE, cache_discovery=False)

    def _ensure_folder(self, parts: list[str]) -> str | None:
        parent = None
        for i, part in enumerate(parts):
            key = '/'.join(parts[:i+1])
            if key in self.folder_map:
                parent = self.folder_map[key]
                continue
            q = f"name='{part}' and mimeType='application/vnd.google-apps.folder' and trashed=false"
            if parent:
                q += f" and '{parent}' in parents"
            resp = self.service.files().list(q=q, fields='files(id)').execute()
            if resp['files']:
                fid = resp['files'][0]['id']
            else:
                body = {'name': part, 'mimeType': 'application/vnd.google-apps.folder'}
                if parent: body['parents'] = [parent]
                fid = self.service.files().create(body=body, fields='id').execute()['id']
            self.folder_map[key] = fid
            parent = fid
        return parent

    def _upload_file(self, path: str, parent_id: str | None) -> None:
        fname = os.path.basename(path)
        file_size = os.path.getsize(path)
        media = MediaFileUpload(path, resumable=True)
        meta = {'name': fname, **({'parents':[parent_id]} if parent_id else {})}

        request = self.service.files().create(body=meta, media_body=media, fields='id')

        pbar = tqdm(total=file_size, unit='B', unit_scale=True, desc=f"Uploading {fname}")
        response = None
        while response is None:
            status, response = request.next_chunk()
            if status:
                pbar.update(status.resumable_progress - pbar.n)


        if pbar.n < file_size:
            pbar.update(file_size - pbar.n)

        pbar.close()


        file_id = response.get('id')
        print(f"✅ {fname} → https://drive.google.com/file/d/{file_id}/view")
        return file_id

    def upload(self, remote_path: str) -> None:
        self._authenticate()
        target = os.path.join(DOWNLOADS_DIR, remote_path)
        if not os.path.exists(target):
            print(f"❌ Not found: {target}")
            return
        if os.path.isfile(target):
            self._upload_file(target, None)
            return
        for root, _, files in os.walk(target):
            rel = os.path.relpath(root, DOWNLOADS_DIR)
            parts = [] if rel == '.' else rel.split(os.sep)
            parts = ([self.base_folder] if self.base_folder else []) + parts
            parent = self._ensure_folder(parts)
            print(f"📂 {'/'.join(parts) or '<root>'}")
            for f in files:
                self._upload_file(os.path.join(root, f), parent)
            print("⏸️ Sleeping 10 s...")
            time.sleep(10)
        print("✅ Google Drive upload done")


# GoFile uploader
class GoFileUploader(BaseUploader):
    API_CREATE_FOLDER = 'https://api.gofile.io/contents/createFolder'
    API_UPLOAD       = 'https://upload.gofile.io/uploadFile'

    def __init__(self, token: str | None = None , root_id: str | None = None):
        self.headers = {'Authorization': f'Bearer {token}'}
        self.root_id = root_id
        self.httpxClient = httpx.Client(headers=self.headers)

    def _new_folder(self, parent: str, name: str) -> str:
        resp = self.httpxClient.post(self.API_CREATE_FOLDER,
                          json={'parentFolderId': parent, 'folderName': name})
        data = resp.json()
        if data['status']=='ok':
            fid = data['data']['id']
            print(f"📂 Created {name} (ID:{fid})")
            return fid
        raise RuntimeError(data.get('message'))

    def _get_root_folder_id(self) -> str:
        resp = self.httpxClient.get("https://api.gofile.io/accounts/getid")
        data = resp.json()
        if data["status"] == "ok":
            account_id =  data["data"]["id"]
            resp = self.httpxClient.get(f"https://api.gofile.io/accounts/{account_id}")
            data = resp.json()
            if data["status"] == "ok":
              return data["data"]["rootFolder"]
            else:
              raise RuntimeError("Failed to get root folder ID")
        else:
            raise RuntimeError("Failed to get account ID")

    def _upload_file(self, path: str, folder: str | None) -> None:
        fname = os.path.basename(path)
        size = os.path.getsize(path)

        with open(path, "rb") as f, \
             tqdm(total=size, unit="B", unit_scale=True, desc=f"⬆️ {fname}") as bar:

            wrapped_file = CallbackIOWrapper(bar.update, f, "read")
            files = {"file": (fname, wrapped_file)}
            data = {'folderId': folder} if folder else {}
            result = None

            with self.httpxClient as client:
                with client.stream(
                    "POST",
                    self.API_UPLOAD,
                    files=files,
                    data=data,
                    timeout=None
                ) as response:
                    response.raise_for_status()
                    text = response.read()
                    result = json.loads(text)

        if result.get("status") == "ok":
            print(f"✅ {fname} → {result['data']['downloadPage']}")
        else:
            raise RuntimeError(result.get("message"))



    def upload(self, remote_path: str) -> None:

        tgt = os.path.join(DOWNLOADS_DIR, remote_path)

        if not os.path.exists(tgt): print(f"❌ Not found: {tgt}"); return

        if not self.token:
            raise RuntimeError("GoFile token is missing")


        if not self.root_id:
            self.root_id = self._get_root_folder_id()
            print(f"📁 Root folder ID: {self.root_id}")

        folder_map = {'': self.root_id}

        if os.path.isfile(tgt): return self._upload_file(tgt, None)
        for root, _, files in os.walk(tgt):
            rel = os.path.relpath(root, DOWNLOADS_DIR)
            parent_key = os.path.dirname(rel)
            parent = self.folder_map.get(parent_key)
            if rel not in self.folder_map:
                self.folder_map[rel] = self._new_folder(parent, os.path.basename(root))
            print(f"📂 Folder: {root}")
            for f in files:
                self._upload_file(os.path.join(root, f), self.folder_map[rel])
            print("⏸️ Sleeping 10 s...")
            time.sleep(10)
        print("✅ GoFile upload done")



# VikingFile Uploader
class VikingFileUploader:
    API_BASE = 'https://vikingfile.com/api'

    def __init__(self, user_hash: str | None = None):
        self.user = user_hash
        self.client = httpx.Client(
            timeout=httpx.Timeout(connect=10.0, read=60.0, write=300.0, pool=10.0)
        )

    def _init_multipart(self, size: int):
        resp = self.client.post(f"{self.API_BASE}/get-upload-url", data={'size': size})
        resp.raise_for_status()
        info = resp.json()
        return info['uploadId'], info['key'], info['partSize'], info['urls']

    def _complete(self, key: str, uid: str, name: str, etags: list):
        payload = {
            'key': key,
            'uploadId': uid,
            'name': name,
            'user': self.user
        }
        for i, (num, tag) in enumerate(etags):
            payload[f"parts[{i}][PartNumber]"] = num
            payload[f"parts[{i}][ETag]"] = tag

        resp = self.client.post(f"{self.API_BASE}/complete-upload", data=payload)
        resp.raise_for_status()
        return resp.json().get('url')


    def _upload_part(self, url, file_path, offset, size, desc=None, thread_id=None):
        with open(file_path, 'rb') as f:
            f.seek(offset)
            chunk_data = f.read(size)

        pbar = tqdm(total=size, unit='B', unit_scale=True, desc=desc, position=thread_id, leave=True)
        buffer = BytesIO(chunk_data)
        wrapped = CallbackIOWrapper(pbar.update, buffer, "read")

        def stream_chunk(chunk_size=100 * 1024 * 1024):
            while True:
                data = wrapped.read(chunk_size)
                if not data:
                    break
                yield data



        headers = {'Content-Length': str(size)}
        resp = self.client.put(url, content=stream_chunk(), headers=headers)
        resp.raise_for_status()
        pbar.close()
        del chunk_data
        del buffer
        gc.collect()
        return resp.headers.get('ETag', '')



    def _upload_file(self, full_path: str):
        size = os.path.getsize(full_path)
        filename = os.path.basename(full_path)
        print(f"\n🔄 Init: {filename} ({size} bytes)")

        uid, key, part_size, urls = self._init_multipart(size)
        etags = []

        usable_ram = 10 * 1024**3  # 10 GiB
        memory_per_part = part_size * 1.2
        max_workers = max(1, math.floor(usable_ram / memory_per_part))
        print(f"🧠 Auto workers: {max_workers} (based on {format_size(part_size)} part size)")

        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = []
            offset = 0

            for i, url in enumerate(urls):
                current_part_size = min(part_size, size - offset)
                if current_part_size <= 0:
                    break
                fut = ex.submit(self._upload_part, url, full_path, offset, current_part_size, f"Part {i+1}", i)
                futures.append((i + 1, fut))
                offset += current_part_size

            for part_number, fut in tqdm(futures, desc=f"⬆️ ETags {filename}", unit="etag",position=99):
                etag = fut.result()
                etags.append((part_number, etag))

        etags.sort(key=lambda x: x[0])
        result = self._complete(key, uid, filename, etags)
        print(f"✅ Done: {result}\n")

    def upload(self, remote_path: str):
        path = os.path.join(DOWNLOADS_DIR, remote_path)
        if not os.path.exists(path):
            print(f"❌ Not found: {path}")
            return

        if not self.user_hash:
            raise RuntimeError("VikingFile user hash is missing")

        if os.path.isfile(path): return self._upload_file(path)
        print(f"📁 Uploading folder: {path}")
        for rd, _, files in os.walk(path):
            for f in files:
                self._upload_file(os.path.join(rd, f))
            print("⏸️ Sleep 10 s...")
            time.sleep(10)
        print("✅ VikingFile Upload done")


# BuzzHeavier uploader
class BuzzHeavierUploader(BaseUploader):

    BUZZ_API_ROOT = 'https://buzzheavier.com/api'
    BUZZ_UPLOAD_ROOT = 'https://w.buzzheavier.com'

    def __init__(self, token: str | None = None , root_id: str | None = None):
        self.headers = {'Authorization': f'Bearer {token}'}
        self.root_id = root_id
        self.httpxClient = httpx.Client(headers=self.headers)

    def _create_folder(self, name: str, parent: str | None) -> str:
        data = {'name': name, 'parentId': parent or self.root}
        resp = httpx.post(f"{self.BUZZ_API_ROOT}/fs", headers=self.headers, json=data)
        info = resp.json()
        if info.get('status')!='ok': raise RuntimeError(info.get('message'))
        fid = info['data']['id']; print(f"📂 New: {name} (ID:{fid})"); return fid


    def _get_root_folder_id(self) -> str:
        resp = httpx.get(f"{self.BUZZ_API_ROOT}/fs", headers=self.headers)
        info = resp.json()
        if info.get('status')!='ok': raise RuntimeError(info.get('message'))
        return info['data']['id']

    def _upload_file(self, path: str, parent: str | None) -> None:
        fname = os.path.basename(path)
        size = os.path.getsize(path)
        url = f"{self.BUZZ_UPLOAD_ROOT}/{(parent + '/') if parent else ''}{fname}"
        params = {'folderId': parent} if parent else {}

        with open(path, "rb") as f, \
             tqdm(total=size, unit="B", unit_scale=True, desc=f"⬆️ {fname}") as bar:

            wrapped_file = CallbackIOWrapper(bar.update, f, "read")

            with httpx.Client(headers=self.headers, timeout=None) as client:
                with client.stream("PUT", url, params=params, content=wrapped_file) as response:
                    response.raise_for_status()
                    text = response.read()
                    result = json.loads(text)
                    print(result)


    def upload(self, remote_path: str) -> None:

        pth = os.path.join(DOWNLOADS_DIR, remote_path)

        if not os.path.exists(pth): print(f"❌ Not found: {pth}"); return


        if not self.token:
            raise RuntimeError("Buzzheavie token is missing")

        if not self.root_id:
            self.root_id = self._get_root_folder_id()
            print(f"📁 Root folder ID: {self.root_id}")


        if os.path.isfile(pth): return self._upload_file(pth, None)
        top = self._create_folder(os.path.basename(pth), None)
        for rd, _, files in os.walk(pth):
            rel = os.path.relpath(rd, pth)
            parent = top if rel==' .' else self._create_folder(rel, top)
            for f in files:
                self._upload_file(os.path.join(rd,f), parent)

            print("⏸️ Sleep 10 s...")
            time.sleep(10)
        print("✅ BuzzHeavier done")



if __name__=='__main__':
    services = [
    ('🗂️ GoFile',        GoFileUploader(GOFILE_API_TOKEN)),
    ('☁️ Google Drive',  GoogleDriveUploader()),
    ('⚔️ VikingFile',    VikingFileUploader(VIKINGFILE_USER_HASH)),
    ('📡 BuzzHeavier',   BuzzHeavierUploader(BUZZHEAVIER_USER_TOKEN)),
    ]
    print("–––––––––––––––––––––––––––––––")
    for i,(n,_) in enumerate(services): print(f"[{i}] {n}")
    print("–––––––––––––––––––––––––––––––")
    idx = int(input("➡️ Select service index: ").strip())
    path = input("📂 File/folder path: ").strip()
    services[idx][1].upload(path)


In [None]:
#@title 📩 Telegram Media Downloader

import re
import os
import asyncio
import requests
from tqdm.notebook import tqdm
from pyrogram import Client


if not TELEGRAM_BOT_TOKEN:
    TELEGRAM_BOT_TOKEN = ""  # @param {type:"string", placeholder:"(Optional) Enter your Telegram bot token"}


def parse_link(link: str) -> tuple[int | str , int]:
    m = re.match(
        r"https?://(?:t\.me|telegram\.me|telegram\.dog|telegram\.org)/(?:c/)?(?P<chat>[^/]+)/(?P<msg_id>\d+)",
        link.strip().lower()
    )
    if not m:
        return None
    chat = m.group("chat")
    msg_id = int(m.group("msg_id"))
    if chat.isdigit() and int(chat) > 0:
        chat = int(f"-100{chat}")
    return (chat, msg_id)


async def download_one(app: Client, chat: int | str, msg_id: int) -> None:
    try:
        message = await app.get_messages(chat, msg_id)

        if not message:
            print(f"❌ Message not found: {chat}/{msg_id}")
            return

        file_name, file_size = (
              (message.video.file_name, message.video.file_size) if message.video else
              (message.audio.file_name, message.audio.file_size) if message.audio else
              (message.document.file_name, message.document.file_size) if message.document else
              ("Photo", message.photo.file_size) if message.photo else
              (None, None)
        )

        if not file_size:
            print(f"❌ Unknown Media type: {chat}/{msg_id}")
            return

        print(f"\n📥 Downloading: {file_name}")

        with tqdm(total=file_size, unit="B", unit_scale=True, desc=file_name, position=0, leave=True) as pbar:
            def progress(current, total):
                pbar.total = total
                pbar.update(current - pbar.n)

            file_path = await app.download_media(
                message,
                file_name=os.path.join(DOWNLOADS_DIR, file_name),
                progress=progress
            )

        print(f"✅ Done: {file_name} → {file_path}")

    except Exception as e:
        print(f"⭕ Error for {chat}/{msg_id}: {e}")

async def telegram_media_downloader():

    links = input("🔗 Enter Telegram message links (comma-separated):\n").split(",")

    parsed_links = [parse_link(link) for link in links]
    valid_links = [p for p in parsed_links if p]

    if not valid_links:
        print("❌ No valid links found.")
        return


    if not TELEGRAM_BOT_TOKEN:
        raise RuntimeError("Telegram bot token is missing")

    app = Client(
        "ddl_bot",
        bot_token=TELEGRAM_BOT_TOKEN,
        api_id=26684954,
        api_hash="a709a6225180f08416b5d9effd1c9fd1",
        in_memory=True
    )

    await app.start()

    try:
        # Bots can't download in parallel
        for chat, msg_id in valid_links:
            await download_one(app, chat, msg_id)
    finally:
        await app.stop()


if __name__ == "__main__":
    await telegram_media_downloader()


In [None]:
# @title 🎬 AnimePahe Downloader

import httpx
import time
import re
from urllib.parse import quote, urlparse, parse_qs
from datetime import datetime
from tqdm.notebook import tqdm



BASE_URL = "https://anime.apex-cloud.workers.dev"
KWIK_URL = "https://access-kwik.apex-cloud.workers.dev/"
AUTH_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.O0FKaqhJjEZgCAVfZoLz6Pjd7Gs9Kv6qi0P8RyATjaE"
RETRY_COUNT = 2
RETRY_DELAY = 1.5
RATE_LIMIT_BATCH = 5
RATE_LIMIT_DELAY = 5


def make_request(url, method="GET", params=None, json_data=None, retries=RETRY_COUNT):

    headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}

    for attempt in range(1, retries + 1):
        try:
            if method.upper() == "GET":
                response = httpx.get(url, params=params, headers=headers, timeout=15)
            else:
                response = httpx.post(url, json=json_data, headers=headers, timeout=15)

            response.raise_for_status()
            return response.json()
        except (httpx.RequestError, httpx.HTTPStatusError) as e:
            if attempt == retries:
                print(f"❌ Error after {retries} attempts: {e}")
                raise
            print(f"⚠️ Attempt {attempt} failed: {e}. Retrying in {RETRY_DELAY}s...")
            time.sleep(RETRY_DELAY)

def search_anime(query):
    params = {"method": "search", "query": query}
    return make_request(BASE_URL, params=params)

def get_series_info(session_id, page=1):
    params = {"method": "series", "session": session_id, "page": page}
    return make_request(BASE_URL, params=params)

def get_episode_links(series_session, episode_session):
    params = {"method": "episode", "session": series_session, "ep": episode_session}
    return make_request(BASE_URL, params=params)

def resolve_kwik_link(kwik_url):
    payload = {
        "service": "kwik",
        "action": "fetch",
        "content": {"kwik": kwik_url},
        "auth": AUTH_TOKEN
    }
    return make_request(KWIK_URL, method="POST", json_data=payload)


def select_best_quality(links):
    def get_resolution(link):
        match = re.search(r'(\d+)p', link.get("name", "0p"), re.IGNORECASE)
        return int(match.group(1)) if match else 0

    if not links:
        return None

    return max(links, key=get_resolution)

def parse_expiry_time(url):
    query_params = parse_qs(urlparse(url).query)
    expires_list = query_params.get("expires")

    if not expires_list or not expires_list[0].isdigit():
        return "Unknown"

    try:
        expiry_time = datetime.fromtimestamp(int(expires_list[0]))
        return expiry_time.strftime("%Y-%m-%d %H:%M:%S")
    except (ValueError, TypeError):
        return "Unknown"


def validate_episode_range(input_str, max_episodes):

    input_str = input_str.strip().lower()


    if input_str in ["all", "a"]:
        return list(range(1, max_episodes + 1))


    if input_str.isdigit():
        ep_num = int(input_str)
        if 1 <= ep_num <= max_episodes:
            return [ep_num]
        else:
            print(f"❌ Episode {ep_num} is out of range (1-{max_episodes})")
            return None


    if "-" in input_str:
        try:
            start, end = map(int, input_str.split("-"))
            if start <= end and 1 <= start <= max_episodes and 1 <= end <= max_episodes:
                return list(range(start, end + 1))
            else:
                print(f"❌ Episode range {start}-{end} is invalid (must be between 1-{max_episodes})")
                return None
        except ValueError:
            pass

    print("❌ Invalid input format. Use 'all', a single number, or a range (e.g., '1-5')")
    return None


if __name__ == "__main__":

    # Step 1: Search for anime
    print("🔍 Anime Downloader Tool")
    print("=" * 50)

    anime_name = input("Enter anime name to search: ").strip()
    if not anime_name:
        print("❌ Search query cannot be empty")
        raise SystemExit

    print(f"\nSearching for '{anime_name}'...")
    search_results = search_anime(anime_name)

    # print(search_results)

    if search_results.get("total", 0) == 0:
        print("❌ No results found. Try a different search term.")
        raise SystemExit


    print(f"\n📋 Found {search_results['total']} results:")
    print("-" * 50)
    for idx, anime in enumerate(search_results["data"]):
        print(f"[{idx}] {anime['title']} ({anime.get('year', 'N/A')}) • {anime.get  ('episodes', '?')} episodes • {anime.get('status', 'Unknown')}")

    # Step 2: Select anime from results
    while True:
        try:
            selection = int(input(f"\nSelect anime by number [0-{len(search_results ['data'])-1}]: "))
            if 0 <= selection < len(search_results["data"]):
                selected_anime = search_results["data"][selection]
                break
            else:
                print(f"❌ Please enter a number between 0 and {len(search_results  ['data'])-1}")
        except ValueError:
            print("❌ Please enter a valid number")

    # Step 3: Fetch series information and episodes
    anime_session = selected_anime["session"]
    anime_title = selected_anime["title"]
    page1_data = get_series_info(anime_session, page=1)
    total_episodes = page1_data.get("total", 0)
    total_pages = page1_data.get("total_pages", 1)
    episodes_per_page = len(page1_data.get("episodes", []))

    print(f"\n🎬 {page1_data.get('title', selected_anime['title'])}")
    print(f"📊 Total Episodes: {total_episodes} (across {total_pages} pages)")

    # Step 4: Select episodes to download
    while True:
        episode_input = input("\nWhich episodes to download? ('all', single number,     or range like '1-5'): ")
        episodes_to_download = validate_episode_range(episode_input, total_episodes)
        if episodes_to_download:
            break

    print(f"\n🔄 Will download {len(episodes_to_download)} episodes:    {episodes_to_download[0]}-{episodes_to_download[-1]}")

    # Step 5: Gather episode session IDs (may need to paginate)
    print("\nGathering episode information...")
    episode_sessions = {}
    pages_needed = set()

    # Determine which pages we need to fetch
    for ep_num in episodes_to_download:
        page_num = ((ep_num - 1) // episodes_per_page) + 1
        pages_needed.add(page_num)

    # Fetch each required page and extract session IDs
    for page_num in sorted(pages_needed):
        print(f"Fetching page {page_num}/{total_pages}...")
        page_data = get_series_info(anime_session, page=page_num)

        for episode in page_data.get("episodes", []):
            ep_num = int(episode.get("episode", "0"))
            if ep_num in episodes_to_download:
                episode_sessions[ep_num] = episode.get("session")

    # Step 6: Process each episode and get download links
    print("\n🔍 Processing episodes and fetching download links...")
    download_links = {}

    for i, (ep_num, session) in enumerate(tqdm(sorted(episode_sessions.items()),    desc="Processing Episodes")):
        try:

            episode_data = get_episode_links(anime_session, session)
            if not isinstance(episode_data, list) or len(episode_data) == 0:
                print(f"⚠️ No quality links found for Episode {ep_num}")
                continue

            best_quality = select_best_quality(episode_data)
            if not best_quality:
                print(f"⚠️ No download links found for Episode {ep_num}")
                continue


            kwik_url = best_quality.get("link")
            ddl_data = resolve_kwik_link(kwik_url)


            direct_url = ddl_data.get("content", {}).get("url", "")
            if not direct_url:
                print(f"⚠️ Empty direct URL for Episode {ep_num}")
                continue


            expiry_time = parse_expiry_time(direct_url)
            quality = best_quality.get("name", "Unknown")
            download_links[ep_num] = {
                "url": direct_url,
                "quality": quality,
                "expires": expiry_time
            }


            if (i + 1) % RATE_LIMIT_BATCH == 0 and i + 1 < len(episode_sessions):
                print(f"💤 Sleeping for {RATE_LIMIT_DELAY}")
                time.sleep(RATE_LIMIT_DELAY)

        except Exception as e:
            print(f"❌ Error processing Episode {ep_num}: {str(e)}")

    # Step 7: Display results
    print("\n✅ Download Links Ready:")
    print("=" * 80)

    if not download_links:
        print("❌ No download links were successfully generated.")
    else:
        for ep_num in sorted(download_links.keys()):
            info = download_links[ep_num]
            print(f"Episode {ep_num:02d} [{info['quality']}]")
            print(f"🔗 {info['url']}")
            print(f"⏰ Expires: {info['expires']}")
            print("-" * 80)

        proceed = input("\nDo you want to start downloading these files now? (y/n):     ").strip().lower()
        if proceed == 'y':
            # Step 8: Download in memory
            urls_to_download = [info["url"] for info in download_links.values()]
            download_media(urls=urls_to_download , folder = f"{DOWNLOADS_DIR}/  {anime_title}")
        else:
            print("Download skipped by user.")


    print(f"\n🎉 Successfully generated {len(download_links)} out of {len   (episodes_to_download)} requested episode links.")

In [None]:
# @title  AnimeKai Downloader (Soon)


In [None]:
#@title 📂🔄 Manage Downloaded Files & Folders

import os
import re
import shutil
import gc
from pathlib import Path
from datetime import datetime
from tqdm.notebook import tqdm
from natsort import natsorted


def get_folder_size(folder_path: Path) -> int:
    return sum(f.stat().st_size for f in folder_path.rglob('*') if f.is_file())

def natural_sort_key(p: Path):
    return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', p.name)]


def list_files() -> dict[str,Path]:
    base = Path(DOWNLOADS_DIR)
    entries = sorted(base.iterdir(), key=natural_sort_key) if base.exists() else []
    if not entries:
        print("📂 No files or folders found.")
        return {}
    print(f"{'Index':<6} {'Name':<50} {'Type':<8} {'Size':<10} {'Modified'}")
    idx_map = {}
    def _print(entry: Path, idx: str, level: int):
        indent = "  "*level
        mtime = datetime.fromtimestamp(entry.stat().st_mtime).strftime("%Y-%m-%d %H:%M")
        typ = "File" if entry.is_file() else "Folder"
        size = format_size(entry.stat().st_size) if entry.is_file() else format_size(get_folder_size(entry))
        name = entry.name + ("/" if entry.is_dir() else "")
        print(f"{idx:<6} {indent+name:<50} {typ:<8} {size:<10} {mtime}")
        idx_map[idx] = entry
        if entry.is_dir():
            subs = sorted(entry.iterdir(), key=natural_sort_key)
            for i, sub in enumerate(subs):
                _print(sub, f"{idx}.{i}", level+1)
    for i, e in enumerate(entries):
        _print(e, str(i), 0)
    return idx_map


def parse_indices(s: str, keys: set[str]) -> list[str]:
    out = set()
    for tok in [t.strip() for t in s.split(',') if t.strip()]:
        if ':' in tok:
            a, b = tok.split(':')
            depth = a.count('.')
            prefix = a.rsplit('.', 1)[0] + '.' if '.' in a else ''
            sibs = natsorted(k for k in keys if k.startswith(prefix) and k.count('.') == depth)
            if a in sibs and b in sibs:
                ia, ib = sibs.index(a), sibs.index(b)
                for k in sibs[min(ia, ib): max(ia, ib)+1]:
                    out.add(k)
        else:
            out.add(tok)
    return sorted(out, key=lambda x: [int(t) if t.isdigit() else t for t in x.split('.')])

def delete_selected(idx_map: dict[str,Path]):
    inp = input("\n🗑️ Enter index(es) to delete (e.g. 1,2.0:2.3,3.1): ").strip()
    if not inp:
        print("🚫 No input—aborting.")
        return
    chosen = parse_indices(inp, set(idx_map))
    to_del = []
    for k in chosen:
        p = idx_map.get(k)
        if p and p.exists():
            to_del.append(p)
        else:
            print(f"⚠️ Invalid index: {k}")
    if not to_del:
        print("🚫 No valid targets.")
        return
    print("\n⚠️ Will delete:")
    for p in to_del:
        print(" •", p)
    if input("Type 'yes' to confirm: ").strip().lower() != 'yes':
        print("🚫 Cancelled.")
        return
    for p in to_del:
        try:
            if p.is_dir(): shutil.rmtree(p)
            else: p.unlink()
            print("✅ Deleted:", p)
        except Exception as e:
            print("❌ Error deleting", p, ":", e)
    gc.collect()


if __name__ == "__main__":
    index_map = list_files()
    if index_map:
        if input("\n❓ Delete any? (y/N): ").strip().lower() == 'y':
            delete_selected(index_map)


In [None]:
#@title 🔍 Show Media Info (mediainfo)

import subprocess
from pathlib import Path


def show_media_info():
    path = input("🎞️ Enter full path to media file: ").strip()
    file = Path(path)

    if not file.exists():
        print("❌ File not found.")
        return

    try:
        result = subprocess.run(
            ["mediainfo", "--Output=Tree", str(file)],
            check=True,
            stdout=subprocess.PIPE,
            text=True
        )
        print(f"\n📄 Media Info for: {file.name}\n")
        print(result.stdout)
    except Exception as e:
        print(f"❌ Error running mediainfo: {e}")


if __name__ == "__main__":
    show_media_info()


In [None]:
#@title 📦 Zipper (Zip and Split File or Folder)

import os


target_name = input("📂 Enter file/folder path: ").strip()
target_path = os.path.join(DOWNLOADS_DIR, target_name)

zip_base_name = input("📦 Enter output base name: ").strip()
if not zip_base_name:
    zip_base_name = os.path.basename(target_name)

if zip_base_name.lower().endswith('.zip'):
    zip_base_name = zip_base_name[:-4]


full_zip_path = os.path.join(DOWNLOADS_DIR, f"{zip_base_name}.zip")
split_prefix = os.path.join(DOWNLOADS_DIR, zip_base_name)


if not os.path.exists(target_path):
    print("❌ Not found:", target_path)
else:
    is_file = os.path.isfile(target_path)
    print("📌 Detected:", "File" if is_file else "Folder")
    print(f"\n📦 Zipping {'file' if is_file else 'folder'} → {full_zip_path}")
    if is_file:
        !zip -j "{full_zip_path}" "{target_path}"
    else:
        !zip -r "{full_zip_path}" "{target_path}"


    do_split = input("\n❓ Do you want to split the ZIP? (y/N): ").strip().lower()
    if do_split == 'y':
        split_size = input("📐 Enter max part size (e.g., 4G, 700M) [default: 4G]: ").strip()
        if not split_size:
            split_size = "4G"

        print(f"\n✂️ Splitting ZIP into chunks of {split_size} as .partN files...")
        split_cmd = f'split -b {split_size} "{full_zip_path}" --numeric-suffixes=1 --suffix-length=1 "{full_zip_path}.part"'
        os.system(split_cmd)

        print("\n✅ Split complete. Output parts:")
        !ls -lh "{full_zip_path}.part"*
    else:
        print("\n📦 Skipped splitting. Single ZIP is ready.")
