In [None]:
!sudo apt-get update
!sudo apt-get install aria2 ffmpeg yt-dlp -y
%pip install yt-dlp==2023.9.24
%pip install aiohttp==3.8.5
%pip install git+https://github.com/m-bain/whisperx.git


In [None]:
!echo "DETA_DRIVE_KEY=CHANGE_ME">> .env
!echo "project_id=CHANGE_ME" >> .env
!echo "drive_name=CHANGE_ME"  >> .env


In [2]:
from yt_dlp import YoutubeDL as yt_dlp


def download_video(url: str):
    print(f"Starting download of {url}")
    ydl_opts = {
        "external_downloader": {"default": "aria2c"},
        "extract_flat": "discard_in_playlist",
        "format": "Audio_Only",
        "fragment_retries": 10,
        "ignoreerrors": "only_download",
        "outtmpl": {"default": "./download/%(id)s.%(ext)s"},
        "postprocessors": [
            {"key": "FFmpegConcat", "only_multi_video": True, "when": "playlist"}
        ],
        "retries": 10,
    }
    with yt_dlp(ydl_opts) as ydl:
        ydl.download([url])
    print(f"Finished download of {url}")


In [3]:
from dotenv import load_dotenv
import aiohttp
import os

load_dotenv()


async def upload_data(file: str | bytes, filename: str) -> None:
    data: bytes | None = None
    if isinstance(file, str):
        with open(file, "rb") as f:
            data = f.read()

    BASE_URL = f"https://drive.deta.sh/v1/{os.environ['project_id']}/{os.environ['drive_name']}"
    headers = {
        "X-API-Key": os.environ["DETA_DRIVE_KEY"],
    }

    params = {
        "name": filename,
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"{BASE_URL}/files",
            params=params,
            headers=headers,
            data=data,
        ) as resp:
            print(f"{filename}", resp.status)



In [4]:
import asyncio
import subprocess
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16
compute_type = "float16" if torch.cuda.is_available() else "int8"
model = "large-v2"


def transcribe_vod(vod_id: str):
    download_video(f"https://www.twitch.tv/videos/{vod_id}")

    file = f"./download/v{vod_id}.mp4"
    command = [
        "whisperx",
        f"{file}",
        "--model",
        f"{model}",
        "--device",
        f"{device}",
        "--print_progress",
        "True",
        "--task",
        "transcribe",
        "--language",
        "pt",
        "--batch_size",
        f"{batch_size}",
        "--compute_type",
        f"{compute_type}",
        "--output_format",
        "all",
        "--output_dir",
        "./outputs/",
    ]

    print(f"Starting whisper subprocess command for id {vod_id}")
    print(" ".join(command))
    process = subprocess.Popen(
        args=command,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    while True:
        output: str = process.stdout.readline()  # type: ignore
        if output == "" and process.poll() is not None:
            break
        if output:
            print(output.strip())

    # Get the subprocess return code
    return_code = process.poll()

    # Print the return code
    print("Whisper subprocess command finished with return code: ", return_code)
    if return_code != 0:
        print("Error")
        raise Exception("Whisper subprocess command failed")
    else:
        asyncio.run(upload_results(vod_id))


async def upload_results(vod_id: str):
    tasks = [
        upload_data(f"outputs/v{vod_id}.json", f"{vod_id}/v{vod_id}.json"),
        upload_data(f"outputs/v{vod_id}.srt", f"{vod_id}/v{vod_id}.srt"),
        upload_data(f"outputs/v{vod_id}.tsv", f"{vod_id}/v{vod_id}.tsv"),
        upload_data(f"outputs/v{vod_id}.txt", f"{vod_id}/v{vod_id}.txt"),
        upload_data(f"outputs/v{vod_id}.vtt", f"{vod_id}/v{vod_id}.vtt"),
    ]
    await asyncio.gather(*tasks)


In [None]:
list_of_vods = ["",""]

for i in list_of_vods:
    transcribe_vod(i)
