<a href="https://colab.research.google.com/github/garder500/GestionStock/blob/master/ggl_colab_download_cpu.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title <font size="5">← ឵឵<i>Install FFMPEG and requirements</font>
from IPython.display import clear_output

!pip install httpx nest_asyncio aiofiles
!apt install ffmpeg
clear_output()
!ffmpeg -version

In [None]:
#@title <font size="5">← ឵឵<i>Mount Google Drive</font>

from google.colab import drive
drive.mount('/content/drive')

In [None]:
#@title <font size="5">← ឵឵<i>core</font>

import base64
import json
import os
import re
import subprocess
import unicodedata
from typing import Any, NamedTuple
from urllib.parse import urlencode, urlparse, urlunparse

import httpx
from bs4 import BeautifulSoup

PROTOCOL = "https"
WORKER = "proxy.ketsuna.com"
BASE_URL = "neko.ketsuna.com"

VIDEO_REGEX = re.compile(r"video\[0\] = '(.+)';", re.MULTILINE)
M3U8_REGEX = [
    re.compile(r'e\.parseJSON\(atob\(t\).slice\(2\)\)\}\(\"([^;]*)"\),'),
    re.compile(r'e\.parseJSON\(n\)}\(\"([^;]*)"\),'),
    re.compile(r'n=atob\("([^"]+)"'),
]
M3U8_RES = re.compile(r"#EXT-X-STREAM-INF:.+RESOLUTION=(\d+x\d+).+")

URL_PARTS = re.compile(r"https://[\w\-.]+/anime/(?P<lang>vostfr|vf)/(?P<id>\d+)/episode/(?P<nb>\d+)")

client = httpx.AsyncClient()

def set_worker(url: str) -> str:
    params = urlencode({"url": url})
    url = urlunparse([PROTOCOL, WORKER, "/", "", params, ""])
    return url


def slugify(value: str, allow_unicode=False):
    """
    Taken from https://github.com/django/django/blob/master/django/utils/text.py
    Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
    dashes to single dashes. Remove characters that aren't alphanumerics,
    underscores, or hyphens. Convert to lowercase. Also strip leading and
    trailing whitespace, dashes, and underscores.
    """
    value = str(value)
    if allow_unicode:
        value = unicodedata.normalize('NFKC', value)
    else:
        value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
    value = re.sub(r'[^\w\s-]', '', value.lower())
    return re.sub(r'[-\s]+', '-', value).strip('-_')


class Context(NamedTuple):
    url: str
    subtitles: str | None
    episode_name: str


async def get_m3u8(url: str) -> Context:
    match = URL_PARTS.match(url)
    if not match:
        raise ValueError("url is not valid")
    res = await client.get(f"https://api.ketsuna.com/animes/{match['id']}/{match['nb']}")
    raw = res.json()

    m3u8 = set_worker(raw[match['lang']]["videoUri"])
    subtitles = raw[match['lang']]["videoVtt"]
    return Context(url=m3u8, subtitles=subtitles, episode_name=f"{raw[match['lang']]['title']} - ep {raw[match['lang']]['num']}")


async def get_available_qualities(ctx: Context) -> dict[str, str]:
    response = await client.get(ctx.url)

    if not response.text.startswith("#EXTM3U"):
        raise ValueError("Not a m3u8 file")

    lines = iter(response.text.splitlines())
    next(lines)
    qualities: dict[str, str] = {}
    for line in lines:
        if line.startswith("#EXT"):
            if match := M3U8_RES.search(line):
                quality = match.group(1)
                qualities[quality] = next(lines)
    return qualities


async def download_form_m3u8(
    url: str, output: str
) -> tuple[subprocess.Popen[bytes], float]:
    filename = slugify(os.path.splitext(os.path.basename(output))[0])
    if not os.path.exists("./tmp"):
        os.mkdir("./tmp")

    with open(f"./tmp/{filename}.m3u8", "wb") as f:
        response = await client.get(url)
        f.write(response.content)  # worker is already set
        total_duration = sum(map(float, re.findall(r"#EXTINF:([\d.]+)", response.text)))

        args = [
    "ffmpeg",
    "-progress",
    f"./tmp/{filename}-progression.txt",
    "-y",  # overwrite output file
    "-protocol_whitelist",
    "file,http,https,tcp,tls,crypto",
    "-i",
    f"./tmp/{filename}.m3u8",
    "-bsf:a",
    "aac_adtstoasc",
    "-c",
    "copy",
    "-c:v",
    "libx265",  # specify H.265 encoding
    output,
        ]

    process = subprocess.Popen(
        args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
    )

    return process, total_duration

In [None]:
#@title <font size="5">← ឵឵<i>Dowload </font>

from pathlib import Path
import asyncio
import ipywidgets as widgets
from typing import Literal
from sys import stdout
from functools import partial

import aiofiles
import nest_asyncio

nest_asyncio.apply()

async def check_progression(file: str) -> float | Literal["end"] | None:
    if not os.path.exists(file):
        return None
    async with aiofiles.open(file, "r") as f:
        # Seek to the end of the file
        await f.seek(0, 2)
        end_pos = await f.tell()

        def analyze_line(line: str) -> float | Literal["end"] | None:
            if line.startswith("progress=") and line.endswith("end"):
                return "end"
            if line.startswith("out_time_ms="):
                return float(line.split("=")[1]) / 1_000_000
            return None

        line: list[str] = []
        for pos in range(end_pos - 1, -1, -1):
            await f.seek(pos, 0)
            char = await f.read(1)
            if char == "\n":
                result = analyze_line("".join(reversed(line)))
                if result is not None:
                    return result
                line = []
            else:
                line.append(char)

#@markdown URL can be comma separated.

#@markdown https://deril-fr.github.io/anime/vostfr/17733/episode/6, https://deril-fr.github.io/anime/vostfr/17733/episode/7
URL = "https://deril-fr.github.io/anime/vostfr/17733/episode/6, https://deril-fr.github.io/anime/vostfr/17733/episode/7" #@param {type:"string"}
QUALITY = "high" #@param ["high", "medium", "low"]

class DownloadProcess(NamedTuple):
    process: subprocess.Popen[bytes]
    duration: int
    progress: widgets.IntProgress
    ctx: Context

async def main():
    urls = URL.split(",")
    processes = []

    path = Path("./drive/MyDrive/derildownload")
    path.mkdir(exist_ok=True)

    for url in urls:
        ctx = await get_m3u8(url.strip())
        qualities = await get_available_qualities(ctx)
        options = list(qualities.keys())

        options = sorted(list(qualities.keys()), key=lambda x: int(x.split("x")[0]), reverse=True)
        if QUALITY == "high":
            quality = options[0]
        elif QUALITY == "medium":
            quality = options[len(options) // 2]
        elif QUALITY == "low":
            quality = options[-1]

        print(f"Quality : {quality}")
        print(f"Episode : {ctx.episode_name}")
        progress = widgets.IntProgress(
            value=0,
            min=0,
            max=100,
            description='???%',
            bar_style='info', # 'success', 'info', 'warning', 'danger' or ''
            style={'bar_color': 'blue'},
            orientation='horizontal'
        )
        display(progress)
        print()

        episode_path = path / f"{ctx.episode_name}.mp4"
        process, duration = await download_form_m3u8(qualities[quality], episode_path)

        processes.append(DownloadProcess(process=process, duration=duration, progress=progress, ctx=ctx))

    while any(process.process.poll() is None for process in processes):
        await asyncio.sleep(2)
        for process in processes:
            filename = slugify(os.path.splitext(os.path.basename(path / f"{process.ctx.episode_name}.mp4"))[0])
            progression = await check_progression(f"./tmp/{filename}-progression.txt")

            if progression == "end":
                process.progress.value = 100
                process.progress.description = "100.00%"
            elif progression is None or process.duration is None:
                value = 0
                process.progress.description = "???%"
            else:
                value = progression / process.duration * 100
                process.progress.value = int(value)
                process.progress.description = f"{value:.2f}%"


asyncio.run(main())