In [None]:
import threading
import requests
import os
import time
import mmap
from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed

class BrutalProgressTracker:
    def __init__(self, total_size):
        self.total_size = total_size
        self.downloaded = 0
        self.lock = Lock()
        self.start_time = time.time()
        self.last_update = 0

    def update(self, bytes_downloaded):
        with self.lock:
            self.downloaded += bytes_downloaded
            # Update even less frequently - every 250ms
            now = time.time()
            if now - self.last_update > 0.25:
                self._print_progress()
                self.last_update = now

    def _print_progress(self):
        if self.total_size == 0:
            return

        progress = (self.downloaded / self.total_size) * 100
        elapsed = time.time() - self.start_time
        speed = self.downloaded / elapsed if elapsed > 0 else 0
        speed_mb = speed / (1024 * 1024)

        print(f"\r{progress:.1f}% | {speed_mb:.2f} MB/s | {self.downloaded//(1024*1024)}/{self.total_size//(1024*1024)} MB", end='', flush=True)

def download_chunk_massive(session, url, start_byte, end_byte, output_path, part_index, progress_tracker):
    """Downloads with massive chunks and minimal overhead."""
    headers = {'Range': f'bytes={start_byte}-{end_byte}'}

    for attempt in range(2):  # Only 2 attempts, fail fast
        try:
            response = session.get(url, headers=headers, stream=True, timeout=60)
            response.raise_for_status()

            with open(f"{output_path}.part{part_index}", 'wb') as f:
                # MASSIVE chunks - 1MB at a time
                for chunk in response.iter_content(chunk_size=1048576):  # 1MB chunks
                    if chunk:
                        f.write(chunk)
                        progress_tracker.update(len(chunk))
            return True

        except Exception as e:
            if attempt == 1:
                print(f"\nPart {part_index} FAILED: {e}")
                return False
            time.sleep(0.1)

def create_beast_session():
    """Creates the most aggressive session possible."""
    session = requests.Session()

    # Maximum aggression
    adapter = requests.adapters.HTTPAdapter(
        pool_connections=200,
        pool_maxsize=200,
        max_retries=1,  # Fail fast
        pool_block=False
    )

    session.mount('http://', adapter)
    session.mount('https://', adapter)

    session.headers.update({
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': '*/*',
        'Connection': 'keep-alive',
        'Accept-Encoding': 'identity'  # No compression for max speed
    })

    return session

def lightning_merge(output_filename, num_parts):
    """Ultra-fast merge using memory mapping."""
    print("Lightning merge starting...")
    start_time = time.time()

    # Calculate total size first
    total_size = 0
    part_sizes = []
    for i in range(num_parts):
        part_file = f"{output_filename}.part{i}"
        if os.path.exists(part_file):
            size = os.path.getsize(part_file)
            part_sizes.append(size)
            total_size += size
        else:
            print(f"MISSING PART: {part_file}")
            return False

    # Pre-allocate the final file
    with open(output_filename, 'wb') as f:
        f.seek(total_size - 1)
        f.write(b'\0')

    # Memory map the output file
    with open(output_filename, 'r+b') as f:
        with mmap.mmap(f.fileno(), 0) as mmapped_file:
            offset = 0

            for i in range(num_parts):
                part_file = f"{output_filename}.part{i}"

                # Copy entire part file at once
                with open(part_file, 'rb') as part:
                    data = part.read()
                    mmapped_file[offset:offset + len(data)] = data
                    offset += len(data)

                # Delete immediately after copying
                os.remove(part_file)

    merge_time = time.time() - start_time
    print(f"Merge completed in {merge_time:.2f}s")
    return True

def beast_downloader(url, output_filename, num_threads=64, chunk_size_mb=10):
    """The most aggressive downloader possible."""

    session = create_beast_session()

    try:
        response = session.head(url, allow_redirects=True, timeout=15)
        response.raise_for_status()

        final_url = response.url
        total_size = int(response.headers.get('content-length', 0))

        if total_size == 0:
            print("No content-length. Using single thread.")
            return simple_beast_download(session, final_url, output_filename)

        if 'accept-ranges' not in response.headers:
            print("No range support. Single thread.")
            return simple_beast_download(session, final_url, output_filename)

        print(f"Target: {total_size / (1024*1024):.2f} MB")
        print(f"Unleashing {num_threads} threads with {chunk_size_mb}MB chunks...")

        progress_tracker = BrutalProgressTracker(total_size)

        # MASSIVE chunks - minimum 10MB per chunk
        min_chunk_size = chunk_size_mb * 1024 * 1024
        chunk_size = max(total_size // num_threads, min_chunk_size)
        actual_threads = min(num_threads, (total_size + chunk_size - 1) // chunk_size)

        print(f"Using {actual_threads} threads, {chunk_size//(1024*1024)}MB per chunk")

        futures = []

        with ThreadPoolExecutor(max_workers=actual_threads) as executor:
            for i in range(actual_threads):
                start_byte = i * chunk_size
                end_byte = min(start_byte + chunk_size - 1, total_size - 1)

                if start_byte >= total_size:
                    break

                thread_session = create_beast_session()
                future = executor.submit(
                    download_chunk_massive,
                    thread_session, final_url, start_byte, end_byte,
                    output_filename, i, progress_tracker
                )
                futures.append(future)

            # Wait and check
            failed = 0
            for future in as_completed(futures):
                if not future.result():
                    failed += 1

        if failed > 0:
            print(f"\n{failed} parts FAILED. Aborting.")
            return False

        print(f"\nDownload done. Starting lightning merge...")

        # Lightning-fast merge
        success = lightning_merge(output_filename, len(futures))

        if success:
            total_time = time.time() - progress_tracker.start_time
            avg_speed = (total_size / total_time) / (1024 * 1024)
            print(f"COMPLETE: {output_filename}")
            print(f"Total time: {total_time:.1f}s | Average: {avg_speed:.2f} MB/s")

        return success

    except Exception as e:
        print(f"FAILED: {e}")
        return False
    finally:
        session.close()