# Import Libraries

In [None]:
import os
import json
import shutil
from glob import glob
from uuid import uuid4
import yaml

import torch
torch.cuda.empty_cache()
torch.cuda.synchronize()

from transformers import (
    AutoModel,
    AutoTokenizer,
    AutoProcessor,
    AutoModelForSpeechSeq2Seq,
    pipeline,
    Qwen2_5_VLForConditionalGeneration,
)

import librosa
from moviepy import VideoFileClip
import cv2

from langchain_core.output_parsers import PydanticOutputParser
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    VectorParams,
    SparseVectorParams,
    SparseIndexParams,
    PointStruct,
    models,
)
from qdrant_client.http.models import PointStruct
from pydantic import BaseModel, Field
from typing import List

In [1]:
# retrieve models from cache
os.environ["HF_HOME"] = "/workspace/hf_cache"
os.environ["HF_HUB_CACHE"] = "/workspace/hf_cache"

# Initialize Qwen Vision Model

In [35]:
MODEL_NAME = "yangjie-cv/WeThink-Qwen2.5VL-7B"
USE_4BIT = True
MAX_NEW_TOKENS = 500
TEMPERATURE = 0
TOP_P = 0.9

def load_qwen_vl_model(model_name: str = MODEL_NAME, use_4bit: bool = USE_4BIT):
    processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)

    if use_4bit:
        quant_kwargs = dict(
            load_in_4bit=True,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
            device_map="auto",
        )
    else:
        quant_kwargs = dict(
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            device_map="auto",
        )

    model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
        model_name,
        trust_remote_code=True,
        low_cpu_mem_usage=True,
        **quant_kwargs,
    )
    return processor, model

# Load VL model + processor
processor, model = load_qwen_vl_model(MODEL_NAME, USE_4BIT)

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [214]:
@torch.inference_mode()
def generate_qwen_response(processor, model, messages: list, images=None) -> str:
    # Build chat input
    chat_text = processor.apply_chat_template(
        messages, add_generation_prompt=True, tokenize=False
    )

    # IMPORTANT: pass images through here
    inputs = processor(text=[chat_text], images=images, return_tensors="pt").to(model.device)

    generated_ids = model.generate(
        **inputs,
        max_new_tokens=MAX_NEW_TOKENS,
        temperature=TEMPERATURE,
        top_p=TOP_P,
        do_sample=TEMPERATURE > 0,
        pad_token_id=getattr(processor.tokenizer, "pad_token_id", None),
        eos_token_id=getattr(processor.tokenizer, "eos_token_id", None),
    )

    prompt_len = inputs["input_ids"].shape[-1]
    new_tokens = generated_ids[:, prompt_len:]
    out = processor.batch_decode(new_tokens, skip_special_tokens=True)[0].strip()
    return out

# Initialize `bge` Embedding Model
- here we just use text embedding model: https://huggingface.co/BAAI/bge-small-en-v1.5

In [151]:
dense_model_name = "BAAI/bge-small-en-v1.5"
dense_tokenizer = AutoTokenizer.from_pretrained(dense_model_name)
dense_embedding_model = AutoModel.from_pretrained(dense_model_name)

sparse_embedding_model = FastEmbedSparse(model_name="Qdrant/bm25")      

# Initialize Qdrant Vector Database
- https://qdrant.tech/documentation/advanced-tutorials/reranking-hybrid-search/

In [156]:
def get_qdrant_client(url: str = "http://localhost:6333", prefer_grpc: bool = True):
    """
    Initialize and return a Qdrant client.

    Args:
        url (str): URL of the Qdrant service. Default is "http://localhost:6333".
        prefer_grpc (bool): Whether to use gRPC for faster communication. Default is True.

    Returns:
        QdrantClient: An initialized Qdrant client instance.
    """
    try:
        qdrant_client = QdrantClient(url=url, prefer_grpc=prefer_grpc)
        print("Connected to Qdrant successfully.")
        return qdrant_client
    except Exception as e:
        print(f"Failed to connect to Qdrant: {e}")
        return None


def get_or_create_collection(qdrant_client, collection_name: str) -> bool:
    """
    Ensure that a specific Qdrant collection exists; create it if not found.

    This collection supports both dense and sparse embeddings for hybrid retrieval.

    Args:
        qdrant_client (QdrantClient): Active Qdrant client instance.
        collection_name (str): Name of the collection to check or create.

    Returns:
        bool: True if the collection exists or was created successfully, False otherwise.
    """
    try:
        # Retrieve all existing collections
        existing = [c.name for c in qdrant_client.get_collections().collections]

        # Create collection if it doesn't exist
        if collection_name not in existing:
            qdrant_client.create_collection(
                collection_name=collection_name,
                vectors_config={
                    "dense_embedding": VectorParams(size=384, distance=Distance.COSINE)
                },
                sparse_vectors_config={
                    "sparse_embedding": SparseVectorParams(
                        index=SparseIndexParams(on_disk=False)
                    )
                },
            )
            print(f"Collection '{collection_name}' created successfully.")
        else:
            print(f"Collection '{collection_name}' already exists.")

        return True
    except Exception as e:
        print(f"Error ensuring collection '{collection_name}': {e}")
        return False

def build_dense_embedding(model_name: str, tokenizer, dense_embedding_model, text: str):
    """
    Generate a dense vector embedding for a given text using a Hugging Face model.
    
    Args:
        model_name (str): Name of the Hugging Face model used (for reference/logging).
        tokenizer: The tokenizer corresponding to the model.
        model: The transformer model instance loaded via `AutoModel.from_pretrained`.
        text (str): Input text to encode.

    Returns:
        list[float]: A list of floats representing the dense embedding vector.
    """
    # Tokenize input text and convert to PyTorch tensors
    inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")

    # Disable gradient computation for inference
    with torch.no_grad():
        outputs = dense_embedding_model(**inputs)

    # Mean pooling
    embeddings = outputs.last_hidden_state.mean(dim=1)

    # Normalize the vector
    embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)

    return embeddings[0].tolist()

def build_sparse_embedding(text: str):
    """
    Generate a sparse embedding (term-based vector) for a given text using Qdrant's BM25 model.

    Args:
        text (str): Input text to embed.

    Returns:
        dict: A sparse embedding represented as a dictionary containing 'indices' (token IDs) and 'values' (BM25 weights).
    """
    sparse_embedding_model = FastEmbedSparse(model_name="Qdrant/bm25")
    embeddings = sparse_embedding_model.embed_query(text)
    return embeddings

def build_late_embedding(late_embedding_model, text:str):
    return late_embedding_model.embed(text)

def build_qdrant_point(dense_embeddings, sparse_embeddings, payload: dict):
    """
    Build a Qdrant PointStruct containing both dense and sparse embeddings.

    Args:
        dense_embeddings (list or np.ndarray): Dense embedding vector (e.g., from BGE model).
        sparse_embeddings: Sparse embedding object with `indices` and `values` attributes.
        payload (dict): Metadata or additional fields associated with this vector.

    Returns:
        PointStruct or None: Qdrant-compatible point structure, or None if creation fails.
    """
    try:
        qdrant_point = PointStruct(
            id=str(uuid4()),  # unique ID for the point
            vector={
                "dense_embedding": dense_embeddings,
                "sparse_embedding": {
                    "indices": sparse_embeddings.indices,
                    "values": sparse_embeddings.values,
                },
            },
            payload=payload,
        )
        return qdrant_point
    except Exception as e:
        print(f"Error building Qdrant point: {e}")
        return None

def upsert_qdrant_points(qdrant_client, collection_name: str, points: list) -> bool:
    """
    Upsert (insert or update) a batch of points into a Qdrant collection.

    Args:
        qdrant_client (QdrantClient): Active Qdrant client instance.
        collection_name (str): Name of the target Qdrant collection.
        points (list[PointStruct]): List of PointStruct objects to insert or update.

    Returns:
        bool: True if the upsert succeeded, False otherwise.
    """
    try:
        qdrant_client.upsert(
            collection_name=collection_name,
            points=points,
        )
        print(f"Successfully upserted {len(points)} point(s) into '{collection_name}'.")
        return True
    except Exception as e:
        print(f"Error upserting points into Qdrant collection '{collection_name}': {e}")
        return False
        
qdrant_client = get_qdrant_client()

Connected to Qdrant successfully.


# Initialize Embedding & Indexing func

In [153]:
def index_chunks_to_qdrant(qdrant_client, collection_name: str, summary_chunks: list, dense_model_name: str, dense_tokenizer, dense_embedding_model):
    """
    Build and upsert Qdrant points from summarized transcript/image chunks.

    Args:
        qdrant_client: Initialized Qdrant client.
        collection_name (str): Name of the Qdrant collection.
        summary_chunks (list[dict]): Each chunk should contain 'text', 'summary', 'topics'.
        dense_model_name (str): Name of dense embedding model.
        dense_tokenizer: Tokenizer for the dense model.
        dense_model: Loaded dense embedding model instance.

    Returns:
        int: Number of Qdrant points successfully indexed.
    """
    qdrant_points = []

    # ensure collection is exists in qdrant before processing
    get_or_create_collection(qdrant_client, collection_name)

    print(f"== Building Qdrant points for collection: '{collection_name}' ==")

    for i, chunk in enumerate(summary_chunks, 1):
        text = chunk.get("text", "")
        summary = chunk.get("summary", "")
        topics = chunk.get("topics", [])

        # Combine structured fields into rich embedding text
        embed_text = f"Summary: {summary}\nTopics: {', '.join(topics)}\n---\n{text}"

        # Compute embeddings
        dense_vector = build_dense_embedding(dense_model_name, dense_tokenizer, dense_embedding_model, embed_text)
        sparse_vector = build_sparse_embedding(embed_text)

        # Build Qdrant payload and point
        qdrant_payload = {
            "text": embed_text,
            "summary": summary,
            "topics": topics,
        }
        qdrant_point = build_qdrant_point(dense_vector, sparse_vector, qdrant_payload)
        qdrant_points.append(qdrant_point)

        print(f"   [{i}/{len(summary_chunks)}] Point built for topics: {topics}")

    # Upsert to Qdrant
    print(f" Upserting {len(qdrant_points)} points into collection '{collection_name}' ...")
    upsert_qdrant_points(qdrant_client, collection_name, qdrant_points)

    print(f" Successfully indexed {len(qdrant_points)} chunks into Qdrant.\n")
    return len(qdrant_points)

# Initialize Prompts and Pydantic Format 

In [154]:
class TranscriptInfo(BaseModel):
    summary: str = Field(...,description="A concise yet informative summary describing what happens or is discussed during the given timeframe, including the timeframe explicitly in the text.")
    topics: List[str] = Field(...,description="A list of key topics, themes, or entities mentioned within the transcript segment.")

transcript_summary_parser = PydanticOutputParser(pydantic_object=TranscriptInfo)

transcript_img_summarizer_prompt = f"""
You are an expert assistant skilled in summarizing image descriptions within their respective timeframes.

Task:
- Input: a timeframe and its corresponding image-based description.
- Output:
  1. A concise yet informative summary that accurately captures what happens or is discussed during that timeframe, and naturally includes the timeframe (e.g., “From 00:10s to 00:25s, ...”).
  2. A list of key topics, themes, or entities mentioned within the segment.

Guidelines:
- Be factual, objective, and clear.
- Avoid redundancy or filler text.
- Ensure both summary and topics align precisely with the image description.

Output format:
{transcript_summary_parser.get_format_instructions()}
"""

transcript_text_summarizer_prompt = f"""
You are an expert summarization assistant specialized in understanding and summarizing transcript segments.

Your task:
- Input: a transcript segment along with its associated timeframe (start and end time).
- Output:
  1. A concise yet informative summary that accurately captures what happens or is discussed during that timeframe, and naturally includes the timeframe (e.g., “From 00:10s to 00:25s, ...”).
  2. A list of key topics, themes, or entities mentioned within the segment.

Guidelines:
- Be clear, factual and, objective.
- Avoid redundancy, filler words, or speculation.

Output format (strictly follow this schema):
{transcript_summary_parser.get_format_instructions()}
"""

# Video RAG Pipeline Start Here

## Load Video File

In [30]:
video_file = "/workspace/notebooks/meeting_updates.mp4"

In [187]:
# define collection name for qdrant storage
collection_name = os.path.splitext(os.path.basename(video_file))[0]

## Convert Video to Images 
- by using openCV library
- ref: https://medium.com/@wayandadangunsri/converting-video-to-images-using-python-and-opencv-72b2ea66a692
- logic:
```
fps = 30 # video has 30 frames each second
frame_rate = 2 # save 2 frames per second

save_interval = int(30 / 2)
save_interval = 15
- save one frame every 15 frames
```

In [5]:
def extract_video_frames(
    video_file: str,
    frame_rate: float = 2.0,
    output_root: str = "data/frames",
    group_seconds: int = 5
):
    """
    Extract frames from a video at the specified frame rate.
    Frames are grouped into folders based on a configurable number of seconds (group_seconds).
    If the output folder already exists, it will be cleared first.

    Args:
        video_file (str): Path to the video file.
        frame_rate (float): Desired number of frames to save per second (e.g. 2.0 = one every 0.5s).
        output_root (str): Root directory to store extracted frames.
        group_seconds (int): Duration of each group in seconds (e.g. 5 → group frames every 5s).

    Returns:
        str: Path to the folder where frames were saved.
    """

    try:
        # Check if video file exists
        if not os.path.exists(video_file):
            raise FileNotFoundError(f"Video file not found: {video_file}")

        # Try to open the video
        cap = cv2.VideoCapture(video_file)
        if not cap.isOpened():
            raise ValueError(f"Cannot open video file: {video_file}")

        # Get video info
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        print(f"Video opened: {video_file}")
        print(f"FPS: {fps:.2f}, Total Frames: {total_frames}\n")

        # Create output folder
        video_name = os.path.splitext(os.path.basename(video_file))[0]
        output_directory = os.path.join(output_root, f"{video_name}_frames")

        # Clear existing folder
        if os.path.exists(output_directory):
            print(f"Cleaning existing folder: {output_directory}\n")
            shutil.rmtree(output_directory)
        os.makedirs(output_directory, exist_ok=True)
        print(f"Output folder: {output_directory}\n")

        # Frame extraction setup
        frame_count = 0
        saved_count = 0
        save_interval = int(fps / frame_rate)
        print(f"Extracting {frame_rate} frames per second → every {save_interval}th frame")
        print(f"Grouping every {group_seconds} seconds\n")

        # Read frames in a loop
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Finished reading all frames.")
                break

            frame_count += 1

            # Save at chosen interval
            if frame_count % save_interval == 0:
                time_sec = frame_count / fps
                start_sec = time_sec - (1 / frame_rate)
                end_sec = time_sec

                # Determine which group this frame belongs to
                group_index = int(time_sec // group_seconds)
                start_group = group_index * group_seconds
                end_group = start_group + group_seconds

                # Create subfolder for the group
                group_folder = os.path.join(output_directory, f"group_{start_group:03d}s_{end_group:03d}s")
                os.makedirs(group_folder, exist_ok=True)

                # Save frame inside group folder
                output_file = os.path.join(group_folder, f"frame_{frame_count}.jpg")
                cv2.imwrite(output_file, frame)

                saved_count += 1
                print(f"Saved frame {frame_count:>5} ({time_sec:>6.2f}s) "
                      f"covers {start_sec:.2f}s–{end_sec:.2f}s → {output_file}")

        # Release resources
        cap.release()
        cv2.destroyAllWindows()

        print(f"\nDone! Extracted {saved_count} frames from {video_name}.")
        print(f"Grouped every {group_seconds} seconds inside: {output_directory}")
        return output_directory

    except FileNotFoundError as e:
        print(e)
    except ValueError as e:
        print(e)
    except Exception as e:
        print(f"Unexpected error: {e}")

extract_video_frames(video_file)

Video opened: /workspace/notebooks/meeting_updates.mp4
FPS: 25.00, Total Frames: 6700

Output folder: data/frames/meeting_updates_frames

Extracting 2.0 frames per second → every 12th frame
Grouping every 5 seconds

Saved frame    12 (  0.48s) covers -0.02s–0.48s → data/frames/meeting_updates_frames/group_000s_005s/frame_12.jpg
Saved frame    24 (  0.96s) covers 0.46s–0.96s → data/frames/meeting_updates_frames/group_000s_005s/frame_24.jpg
Saved frame    36 (  1.44s) covers 0.94s–1.44s → data/frames/meeting_updates_frames/group_000s_005s/frame_36.jpg
Saved frame    48 (  1.92s) covers 1.42s–1.92s → data/frames/meeting_updates_frames/group_000s_005s/frame_48.jpg
Saved frame    60 (  2.40s) covers 1.90s–2.40s → data/frames/meeting_updates_frames/group_000s_005s/frame_60.jpg
Saved frame    72 (  2.88s) covers 2.38s–2.88s → data/frames/meeting_updates_frames/group_000s_005s/frame_72.jpg
Saved frame    84 (  3.36s) covers 2.86s–3.36s → data/frames/meeting_updates_frames/group_000s_005s/frame

'data/frames/meeting_updates_frames'

In [36]:
def build_user_prompt_for_img_chunk(start_s: float, end_s: float, text: str) -> str:
    return f"Timeframe: {start_s:.2f}s to {end_s:.2f}s\n\nImage Summaries: {text}"

def extract_time_from_group_path(folder_path: str) :
    """
    Expect folder name like 'group_005s_010s' and return (5.0, 10.0).
    Fallback to (0.0, 0.0) if pattern not found.
    """
    name = os.path.basename(os.path.normpath(folder_path))
    m = re.search(r"(\d+)s_(\d+)s", name)
    if not m:
        return 0.0, 0.0
    return float(m.group(1)), float(m.group(2))

def list_images(folder: str):
    exts = ("*.jpg", "*.jpeg", "*.png", "*.webp", "*.bmp")
    files = []
    for e in exts:
        files.extend(glob(os.path.join(folder, e)))
    return sorted(files)

def load_and_resize(image_path: str, max_edge: int = 768) -> Image.Image:
    """
    Load an image as RGB and downscale so the longest edge <= max_edge.
    """
    img = Image.open(image_path).convert("RGB")
    w, h = img.size
    scale = min(max_edge / max(w, h), 1.0)  # only downscale
    if scale < 1.0:
        img = img.resize((int(w * scale), int(h * scale)), Image.LANCZOS)
    return img

def summarize_frames(processor, model, folder: str, max_edge: int = 768) -> List[Dict]:
    start_s, end_s = extract_time_from_group_path(folder)
    paths = list_images(folder)
    if not paths:
        print(f"No images found in: {folder}")
        return {}

    print("\n" + "=" * 80)
    print(f"📁 Processing group folder: {os.path.basename(folder)}")
    print(f"⏱️  Timeframe: {start_s:.2f}s → {end_s:.2f}s")
    print(f"🖼️  Found {len(paths)} image(s) in folder: {folder}")
    print("=" * 80)


    img_descriptions = []
    for i, p in enumerate(paths, 1):
        img = load_and_resize(p, max_edge=max_edge)

        messages = [
            {"role": "system", "content": [{"type": "text", "text": "Summarize this image clearly and concisely."}]},
            {"role": "user", "content": [{"type": "image", "image": img},]},
        ]
        
        img_description = generate_qwen_response(processor, model, messages, images=[img])
        img_descriptions.append(img_description)
        print(f"[{i}/{len(paths)}] Frame: {os.path.basename(p)}")
        print(f"    ➜ Summary: {img_description}\n")

    # generate image summary
    text = '\n\n'.join(img_descriptions)
    img_user_prompt = build_user_prompt_for_img_chunk(start_s, end_s, text)
    summary_messages = [
            {"role": "system", "content": [{"type": "text", "text": transcript_img_summarizer_prompt}]},
            {"role": "user", "content": [{"type": "text", "text": img_user_prompt},]},
    ]
    summary_output = generate_qwen_response(processor, model, summary_messages)
    summary_info = transcript_summary_parser.parse(summary_output)

    # Extract summary and topics safely
    summary = ""
    topics = []
    try:
        summary = summary_info.summary
        topics = summary_info.topics
    except Exception as e:
        print(f"[ERROR] Failed to extract summary or topics: {e}")
        print("Raw model output:", summary_output)

    # log for summary
    print("-" * 80)
    print(f"Group Summary for {os.path.basename(folder)} ({start_s:.2f}s → {end_s:.2f}s):")
    print(summary)
    print(f"\nTopics: {topics}")
    print("-" * 80 + "\n")

    # return structured result
    summary_chunk = ({
        "text": text,
        "summary": summary,
        "topics": topics,
        "type": "img"
    })

    return summary_chunk

def get_frame_groups(parent_folder: str) -> List[Dict]:
    """
    Get all subfolders under parent_folder that look like 'group_*',
    """
    # find subfolders named group_*
    groups = [p for p in glob(os.path.join(parent_folder, "group_*")) if os.path.isdir(p)]
    groups = sorted(groups)

    if not groups:
        print(f"[WARN] No 'group_*' subfolders under: {parent_folder}")
        return []
    return groups
    
# ==== Run all frame groups ====
frame_groups = get_frame_groups(r"data/frames/meeting_updates_frames")
img_summary_chunks = []

for i, group in enumerate(frame_groups, start=1):
    print(f"\n\n========== Group {i}/{len(frame_groups)} ==========")
    group_summary = summarize_frames(processor, model, group)
    img_summary_chunks.append(group_summary)
    print(f"Finished processing {os.path.basename(group)}\n")




📁 Processing group folder: group_000s_005s
⏱️  Timeframe: 0.00s → 5.00s
🖼️  Found 10 image(s) in folder: data/frames/meeting_updates_frames/group_000s_005s
[1/10] Frame: frame_12.jpg
    ➜ Summary: The image is completely black with no visible content or details.

[2/10] Frame: frame_24.jpg
    ➜ Summary: The image is completely black with no visible content or details.

[3/10] Frame: frame_36.jpg
    ➜ Summary: The image displays a black background with the text "Business Result" in a light gray font, positioned in the upper left corner.

[4/10] Frame: frame_48.jpg
    ➜ Summary: The image displays a black background with the text "Business Result" in white, with "Business" on the first line and "Result" on the second line, both in a clean, sans-serif font.

[5/10] Frame: frame_60.jpg
    ➜ Summary: The image features a black background with the text "Business Result" in white, with "Business" on the first line and "Result" on the second line, both in a clean, sans-serif font.

[6/

In [157]:
index_chunks_to_qdrant(qdrant_client=qdrant_client,collection_name=collection_name,summary_chunks=img_summary_chunks,dense_model_name=dense_model_name,dense_tokenizer=dense_tokenizer,dense_embedding_model=dense_embedding_model)

Collection 'test' already exists.
== Building Qdrant points for collection: 'test' ==
   [1/54] Point built for topics: ['Business Result', 'Intermediate', 'black background', 'white text', 'yellow horizontal stripe']
   [2/54] Point built for topics: ['Business Result', 'Unit 2', 'A team meeting', 'Intermediate-level learners']
   [3/54] Point built for topics: ['Business English course', 'Business Result', 'Unit 2', 'A team meeting', 'Intermediate level learners']
   [4/54] Point built for topics: ['QPG', 'logo', 'star', 'triangle', 'glass doors', 'windows', 'commercial area', 'industrial area', 'meeting room', 'professional setting']
   [5/54] Point built for topics: ['meeting', 'discussion', 'professional environment', 'meeting room', 'whiteboard', 'door']
   [6/54] Point built for topics: ['professional meeting', 'conference room', 'woman leading', 'four seated individuals', 'table with documents, water bottle, snacks, coffee pot', 'whiteboard', 'collaborative environment']
   [7/

54

## Extract Audio from Video

In [8]:
def extract_audio_from_video(video_file: str, output_folder: str = "data/audio") -> str:
    """
    Extracts audio from a given video file and saves it as an MP3 file.

    This function reads the input video file, extracts its audio track,
    and saves it in the specified output folder as an MP3 file.
    The output filename will follow the pattern: <video_name>_audio.mp3.

    Args:
        video_file (str): Path to the input video file.
        output_folder (str): Directory to save the extracted audio file.
                             Defaults to "data/audio".

    Returns:
        str: The path to the generated audio file.
    """
    try:
        # Check if video file exists
        if not os.path.exists(video_file):
            raise FileNotFoundError(f"Video file not found: {video_file}")

        # Ensure output folder exists
        os.makedirs(output_folder, exist_ok=True)
        print(f"Output directory: {output_folder}")

        # Extract video name (without extension)
        video_name = os.path.splitext(os.path.basename(video_file))[0]
        output_path = os.path.join(output_folder, f"{video_name}_audio.mp3")

        print(f"Starting audio extraction from: {video_file} and Output audio will be saved to: {output_path}")

        # Convert video to audio
        video = VideoFileClip(video_file)
        video.audio.write_audiofile(output_path, codec='mp3')
        video.close()

        print(f"Audio extraction completed: {output_path}")
        return output_path

    except FileNotFoundError as e:
        print(f"{e}")
        raise

    except Exception as e:
        print(f"Failed to extract audio: {e}")
        raise

extract_audio_from_video(video_file)

Output directory: data/audio
Starting audio extraction from: /workspace/notebooks/meeting_updates.mp4 and Output audio will be saved to: data/audio/meeting_updates_audio.mp3
MoviePy - Writing audio in data/audio/meeting_updates_audio.mp3


                                                                      

MoviePy - Done.
Audio extraction completed: data/audio/meeting_updates_audio.mp3


'data/audio/meeting_updates_audio.mp3'

## Convert Speech to Text
- transcribes an audio file (meeting recording) into text using a (whisper) speech recognition model.

In [20]:
def transcribe_audio_whisper(audio_path: str, output_dir: str = "data/transcript", chunk_length_s: int = 5, batch_size: int = 32):
    """
    Transcribes speech from an audio file using the Distil-Whisper small English model.

    Groups text by chunk_length_s seconds and saves output as a dictionary in a text file.
    Example output:
    {
        "0–5s": "text here",
        "5–10s": "text here"
    }

    Args:
        audio_path (str): 
            Path to the input audio file (e.g., MP3 or WAV).
        output_dir (str, optional): 
            Directory where the transcribed output file will be saved.
            Defaults to "data/transcript".
        chunk_length_s (int, optional): 
            Length (in seconds) of each audio chunk to process for grouping.
            Determines how frequently timestamps are split in the final dictionary (how model splits audio internally)
            Defaults to 5.
        batch_size (int, optional): 
            Number of audio chunks to process in one batch during inference.
            Higher values may improve performance on GPU.
            Defaults to 32.

    Returns:
        str: 
            Path to the saved transcription yaml file (dictionary-style output).
    """

    try:
        # Check if audio file exists
        if not os.path.exists(audio_path):
            raise FileNotFoundError(f"Audio file not found: {audio_path}")

        print("Initializing Whisper model and processor...")

        model_id = "distil-whisper/distil-small.en"
        torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

        # Load model and processor
        transcribe_model = AutoModelForSpeechSeq2Seq.from_pretrained(
            model_id,
            torch_dtype=torch_dtype,
            low_cpu_mem_usage=True,
            use_safetensors=True
        )
        processor = AutoProcessor.from_pretrained(model_id)

        # Create pipeline
        transcribe_pipeline = pipeline(
            "automatic-speech-recognition",
            model=transcribe_model,
            tokenizer=processor.tokenizer,
            feature_extractor=processor.feature_extractor,
            torch_dtype=torch_dtype,
            device="cuda:0" if torch.cuda.is_available() else "cpu"
        )

        print(f"Loading audio from: {audio_path}")
        audio, sr = librosa.load(audio_path, sr=16000)

        # Prepare output path
        audio_name = os.path.splitext(os.path.basename(audio_path))[0]
        os.makedirs(output_dir, exist_ok=True)
        transcription_path = os.path.join(output_dir, f"{audio_name}_transcript.yaml")

        print("Starting transcription process...")
        transcriptions = transcribe_pipeline(
            audio,
            chunk_length_s=chunk_length_s,
            batch_size=batch_size,
            return_timestamps=True
        )

        print(transcriptions)

        # Build dictionary based on chunks
        transcript_dict = {}
        for chunk in transcriptions.get("chunks", []):
            start, end = chunk.get("timestamp", (0, 0))
            text = chunk.get("text", "").strip()
            if start is not None and end is not None:
                key = f"{int(start)}–{int(end)}s"
                transcript_dict[key] = text

        # Save dictionary as formatted JSON text
        with open(transcription_path, "w", encoding="utf-8") as f:
            yaml.dump(transcript_dict, f, allow_unicode=True, sort_keys=False)

        print("\n--- Transcription Completed ---\n")
        print("Grouped Transcript:\n")
        for k, v in transcript_dict.items():
            print(f"{k}: {v}")

        print(f"\nTranscription saved as dictionary at: {transcription_path}")
        return transcription_path

    except FileNotFoundError as fnf_err:
        print(f"Error: {fnf_err}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

transcribe_audio_whisper("data/audio/meeting_updates_audio.mp3", chunk_length_s=6)

Initializing Whisper model and processor...


Device set to use cuda:0


Loading audio from: data/audio/meeting_updates_audio.mp3




Starting transcription process...
{'text': " Thank you. Thanks. I don't know. I don't know. Quartz Power Group Quartz's power group is an energy company. It supplies power to homes across the UK. The Human Resources Department The human resources departmental managers have just started their weekly team meeting led by Paul the head of HR. we were meeting here? Yes, sorry IT needed to do something to my computer so my office wasn't free. Okay well this is fine. Anyway shall we start with an update from everyone? Can we try and keep this to 20 minutes, so we have time for the main item afterwards. Uh, so... So, how are things going? Maria, what's happening in the training department this week? So far so good we have two external trainers in this week one of them is working with called center staff at the other side. So I hope they all turn up. What do you mean? Well, there were lots of absences for the last training we had at the called centre. I'm surprised really. It's in paid time and

'data/transcript/meeting_updates_audio_transcript.yaml'

## Transcript Chunking

- yaml store format

In [193]:
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True)

def count_tokens(text: str) -> int:
    """
    Count tokens in a text using Qwen2.5-VL-7B-Instruct tokenizer.
    Works for text-only input.
    
    Args:
        text (str): The input text.
        model_name (str): Hugging Face model name (default: Qwen/Qwen2.5-VL-7B-Instruct).

    Returns:
        int: Number of tokens.
    """
    try:
        # Tokenize input
        tokens = tokenizer.encode(text)
        token_count = len(tokens)

        return token_count
    except Exception as e:
        print(f"Error: {e}")
        return 0

In [194]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
    tokenizer=tokenizer,
    chunk_size=300,      # token-based
    chunk_overlap=100,   # overlap in tokens
)

In [195]:
def parse_timeframe(timeframe: str):
    """
    Convert a timeframe string into numeric start/end seconds.

    Args:
        timeframe (str): A timeframe range like '0–5s' or '0-5s'.

    Returns:
        Tuple[float, float]: (start_seconds, end_seconds)
    
    """
    timeframe = timeframe.replace("–", "-").replace("s", "")
    start, end = timeframe.split("-")
    return float(start), float(end)

def read_transcript_path(transcript_path):
    """
    Read a YAML transcript file into a dict mapping timeframe -> text.

    Args:
        transcript_path (str): Path to a YAML file where keys are timeframes
            (e.g., '0–5s', '5–10s') and values are transcript strings.

    Returns:
        Dict[str, str]: Mapping of timeframe key to transcript text.
    """
    with open(transcript_path, "r", encoding="utf-8") as f:
        transcript_data = yaml.safe_load(f)

    return transcript_data

def chunk_transcript_text(transcript_path: str, max_chunk_token_size: int = 300):
    """
    Read YAML transcript (time-range keys → text) and group into chunks <= chunk_size characters.

    - Combines consecutive groups until adding another would exceed chunk_size.
    - If a single group is longer than chunk_size, split it using RecursiveCharacterTextSplitter.
    - Each chunk has start/end seconds, list of group keys, and text.
    """
    transcript_data = read_transcript_path(transcript_path)
    transcript_text = []
    for timeframe, text in transcript_data.items():
        start, end = parse_timeframe(timeframe)
        if text.strip():
            transcript_text.append((timeframe, start, end, text.strip()))
            
    transcript_text.sort(key=lambda x:x[1])

    chunks = []
    current_chunk = {
        "text": "",
        "groups": [],
        "start": None,
        "end": None
    }
    
    for key, start, end, text in transcript_text:
        text = text.strip()
        text_token_len = count_tokens(text)
    
        # Case 1: if this single group > chunk_size, split it into smaller pieces
        if text_token_len > max_chunk_token_size:
            # Save current chunk before handling this long one
            if current_chunk["text"]:
                chunks.append({
                    "start": current_chunk["start"],
                    "end": current_chunk["end"],
                    "groups": current_chunk["groups"],
                    "text": current_chunk["text"].strip()
                })
                current_chunk = {"text": "", "groups": [], "start": None, "end": None}
    
            sub_texts = splitter.split_text(text)
            for i, sub in enumerate(sub_texts):
                chunks.append({
                    "start": start,
                    "end": end,
                    "groups": [f"{key}#part{i+1}"],
                    "text": sub.strip()
                })
            continue
    
        # Case 2: if adding this text exceeds chunk_size, save current and start new
        if len(current_chunk["text"]) + text_token_len + 1 > max_chunk_token_size:
            chunks.append({
                "start": current_chunk["start"],
                "end": current_chunk["end"],
                "groups": current_chunk["groups"],
                "text": current_chunk["text"].strip()
            })
            current_chunk = {"text": "", "groups": [], "start": None, "end": None}
    
        # Add current text to the ongoing chunk
        if not current_chunk["text"]:
            current_chunk["start"] = start
        current_chunk["end"] = end
        current_chunk["groups"].append(key)
        current_chunk["text"] += (" " if current_chunk["text"] else "") + text
    
    # Save the last chunk
    if current_chunk["text"]:
        chunks.append({
            "start": current_chunk["start"],
            "end": current_chunk["end"],
            "groups": current_chunk["groups"],
            "text": current_chunk["text"].strip()
        })

    return chunks

In [196]:
transcript_path = "data/transcript/meeting_updates_audio_transcript.yaml"
chunks = chunk_transcript_text(transcript_path)
chunks

[{'start': 0.0,
  'end': 48.0,
  'groups': ['0–5s', '5–6s', '6–16s', '16–18s', '20–48s'],
  'text': "Thank you. Thanks. I don't know. I don't know. Quartz Power Group Quartz's power group is an energy company. It supplies power to homes across the UK. The Human Resources Department The human resources departmental managers have just started their weekly team meeting led by Paul the head of HR. we were meeting here? Yes, sorry IT needed to do something to my computer so my office wasn't free. Okay well this is fine. Anyway shall we start with an update from everyone? Can we try and keep this to 20 minutes, so we have time for the main item afterwards."},
 {'start': 48.0,
  'end': 72.0,
  'groups': ['48–52s', '52–56s', '56–66s', '66–68s', '68–72s'],
  'text': "Uh, so... So, how are things going? Maria, what's happening in the training department this week? So far so good we have two external trainers in this week one of them is working with called center staff at the other side. So I hop

## Summarize Transcript Chunk
Vision Model Leaderboard
- https://huggingface.co/spaces/TIGER-Lab/MMEB-Leaderboard
- https://huggingface.co/yangjie-cv/WeThink-Qwen2.5VL-7B
- https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct

Multimodal Embedding Leaderboard
- https://huggingface.co/spaces/TIGER-Lab/MMEB-Leaderboard
- we test with: https://huggingface.co/Alibaba-NLP/gme-Qwen2-VL-2B-Instruct

In [90]:
# pip install transformers accelerate bitsandbytes torch pyyaml langchain qdrant-client
# pip install fastembed-gpu or pip install fastembed

In [198]:
def build_user_prompt_for_text_chunk(text: str, start_s: float, end_s: float) -> str:
    return (
        f"Timeframe: {start_s:.2f}s to {end_s:.2f}s\n\n"
        f"Transcript:\n{text}\n\n"
    )
    
def summarize_transcript_chunks(chunks: list) -> list:
    """
    Summarize a list of transcript chunks and extract key topics for each segment.

    Args:
        chunks (list): List of dictionaries, each containing a transcript segment with 'start', 'end', and 'text' fields.

    Returns:
        list: List of structured dictionaries ready for indexing and storage in a vector database.
    """
    summary_chunks = []

    for i, data in enumerate(chunks, start=1):
        text = data["text"]
        start_s = data['start']
        end_s = data['end']

        # Run summarization model on this chunk
        messages = [
            {"role": "system", "content": [{"type": "text", "text": transcript_text_summarizer_prompt}]},
            {"role": "user",   "content": [{"type": "text", "text": build_user_prompt_for_text_chunk(text, start_s, end_s)}]},
        ]
        
        summary_output = generate_qwen_response(processor, model, messages)
        summary_info = transcript_summary_parser.parse(summary_output)

        summary = ""
        topics = []
        try:
            summary = summary_info.summary
            topics = summary_info.topics
        except Exception as e:
            print(f"Error extracting summary or topics in chunk {i}: {e}")

        # Debug output for verification
        print(f"\nChunk {i}:")
        print(text)
        print(f"\nSummary: {summary}")
        print(f"Topics: {topics}")

        # Store structured result
        summary_chunks.append({
            "text": text,
            "summary": summary,
            "topics": topics,
            "type": "txt"
        })

    return summary_chunks

txt_summary_chunks = summarize_transcript_chunks(chunks)


Chunk 1:
Thank you. Thanks. I don't know. I don't know. Quartz Power Group Quartz's power group is an energy company. It supplies power to homes across the UK. The Human Resources Department The human resources departmental managers have just started their weekly team meeting led by Paul the head of HR. we were meeting here? Yes, sorry IT needed to do something to my computer so my office wasn't free. Okay well this is fine. Anyway shall we start with an update from everyone? Can we try and keep this to 20 minutes, so we have time for the main item afterwards.

Summary: From 0.00s to 48.00s, the transcript discusses the introduction of a meeting led by Paul, the head of the Human Resources Department. The HR managers are starting their weekly team meeting, and the discussion includes a brief update from everyone, with a request to keep the meeting under 20 minutes to allow time for the main item.
Topics: ['Quartz Power Group', 'HR Department', 'Weekly Team Meeting', 'Paul (HR Head)', 

In [199]:
index_chunks_to_qdrant(qdrant_client=qdrant_client,collection_name=collection_name,summary_chunks=txt_summary_chunks,dense_model_name=dense_model_name,dense_tokenizer=dense_tokenizer,dense_embedding_model=dense_embedding_model)

Collection 'meeting_updates' already exists.
== Building Qdrant points for collection: 'meeting_updates' ==
   [1/10] Point built for topics: ['Quartz Power Group', 'HR Department', 'Weekly Team Meeting', 'Paul (HR Head)', 'Computer Issue', 'Meeting Duration']
   [2/10] Point built for topics: ['training department', 'external trainers', 'call center staff', 'training attendance']
   [3/10] Point built for topics: ['training sessions', 'customer services', 'Anna']
   [4/10] Point built for topics: ['internet training', 'IT readiness', 'update meeting']
   [5/10] Point built for topics: ['training department', 'Schools Day event', 'energy sources', 'marketing']
   [6/10] Point built for topics: ['marketing event', 'budget allocation', 'time commitment', 'David']
   [7/10] Point built for topics: ['call center training', 'internet training', "school's events", 'Maria', 'Anna', 'Matt', 'Lucy', 'David']
   [8/10] Point built for topics: ['recruitment', 'finance assistant', 'interviews', 'a

10

# RAG

In [None]:
def query_rag_points(user_query: str, dense_model_name, dense_tokenizer, dense_embedding_model, qdrant_client, collection_name: str, limit: int = 10,):
    """
    Run hybrid (dense + sparse) retrieval against Qdrant using RRF fusion.

    Args:
        user_query (str): The user question/query.
        dense_model_name: Name or id for the dense embedding model.
        dense_tokenizer: Tokenizer instance for the dense model.
        dense_embedding_model: Dense embedding model instance.
        qdrant_client: Initialized QdrantClient.
        collection_name (str): Qdrant collection to search.
        limit (int): Per-branch limit for Prefetch (dense and sparse).

    Returns:
        retrieved_points: Qdrant query result.
    """
    # Build embeddings (uses your existing helpers)
    dense_vector = build_dense_embedding(dense_model_name, dense_tokenizer, dense_embedding_model, user_query)
    sparse_vector = build_sparse_embedding(user_query)

    # Query Qdrant with RRF fusion
    retrieved_points = qdrant_client.query_points(
        collection_name=collection_name,
        prefetch=[
            models.Prefetch(
                query=models.SparseVector(indices=sparse_vector.indices, values=sparse_vector.values),
                using="sparse_embedding",
                limit=limit
            ),
            models.Prefetch(
                query=dense_vector,
                using="dense_embedding",
                limit=limit
            )
        ],
        query=models.FusionQuery(fusion=models.Fusion.RRF)
    )

    print(retrieved_points)
    return retrieved_points

def build_doc_context(retrieved_points, top_k: int = 5) -> str:
    """
    Build a formatted document context string from retrieved Qdrant points for LLM input.

    Args:
        retrieved_points: Result object from qdrant_client.query_points().
        top_k (int): Number of top retrieved points to include in the context.

    Returns:
        str: A structured string combining summaries, topics, and transcript text.
    """
    doc_context = ""

    for point in retrieved_points.points[:top_k]:
        payload = point.payload
        score = point.score
        summary = payload.get("summary", "")
        text = payload.get("transcript_text", "")
        topics = payload.get("topics", "")
        doc_type = payload.get("type", "")

        context_block = (
            f"### Document Type: {doc_type}\n"
            f"**Relevance Score:** {score:.4f}\n\n"
            f"**Summary:** {summary}\n\n"
            f"**Topics:** {', '.join(topics) if isinstance(topics, list) else topics}\n\n"
            f"**Transcript Text:**\n{text}\n"
            "------------------------------------------------------------\n"
        )

        doc_context += context_block

    return doc_context

retrieved_points = query_rag_points(user_query, dense_model_name, dense_tokenizer, dense_embedding_model, qdrant_client, collection_name, limit=10)
doc_context = build_doc_context(retrieved_points, top_k=5)
print(doc_context)

In [228]:
class RAGAnswerSchema(BaseModel):
    response_text: str = Field(..., description="Final concise answer to the user's question, derived only from the provided transcript context.")

rag_ans_format_instructions = rag_answer_parser.get_format_instructions()

rag_qa_prompt_base = """
You are a knowledgeable and reliable assistant tasked with answering questions **only** based on the provided transcript context.

The given context may include different types of data:
- **type:** Indicates the source of the content (e.g., `img` = frame from video, `txt` = transcript text).
- **transcript_text:** Raw speech-to-text transcript extracted from audio **or** descriptive text generated from a video frame or image.
- **summary:** Condensed description or explanation of what happens in the transcript segment.
- **topics:** List of key entities, events, or themes mentioned.
- **score:** Relevance score of the retrieved content (higher means more relevant).

### Response Guidelines
- Use **only** the information contained in the context to answer the question.
- Provide your answer in **clear point form** for readability.
- Highlight important keywords or entities in **bold**.
- Avoid assumptions or fabricated details.
- If the answer cannot be found in the context, respond exactly with:
  **"The answer is not available in the provided context."**

### Document Context
{doc_context}

### Output Format (Strictly follow this schema)
"""

# Helper to format user query
def format_user_query(user_query: str) -> str:
    return f"User Query: {user_query}"

def generate_rag_response(doc_context: str, user_query: str, rag_qa_prompt_base: str, rag_ans_format_instructions: str, processor, model):
    """
    Generate an LLM response using a RAG-style QA prompt and retrieved document context.

    Args:
        doc_context (str): Formatted text context built from retrieved Qdrant points.
        user_query (str): The user’s question or query.
        rag_qa_prompt_base (str): Base QA prompt template (without format instructions).
        rag_ans_format_instructions (str): Output format instructions (from parser).
        processor: The model’s processor/tokenizer for the Qwen model.
        model: The Qwen model instance.

    Returns:
        The generated model response object.
    """
    # Build full system prompt
    complete_prompt = rag_qa_prompt_base.format(doc_context=doc_context) + rag_ans_format_instructions

    # Construct messages
    rag_messages = [
        {
            "role": "system",
            "content": [{"type": "text", "text": complete_prompt}],
        },
        {
            "role": "user",
            "content": [{"type": "text", "text": format_user_query(user_query)}],
        },
    ]

    # Generate and return the model response
    response = generate_qwen_response(processor, model, rag_messages)
    parse_response = rag_answer_parser.parse(response).response_text
    return parse_response


In [None]:
response = generate_rag_response(doc_context=doc_context,user_query=user_query,rag_qa_prompt_base=rag_qa_prompt_template,rag_ans_format_instructions=rag_answer_parser.get_format_instructions(),processor=processor,model=model)