<a href="https://colab.research.google.com/github/Ragavi203/AI-Powered-YouTube-Video-Summarizer/blob/main/AI_Powered_YouTube_Video_Summarizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install google-api-python-client yt-dlp transformers torch moviepy opencv-python numpy pydub



In [None]:
import os
from googleapiclient.discovery import build
import yt_dlp
from transformers import pipeline
import cv2
import numpy as np
from moviepy.editor import VideoFileClip
import json
import tempfile
import torch
from pydub import AudioSegment
import math

class YouTubeVideoSummarizer:
    def __init__(self, api_key):
        """Initialize with YouTube API key"""
        self.youtube = build('youtube', 'v3', developerKey=api_key)
        # Initialize Whisper model with specific parameters
        self.audio_transcriber = pipeline(
            "automatic-speech-recognition",
            model="openai/whisper-base",
            chunk_length_s=30,  # Process 30-second chunks
            return_timestamps=True
        )
        self.text_summarizer = pipeline(
            "summarization",
            model="facebook/bart-large-cnn",
            device=0 if torch.cuda.is_available() else -1
        )

    def get_video_info(self, video_url):
        """Get video details from YouTube"""
        video_id = (
            video_url.split('v=')[1].split('&')[0]
            if 'youtube.com' in video_url
            else video_url.split('/')[-1]
        )

        request = self.youtube.videos().list(
            part="snippet,contentDetails,statistics",
            id=video_id
        )
        response = request.execute()

        if not response['items']:
            raise ValueError("Video not found")
        return response['items'][0]

    def download_video(self, video_url):
        """Download YouTube video"""
        temp_dir = tempfile.mkdtemp()
        output_path = os.path.join(temp_dir, 'video.mp4')

        ydl_opts = {
            'format': 'best[ext=mp4]',
            'outtmpl': output_path,
            'quiet': True
        }

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([video_url])
        return output_path

    def extract_audio_segment(self, video_clip, start_time, end_time, temp_dir):
        """Extract audio segment and save as WAV"""
        segment = video_clip.subclip(start_time, end_time)
        temp_audio_path = os.path.join(temp_dir, f'segment_{start_time}_{end_time}.wav')
        segment.audio.write_audiofile(
            temp_audio_path,
            codec='pcm_s16le',
            fps=16000,  # Whisper preferred sample rate
            verbose=False,
            logger=None
        )
        return temp_audio_path

    def transcribe_long_audio(self, audio_path):
        """Transcribe audio with proper chunk handling"""
        try:
            # Transcribe with timestamps
            result = self.audio_transcriber(
                audio_path,
                return_timestamps=True
            )
            return result['text']
        except Exception as e:
            print(f"Transcription error: {str(e)}")
            return ""

    def process_video_segment(self, video_clip, start_time, end_time, temp_dir):
        """Process a single video segment"""
        try:
            # Extract and transcribe audio
            audio_path = self.extract_audio_segment(video_clip, start_time, end_time, temp_dir)
            transcription = self.transcribe_long_audio(audio_path)

            # Clean up audio file
            os.remove(audio_path)

            # Summarize transcription if long enough
            if len(transcription.split()) > 50:
                summary = self.text_summarizer(
                    transcription,
                    max_length=130,
                    min_length=30,
                    do_sample=False
                )[0]['summary_text']
            else:
                summary = transcription

            return {
                'start_time': start_time,
                'end_time': end_time,
                'transcription': transcription,
                'summary': summary
            }
        except Exception as e:
            print(f"Error processing segment {start_time}-{end_time}: {str(e)}")
            return None

    def process_video(self, video_url, segment_duration=60):
        """Process YouTube video and generate summary"""
        try:
            # Create temporary directory for processing
            temp_dir = tempfile.mkdtemp()

            # Get video information
            print("Fetching video information...")
            video_info = self.get_video_info(video_url)

            # Download video
            print("Downloading video...")
            video_path = self.download_video(video_url)

            # Load video
            video = VideoFileClip(video_path)
            duration = video.duration

            # Process video in segments
            print("Processing video segments...")
            segments = []

            for start_time in range(0, int(duration), segment_duration):
                end_time = min(start_time + segment_duration, duration)
                print(f"Processing segment: {start_time}-{end_time} seconds")

                segment_result = self.process_video_segment(
                    video, start_time, end_time, temp_dir
                )

                if segment_result:
                    segments.append(segment_result)

            # Create final summary
            result = {
                'video_id': video_info['id'],
                'title': video_info['snippet']['title'],
                'channel': video_info['snippet']['channelTitle'],
                'duration': duration,
                'view_count': video_info['statistics']['viewCount'],
                'segments': segments,
                'full_summary': self.create_full_summary(segments)
            }

            # Clean up
            video.close()
            os.remove(video_path)
            os.rmdir(temp_dir)

            return result

        except Exception as e:
            raise Exception(f"Error processing video: {str(e)}")

    def create_full_summary(self, segments):
        """Create a complete summary from all segments"""
        all_summaries = " ".join(segment['summary'] for segment in segments)
        if len(all_summaries.split()) > 100:
            return self.text_summarizer(
                all_summaries,
                max_length=250,
                min_length=100,
                do_sample=False
            )[0]['summary_text']
        return all_summaries

    def export_summary(self, summary, output_path):
        """Export summary to JSON file"""
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(summary, f, indent=4, ensure_ascii=False)

def main():
    # Example usage
    API_KEY = "API KEY"  # Replace with your API key
    video_url = "YOUTUBE VIEO LINK"    # Replace with your video URL

    try:
        # Initialize summarizer
        summarizer = YouTubeVideoSummarizer(API_KEY)

        # Process video
        print("Starting video processing...")
        summary = summarizer.process_video(video_url)

        # Export summary
        summarizer.export_summary(summary, "video_summary.json")
        print("Summary saved to video_summary.json")

        # Print brief overview
        print("\nVideo Summary:")
        print(f"Title: {summary['title']}")
        print(f"Channel: {summary['channel']}")
        print(f"Duration: {summary['duration']} seconds")
        print("\nFull Summary:")
        print(summary['full_summary'])

    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.

Device set to use cpu
Device set to use cpu


Starting video processing...
Fetching video information...
Downloading video...
Processing video segments...
Processing segment: 0-60 seconds



Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`.
Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.43.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Processing segment: 60-120 seconds





Processing segment: 120-180 seconds





Processing segment: 180-240 seconds





Processing segment: 240-300 seconds





Processing segment: 300-360 seconds





Processing segment: 360-420 seconds





Processing segment: 420-480 seconds





Processing segment: 480-540 seconds





Processing segment: 540-594.45 seconds



Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
Your max_length is set to 130, but your input_length is only 115. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=57)


Summary saved to video_summary.json

Video Summary:
Title: LangChain vs LangGraph: A Tale of Two Frameworks
Channel: IBM Technology
Duration: 594.45 seconds

Full Summary:
Langchain and LangGraph are both open source frameworks designed to help developers build applications with large language models. At the core, Langchain is a wafer building LLM powered applications by executing a sequence of functions in a chain. Lang graph helps us create this as a graph structure where each one of these actions is considered as a node. And then the transitions between these things, that's known as edges. Now the central node is the process input node. So that's where the user input comes in.
