In [9]:
!pip install chromadb

Collecting chromadb
  Downloading chromadb-0.6.3-py3-none-any.whl.metadata (6.8 kB)
Collecting build>=1.0.3 (from chromadb)
  Downloading build-1.2.2.post1-py3-none-any.whl.metadata (6.5 kB)
Collecting chroma-hnswlib==0.7.6 (from chromadb)
  Downloading chroma_hnswlib-0.7.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (252 bytes)
Collecting fastapi>=0.95.2 (from chromadb)
  Downloading fastapi-0.115.11-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn>=0.18.3 (from uvicorn[standard]>=0.18.3->chromadb)
  Downloading uvicorn-0.34.0-py3-none-any.whl.metadata (6.5 kB)
Collecting posthog>=2.4.0 (from chromadb)
  Downloading posthog-3.19.1-py2.py3-none-any.whl.metadata (2.9 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.21.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.30.0-py

In [1]:
import os
import yt_dlp
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
import whisper
import cv2
from transformers import BlipProcessor, BlipForConditionalGeneration
from sentence_transformers import SentenceTransformer
import chromadb
####

In [55]:
from datetime import timedelta
import webvtt
from datetime import timedelta
from googletrans import Translator, LANGUAGES
####

In [80]:
from deep_translator import GoogleTranslator
from transformers import pipeline
import datetime

In [81]:
class VideoDownloader:
    def __init__(self, output_dir="videos"):
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

    def download(self, url):
        ydl_opts = {
            'format': 'best',
            'outtmpl': f'{self.output_dir}/%(id)s.%(ext)s',
            'writesubtitles': True,
            'subtitle_format': 'webvtt',
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
            video_path = f"{self.output_dir}/{info['id']}.{info['ext']}"
            return {
                'uri': video_path,
                'title': info.get('title', ''),
                'description': info.get('description', '')
            }

####

In [82]:
# Suppress specific warnings for cleaner output
warnings.filterwarnings("ignore", category=UserWarning, module="whisper.transcribe")  # FP16 warning
warnings.filterwarnings("ignore", category=UserWarning, module="huggingface_hub.utils._auth")  # HF_TOKEN warning

In [89]:
class TranscriptExtractor:
    def __init__(self):
        print("Initializing pipelines...")  # Debugging print
        self.transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-large-v2")
        self.translator = pipeline("translation", model="Helsinki-NLP/opus-mt-hi-en")

    def extract_or_generate(self, video_url, video_path):
        video_id = video_url.split('v=')[-1]
        transcript = None

        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
        except (NoTranscriptFound, TranscriptsDisabled):
            try:
                transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['hi'])
                for entry in transcript:
                    entry['text'] = self.translate_to_english(entry['text'])
            except Exception:
                transcript = None

        if transcript:
            return self.format_transcript(transcript)

        # Generate transcript using Whisper
        print("Running Whisper model...")
        result = self.transcriber(video_path, return_timestamps=True, task="translate")
        print("Whisper Output:", result)  # Debugging print
        return self.format_whisper_output(result)

    def translate_to_english(self, text):
        return self.translator(text)[0]['translation_text']

    import datetime

    import datetime

    def format_transcript(self, transcript):
        vtt = webvtt.WebVTT()
        formatted_transcript = []

        for entry in transcript:
            start = str(datetime.timedelta(seconds=entry['start']))  # Convert timedelta to string
            end = str(datetime.timedelta(seconds=entry['start'] + entry['duration']))  # Convert timedelta to string

            # Ensure timestamps are in HH:MM:SS.sss format
            start = start.zfill(8) + ".000"
            end = end.zfill(8) + ".000"

            text = entry['text']

            caption = webvtt.Caption(start, end, text)
            vtt.captions.append(caption)

            formatted_transcript.append({'start_time': entry['start'], 'end_time': entry['start'] + entry['duration'], 'text': text})

        return formatted_transcript, "\n".join(str(c) for c in vtt.captions)


    def format_whisper_output(self, result):
        vtt = webvtt.WebVTT()
        formatted_transcript = []
        if "chunks" not in result:
            print("Unexpected Whisper Output:", result)  # Debugging print
            return [], ""

        for segment in result['chunks']:
            start_time = segment['timestamp'][0]
            end_time = segment['timestamp'][1]
            text = segment['text']
            caption = webvtt.Caption(timedelta(seconds=start_time), timedelta(seconds=end_time), text)
            vtt.captions.append(caption)
            formatted_transcript.append({'start_time': start_time, 'end_time': end_time, 'text': text})

        return formatted_transcript, "\n".join(str(c) for c in vtt.captions)


In [90]:
class FrameExtractor:
    def __init__(self):
        self.processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
        self.model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

    def extract_keyframes(self, video_path):
        cap = cv2.VideoCapture(video_path)
        frames = []
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_interval = int(fps * 10)  # Extract every 10 seconds
        frame_count = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame_count % frame_interval == 0:
                timestamp = frame_count / fps
                description = self.describe_frame(frame)
                frames.append({'timestamp': timestamp, 'description': description})
            frame_count += 1
        cap.release()
        return frames

    def describe_frame(self, frame):
        inputs = self.processor(images=frame, return_tensors="pt")
        out = self.model.generate(**inputs)
        return self.processor.decode(out[0], skip_special_tokens=True)


####

In [91]:
class MetadataProcessor:
    def __init__(self, db_path="vector_db"):
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.client = chromadb.PersistentClient(path=db_path)
        self.collection = self.client.get_or_create_collection("video_metadata")

    def chunk_and_store(self, metadata):
        transcript_chunks = []
        for entry in metadata['transcript']:
            chunk = {
                'video_uri': metadata['uri'],
                'start_time': entry['start_time'],
                'end_time': entry['end_time'],
                'text': entry['text'],
                'type': 'transcript'
            }
            transcript_chunks.append(chunk)

        frame_chunks = [{'video_uri': metadata['uri'], 'timestamp': f['timestamp'], 'text': f['description'], 'type': 'frame'} for f in metadata['frames']]

        all_chunks = transcript_chunks + frame_chunks
        embeddings = self.embedding_model.encode([c['text'] for c in all_chunks])

        for i, chunk in enumerate(all_chunks):
            self.collection.add(
                embeddings=[embeddings[i].tolist()],
                documents=[chunk['text']],
                metadatas=[chunk],
                ids=[f"{metadata['uri']}_{i}"]
            )


####

In [92]:
class Retriever:
    def __init__(self, db_path="vector_db"):
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.client = chromadb.PersistentClient(path=db_path)
        self.collection = self.client.get_collection("video_metadata")

    def retrieve(self, query):
        query_embedding = self.embedding_model.encode([query])[0].tolist()
        results = self.collection.query(query_embeddings=[query_embedding], n_results=3)
        return results['metadatas'][0]

####

In [93]:
# Main pipeline
class VideoRAGPipeline:
    def __init__(self):
        self.downloader = VideoDownloader()
        self.transcriber = TranscriptExtractor()
        self.frame_extractor = FrameExtractor()
        self.processor = MetadataProcessor()
        self.retriever = Retriever()

    def process_video(self, url):
        # Download video
        video_info = self.downloader.download(url)
        # Extract or generate transcript
        transcript, _ = self.transcriber.extract_or_generate(url, video_info['uri'])
        # Extract frames
        frames = self.frame_extractor.extract_keyframes(video_info['uri'])
        # Prepare metadata
        metadata = {
            'uri': video_info['uri'],
            'title': video_info['title'],
            'description': video_info['description'],
            'transcript': transcript,
            'frames': frames
        }
        # Store in VectorDB
        self.processor.chunk_and_store(metadata)
        return metadata

    def query(self, question):
        return self.retriever.retrieve(question)

####

In [95]:
####////
# Example YouTube video URL (Replace with an actual video link)
"""video_url = "https://www.youtube.com/watch?v=ftDsSB3F5kg"

# Example path to a downloaded video file (Replace with an actual file path)
video_path = "videos/ftDsSB3F5kg.mp4"

# Create an instance of the transcript extractor
extractor = TranscriptExtractor()

# Run the extraction process
transcript, vtt_content = extractor.extract_or_generate(video_url, video_path)

# Print results
print("Extracted Transcript:")
for entry in transcript:
    print(f"[{entry['start_time']} - {entry['end_time']}] {entry['text']}")

print("\nVTT Content:")
print(vtt_content)
"""

'video_url = "https://www.youtube.com/watch?v=ftDsSB3F5kg"\n\n# Example path to a downloaded video file (Replace with an actual file path)\nvideo_path = "videos/ftDsSB3F5kg.mp4"\n\n# Create an instance of the transcript extractor\nextractor = TranscriptExtractor()\n\n# Run the extraction process\ntranscript, vtt_content = extractor.extract_or_generate(video_url, video_path)\n\n# Print results\nprint("Extracted Transcript:")\nfor entry in transcript:\n    print(f"[{entry[\'start_time\']} - {entry[\'end_time\']}] {entry[\'text\']}")\n\nprint("\nVTT Content:")\nprint(vtt_content)\n'

In [96]:
# Example usage
if __name__ == "__main__":
    pipeline = VideoRAGPipeline()
    video_urls = [
        "https://www.youtube.com/watch?v=ftDsSB3F5kg",
        "https://www.youtube.com/watch?v=kKFrbhZGNNI"
    ]

    # Process videos
    for url in video_urls:
        print(f"Processing {url}")
        pipeline.process_video(url)

    # Test questions
    questions = [
        "What is the main topic of the video?",
        "Explain the concept discussed at 2 minutes.",
        "What does the scene at 5 minutes show?",
        "Summarize the video content."
    ]

    # Retrieve answers
    for q in questions:
        print(f"\nQuery: {q}")
        results = pipeline.query(q)
        for r in results:
            print(f"Result: {r}")

Initializing pipelines...


Device set to use cpu
Device set to use cpu


Processing https://www.youtube.com/watch?v=ftDsSB3F5kg
[youtube] Extracting URL: https://www.youtube.com/watch?v=ftDsSB3F5kg
[youtube] ftDsSB3F5kg: Downloading webpage
[youtube] ftDsSB3F5kg: Downloading tv client config
[youtube] ftDsSB3F5kg: Downloading player 91201489
[youtube] ftDsSB3F5kg: Downloading tv player API JSON
[youtube] ftDsSB3F5kg: Downloading ios player API JSON
[youtube] ftDsSB3F5kg: Downloading m3u8 information
[info] ftDsSB3F5kg: Downloading 1 format(s): 18
[info] There are no subtitles for the requested languages
[download] videos/ftDsSB3F5kg.mp4 has already been downloaded
[download] 100% of   11.85MiB
Processing https://www.youtube.com/watch?v=kKFrbhZGNNI
[youtube] Extracting URL: https://www.youtube.com/watch?v=kKFrbhZGNNI
[youtube] kKFrbhZGNNI: Downloading webpage
[youtube] kKFrbhZGNNI: Downloading tv client config
[youtube] kKFrbhZGNNI: Downloading player b21600d5
[youtube] kKFrbhZGNNI: Downloading tv player API JSON
[youtube] kKFrbhZGNNI: Downloading ios player