# YouTube Audio Processing Workflow

This notebook integrates the workflow for downloading audio from YouTube, processing it, transcribing the audio to text, cleaning the transcription, and summarizing the content.

## Step 1: Download YouTube Audio

Download audio from a specified YouTube URL and save it to a designated directory.

In [None]:
import os
import yt_dlp as youtube_dl

# Configuration
DOWNLOAD_DIR = "./SELF/data/audio"
BASE_FILENAME = "full_audio"
EXTENSION = ".m4a"
OUTPUT_FILE = os.path.join(DOWNLOAD_DIR, f"{BASE_FILENAME}{EXTENSION}")

# Ensure download directory exists
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

def download_audio(url):
    print("🎵 正在下載音訊...")
    ydl_opts = {
        'format': 'bestaudio[ext=m4a]',
        'outtmpl': os.path.join(DOWNLOAD_DIR, f'{BASE_FILENAME}.%(ext)s'),
        'quiet': False,
        'verbose': True
    }
    
    with youtube_dl.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])
    print(f"✅ 音訊下載完成，檔案已儲存至：{OUTPUT_FILE}")

# Example URL
url = "https://www.youtube.com/watch?v=vzGxmtYXJkY"
download_audio(url)

## Step 2: Trim Audio

Trim the downloaded audio to a specific segment if needed.

In [None]:
import subprocess

# Configuration for trimming
TEMP_WAV_FILE = os.path.join(DOWNLOAD_DIR, "temp_audio.wav")
TRIMMED_FILE = os.path.join(DOWNLOAD_DIR, "trimmed_audio.mp3")

def trim_audio():
    print("✂️ 正在裁切音訊片段...")
    if not os.path.isfile(OUTPUT_FILE):
        raise FileNotFoundError("❌ 找不到下載完成的音訊檔案")

    # Convert m4a to wav format first
    subprocess.run([
        'ffmpeg', '-y',
        '-i', OUTPUT_FILE,
        TEMP_WAV_FILE
    ], check=True)

    # Set trimming time range
    start_time = '00:04:07'  # Start time
    duration = '00:15:05'    # Duration

    # Use ffmpeg to trim and convert to mp3 format
    subprocess.run([
        'ffmpeg', '-y',
        '-i', TEMP_WAV_FILE,
        '-ss', start_time,
        '-t', duration,
        '-acodec', 'libmp3lame',
        '-ab', '192k',
        TRIMMED_FILE
    ], check=True)

    print(f"✅ 裁切完成，檔案已儲存為：{TRIMMED_FILE}")

    # Delete temporary wav file
    os.remove(TEMP_WAV_FILE)

# Trim the audio
trim_audio()

## Step 3: Convert and Split Audio for Transcription

Convert the audio to WAV format and split it into smaller clips for transcription.

In [None]:
import os
import subprocess

# Configuration
INPUT_FILE = TRIMMED_FILE if os.path.exists(TRIMMED_FILE) else OUTPUT_FILE
OUTPUT_DIR = "./SELF/data/output_clips"
CLIP_DURATION_SEC = 600  # 10 minutes

def clear_output_folder():
    if os.path.exists(OUTPUT_DIR):
        for file in os.listdir(OUTPUT_DIR):
            file_path = os.path.join(OUTPUT_DIR, file)
            try:
                if os.path.isfile(file_path):
                    os.remove(file_path)
                elif os.path.isdir(file_path):
                    os.rmdir(file_path)
            except Exception as e:
                print(f"⚠️ 無法刪除 {file_path}: {e}")
        print("🗑️ 輸出資料夾已清除！")

def convert_to_wav(input_file):
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)
    output_wav = os.path.join(OUTPUT_DIR, "converted.wav")
    cmd = [
        "ffmpeg",
        "-i", input_file,
        "-ac", "1",
        "-ar", "16000",
        "-q:a", "0",
        output_wav
    ]
    subprocess.run(cmd, check=True)
    return output_wav

def split_audio(input_file, duration_sec):
    clip_files = []
    timestamps = []
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)
    cmd = [
        "ffprobe",
        "-i", input_file,
        "-show_entries", "format=duration",
        "-of", "default=noprint_wrappers=1:nokey=1",
        "-v", "quiet"
    ]
    total_duration = float(subprocess.check_output(cmd).decode().strip())
    for i, start_time in enumerate(range(0, int(total_duration), duration_sec)):
        end_time = min(start_time + duration_sec, total_duration)
        clip_filename = os.path.join(OUTPUT_DIR, f"clip_{i+1:03d}.wav")
        cmd = [
            "ffmpeg",
            "-i", input_file,
            "-ss", str(start_time),
            "-t", str(duration_sec),
            "-acodec", "copy",
            clip_filename
        ]
        subprocess.run(cmd, check=True)
        start_time_str = f"{int(start_time) // 3600:02d}:{(int(start_time) % 3600) // 60:02d}:{int(start_time) % 60:02d}"
        end_time_str = f"{int(end_time) // 3600:02d}:{(int(end_time) % 3600) // 60:02d}:{int(end_time) % 60:02d}"
        timestamps.append(f"Clip {i+1}: {start_time_str} - {end_time_str}\n")
        clip_files.append((clip_filename, start_time, end_time))
    with open(os.path.join(OUTPUT_DIR, "timestamps.txt"), "w") as f:
        f.writelines(timestamps)
    print("✅ 音訊切割完成，時間戳已儲存至 timestamps.txt")
    return clip_files

# Clear old files and process audio
clear_output_folder()
wav_file = convert_to_wav(INPUT_FILE)
clip_files = split_audio(wav_file, CLIP_DURATION_SEC)

## Step 4: Transcribe Audio Clips

Use Whisper.cpp to transcribe the audio clips into text.

In [None]:
# Configuration for transcription
WHISPER_EXEC = "./whisper.cpp/build/bin/whisper-cli"
WHISPER_MODEL = "whisper.cpp/models/ggml-medium.bin"
LANGUAGE = "zh"

def transcribe_audio(clip_files):
    transcript_file = os.path.join(OUTPUT_DIR, "transcription.txt")
    with open(transcript_file, "w", encoding="utf-8") as f:
        for clip_filename, start_time, end_time in clip_files:
            print(f"🎤 轉錄 {clip_filename} ...")
            cmd = [
                WHISPER_EXEC,
                "-m", WHISPER_MODEL,
                "-f", clip_filename,
                "--language", LANGUAGE
            ]
            try:
                result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="ignore")
                text = result.stdout.strip()
                if result.stderr:
                    print(f"⚠️ Whisper.cpp 錯誤: {result.stderr}")
                start_time_str = f"{int(start_time) // 3600:02d}:{(int(start_time) % 3600) // 60:02d}:{int(start_time) % 60:02d}"
                end_time_str = f"{int(end_time) // 3600:02d}:{(int(end_time) % 3600) // 60:02d}:{int(end_time) % 60:02d}"
                timestamp = f"[{start_time_str} - {end_time_str}]"
                f.write(f"{timestamp}\n{text}\n\n")
            except UnicodeDecodeError as e:
                print(f"❌ UnicodeDecodeError: {e}. 嘗試繼續轉錄下一個片段...")
                continue
    print("✅ 轉錄完成，結果已儲存至 transcription.txt")

# Transcribe the audio clips
transcribe_audio(clip_files)

## Step 5: Clean Transcription

Clean the transcription by removing timestamps and formatting the text.

In [None]:
import re

# Configuration
INPUT_TRANSCRIPTION_FILE = os.path.join(OUTPUT_DIR, "transcription.txt")
OUTPUT_CLEAN_FILE = os.path.join(OUTPUT_DIR, "clean_transcription.txt")

def read_transcription(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        return f.read()

def clean_transcription(text):
    cleaned_segments = []
    sections = re.split(r"(\[\d{2,3}:\d{2} - \d{2,3}:\d{2}\])\n", text)
    for i in range(1, len(sections), 2):
        header = sections[i].strip()
        content = sections[i + 1]
        cleaned_text = re.sub(r"\[\d{2,3}:\d{2}:\d{2}\.\d{3} --> \d{2,3}:\d{2}:\d{2}\.\d{3}\]", "", content)
        cleaned_text = cleaned_text.replace("\n", " ").strip()
        if cleaned_text:
            cleaned_segments.append(f"{header}\n{cleaned_text}")
    return cleaned_segments

def save_cleaned_transcription(cleaned_segments, output_file):
    with open(output_file, "w", encoding="utf-8") as f:
        f.write("\n\n".join(cleaned_segments))

# Clean the transcription
text = read_transcription(INPUT_TRANSCRIPTION_FILE)
cleaned_segments = clean_transcription(text)
save_cleaned_transcription(cleaned_segments, OUTPUT_CLEAN_FILE)
print(f"✅ 逐字稿清理完成，結果已儲存至 {OUTPUT_CLEAN_FILE}")

## Step 6: Summarize Transcription

Summarize the cleaned transcription using an API.

In [None]:
import re
import requests
import json
import time

# Configuration for summarization
LMSTUDIO_API_URL = "http://127.0.0.1:1234/v1/chat/completions"
MODEL_NAME = "meta-llama-3.1-8b-instruct"
MAX_TOKENS = 200
TEMPERATURE = 0.7
OUTPUT_SUMMARY_FILE = os.path.join(OUTPUT_DIR, "summary.txt")

def parse_transcription(file_path):
    with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
        content = f.read()
    pattern = r"\[(\d{2,3}:\d{2}) - (\d{2,3}:\d{2})\]\n(.+?)\n\n"
    matches = re.findall(pattern, content, re.DOTALL)
    segments = []
    for start, end, text in matches:
        segments.append({"start": start, "end": end, "text": text.strip()})
    return segments

def summarize_text(text):
    payload = {
        "model": MODEL_NAME,
        "messages": [
            {"role": "user", "content": f"請將以下內容總結成 200 字以內，不需要回應問題：\n{text}"}
        ],
        "max_tokens": MAX_TOKENS,
        "temperature": TEMPERATURE
    }
    try:
        headers = {"Content-Type": "application/json"}
        response = requests.post(LMSTUDIO_API_URL, headers=headers, data=json.dumps(payload))
        response.raise_for_status()
        response_json = response.json()
        print(f"📩 API 回應: {response_json}")
        choices = response_json.get("choices", [])
        if choices and "message" in choices[0]:
            summary = choices[0]["message"].get("content", "").strip()
        else:
            summary = choices[0].get("text", "").strip()
        return summary if summary else "摘要失敗"
    except requests.exceptions.RequestException as e:
        print(f"❌ API 請求失敗: {e}")
        return "摘要失敗"

# Summarize the cleaned transcription
segments = parse_transcription(OUTPUT_CLEAN_FILE)
summary_results = []
for segment in segments:
    print(f"🔍 正在摘要: {segment['start']} - {segment['end']} ...")
    summary = summarize_text(segment["text"])
    summary_results.append(f"[{segment['start']} - {segment['end']}]\n{summary}\n")
    time.sleep(1)  # Avoid API overload

with open(OUTPUT_SUMMARY_FILE, "w", encoding="utf-8", errors="ignore") as f:
    f.writelines(summary_results)

print(f"✅ 摘要完成，結果已儲存於 {OUTPUT_SUMMARY_FILE}")