## NEW


In [None]:
import os
import json
import cv2
import ffmpeg
import yt_dlp
import whisper
from pathlib import Path
from typing import Optional, List, Dict
from webvtt import WebVTT, Caption
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image

In [None]:
class VideoDownloader:
    def __init__(self, base_dir: str = "./shared_data/videos"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(parents=True, exist_ok=True)

    def download_video(self, url: str, video_id: str) -> Optional[Dict]:
        output_dir = self.base_dir / video_id
        output_dir.mkdir(exist_ok=True)
        ydl_opts = {
            'format': 'best',
            'outtmpl': str(output_dir / '%(title)s.%(ext)s'),
            'quiet': True,
            'ignoreerrors': True,
            'writedescription': True,  # Fetch video description
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
            if not info:
                return None
            video_path = Path(ydl.prepare_filename(info))
            if not video_path.exists():
                return None
            return {
                "video_path": video_path,
                "title": info.get("title", ""),
                "description": info.get("description", ""),
                "url": url,
            }

    def download_subtitles(self, url: str, video_id: str) -> Optional[Path]:
        output_dir = self.base_dir / video_id
        output_dir.mkdir(exist_ok=True)
        ydl_opts = {
            'skip_download': True,
            'writesubtitles': True,
            'writeautomaticsub': True,
            'subtitleslangs': ['en'],
            'subtitlesformat': 'vtt',
            'outtmpl': str(output_dir / '%(title)s.%(ext)s'),
            'quiet': True,
            'ignoreerrors': True
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
            if not info:
                return None
            subs = info.get('requested_subtitles', {}).get('en', {})
            if subs and (sub_path := Path(subs.get('filepath', ''))).exists():
                return sub_path
            for f in output_dir.glob("*.en.vtt"):
                return f
            return None


class TranscriptProcessor:
    def __init__(self):
        self.model = whisper.load_model("base")

    def process_subtitles(self, video_path: Path, sub_path: Optional[Path]) -> Path:
        video_dir = video_path.parent
        if sub_path and sub_path.exists() and sub_path.stat().st_size > 0:
            return sub_path
        return self._generate_subtitles(video_path, video_dir)

    def _generate_subtitles(self, video_path: Path, output_dir: Path) -> Path:
        audio_path = output_dir / "temp_audio.wav"
        vtt_path = output_dir / "generated_subtitles.vtt"
        (
            ffmpeg.input(str(video_path))
            .output(str(audio_path), acodec='pcm_s16le', ar=16000, ac=1)
            .run(quiet=True)
        )
        result = self.model.transcribe(str(audio_path), task="translate", language="en", fp16=False)
        self._create_vtt(result["segments"], vtt_path)
        if audio_path.exists():
            audio_path.unlink()
        return vtt_path

    def _create_vtt(self, segments: List[Dict], output_path: Path):
        vtt = WebVTT()
        for seg in segments:
            caption = Caption(
                self._format_time(seg['start']),
                self._format_time(seg['end']),
                seg['text'].strip()
            )
            vtt.captions.append(caption)
        vtt.save(str(output_path))

    @staticmethod
    def _format_time(seconds: float) -> str:
        hours = int(seconds // 3600)
        mins = int((seconds % 3600) // 60)
        secs = seconds % 60
        return f"{hours:02}:{mins:02}:{secs:06.3f}"


class FrameProcessor:
    def __init__(self):
        self.processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
        self.model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

    def process_video(self, video_path: Path, vtt_path: Path) -> List[Dict]:
        video_dir = video_path.parent
        frames_dir = video_dir / "frames"
        frames_dir.mkdir(exist_ok=True)
        subtitles = self._parse_vtt(vtt_path)
        cap = cv2.VideoCapture(str(video_path))
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_interval = int(round(fps))
        metadata = []
        frame_count = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame_count % frame_interval == 0:
                timestamp = frame_count / fps
                frame_description = self._generate_frame_description(frame)
                self._process_frame(frame, frame_count, timestamp, frames_dir, subtitles, metadata, frame_description)
            frame_count += 1
        cap.release()
        return metadata

    def _generate_frame_description(self, frame) -> str:
        pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        inputs = self.processor(pil_image, return_tensors="pt")
        out = self.model.generate(**inputs)
        return self.processor.decode(out[0], skip_special_tokens=True)

    def _process_frame(self, frame, count: int, timestamp: float, frames_dir: Path, subtitles: List[Dict], metadata: List, frame_description: str):
        frame_path = frames_dir / f"frame_{count}_time_{timestamp:.2f}.jpg"
        cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
        metadata.append({
            "frame_path": str(frame_path.relative_to(frames_dir.parent)),
            "timestamp": round(timestamp, 2),
            "subtitles": self._get_matching_subtitles(subtitles, timestamp),
            "frame_description": frame_description
        })

    def _parse_vtt(self, path: Path) -> List[Dict]:
        return [{
            "start": self._time_to_sec(caption.start),
            "end": self._time_to_sec(caption.end),
            "text": caption.text.strip()
        } for caption in WebVTT().read(str(path)).captions]

    def _get_matching_subtitles(self, subtitles: List[Dict], timestamp: float) -> List[str]:
        return [sub['text'] for sub in subtitles if sub['start'] <= timestamp <= sub['end']]

    @staticmethod
    def _time_to_sec(time_str: str) -> float:
        parts = list(map(float, time_str.replace(',', '.').split(':')))
        if len(parts) == 3:
            return parts[0] * 3600 + parts[1] * 60 + parts[2]
        elif len(parts) == 2:
            return parts[0] * 60 + parts[1]
        return 0.0


def process_video_pipeline(url: str, video_id: str):
    downloader = VideoDownloader()
    transcript_processor = TranscriptProcessor()
    frame_processor = FrameProcessor()

    # Step 1: Download video and extract metadata
    video_info = downloader.download_video(url, video_id)
    if not video_info or not video_info["video_path"].exists():
        return

    # Step 2: Download or generate subtitles
    sub_path = downloader.download_subtitles(url, video_id)
    vtt_path = transcript_processor.process_subtitles(video_info["video_path"], sub_path)

    # Step 3: Process frames and generate descriptions
    frame_metadata = frame_processor.process_video(video_info["video_path"], vtt_path)

    # Step 4: Prepare final metadata
    metadata = {
        "title": video_info["title"],
        "video_uri": video_info["url"],
        "description": video_info["description"],
        "transcript": [{
            "start_time": seg["start"],
            "end_time": seg["end"],
            "text": seg["text"]
        } for seg in frame_processor._parse_vtt(vtt_path)],
        "frames": frame_metadata,
    }

    # Step 5: Save metadata
    metadata_path = video_info["video_path"].parent / "metadata.json"
    with open(metadata_path, 'w') as f:
        json.dump(metadata, f, indent=2)

    print(f"Metadata saved to {metadata_path}")


if __name__ == "__main__":
    video_urls = [
        "https://www.youtube.com/watch?v=ftDsSB3F5kg",
        "https://www.youtube.com/watch?v=kKFrbhZGNNI",
        "https://www.youtube.com/watch?v=6qUxwZcTXHY",
        "https://www.youtube.com/watch?v=MspNdsh0QcM",
        "https://www.youtube.com/watch?v=Kf57KGwKa0w"
    ]
    for idx, url in enumerate(video_urls, 1):
        video_id = f"video_{idx}"
        process_video_pipeline(url, video_id)

## RETRIEVAL

In [None]:
import json
from pathlib import Path
from typing import List, Dict, Optional
import chromadb
from sentence_transformers import SentenceTransformer
from chromadb.config import Settings


class MetadataChunker:
    def __init__(self, metadata: Dict):
        self.metadata = metadata

    def chunk_metadata(self) -> List[Dict]:
        """
        Chunk the metadata into smaller, meaningful pieces.
        Each chunk will include:
        - Title
        - Description
        - Transcript segment (start_time, end_time, text)
        - Frame description (if available for the segment)
        - Video URI
        """
        chunks = []

        # Chunk based on transcript segments
        for transcript_segment in self.metadata["transcript"]:
            chunk = {
                "title": self.metadata["title"],
                "description": self.metadata["description"],
                "start_time": transcript_segment["start_time"],
                "end_time": transcript_segment["end_time"],
                "text": transcript_segment["text"],
                "video_uri": self.metadata["video_uri"],
            }

            # Add frame descriptions if available for the segment
            frame_descriptions = []
            for frame in self.metadata["frames"]:
                if transcript_segment["start_time"] <= frame["timestamp"] <= transcript_segment["end_time"]:
                    frame_descriptions.append(frame["frame_description"])
            chunk["frame_descriptions"] = frame_descriptions

            chunks.append(chunk)

        return chunks


class EmbeddingGenerator:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def generate_embeddings(self, chunks: List[Dict]) -> List[List[float]]:
        """
        Generate embeddings for each chunk's text.
        """
        texts = [self._prepare_text(chunk) for chunk in chunks]
        return self.model.encode(texts).tolist()

    @staticmethod
    def _prepare_text(chunk: Dict) -> str:
        """
        Prepare the text for embedding by combining relevant fields.
        """
        text = f"Title: {chunk['title']}\nDescription: {chunk['description']}\nTranscript: {chunk['text']}"
        if chunk["frame_descriptions"]:
            text += f"\nFrame Descriptions: {' '.join(chunk['frame_descriptions'])}"
        return text


class VectorDB:
    def __init__(self, db_path: str = "./chroma_db"):
        self.client = chromadb.Client(Settings(persist_directory=db_path, is_persistent=True))
        self.collection = self.client.get_or_create_collection(name="video_metadata")

    def store_chunks(self, chunks: List[Dict], embeddings: List[List[float]]):
        """
        Store chunks and their embeddings in the vector database.
        """
        ids = [str(i) for i in range(len(chunks))]
        documents = [self._prepare_document(chunk) for chunk in chunks]
        metadatas = [self._prepare_metadata(chunk) for chunk in chunks]

        self.collection.add(
            ids=ids,
            embeddings=embeddings,
            documents=documents,
            metadatas=metadatas,
        )

    @staticmethod
    def _prepare_document(chunk: Dict) -> str:
        """
        Prepare the document text for storage.
        """
        return f"Title: {chunk['title']}\nDescription: {chunk['description']}\nTranscript: {chunk['text']}"

    @staticmethod
    def _prepare_metadata(chunk: Dict) -> Dict:
        """
        Prepare metadata for storage.
        """
        return {
            "start_time": chunk["start_time"],
            "end_time": chunk["end_time"],
            "video_uri": chunk["video_uri"],
        }


class Retriever:
    def __init__(self, db_path: str = "./chroma_db"):
        self.client = chromadb.Client(Settings(persist_directory=db_path, is_persistent=True))
        self.collection = self.client.get_collection(name="video_metadata")
        self.embedding_generator = EmbeddingGenerator()

    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        """
        Retrieve the most relevant chunks based on the user's query.
        """
        query_embedding = self.embedding_generator.generate_embeddings([{"title": "", "description": "", "text": query, "frame_descriptions": []}])[0]
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k,
        )

        # Format the results
        retrieved_chunks = []
        for i in range(len(results["ids"][0])):
            retrieved_chunks.append({
                "video_uri": results["metadatas"][0][i]["video_uri"],
                "start_time": results["metadatas"][0][i]["start_time"],
                "text": results["documents"][0][i],
            })

        return retrieved_chunks


def process_all_metadata_for_vectordb(metadata_dir: str):
    """
    Process all metadata files in the directory and store them in the vector database.
    """
    metadata_dir = Path(metadata_dir)
    vectordb = VectorDB()
    embedding_generator = EmbeddingGenerator()

    # Iterate through all metadata files
    for metadata_file in metadata_dir.glob("**/metadata.json"):
        print(f"Processing {metadata_file}...")

        # Load metadata
        with open(metadata_file, "r") as f:
            metadata = json.load(f)

        # Chunk metadata
        chunker = MetadataChunker(metadata)
        chunks = chunker.chunk_metadata()

        # Generate embeddings
        embeddings = embedding_generator.generate_embeddings(chunks)

        # Store in VectorDB
        vectordb.store_chunks(chunks, embeddings)

    print(f"All metadata processed and stored in VectorDB.")


def query_vectordb(query: str):
    """
    Query the vector database and retrieve relevant results.
    """
    retriever = Retriever()
    results = retriever.retrieve(query)

    print("Retrieved Results:")
    for result in results:
        print(f"Video URI: {result['video_uri']}")
        print(f"Start Time: {result['start_time']}")
        print(f"Text: {result['text']}")
        print("-" * 50)


if __name__ == "__main__":
    # Step 1: Process all metadata files and store in VectorDB
    metadata_dir = "./shared_data/videos"
    process_all_metadata_for_vectordb(metadata_dir)

    # Step 2: Query the VectorDB
    query = "After completing any story, what is the next crucial step"
    query_vectordb(query)