In [None]:
# Install all required packages
!pip install yt-dlp openai ipywidgets moviepy youtube-transcript-api python-dotenv



# Podcast Summary Video Generator

This notebook demonstrates two different approaches for generating summary videos from YouTube podcasts:

1. **AssemblyAI Approach**: Downloads audio from YouTube and transcribes using AssemblyAI
2. **YouTube Transcript API Approach**: Uses native YouTube captions/transcripts
3. **Workflow Integration**: Uses the refactored PodcastSummaryWorkflow for streamlined processing

Choose the approach that best fits your needs and API availability.

In [3]:
# Set AssemblyAI API Key
import os
from pathlib import Path

from dotenv import load_dotenv
# ❷ Read the key-value pairs and add them to os.environ
load_dotenv()      # ← now the vars are set

# print(f"AssemblyAI API key is {os.environ['ASSEMBLYAI_API_KEY']} !")

True

## Common Setup

Environment variables and configuration shared across all approaches.

In [None]:
# ─── Imports and Configuration ────────────────────────────────────────────────

import os
import time
import requests
import tempfile
from typing import Dict, Optional, Tuple

import yt_dlp
from dotenv import load_dotenv
from openai import OpenAI
from moviepy.editor import (
    AudioFileClip, ColorClip, TextClip, CompositeVideoClip
)
from moviepy.video.fx.speedx import speedx as video_speedx
from youtube_transcript_api import YouTubeTranscriptApi
from IPython.display import display, Video
import ipywidgets as widgets

# Load environment variables from .env file
load_dotenv()

# YouTube URL to process
youtube_url = "https://www.youtube.com/watch?v=WjKQQAFwrR4"

# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

print("Imports and configuration loaded.")


## Approach 1: AssemblyAI Transcription

This approach downloads the audio from YouTube and uses AssemblyAI for transcription.
Requires: ASSEMBLYAI_API_KEY environment variable.

In [None]:
# ─── Helper Functions ───────────────────────────────────────────────────────────

def get_youtube_video_info(youtube_url: str) -> dict:
    """
    Extracts metadata from a YouTube video.
    """
    ydl_opts = {'quiet': True}
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info_dict = ydl.extract_info(youtube_url, download=False)
        return {
            'title': info_dict.get('title'),
            'description': info_dict.get('description'),
            'uploader': info_dict.get('uploader'),
            'tags': info_dict.get('tags'),
        }

def summarize_text(text: str, client: OpenAI) -> str:
    """
    Summarizes the given text using the OpenAI API.
    """
    response = client.chat.completions.create(
        model='gpt-4',
        messages=[
            {"role": "system", "content": "You are a helpful assistant that summarizes text into concise bullet points without omitting important details."},
            {"role": "user", "content": (
                "Here is the transcript from the podcast. "
                "Please provide a concise summary in bullet-point format, covering all key points." +
                f"\n\n{text}"
            )}
        ],
        temperature=0.5
    )
    return response.choices[0].message.content

def synthesize_speech(text: str, client: OpenAI, output_path: str):
    """
    Generates speech from text using the OpenAI API and saves it to a file.
    """
    if os.path.exists(output_path):
        os.remove(output_path)
    response = client.audio.speech.create(
        model="tts-1",
        voice="alloy",
        input=text,
    )
    response.stream_to_file(output_path)

def create_scrolling_video(summary: str, audio_path: str, output_path: str):
    """
    Creates a scrolling text video from the summary and audio.
    """
    audio_clip = AudioFileClip(audio_path)
    delay = 8
    tail = 8
    video_duration = delay + audio_clip.duration + tail
    speed_factor = 0.85
    video_duration *= speed_factor

    bg = ColorClip(size=(1280, 720), color=(0, 0, 0), duration=video_duration)
    txt_clip = TextClip(
        summary,
        fontsize=30,
        color='white',
        size=(1000, None),
        method='caption'
    )

    txt_h, txt_w = txt_clip.h, txt_clip.w
    x_center = (1280 - txt_w) // 2

    def scroll_pos(t):
        progress = t / video_duration
        y = 720 - (720 + txt_h) * progress
        return (x_center, y)

    moving_txt = txt_clip.set_position(scroll_pos).set_duration(video_duration)
    visuals = CompositeVideoClip([bg, moving_txt], size=(1280, 720))
    sped_visuals = visuals.fx(video_speedx, factor=1.15)
    adjusted_delay = delay / 1.15
    video = sped_visuals.set_audio(audio_clip.set_start(adjusted_delay))

    if os.path.exists(output_path):
        os.remove(output_path)
    video.write_videofile(
        output_path,
        fps=24,
        codec='libx264',
        audio_codec='aac',
        temp_audiofile='temp-audio.m4a',
        remove_temp=True
    )

print("Helper functions defined.")


## Approach 2: YouTube Transcript API

This approach uses native YouTube captions/transcripts directly without downloading audio.
Faster and doesn't require AssemblyAI, but only works if the video has captions.

In [None]:
# YouTube Transcript API Implementation
from youtube_transcript_api import YouTubeTranscriptApi

def transcribe_youtube_native(youtube_url: str, languages=['en']) -> str:
    """
    Fetch transcript directly from YouTube using youtube-transcript-api.
    Args:
        youtube_url: URL of the YouTube video.
        languages: List of language codes for the transcript.
    Returns:
        The full transcript text as a single string.
    """
    # Extract the video ID from the URL
    video_id = youtube_url.split('v=')[-1].split('&')[0]
    # Retrieve the transcript entries
    transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=languages)
    # Combine text entries into a single string
    transcript_text = '\n'.join([entry['text'] for entry in transcript_list])
    return transcript_text

# Example usage for YouTube Transcript API
try:
    transcript_native = transcribe_youtube_native(youtube_url)
    video_info_native = get_youtube_video_info(youtube_url)
    
    print("="*50)
    print("NATIVE YOUTUBE TRANSCRIPT:")
    print("="*50)
    print(transcript_native[:500] + "..." if len(transcript_native) > 500 else transcript_native)
    
except Exception as e:
    print(f"YouTube Transcript API failed: {e}")
    print("This video may not have captions available.")

In [None]:
# ─── Workflow 1: Transcription with AssemblyAI ──────────────────────────────────

# AssemblyAI Configuration
ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")
if not ASSEMBLYAI_API_KEY:
    raise RuntimeError("Please set your ASSEMBLYAI_API_KEY environment variable")

ASSEMBLYAI_HEADERS = {
    "Authorization": ASSEMBLYAI_API_KEY,
    "Content-Type": "application/json"
}
TRANSCRIBE_URL = "https://api.assemblyai.com/v2/transcript"
UPLOAD_URL = "https://api.assemblyai.com/v2/upload"

def upload_file_to_assemblyai(file_path: str) -> str:
    """
    Uploads a local audio file to AssemblyAI.
    """
    if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
        raise ValueError(f"File is empty or does not exist: {file_path}")
    
    with open(file_path, 'rb') as f:
        response = requests.post(UPLOAD_URL, headers=ASSEMBLYAI_HEADERS, data=f)
    response.raise_for_status()
    return response.json().get('upload_url')

def transcribe_audio_url(audio_url: str, poll_interval: int = 5, timeout: int = 600) -> str:
    """
    Transcribes an audio file from a URL using AssemblyAI.
    """
    response = requests.post(TRANSCRIBE_URL, json={"audio_url": audio_url}, headers=ASSEMBLYAI_HEADERS)
    response.raise_for_status()
    transcript_id = response.json()["id"]
    
    start_time = time.time()
    while True:
        poll_response = requests.get(f"{TRANSCRIBE_URL}/{transcript_id}", headers=ASSEMBLYAI_HEADERS)
        poll_response.raise_for_status()
        data = poll_response.json()
        if data['status'] == 'completed':
            return data.get('text', '')
        elif data['status'] == 'error':
            raise RuntimeError(f"Transcription failed: {data.get('error')}")
        if time.time() - start_time > timeout:
            raise TimeoutError("Transcription timed out.")
        time.sleep(poll_interval)

# Download audio from YouTube
with tempfile.TemporaryDirectory() as tmpdir:
    download_opts = {
        'format': 'bestaudio/best',
        'outtmpl': os.path.join(tmpdir, 'audio'),
        'noplaylist': True,
    }
    with yt_dlp.YoutubeDL(download_opts) as ydl:
        ydl.download([youtube_url])
        audio_file = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.startswith('audio')), None)
        if not audio_file:
            raise FileNotFoundError("Failed to download audio from YouTube.")

        # Transcribe with AssemblyAI
        upload_url = upload_file_to_assemblyai(audio_file)
        transcript = transcribe_audio_url(upload_url)

# Generate summary and video
summary = summarize_text(transcript, client)
audio_path = 'summary_assemblyai.mp3'
video_path = 'summary_assemblyai.mp4'
synthesize_speech(summary, client, audio_path)
create_scrolling_video(summary, audio_path, video_path)

# Display the video
Video(video_path, embed=True)


## Approach 3: Streamlined Workflow

This approach uses the refactored PodcastSummaryWorkflow class which internally uses YouTube Transcript API for a complete end-to-end solution.

### Choose Your Approach

- **AssemblyAI**: More accurate transcription, supports speaker detection, works with any audio
- **YouTube Transcript API**: Faster, free, but only works if video has captions
- **Workflow**: Clean, production-ready, uses YouTube Transcript API internally

Run the cells for the approach you prefer!