<a href="https://colab.research.google.com/github/Deril-fr/ggl-colab-download/blob/master/ggl-colal-dowload.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title <font size="5">← ឵឵<i>Install FFMPEG</font> { vertical-output: true }
from IPython.display import clear_output

!apt install ffmpeg
clear_output()
!ffmpeg -version

In [None]:
#@title <font size="5">← ឵឵<i>Install libraries...</font> { vertical-output: true }
!pip install httpx simple_term_menu nest_asyncio


In [None]:
#@title <font size="5">← ឵឵<i>Set constants</font> { vertical-output: true }
import re

PROTOCOL = "https"
WORKER = "proxy.ketsuna.com"
BASE_URL = "neko.ketsuna.com"

VIDEO_REGEX = re.compile(r"video\[0\] = '(.+)';", re.MULTILINE)
M3U8_REGEX = [
    re.compile(r'e\.parseJSON\(atob\(t\).slice\(2\)\)\}\(\"([^;]*)"\),'),
    re.compile(r'e\.parseJSON\(n\)}\(\"([^;]*)"\),'),
    re.compile(r'n=atob\("([^"]+)"'),
]
M3U8_RES = re.compile(r"#EXT-X-STREAM-INF:.+RESOLUTION=(\d+x\d+).+")

URL_PARTS = re.compile(r"https://[\w\-.]+/anime/(?P<lang>vostfr|vf)/(?P<id>\d+)/episode/(?P<nb>\d+)")

In [None]:
#@title <font size="5">← ឵឵<i>Do the magic !</font> { vertical-output: true }

import base64
import json
import os
import subprocess
from typing import Any, NamedTuple
from urllib.parse import urlencode, urlparse, urlunparse

import httpx
from bs4 import BeautifulSoup

client = httpx.AsyncClient()

def set_worker(url: str) -> str:
    params = urlencode({"url": url})
    url = urlunparse([PROTOCOL, WORKER, "/", "", params, ""])
    return url


class Context(NamedTuple):
    url: str
    subtitles: str | None


async def get_m3u8(url: str) -> Context:
    match = URL_PARTS.match(url)
    if not match:
        raise ValueError("url is not valid")
    res = await client.get(f"https://api.ketsuna.com/animes/{match['lang']}/{match['id']}/{match['nb']}")
    raw = res.json()

    m3u8 = set_worker(raw["videoUri"])
    subtitles = raw["videoVtt"]
    return Context(url=m3u8, subtitles=subtitles)


async def get_available_qualities(ctx: Context) -> dict[str, str]:
    response = await client.get(ctx.url)

    if not response.text.startswith("#EXTM3U"):
        raise ValueError("Not a m3u8 file")

    lines = iter(response.text.splitlines())
    next(lines)
    qualities: dict[str, str] = {}
    for line in lines:
        if line.startswith("#EXT"):
            if match := M3U8_RES.search(line):
                quality = match.group(1)
                qualities[quality] = next(lines)
    return qualities


async def download_form_m3u8(
    url: str, output: str
) -> tuple[subprocess.Popen[bytes], float]:
    filename = os.path.splitext(os.path.basename(output))[0]
    if not os.path.exists("./tmp"):
        os.mkdir("./tmp")

    with open(f"./tmp/{filename}.m3u8", "wb") as f:
        response = await client.get(url)
        f.write(response.content)  # worker is already set
        total_duration = sum(map(float, re.findall(r"#EXTINF:([\d.]+)", response.text)))

    args = [
        "ffmpeg",
        "-progress",
        "./tmp/progression.txt",
        "-y",  # overwrite output file
        "-protocol_whitelist",
        "file,http,https,tcp,tls,crypto",
        "-i",
        f"./tmp/{filename}.m3u8",
        "-bsf:a",
        "aac_adtstoasc",
        "-c",
        "copy",
        "-vcodec",
        "copy",
        "-crf",
        "1",
        output,
    ]

    process = subprocess.Popen(
        args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
    )

    return process, total_duration

In [None]:
import asyncio
import ipywidgets as widgets
from typing import Literal
from sys import stdout

import nest_asyncio

nest_asyncio.apply()

def check_progression(file: str) -> float | Literal["end"] | None:
    if not os.path.exists(file):
        return None
    with open(file, "r") as f:
        # Seek to the end of the file
        f.seek(0, 2)
        end_pos = f.tell()

        def analyze_line(line: str) -> float | Literal["end"] | None:
            if line.startswith("progress=") and line.endswith("end"):
                return "end"
            if line.startswith("out_time_ms="):
                return float(line.split("=")[1]) / 1_000_000
            return None

        line: list[str] = []
        for pos in range(end_pos - 1, -1, -1):
            f.seek(pos, 0)
            char = f.read(1)
            if char == "\n":
                result = analyze_line("".join(reversed(line)))
                if result is not None:
                    return result
                line = []
            else:
                line.append(char)


async def main():
    raw_url = input("Episode URL (from deril): ")
    ctx = await get_m3u8(raw_url)
    qualities = await get_available_qualities(ctx)
    options = list(qualities.keys())

    while True:
        print("Please chose a quality:")
        for i, quality in enumerate(qualities.keys()):
            print(i, quality)
        selection = input()
        try:
            index = int(selection)
        except:
            pass
        else:
            break
    
    clear_output()
    print("Your download is about to start...")
    process, duration = await download_form_m3u8(qualities[options[index]], "episode.mp4")

    while process.poll() is None:
        await asyncio.sleep(2)
        progression = check_progression("./tmp/progression.txt")
        if progression == "end":
            stdout.write("\rDone.")
        elif progression is None or duration is None:
            stdout.write("\rIn progress... (no eta)")
        else:
            stdout.write(f"\rIn progress... ({progression / duration * 100:.2f}%)")
        stdout.flush()

    os.remove(f"./tmp/episode.m3u8")
    os.remove(f"./tmp/progression.txt")


asyncio.run(main())