# [作者 TsukiSama9292 - Github 連結](https://github.com/tsukisama9292)
## [參考文獻 - 連結](https://ianwu.tw/press/topic/command_line_program/yt-dlp.html#%E4%BB%8B%E7%B4%B9)

# 依賴安裝

In [None]:
!pip install -q yt-dlp
!pip install -q -U gradio

In [1]:
import yt_dlp
import threading
import queue
import os
import zipfile
import shutil
from datetime import datetime
import gradio as gr


# 定義進度回調函數
def progress_hook(q):
    def hook(d):
        if d['status'] == 'downloading':
            speed = d.get('speed', 0)
            speed_mb = speed / 1024 / 1024 if speed is not None else 0  # 將速度轉換為千字節每秒 (KB/s)，如果為 None，則設為 0
            q.put({
                '狀態': '下載中影片中',
                '已下載大小(MB)': d['downloaded_bytes']/1024/1024,
                '當前影片總容量(MB)': d.get('total_bytes', 0)/1024/1024,
                '網路速度(MB/秒)': speed_mb,
                '預設剩餘時間(秒)': d.get('eta', 0),
            })
        elif d['status'] == 'finished':
            q.put({
                '狀態': '已完成',
                '檔名': d['filename']
            })
    return hook

# 定義下載函數
def download_Run(ydl_opts, url, q):
    ydl_opts['progress_hooks'] = [progress_hook(q)]
    
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])

def download_Thread(ydl_opts,url):
    # 創建一個 Queue 來傳遞進度消息
    q = queue.Queue()

    # 在獨立的執行緒中執行下載函數
    thread = threading.Thread(target=download_Run, args=(ydl_opts, url, q))
    thread.start()

    # 即時顯示下載進度
    while thread.is_alive() or not q.empty():
        try:
            progress = q.get(timeout=1)
            yield progress
        except queue.Empty:
            pass

def download_video(YOUTUBE_URL, do_zip):
    if(do_zip):
        ydl_opts = {
        'format': 'bestvideo+bestaudio/best',  # 下載最佳畫質
        'outtmpl': 'Downloads_TMP/%(title)s.%(ext)s',  # 影片文件名格式
        }
        yield from download_Thread(ydl_opts,YOUTUBE_URL)
        if has_files('Downloads_TMP'):
            zip_files('Video')
            yield "影片下載完成"
        else:
            yield "輸入的URL錯誤，無法獲取影片"
    else:
        ydl_opts = {
        'format': 'bestvideo+bestaudio/best',  # 下載最佳畫質
        'outtmpl': 'Downloads/Video/%(title)s.%(ext)s',  # 影片文件名格式
        }
        yield from download_Thread(ydl_opts,YOUTUBE_URL)
        if has_files('Downloads/Video'):
            yield "影片下載完成"
        else:
            yield "輸入的URL錯誤，無法獲取影片"
        
    

def download_audio(YOUTUBE_URL, do_zip):
    if(do_zip):
        ydl_opts = {
        'format': 'bestvideo+bestaudio/best',  # 下載最佳畫質
        'outtmpl': 'Downloads_TMP/%(title)s.%(ext)s',  # 影片文件名格式
        }
        yield from download_Thread(ydl_opts,YOUTUBE_URL)
        if has_files('Downloads_TMP'):
            zip_files('Audio')
            yield "音檔下載完成"
        else:
            yield "輸入的URL錯誤，無法獲取影片"
    else:
        # 音檔下載
        ydl_opts = {
            'format': 'bestvideo+bestaudio/best',  # 下載最佳畫質和音質，並合併
            'outtmpl': 'Downloads/Audio/%(title)s.%(ext)s',  # 影片文件名格式，下載到 downloads 資料夾
            'postprocessors': [{  # 下載後處理選項
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'mp3',
                'preferredquality': '192',
            }],
        }
        yield from download_Thread(ydl_opts,YOUTUBE_URL)
        if has_files('Downloads/Audio'):
            yield "音檔下載完成"
        else:
            yield "輸入的URL錯誤，無法獲取音檔"
            
    
def has_files(folder_path):
    for _, _, files in os.walk(folder_path):
        if files:
            return True
    return False

def zip_files(Type):
    now = datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    second = now.second
    folder_to_zip = 'Downloads_TMP'
    
    if has_files(folder_to_zip):
        zip_folder = 'Downloads_ZIP'
        if not os.path.exists(zip_folder):
            os.makedirs(zip_folder)
        
        zip_filename = f'{zip_folder}/{Type}_{year}_{month}_{day}_{hour}_{minute}_{second}.zip'
        with zipfile.ZipFile(zip_filename, 'w') as zipf:
            for root, dirs, files in os.walk(folder_to_zip):
                for file in files:
                    zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), folder_to_zip))
        # 檢查資料夾是否存在
        if os.path.exists(folder_to_zip):
            shutil.rmtree(folder_to_zip)
    else:
        print(f"資料夾 {folder_to_zip} 中不包含檔案")

def gradio_interface(function_choice, YOUTUBE_URL, do_zip):
    if function_choice == "下載影片":
        yield from download_video(YOUTUBE_URL, do_zip)
    elif function_choice == "下載音檔":
        yield from download_audio(YOUTUBE_URL, do_zip)

iface = gr.Interface(
    fn=gradio_interface,
    inputs=[
        gr.Radio(["下載影片", "下載音檔"], label="選擇功能"),
        gr.Textbox(label="輸入 YouTube 清單或影片的連結"),
        gr.Checkbox(label="壓縮成 ZIP 檔")
    ],
    outputs="text",
)
iface.queue().launch(share=True)
# 4K 60fps 影片連結(測試用) : https://youtu.be/3aRJtJRa434
# 播放清單 和 影片 不可以放分享連結(share結尾)，要放一般的網址，如 : https://www.youtube.com/playlist?list=PLm2oaJc6zPi1ZJyHaGid4P3sCke4b0qfL

  from .autonotebook import tqdm as notebook_tqdm


Running on local URL:  http://127.0.0.1:7860
Running on public URL: https://a251e7a93767f255e8.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


