In [None]:
!apt install ffmpeg

from google.colab import drive

drive.mount("/content/drive")

# 保存用ディレクトリ作成
import os

path = "/content/drive/MyDrive/MediaKnife"
os.makedirs(path, exist_ok=True)

save_dir = f"{path}/results"
os.makedirs(save_dir, exist_ok=True)

!pip install fastapi uvicorn nest-asyncio websockets yt-dlp ffmpeg-python kora

!pip install git+https://github.com/xinntao/BasicSR.git@master

# https://github.com/xinntao/Real-ESRGAN?tab=readme-ov-file#installation
!git clone https://github.com/xinntao/Real-ESRGAN
%cd Real-ESRGAN
!python setup.py develop
%cd ..

!pip install demucs

In [None]:
from fastapi import FastAPI, File, UploadFile
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from yt_dlp import YoutubeDL
import nest_asyncio
from google.colab import output
import uvicorn
import shutil
import demucs.separate
import os
import ffmpeg
import subprocess
from kora.xattr import get_id
import time

path = "/content/drive/MyDrive/MediaKnife"

app = FastAPI()


# ファイルパスからドライブのURLを取得
def get_drive_url(file_path):
    # localという文字が含まれなくなるまで繰り返す
    file_id = ""
    while True:
        file_id = get_id(file_path)
        if not "local" in file_id:
            break
        time.sleep(0.5)
    return f"https://drive.google.com/file/d/{file_id}/view"


# ファイルの一覧を取得
@app.get("/upload")
async def read_upload_file():
    files = os.listdir(f"{path}/uploads")
    return {"files": files}


class ytDto(BaseModel):
    url: str


# 動画ダウンロード
@app.post("/yt-dlp")
async def yt_dlp(urlDto: ytDto):
    url = urlDto.url

    # ディレクトリが存在しなければ作成
    save_dir = f"{path}/uploads"
    os.makedirs(save_dir, exist_ok=True)

    # 動画の情報を抽出
    ydl = YoutubeDL()
    info = ydl.extract_info(url, download=False)

    # 暫定的にスラッシュをアンダースコアに置換
    # 予期しない動作を防ぐため
    title = info.get("title", None).replace("/", "_")

    save_path = f"{save_dir}/{title}.mp4"

    format = (
        "bestvideo[ext=mp4]+bestaudio[ext=m4a]"
        if info.get("webpage_url_domain", None) == "youtube.com" or "nicovideo.jp"
        else "best"
    )

    # format = "bestvideo[ext=mp4]+bestaudio[ext=m4a]"

    # yt-dlpでダウンロード
    ydl_opts = {
        "format": format,
        "merge_output_format": "mp4",
        "no_mtime": True,
        "outtmpl": save_path,
    }
    with YoutubeDL(ydl_opts) as ydl:
        result = ydl.download([url])
        if result == 0:
            return {"filename": f"{title}.mp4"}


@app.post("/upload")
async def create_upload_file(file: UploadFile = File(...)):
    # ディレクトリが存在しなければ作成
    save_dir = f"{path}/uploads"
    os.makedirs(save_dir, exist_ok=True)

    # ファイル保存
    save_path = f"{save_dir}/{file.filename}"

    with open(save_path, "wb+") as f:
        f.write(file.file.read())

    return {"filename": file.filename}


class processDto(BaseModel):
    filename: str
    # ffmpeg_mp3 | ffmpeg_wav | demucs | esrgan | segmentation | whisper
    process: str


# 処理実行
@app.post("/process")
def process_file(processDto: processDto):
    # 拡張子を除いたファイル名
    # 最後尾の拡張子を除去
    filename_without_ext = processDto.filename.rsplit(".", 1)[0]
    process = processDto.process

    upload_dir = f"{path}/uploads"
    # ディレクトリが存在しなければ作成
    save_dir = f"{path}/results/"
    os.makedirs(save_dir, exist_ok=True)

    # 処理実行
    if process == "ffmpeg_mp3":
        # すでに存在していればそれを返す
        if os.path.exists(f"{save_dir}/{filename_without_ext}.mp3"):
            url = get_drive_url(f"{save_dir}/{filename_without_ext}.mp3")
            return {"filename": f"{filename_without_ext}.mp3", "url": url}

        ffmpeg.input(f"{upload_dir}/{processDto.filename}").output(
            f"{save_dir}/{filename_without_ext}.mp3", acodec="libmp3lame"
        ).run()

        url = get_drive_url(f"{save_dir}/{filename_without_ext}.mp3")

        return {"filename": f"{filename_without_ext}.mp3", "url": url}

    elif process == "ffmpeg_wav":
        # すでに存在していればそれを返す
        if os.path.exists(f"{save_dir}/{filename_without_ext}.wav"):
            url = get_drive_url(f"{save_dir}/{filename_without_ext}.wav")
            return {"filename": f"{filename_without_ext}.wav", "url": url}

        ffmpeg.input(f"{upload_dir}/{processDto.filename}").output(
            f"{save_dir}/{filename_without_ext}.wav", acodec="pcm_s24le"
        ).run()

        url = get_drive_url(f"{save_dir}/{filename_without_ext}.wav")

        return {"filename": f"{filename_without_ext}.wav", "url": url}
    elif process == "demucs":
        # すでに存在していればそれを返す
        if os.path.exists(f"{save_dir}/{filename_without_ext}.zip"):
            url = get_drive_url(f"{save_dir}/{filename_without_ext}.zip")
            return {"filename": f"{filename_without_ext}.zip", "url": url}

        # mp3が存在しなければ
        if not os.path.exists(f"{save_dir}/{filename_without_ext}.mp3"):
            ffmpeg.input(f"{upload_dir}/{processDto.filename}").output(
                f"{save_dir}/{filename_without_ext}.mp3", acodec="libmp3lame"
            ).run()
        os.makedirs(f"{save_dir}/{filename_without_ext}", exist_ok=True)

        demucs.separate.main(
            [
                "-n",
                "htdemucs_ft",
                f"{save_dir}/{filename_without_ext}.mp3",
                "-o",
                f"{save_dir}/{filename_without_ext}",
            ]
        )

        # zip形式で圧縮
        shutil.make_archive(
            f"{save_dir}/{filename_without_ext}",
            format="zip",
            root_dir=f"{save_dir}/{filename_without_ext}/htdemucs_ft/{filename_without_ext}",
        )

        url = get_drive_url(f"{save_dir}/{filename_without_ext}.zip")

        # 作業ディレクトリを削除
        shutil.rmtree(f"{save_dir}/{filename_without_ext}")

        # zipファイル名が結果として返る
        return {"filename": f"{filename_without_ext}.zip", "url": url}
    elif process == "esrgan":
        # すでに存在していればそれを返す
        if os.path.exists(f"{save_dir}/{filename_without_ext}_outx2.mp4"):
            url = get_drive_url(f"{save_dir}/{filename_without_ext}_outx2.mp4")
            return {"filename": f"{filename_without_ext}_outx2.mp4", "url": url}

        # TODO: エラーハンドリング
        !python Real-ESRGAN/inference_realesrgan_video.py -n RealESRGAN_x4plus -i "$upload_dir/$processDto.filename" -n RealESRGAN_x4plus_anime_6B -s 2 --suffix outx2 -o "$save_dir"

        url = get_drive_url(f"{save_dir}/{filename_without_ext}_outx2.mp4")

        return {"filename": f"{filename_without_ext}_outx2.mp4", "url": url}

    # elif process == "segmentation":
    #     # !segmentation -i {path}/uploads/$filename -o {save_dir}
    #     os.system(f"segmentation -i {path}/uploads/{filename} -o {save_dir}")
    # elif process == "whisper":
    #     # !whisper -i {path}/uploads/$filename -o {save_dir}
    #     os.system(f"whisper -i {path}/uploads/{filename} -o {save_dir}")

    else:
        raise ValueError(f"Invalid process: {process}")


# TODO: distコンテンツを取得する処理
app.mount("/results", StaticFiles(directory=f"{path}/results"), name="results")
app.mount("/", StaticFiles(directory="dist", html=True), name="static")


nest_asyncio.apply()


output.serve_kernel_port_as_iframe(8000, height=900)


uvicorn.run(app, port=8000, host="0.0.0.0")