In [None]:
# 🎙️ YouTube Audio Cleaner for Voice Cloning

# ✅ Step 1: Install dependencies
# Installs the tools we need for downloading and processing YouTube audio
!pip install -q yt-dlp  # For downloading YouTube videos/audio
!apt-get -qq install ffmpeg  # For converting and cleaning audio

# ✅ Step 2: Mount Google Drive for export
# We'll save all final audio files directly to a folder in your Google Drive
from google.colab import drive
from pathlib import Path
import shutil
import os
import urllib.parse
import subprocess
import time

from contextlib import contextmanager
import signal

# This function limits how long each YouTube job can take to avoid infinite hangs
@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutError("⏱️ Process exceeded maximum allowed time.")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)

# Mount Google Drive and define the export folder path
# ✅ All cleaned audio will appear in /MyDrive/YouTubeAudio
print("📂 Mounting Google Drive...")
drive.mount('/content/drive')
drive_export_path = Path("/content/drive/MyDrive/YouTubeAudio")
drive_export_path.mkdir(parents=True, exist_ok=True)
print(f"📁 Files will be saved to: {drive_export_path}\n")

# 🔥🔥🔥  STEP 3: PASTE YOUR YOUTUBE LINKS HERE 🔥🔥🔥
# 🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽🔽
# Paste up to 10 YouTube URLs below (each in quotes, separated by commas)
# You can include full links with or without timestamps (&t=... is OK)
urls = [
    "https://www.youtube.com/watch?v=dQw4w9WgXcQ",  # <- Replace this line with your own
    # "https://www.youtube.com/watch?v=ANOTHER_ID",
    # "https://www.youtube.com/watch?v=SOME_OTHER_ID",
]
# 🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼🔼
# 💡 You can re-run the notebook with new URLs anytime

# ✅ Helper functions
# These support downloading, cleaning, and extracting metadata from YouTube audio

def sanitize_filename(title):
    # Replace spaces and slashes in video title to create safe filenames
    return "_".join(title.strip().split()).replace("/", "-").replace("\\", "-")

def get_youtube_title(url):
    # Extract and sanitize the video title using yt-dlp
    result = subprocess.run([
        "yt-dlp", "--get-title", url
    ], capture_output=True, text=True)
    return sanitize_filename(result.stdout.strip())

def extract_audio_info(filepath):
    # Use ffprobe to extract sample rate, channel config, and codec from a file
    result = subprocess.run([
        "ffprobe", "-v", "error",
        "-select_streams", "a:0",
        "-show_entries", "stream=sample_rate,channels,codec_name",
        "-of", "default=noprint_wrappers=1:nokey=0",
        filepath
    ], capture_output=True, text=True)
    info = {}
    for line in result.stdout.strip().split("\n"):
        if line.startswith("sample_rate"):
            info['sr'] = int(line.split("=")[1])
        elif line.startswith("channels"):
            info['ch'] = "mono" if line.split("=")[1] == "1" else "stereo"
        elif line.startswith("codec_name"):
            info['codec'] = line.split("=")[1]
    return info

def download_audio(youtube_url, output_name="audio"):
    # Download the best audio stream from YouTube using yt-dlp
    parsed_url = urllib.parse.urlparse(youtube_url)
    base_url = f"https://www.youtube.com/watch?v={parsed_url.query.split('=')[1].split('&')[0]}" if "watch" in youtube_url else youtube_url
    title = get_youtube_title(base_url)
    print(f"Downloading best audio from: {base_url}")
    result = os.system(f"yt-dlp -f 'bestaudio[ext=webm]' -o '{output_name}.webm' \"{base_url}\"")
    if result != 0:
        print(f"❌ Failed to download audio from: {base_url}")
        return None, None
    else:
        print(f"✅ Audio downloaded: {output_name}.webm")
        return title, base_url

# ✅ Step 4: Process each URL
# This is the full voice-prep pipeline for each YouTube video
# - Download best stream
# - Resample based on quality tier
# - Trim silence and normalize volume
# - Rename with metadata
# - Upload to Google Drive
for i, url in enumerate(urls[:10]):
    print(f"\n📥 Processing URL {i+1}/{len(urls)}")
    try:
        with time_limit(300):  # ⏱️ Timeout after 5 minutes
            title, base_url = download_audio(url, output_name=f"audio_{i}")

            input_file = f"audio_{i}.webm"
            raw_info = extract_audio_info(input_file) if os.path.exists(input_file) else {}
            sample_rate = raw_info.get('sr', 16000)

            # 🎚️ Smarter sample rate logic based on YouTube source quality
            if sample_rate >= 96000:
                recommended_sr = 96000
                quality_tag = "_studio"
            elif sample_rate >= 48000:
                recommended_sr = 48000
                quality_tag = "_hq"
            elif sample_rate >= 22050:
                recommended_sr = 22050
                quality_tag = ""
            else:
                recommended_sr = 16000
                quality_tag = ""

            temp_output = f"temp_cleaned_{i}.wav"
            enhanced_output = f"enhanced_{i}.wav"

            if title and os.path.exists(input_file):
                print(f"🎧 Converting {input_file} to {recommended_sr}Hz WAV")
                result = os.system(f"ffmpeg -y -i {input_file} -ar {recommended_sr} -ac 1 {temp_output}")
                if result == 0 and os.path.exists(temp_output):
                    print(f"✨ Trimming silence and normalizing volume")
                    os.system(f"ffmpeg -y -i {temp_output} -af silenceremove=1:0:-50dB,loudnorm {enhanced_output}")

                    info = extract_audio_info(enhanced_output)
                    final_name = f"{title}{quality_tag}_{info['sr']}Hz_{info['ch']}_{info['codec']}.wav"
                    Path(enhanced_output).rename(final_name)
                    print(f"✅ Enhancement complete: {final_name}")

                    # ✅ Upload final .wav to Google Drive
                    drive_dest = drive_export_path / final_name
                    shutil.copy(final_name, drive_dest)
                    print(f"📤 Uploaded to Google Drive: {drive_dest}")
                else:
                    print(f"❌ Failed to convert {input_file}")
            else:
                print(f"⚠️ Skipping conversion, file not found or title unavailable: {input_file}")
    except TimeoutError as e:
        print(str(e))

# ✅ Step 5: Inspect format of the most recent cleaned file (if any)
# This gives you metadata feedback so you know what was just processed
files = sorted(Path().glob("*.wav"), key=os.path.getmtime, reverse=True)
if files:
    print(f"Inspecting: {files[0].name}")
    !ffmpeg -i "{files[0].name}"
else:
    print("❌ No .wav file found to inspect.")