In [None]:
!pip install -q gradio yt_dlp

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.1/172.1 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.7/56.7 MB[0m [31m11.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m319.8/319.8 kB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m40.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.7/94.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m447.5/447.5 kB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.0/11.0 MB[0m [31m30.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.3/73.3 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [37]:
import gradio as gr
import pandas as pd
from yt_dlp import YoutubeDL
import re
import os
from pathlib import Path
from scipy.io import wavfile

In [42]:
def handles_yt_link_id(link_id):
    return re.search(r"(?:(?:https).+youtu.be/|(?:https).+v=|^)([\d\w_]+)", link_id).group(1)

In [None]:
download_path = Path("./data/download")
edited_path = Path("./data/edited")
ori_path = Path(os.path.join(edited_path, "original"))
arr_path = Path(os.path.join(edited_path, "arrangement"))

download_path.mkdir(parents=True, exist_ok=True)
edited_path.mkdir(parents=True, exist_ok=True)
ori_path.mkdir(parents=True, exist_ok=True)
arr_path.mkdir(parents=True, exist_ok=True)


In [43]:
def check_downloaded(yt_id, csv="./data/downloaded.csv"):
    if os.path.exists(csv):
        df = pd.read_csv(csv)
        return yt_id in df["id"].values
    else:
        return False


In [44]:
def save_downloaded(yt_id, ext, csv="./data/downloaded.csv"):
    if os.path.exists(csv):
        df = pd.read_csv(csv)
        if yt_id not in df["id"].values:
            df = pd.concat([df, pd.DataFrame({"id": [yt_id], "ext": [ext]})], ignore_index=True)
    else:
        df = pd.DataFrame({"id": [yt_id], "ext": [ext]})

    df.to_csv(csv, index=False)

In [None]:
def download_audio(yt_id, ext="mp3"):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': ext,
            'preferredquality': '192',
        }],
        'outtmpl': os.path.join(download_path, f'{yt_id}.%(ext)s')
    }
    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([f'https://www.youtube.com/watch?v={yt_id}'])

    return os.path.join(download_path, f'{yt_id}.{ext}')

In [56]:
def download(original_link, arrangement_link, original_ext="mp3", arrangement_ext="mp3"):
    original_id = handles_yt_link_id(original_link)
    arrangement_id = handles_yt_link_id(arrangement_link)

    if not check_downloaded(original_id):
        print(f"Downloading original audio ID:{original_id}")
        original_audio_path = download_audio(original_id, original_ext)
        save_downloaded(original_id, original_ext)
    else:
        print(f"Original audio already downloaded ID:{original_id}")
        original_audio_path = os.path.join(download_path, f'{original_id}.{original_ext}')

    if not check_downloaded(arrangement_id):
        print(f"Downloading arrangement audio ID:{arrangement_id}")
        arrangement_audio_path = download_audio(arrangement_id, arrangement_ext)
        save_downloaded(arrangement_id, arrangement_ext)
    else:
        print(f"Arrangement audio already downloaded ID:{arrangement_id}")
        arrangement_audio_path = os.path.join(download_path, f'{arrangement_id}.{arrangement_ext}')

    return original_id, arrangement_id, original_audio_path, arrangement_audio_path

In [53]:
def min_text_to_sec(min_text):
    if ":" not in min_text:
        return float(min_text)

    min, sec = min_text.split(":")
    return int(min) * 60 + float(sec)

In [57]:
def trim_audio(audio, start, end):
    print("Trimming audio")
    sr, arr = audio
    arr = arr[int(min_text_to_sec(start) * sr):int(min_text_to_sec(end) * sr)]
    return (sr, arr)

In [46]:
def save_pair(ori_name, arr_name, csv="./data/edited/pairs.csv"):
    if os.path.exists(csv):
        df = pd.read_csv(csv)
        if arr_name not in df["arr_name"].values:
            df = pd.concat([df, pd.DataFrame({"ori_name": [ori_name], "arr_name": [arr_name]})], ignore_index=True)
        else:
            # change the original
            df.loc[df["arr_name"] == arr_name, "ori_name"] = ori_name
    else:
        df = pd.DataFrame({"ori_name": [ori_name], "arr_name": [arr_name]})
    df.to_csv(csv, index=False)

In [58]:
def save(original_id, arrangement_id, original_audio, arrangement_audio, original_ext="mp3", arrangement_ext="mp3"):
    print("Saving")
    ori_sr, ori_arr = original_audio
    arr_sr, arr_arr = arrangement_audio

    save_pair(original_id, arrangement_id)

    wavfile.write(os.path.join(ori_path, f'{original_id}.{original_ext}'), ori_sr, ori_arr)
    wavfile.write(os.path.join(arr_path, f'{arrangement_id}.{arrangement_ext}'), arr_sr, arr_arr)

In [59]:
with gr.Blocks() as app:
    gr.Markdown("# Dataset downloader and preprocessor")

    with gr.Row():
        with gr.Column():
            original_link = gr.Textbox(label="Original link/id")
            original_id = gr.Textbox(label="Original id", interactive=False)


        with gr.Column():
          arrangement_link = gr.Textbox(label="Arrangement link/id")
          arrangement_id = gr.Textbox(label="Arrangement id", interactive=False)

    download_button = gr.Button("Download")

    with gr.Row():
        with gr.Column():
            original_audio = gr.Audio(label="Original audio", interactive=False)

            with gr.Row():
                original_start = gr.Textbox(label="Original start")
                original_end = gr.Textbox(label="Original end")

            original_trim_button = gr.Button("Trim")

            original_trim_button.click(trim_audio, inputs=[original_audio, original_start, original_end], outputs=[original_audio])


        with gr.Column():
            arrangement_audio = gr.Audio(label="Arrangement audio", interactive=False)

            with gr.Row():
                arrangement_start = gr.Textbox(label="Arrangement start")
                arrangement_end = gr.Textbox(label="Arrangement end")

            arrangement_trim_button = gr.Button("Trim")

            arrangement_trim_button.click(trim_audio, inputs=[arrangement_audio, arrangement_start, arrangement_end], outputs=[arrangement_audio])

    download_button.click(download, inputs=[original_link, arrangement_link], outputs=[original_id, arrangement_id, original_audio, arrangement_audio])

    save_button = gr.Button("Save")

    save_button.click(save, inputs=[original_id, arrangement_id, original_audio, arrangement_audio])

In [49]:
# https://www.youtube.com/watch?v=L13gCEZJVRU
# https://www.youtube.com/watch?v=NW370z0Icls

In [60]:
 app.launch(True, debug=True)

Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://0085bb620bc075d7f1.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://0085bb620bc075d7f1.gradio.live




In [None]:
app.close()

Closing server running on port: 7860
