In [1]:
import pandas as pd
import numpy as np
import os
import yt_dlp
import whisper

pd.set_option('display.max_rows', 20)

In [2]:
# Only if you have 
import torch
print("CUDA Availability: ")
print(torch.cuda.is_available())  # Returns True if CUDA is available: GPU is available && CUDA is installed and configured
print(torch.cuda.get_device_name(0))  # Returns GPU name if available

CUDA Availability: 
True
NVIDIA GeForce RTX 2060


In [5]:
# Define directories for audio and transcripts
AUDIO_DIR = "audio_files"
TRANSCRIPT_DIR = "transcripts"
os.makedirs(AUDIO_DIR, exist_ok=True)
os.makedirs(TRANSCRIPT_DIR, exist_ok=True)

In [12]:
def is_livestream_or_long(video_id, max_duration=3600): # 1 hour
    """Checks if a video is a livestream using yt-dlp metadata."""
    url = f"https://www.youtube.com/watch?v={video_id}"
    
    ydl_opts = {
        'quiet': True,
        'simulate': True,
        'force_generic_extractor': False,
        'no_warnings': True,
    }

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=False)  # Get metadata without downloading

        # Check if the video is a livestream
        if info.get("is_live", False):
            print(f"⚠️ Skipping {video_id}: It is a livestream.")
            return True  # Indicate the video should be skipped
        
        # Check if the video duration is too long
        duration = info.get("duration", 0)  # Duration in seconds
        if duration and duration > max_duration:
            print(f"⚠️ Skipping {video_id}: It is too long ({duration // 3600}h {duration % 3600 // 60}m).")
            return True  # Indicate the video should be skipped

        return False  # Video is valid
    except Exception as e:
        print(f"⚠️ Video {video_id} is unavailable or removed. Skipping.")
        return None  # Returning None to indicate failure

def download_audio(video_id, output_format="mp3", failed_videos=None):
    """Downloads audio from a YouTube video and saves it in the audio directory."""
    
    if failed_videos is None:
        failed_videos = []  # Prevents NoneType issues

    # **Check if the video is a livestream**
    if is_livestream_or_long(video_id):
        print(f"⚠️ Skipping {video_id}: Not suitable for transcription.")
        failed_videos.append(video_id)  # Add to failed list
        return None  # Prevent downloading
    
    try:
        url = f"https://www.youtube.com/watch?v={video_id}"
        output_filename = os.path.join(AUDIO_DIR, f"{video_id}.%(ext)s")  # Save in AUDIO_DIR

        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': output_filename,
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': output_format,
                'preferredquality': '192',
            }],
        }

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])

        # Check for incorrect double extension
        expected_filename = os.path.join(AUDIO_DIR, f"{video_id}.{output_format}")
        double_extension = os.path.join(AUDIO_DIR, f"{video_id}.{output_format}.{output_format}")

        if os.path.exists(double_extension):
            os.rename(double_extension, expected_filename)

        return expected_filename
    except Exception as e:
        print(f"Error downloading audio for video {video_id}: {e}")
        return None


def transcribe_audio(audio_file):
    device = "cuda" if torch.cuda.is_available() else "cpu"  # Use GPU if available
    model = whisper.load_model("base", device=device)  # Load Whisper on GPU with base model
    result = model.transcribe(audio_file)
    return result["text"]


def save_transcript(video_id, transcript_text):
    output_filename = os.path.join(TRANSCRIPT_DIR, f"{video_id}.txt")
    with open(output_filename, "w", encoding="utf-8") as f:
        f.write(transcript_text)
    print(f"Transcript saved at: {output_filename}")

In [None]:
## Transcription Cell

df = pd.read_csv("transcriptless_videos_1.csv") # Replace with your CSV file

failed_videoes = []

for index, row in df.iterrows():
    video_id = row["Video Id"]  # Extract Video ID
    print(f"\nProcessing Video ID: {video_id}")

    audio_file = download_audio(video_id, failed_videos=failed_videoes)

    if audio_file is None:
        print(f"Skipping {video_id}: Download failed.")
        failed_videoes.append(video_id)
        print("Total failed videos count:", len(failed_videoes))
        continue  # Skip to the next video

    if os.path.exists(audio_file):
        print(f"File exists: {audio_file}")
        try:
            transcript = transcribe_audio(audio_file)
            save_transcript(video_id, transcript)
        except Exception as e:
            print(f"Error during transcription for {video_id}: {e}")
            failed_videoes.append(video_id)
            print("Total failed videos count:", len(failed_videoes))
    else:
        print(f"Error: Audio file {audio_file} not found! Skipping.")
        failed_videoes.append(video_id)
        print("Total failed videos count:", len(failed_videoes))


if failed_videoes:
    print("\n⚠️ The following videos failed to generate transcripts:")
    for vid in failed_videoes:
        print(f"- {vid}")
else:
    print("\n✅ All videos were successfully processed!")

In [None]:
## Resume Transcription Cell

# # 🔹 Replace with the actual video ID where the script stopped
# last_processed_video_id = "84FXbZxSOBI"  # Replace with the correct ID

# # 🔹 Find the index where this video ID is located
# start_index = df[df["Video Id"] == last_processed_video_id].index[0] + 1  # Move to the next row

# # 🔹 Resume from the next row
# for index, row in df.iloc[start_index:].iterrows():  # Start from the saved index
#     video_id = row["Video Id"]
#     print(f"\nProcessing Video ID: {video_id}")

#     audio_file = download_audio(video_id, failed_videos=failed_videoes)

#     if audio_file is None:
#         print(f"Skipping {video_id}: Download failed.")
#         failed_videoes.append(video_id)
#         print("Total failed videos count:", len(failed_videoes))
#         continue  # Skip to the next video

#     if os.path.exists(audio_file):
#         print(f"File exists: {audio_file}")
#         try:
#             transcript = transcribe_audio(audio_file)
#             save_transcript(video_id, transcript)
#         except Exception as e:
#             print(f"Error during transcription for {video_id}: {e}")
#             failed_videoes.append(video_id)
#             print("Total failed videos count:", len(failed_videoes))
#     else:
#         print(f"Error: Audio file {audio_file} not found! Skipping.")
#         failed_videoes.append(video_id)
#         print("Total failed videos count:", len(failed_videoes))


# if failed_videoes:
#     print("\n⚠️ The following videos failed to generate transcripts:")
#     for vid in failed_videoes:
#         print(f"- {vid}")
# else:
#     print("\n✅ All videos were successfully processed!")