# YouTube Transcription Pipeline with Self-Hosted Whisper

This notebook implements a pipeline that:
1. Downloads audio from YouTube videos
2. Uses locally hosted Whisper model for transcription and translation (no API costs)
3. Converts the output to PDF

## Setup and Requirements

- Python 3.8+
- FFmpeg (for audio processing)
- Required packages: yt-dlp, openai-whisper, torch, fpdf
- GPU recommended but not required (CPU will be slower)

## Paste the Youtube video link in the last cell


# Download the dependencies

In [1]:
# # Install required packages in Jupyter Notebook
# !pip install yt-dlp
# !pip install -U openai-whisper
# !pip install fpdf
# !pip install torch  # Skip this if torch is already installed or if you're using GPU and need a specific version


# Import required dependencies
!pip install yt_dlp fpdf requests python-dotenv torch transformers openai-whisper tqdm


In [2]:
# Import required libraries
import os
import yt_dlp
import whisper
from fpdf import FPDF
import argparse
from pathlib import Path
import torch
from IPython.display import FileLink, display, Markdown
import subprocess 

In [3]:
# Create output directories if they don't exist
os.makedirs("downloads", exist_ok=True)
os.makedirs("transcripts", exist_ok=True)


In [4]:
# Language options dictionary (language code: language name)
LANGUAGE_OPTIONS = {
    "en": "English",
    "es": "Spanish",
    "fr": "French",
    "de": "German",
    "it": "Italian",
    "pt": "Portuguese",
    "nl": "Dutch",
    "ru": "Russian",
    "zh": "Chinese",
    "ja": "Japanese",
    "ko": "Korean",
    "ar": "Arabic",
    "hi": "Hindi",
    "bn": "Bengali",
    "tr": "Turkish",
    "vi": "Vietnamese",
    "th": "Thai",
    "id": "Indonesian",
    "ms": "Malay",
    "fa": "Persian",
    "he": "Hebrew",
    "pl": "Polish",
    "cs": "Czech",
    "sv": "Swedish",
    "da": "Danish",
    "no": "Norwegian",
    "fi": "Finnish",
    "hu": "Hungarian",
    "el": "Greek",
    "ro": "Romanian",
    "uk": "Ukrainian"
}

print("Language dictionary loaded with", len(LANGUAGE_OPTIONS), "languages")


Language dictionary loaded with 31 languages


In [5]:
import re

def clean_repeated_phrases(text):
    """Clean up repeated phrases in the transcript text
    
    This function removes repetitive phrases like "Why not? Why not? Why not?"
    that might appear in the transcript, making it more readable.
    """
    # Common repeated phrases to look for
    common_phrases = [
        "Why not?", 
        "I don't know.", 
        "I love you.", 
        "I'm not sure.",
        "What is that?", 
        "I like you.", 
        "I'm a big fan of this movie.",
        "You are so smart."
    ]
    
    cleaned_text = text
    
    # Clean up common repeated phrases
    for phrase in common_phrases:
        # Replace 3+ consecutive occurrences with just one
        pattern = f"({re.escape(phrase)}\\s*){{3,}}"
        cleaned_text = re.sub(pattern, phrase + " ", cleaned_text)
        
        # Replace 2 consecutive occurrences with just one
        pattern = f"({re.escape(phrase)}\\s*){{2}}"
        cleaned_text = re.sub(pattern, phrase + " ", cleaned_text)
    
    return cleaned_text

# Test the function with a sample text
sample_text = "Why not? Why not? Why not? Why not? I don't know. I don't know. Something else. Why not?"
print("Original text:")
print(sample_text)
print("\nCleaned text:")
print(clean_repeated_phrases(sample_text))


Original text:
Why not? Why not? Why not? Why not? I don't know. I don't know. Something else. Why not?

Cleaned text:
Why not? I don't know. Something else. Why not?


## Step 1: Define TranscriptionPipeline Class

Now we'll implement the main pipeline class with methods for downloading, transcribing, and creating PDFs:


In [6]:
# Modified TranscriptionPipeline Class
class TranscriptionPipeline:
    def __init__(self, whisper_model=None, model_size="small"):  # Added model_size parameter
        """Initialize the transcription pipeline with local Whisper model"""
        self.whisper_model = whisper_model or model  # Use the globally loaded model
        self.model_size = model_size # Store the model size

        # Create output directories if they don't exist
        os.makedirs("downloads", exist_ok=True)
        os.makedirs("transcripts", exist_ok=True)

    def remove_consecutive_duplicates(self, text):
        """Remove consecutive duplicate lines from transcription text"""
        if not text:
            return text
        lines = text.split('\\n')
        if len(lines) <= 1:
            return text

        def extract_content(line):
            parts = line.split(']  ')
            return parts[1].strip() if len(parts) > 1 else line.strip()

        result = [lines[0]]
        for i in range(1, len(lines)):
            current_content = extract_content(lines[i])
            prev_content = extract_content(result[-1])
            if current_content != prev_content or not current_content:
                result.append(lines[i])
        return '\\n'.join(result)

    def download_audio(self, youtube_url):
        """Download audio from a YouTube video"""
        print(f"Downloading audio from: {youtube_url}")
        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': 'downloads/%(title)s.%(ext)s',
            'restrictfilenames': True,
            'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}],
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(youtube_url, download=True)
            # yt-dlp sanitizes the title, we get the final path from the info dict
            audio_file = ydl.prepare_filename(info).replace('.webm', '.mp3').replace('.m4a', '.mp3')
            print(f"Audio downloaded: {audio_file}")
            
            # Get and print video length
            duration = info.get('duration')
            if duration:
                minutes = int(duration / 60)
                seconds = int(duration % 60)
                print(f"Video length: {minutes} minutes {seconds} seconds")
            
            return audio_file, info['title'], duration # Return duration as well

    def extract_audio_from_video(self, video_path):
        """Extracts audio from a local video file using ffmpeg."""
        print(f"Extracting audio from: {video_path}")
        if not Path(video_path).exists():
            raise FileNotFoundError(f"Video file not found: {video_path}")

        video_title = Path(video_path).stem
        audio_file_path = f"downloads/{video_title}.mp3"

        # Command to extract audio using ffmpeg, overwriting if exists (-y)
        command = f'ffmpeg -i \"{video_path}\" -y -vn -acodec mp3 -ab 192k \"{audio_file_path}\"'
        
        print(f"Running FFmpeg to extract audio...")
        try:
            result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
            print(f"Audio extracted successfully: {audio_file_path}")

            # Get video length using ffprobe
            duration_command = f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 \"{video_path}\"'
            duration_output = subprocess.check_output(duration_command, shell=True, text=True)
            duration = float(duration_output.strip())
            
            if duration:
                minutes = int(duration / 60)
                seconds = int(duration % 60)
                print(f"Video length: {minutes} minutes {seconds} seconds")

            return audio_file_path, video_title, duration # Return duration
        except subprocess.CalledProcessError as e:
            print(f"Error during FFmpeg execution: {e.stderr}")
            raise RuntimeError(f"Failed to extract audio from {video_path}")

    def get_audio_length(self, audio_file_path):
        """Gets the length of an audio file using ffprobe."""
        try:
            command = f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 \"{audio_file_path}\"'
            duration_output = subprocess.check_output(command, shell=True, text=True)
            duration = float(duration_output.strip())
            
            if duration:
                minutes = int(duration / 60)
                seconds = int(duration % 60)
                print(f"Audio length: {minutes} minutes {seconds} seconds")
            return duration
        except subprocess.CalledProcessError as e:
            print(f"Error getting audio length with ffprobe: {e.stderr}")
            return None


    def detect_language(self, audio_file):
        """Detect the language of the audio file"""
        print("Detecting original language...")
        if not os.path.exists(audio_file):
            raise FileNotFoundError(f"Audio file not found: {audio_file}")
        
        audio = whisper.load_audio(audio_file)
        audio = whisper.pad_or_trim(audio)
        mel = whisper.log_mel_spectrogram(audio).to(self.whisper_model.device)
        _, probs = self.whisper_model.detect_language(mel)
        detected_lang = max(probs, key=probs.get)
        lang_name = LANGUAGE_OPTIONS.get(detected_lang, "Unknown")
        print(f"Detected language: {detected_lang} ({lang_name}) - confidence: {probs[detected_lang]:.2f}")
        return detected_lang, probs[detected_lang]

    def transcribe_audio(self, audio_file, target_language):
        """Transcribe and/or translate audio using local Whisper model"""
        print(f"Transcribing audio to {target_language}...")
        if not os.path.exists(audio_file):
            raise FileNotFoundError(f"Audio file not found: {audio_file}")

        detected_lang, confidence = self.detect_language(audio_file)
        
        task = "translate" if target_language != detected_lang and target_language == 'en' else "transcribe"
        print(f"Performing task: {task} to language: {target_language}")

        result = self.whisper_model.transcribe(
            audio_file,
            task=task,
            language=target_language if task == "transcribe" else None, # Specify language for transcription, not for translation
            verbose=True
        )
        
        cleaned_text = self.remove_consecutive_duplicates(result["text"])
        result_with_lang = {
            "text": cleaned_text,
            "detected_language": detected_lang,
            "confidence": confidence
        }
        return result_with_lang

    def create_pdf(self, transcript_result, title, target_language, output_file=None):
        """Generate PDF from transcription text"""
        text = clean_repeated_phrases(transcript_result["text"])
        detected_lang = transcript_result["detected_language"]
        
        # Incorporate model size into the safe title for the filename
        safe_title = "".join([c if c.isalnum() else "_" for c in title])
        
        if output_file is None:
            output_file = f"transcripts/{safe_title}_{self.model_size}_{target_language}.pdf" # Modified filename
        
        print(f"Creating PDF: {output_file}")
        source_lang_name = LANGUAGE_OPTIONS.get(detected_lang, "Unknown")
        target_lang_name = LANGUAGE_OPTIONS.get(target_language, "Unknown")

        try:
            clean_text = text.encode('latin-1', 'replace').decode('latin-1')
            pdf = FPDF()
            pdf.add_page()
            pdf.set_auto_page_break(auto=True, margin=15)
            
            # Add Title
            pdf.set_font("Arial", "B", 16)
            pdf.multi_cell(0, 10, f"Transcript: {title}", align="C")
            pdf.ln(10)
            
            # Add language info
            pdf.set_font("Arial", "I", 12)
            pdf.cell(0, 10, f"Original language: {detected_lang} ({source_lang_name})", ln=True)
            pdf.cell(0, 10, f"Output language: {target_language} ({target_lang_name})", ln=True)
            pdf.cell(0, 10, f"Whisper Model: {self.model_size}", ln=True) # Add model info
            pdf.ln(5)
            
            # Add transcript text
            pdf.set_font("Arial", "", 12)
            pdf.multi_cell(0, 10, clean_text)
            
            pdf.output(output_file)
            return output_file
        except Exception as e:
            print(f"Error creating PDF: {e}")
            txt_file = output_file.replace(".pdf", ".txt")
            print(f"Saving as text file instead: {txt_file}")
            with open(txt_file, "w", encoding="utf-8") as f:
                f.write(f"Transcript: {title}\\n\\n")
                f.write(f"Original language: {detected_lang} ({source_lang_name})\\n")
                f.write(f"Output language: {target_language} ({target_lang_name})\\n\\n")
                f.write(f"Whisper Model: {self.model_size}\\n\\n") # Add model info to TXT
                f.write(text)
            return txt_file

    def process_media(self, input_path, target_language):
        """
        Process a media input: download or extract audio, transcribe, and create a PDF.
        The input can be a YouTube URL, a local video file, or a local audio file.
        """
        try:
            title = ""
            audio_file = ""
            duration = None # Initialize duration

            # Step 1: Get audio file and title based on input type
            print("Step 1/3: Processing media input...")
            is_url = input_path.startswith(('http:', 'https:'))
            is_video = any(input_path.lower().endswith(ext) for ext in ['.mp4', '.mkv', '.avi', '.mov', '.webm'])
            is_audio = any(input_path.lower().endswith(ext) for ext in ['.mp3', '.wav', '.m4a', '.flac'])

            if is_url:
                audio_file, title, duration = self.download_audio(input_path) # Get duration
            elif is_video:
                audio_file, title, duration = self.extract_audio_from_video(input_path) # Get duration
            elif is_audio:
                audio_file = input_path
                title = Path(input_path).stem
                print(f"Using local audio file: {audio_file}")
                duration = self.get_audio_length(audio_file) # Get duration for audio file
            else:
                raise ValueError("Unsupported input type. Please provide a YouTube URL, video file, or audio file.")

            # Step 2: Transcribe and translate audio
            print("\nStep 2/3: Transcribing audio...")
            transcript_result = self.transcribe_audio(audio_file, target_language)
            
            # Step 3: Create PDF
            print("\nStep 3/3: Creating PDF...")
            pdf_file = self.create_pdf(transcript_result, title, target_language)
            
            # Final summary
            detected_lang = transcript_result["detected_language"]
            source_lang_name = LANGUAGE_OPTIONS.get(detected_lang, "Unknown")
            target_lang_name = LANGUAGE_OPTIONS.get(target_language, "Unknown")
            
            print(f"\nVideo language: {detected_lang} ({source_lang_name})")
            print(f"Output language: {target_language} ({target_lang_name})")
            print(f"Process completed successfully! PDF saved at: {pdf_file}")
            
            return pdf_file
        except Exception as e:
            print(f"Error in pipeline: {str(e)}")
            return None

## Select Whisper Model Size

Choose the Whisper model size to use for transcription. Larger models are more accurate but require more memory and computational resources:


In [7]:
# Select Whisper model size
# Available models: "tiny", "base", "small", "medium", "large"
# Larger models are more accurate but require more memory and compute
# Recommendation: Start with "base" or "small" for a balance of speed and accuracy
WHISPER_MODEL_SIZE = "small"  # Change this to your desired model size

# Print available devices for running the model
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

if device == "cuda":
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

# Display model size info
model_sizes = {
    "tiny": {"parameters": "39M", "required_vram": "~1 GB", "english_only": False},
    "base": {"parameters": "74M", "required_vram": "~1 GB", "english_only": False},
    "small": {"parameters": "244M", "required_vram": "~2 GB", "english_only": False},
    "medium": {"parameters": "769M", "required_vram": "~5 GB", "english_only": False},
    "large": {"parameters": "1550M", "required_vram": "~10 GB", "english_only": False},
}

print(f"\nSelected model: {WHISPER_MODEL_SIZE}")
print(f"Model parameters: {model_sizes[WHISPER_MODEL_SIZE]['parameters']}")
print(f"Approx. required VRAM: {model_sizes[WHISPER_MODEL_SIZE]['required_vram']}")

# Check if selected model is reasonable for your device
if device == "cpu" and WHISPER_MODEL_SIZE in ["medium", "large"]:
    print("\nWARNING: You've selected a large model to run on CPU. This may be very slow.")
    print("Consider using a smaller model like 'base' or 'small' for better performance on CPU.")

# Load the model (this will download the model first time)
print(f"\nLoading Whisper model '{WHISPER_MODEL_SIZE}'...")
model = whisper.load_model(WHISPER_MODEL_SIZE, device=device)
print("Model loaded successfully!")


Using device: cpu

Selected model: small
Model parameters: 244M
Approx. required VRAM: ~2 GB

Loading Whisper model 'small'...
Model loaded successfully!


## Step 2: Run the Pipeline

Now let's use our pipeline class to process a YouTube video:


In [10]:
# Initialize the transcription pipeline with the local Whisper model
pipeline = TranscriptionPipeline(whisper_model=model, model_size=WHISPER_MODEL_SIZE)

# --- CHOOSE YOUR INPUT METHOD ---

# Option 1: YouTube URL
#input_path = "Youtube Video url"

# Option 2: Local Video File Path
# Make sure to upload the video file to your notebook's environment or provide the full path.
# input_path = "path/to/your/video.mp4" 

# Option 3: Local Audio File Path
# Make sure the audio file is in a common format like MP3, WAV, M4A.
# input_path = "rihanna-diamonds.mp3" # <-- CHANGE THIS to your file path or URL

# Set the target language for the output
target_language = "en"  # e.g., 'en', 'es', 'fr', 'de', 'ja'

# --- Run the pipeline ---

print(f"Starting transcription pipeline for: {input_path}")
print(f"Target language: {target_language} ({LANGUAGE_OPTIONS.get(target_language, 'Unknown')})")

pdf_path = pipeline.process_media(input_path, target_language)

# Display a link to the generated PDF if the process was successful
if pdf_path:
    display(Markdown(f"**PDF generated successfully!** [Open PDF]({pdf_path})"))
else:
    display(Markdown("**Pipeline failed. Please check the error messages above.**"))

Deprecated Feature: Support for Python version 3.8 has been deprecated. Please update to Python 3.9 or above


Starting transcription pipeline for: https://www.youtube.com/watch?v=3dKSBfRMmdU&t=39s
Target language: en (English)
Step 1/3: Processing media input...
Downloading audio from: https://www.youtube.com/watch?v=3dKSBfRMmdU&t=39s
[youtube] Extracting URL: https://www.youtube.com/watch?v=3dKSBfRMmdU&t=39s
[youtube] 3dKSBfRMmdU: Downloading webpage
[youtube] 3dKSBfRMmdU: Downloading ios player API JSON
[youtube] 3dKSBfRMmdU: Downloading mweb player API JSON
[youtube] 3dKSBfRMmdU: Downloading player 5dcb2c1f


         player = https://www.youtube.com/s/player/5dcb2c1f/player_ias.vflset/en_US/base.js
         n = RA1tnfURajH6JVhvFhTlj ; player = https://www.youtube.com/s/player/5dcb2c1f/player_ias.vflset/en_US/base.js
         player = https://www.youtube.com/s/player/5dcb2c1f/player_ias.vflset/en_US/base.js
         n = HYOWVmMB2L_VwmTkp7R3X ; player = https://www.youtube.com/s/player/5dcb2c1f/player_ias.vflset/en_US/base.js


[youtube] 3dKSBfRMmdU: Downloading m3u8 information
[info] 3dKSBfRMmdU: Downloading 1 format(s): 251
[download] Destination: downloads\MOST_CRINGE_FEED_EVARIDI_FIRST_VIDEO_II_KAKARAKAYTALKS.webm
[download] 100% of   12.63MiB in 00:00:05 at 2.44MiB/s   
[ExtractAudio] Destination: downloads\MOST_CRINGE_FEED_EVARIDI_FIRST_VIDEO_II_KAKARAKAYTALKS.mp3
Deleting original file downloads\MOST_CRINGE_FEED_EVARIDI_FIRST_VIDEO_II_KAKARAKAYTALKS.webm (pass -k to keep)
Audio downloaded: downloads\MOST_CRINGE_FEED_EVARIDI_FIRST_VIDEO_II_KAKARAKAYTALKS.mp3
Video length: 14 minutes 8 seconds

Step 2/3: Transcribing audio...
Transcribing audio to en...
Detecting original language...
Detected language: te (Unknown) - confidence: 0.97
Performing task: translate to language: en




Detecting language using up to the first 30 seconds. Use `--language` to specify the language
Detected language: Telugu
[00:00.000 --> 00:01.240]  who are you?
[00:06.040 --> 00:13.200]  Hey kid do not listen to him I will later tell you
[00:15.800 --> 00:17.320]  Am I looking so ugly?
[00:19.140 --> 00:20.020]  Do not even look like a lady
[00:20.020 --> 00:23.620]  Why are you explaining all these strange things in the first video?
[00:23.620 --> 00:24.620]  Why not?
[00:24.620 --> 00:26.120]  Why not strangers?
[00:26.120 --> 00:30.620]  How much I can see, how much I can hold on to, dreams.
[00:30.620 --> 00:32.120]  Yeah, dreams.
[00:32.120 --> 00:32.620]  Yeah.
[00:32.620 --> 00:33.620]  What did you say Kalam?
[00:33.620 --> 00:34.420]  Kalam?
[00:34.420 --> 00:35.420]  Kalam said, dreams.
[00:35.420 --> 00:36.120]  Stories.
[00:36.120 --> 00:36.620]  Hmm?
[00:36.620 --> 00:37.120]  Hmm.
[00:37.120 --> 00:38.420]  When Kalam said dreams...
[00:38.420 --> 00:40.220]  I should hav

**PDF generated successfully!** [Open PDF](transcripts/MOST_CRINGE_FEED_EVARIDI_______FIRST_VIDEO_II_KAKARAKAYTALKS_small_en.pdf)

## Optional: Batch Processing Multiple Videos

If you want to process multiple videos at once:


In [9]:
# # List of YouTube URLs and their target languages
# # Use our language options dictionary to select desired languages
# videos_to_process = [
#     {"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "language": "en"},  # English
#     {"url": "https://www.youtube.com/watch?v=VIDEO_ID_2", "language": "es"},   # Spanish
#     # Add more videos as needed
#     # You can use any language code from LANGUAGE_OPTIONS dictionary
# ]

# # Print available languages as a reminder
# print("Available languages for batch processing:")
# print(", ".join([f"{code}: {name}" for code, name in list(LANGUAGE_OPTIONS.items())[:10]]))
# print(f"... and {len(LANGUAGE_OPTIONS) - 10} more languages (see language options cell above)")

# # Process each video
# results = []
# for video in videos_to_process:
#     print(f"\nProcessing video: {video['url']} in {video['language']}")
#     pdf_path = pipeline.process_video(video["url"], video["language"])
#     results.append({
#         "url": video["url"],
#         "language": video["language"],
#         "success": pdf_path is not None,
#         "pdf_path": pdf_path
#     })

# # Display results
# print("\nProcessing Results:")
# for i, result in enumerate(results, 1):
#     status = "✅ Success" if result["success"] else "❌ Failed"
#     print(f"{i}. {status} - {result['url']} ({result['language']})")
#     if result["success"]:
#         print(f"   PDF: {result['pdf_path']}")
