# Twelve Labs Video RAG with Weaviate

## Set Up Our Environment

### Install Dependencies

In [None]:
!python -m pip install -U -q twelvelabs
!python -m pip install -U -q weaviate-client

In [None]:
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate
!python -m pip install -U bitsandbytes
!python -m pip install git+https://github.com/huggingface/transformers.git

In [None]:
!python -m pip install pillow
!python -m pip install sentencepiece
!python -m pip install matplotlib

### Set Up Twelve Labs and Weaviate SDKs

In [None]:
import os
from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')

weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")


In [None]:
from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)


In [None]:
import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")

### Setting Up Our Video Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"

### Setting Up Our Video Data
Some of our videos are too low resolution to use in the embedding engine, so we will double their their resolution with `upscale_video`.

read_video_pyav comes directly from the [LLaVa-NeXT-Video collab notebook](https://colab.research.google.com/drive/1CZggLHrjxMReG-FNOmqSOdi4z7NPq6SO?usp=sharing#scrollTo=hqpPqDKuQUTq) and it formats videos in the correct numpy representation for inference.



In [None]:
import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])


Here we upscale all of our videos

In [None]:
# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):

    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)


## Compare Pegasus and LLaVA-NeXT-Video on a Single Video

We will start by comparing Pegausus and LLaVA-NeXT-Video on generating insights from a single video

### Using Pegasus to Chat with our Video

To chat with our video, we first need to have Pegasus index it.

We will create an index named `sports_videos` and then upload our video to this index to be indexed before chatting with it. We only need to do this once per video.

In more complex workflows with multiple videos, we can upload all of can be done way ahead of time to reduce overhead and speed up the end-to-end workflow.


First we create the index.

In [None]:
models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

Then we create a funciton to upload our video to be indexed.

In [None]:
# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")

In [None]:
def upload_video_to_twelve_labs_pegasus(video_path):
    print(video_path)
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

Next, we'll upload our video.

In [None]:
# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

In [None]:
single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

Finally we'll query it.

In [None]:
single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Here is Pegasus' response:
```
The video showcases a pivotal moment in a football game between the New York Giants and the New England Patriots. Eli Manning, the Giants' quarterback, throws a pass that David Tyree catches spectacularly by pinning the ball against his helmet as he falls out of bounds. Multiple angles replay the catch, emphasizing its difficulty and precision. Tyree briefly celebrates after the play, and the video ends with him and other players walking off the field.
```

From the above response, we can see that Pegagus 1.2 can coherently resopnd to the question. Now, lets check and see if we can get a similar response from the Open Source model.

In [None]:

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

### Using LLaVa-NeXT-Video to Chat with our Video
For the Open Source model, we will need to setup up a video sampling for the model to consume and load the model from the Hugging Face Hub, format the input for inference, and then run the model on our inputs. We will modify the [LLaVa-NeXT-Video Sampling code](https://colab.research.google.com/drive/1CZggLHrjxMReG-FNOmqSOdi4z7NPq6SO?usp=sharing#scrollTo=hqpPqDKuQUTq) to get a uniform sample of 40 frames for each video.



In [None]:
def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)

    sampled_frames = read_video_pyav(container, indices)

    return sampled_frames

In [None]:
sampled_video = sample_video(single_video_file, num_samples=40)

Here we'll set up our LLaVa-NeXT-Video model.

In [None]:
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

Next, we'll create a function to query our model.

In [None]:
def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

In [None]:
llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

### Output:

Here is LLaVa-NeXT-Video's ouput:

```
What is happening in this video? Be concise ASSISTANT: The video shows a football game in progress, with various players on the field. It appears to be the Super Bowl III between the New York Giants and the New England Patriots, judging by the jersey numbers and the old-fashioned helmets worn by some players. One player is in mid-action, grabbing the ball and getting tackled by another player, while a referee is signaling a first down. There are also coaches and other game
```

While this model does recognize that there is a football game happening between the Giants and the Patriots, it tends to hallucinate other facts.

In [None]:
llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

# RAG for Segment-Level Queries on a Single Video

We see that Pegasus is the clear winner on time and accuracy for this query when querying the entire video.

The open source model would likely perform better if we could constrict the video in question to a smaller segment. We can do this by creating queries that only need a subset of the video, and using RAG to get the relevant subset.

This is where the Marengo model will come in. We can use it to create embeddings for each segment of the video, and then use RAG to get the most relevant segment based on our queries.

We will start by creating embeddings for each segment of the video.

In [None]:
# Define the video segment length
segment_length = 10

### Using Marengo to Create Full Video and Video Clip Embeddings

Marengo allows us to retrieve embeddings for the entire video and for clips at a set clip length all in one call.

In [None]:
task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

We'll save the task ID for use later when uploading our embeddings to Weaviate.

In [None]:
# single_video_task_id = task.id
single_video_task_id = "67cfa68dccd453a4969c8785"

In [None]:
marengo_task_ids = {}
single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id

### Prepare Video Segments for RAG

Next, we will split this video up into segments that mirror the timestamps for each embedding. This lets us later submit _only_ this video chunk to our model for a RAG use case

In [None]:
import os
import subprocess
import json

def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.

    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5

    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]

    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]

    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0

    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)

    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)

    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False

    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")

    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration

        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")

        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]

        result = subprocess.run(cmd, capture_output=True, text=True)

        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")

    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

In [None]:
split_video(single_video_file, video_segments_dir,segment_length)

Next, we'll upload the video segments to Pegaus to get their video ids. We will upload these to Weaviate along with the embeddings, so we can easily chat with the returned video. This is a great way to speed up results when you have videos that users will chat with.

Here we'll create and populate a dictionary mapping file names with pegasus video IDs.

In [None]:
pegasus_video_ids = {}

In [None]:
segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id

    except Exception as e:
        print(f"Error processing {segment_video_file}: {str(e)}")
        continue

We'll also add the video ID for the full video that we retrieved earlier

In [None]:
fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

In [None]:
print(pegasus_video_ids)

We'll also sample all of our videos for use with the LLaVa-NeXT-Video model

In [None]:
sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

### Uploading Embeddings to Weaviate

Now we'll create a function to prepare our data to be uploaded to Weaviate

In [None]:
def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():


        marengo_task_id = marengo_task_ids[video_file_name]

        # Retreive marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")

            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name]

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }

            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]

            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

In [None]:
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

Now, we'll upload the data to our collection

In [None]:
with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")

### Testing the Vector Search
Now that we have everything in the collection, we can test and see that it properly returns the correct sample `'video_name':5.0`

In [None]:
from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,  # Increased limit to get more results
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

### Querying our Vector Database with Text Embeddings

To query the database, first we'll embed our text query with Marengo's text embedding feature. Then we will query the Weaviate database for the clip embedding that best matches our question embeddings. We will then use the pegasus video ID to ask our question for that clip.

In [None]:
sample_question = "What technique did David Tyree use to catch the ball?"

In [None]:
embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")
print(video_file)


In [None]:
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

### Chatting with our Video Segment: Pegasus vs LLaVa-NeTX-Video

Pegasus:

In [None]:
print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

LLaVa-NeXT-Video

In [None]:
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

In [None]:
print(generated_text)

## Multi Video RAG with Marengo, Weaviate, and Pegasus

Now that we know how Marengo embeddings perform on individual clips from a single video, we will show how to use embeddings across mutiple videos for a more realistic RAG use case

### Get Marengo Embeddings for All Videos

In [None]:
for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id


In [None]:
print(marengo_task_ids)

### Split our Remaining Videos into Segments

In [None]:
# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)

### Get Pegasus Video IDs for All Videos and their Segments

Finally, we will upload the full videos and their segments to Pegasus so we can chat with them. We will paralellize this task to speed it up.

In [None]:
import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}

    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")

In [None]:
print(pegasus_video_ids)

### Upload Data to Weaviate
First we'll prepare our data to be uploaded

In [None]:
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

Then, we will upload it to our collection.

In [None]:
with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")

### RAG Questions


We now have Marengo embeddings and Pegasus video IDs upload to Weaviate.

We can assess the performance of running queries on the clips and the full video in terms of answer accuracy and speed.

In [None]:
video_questions = [
    "In the American Football Video, what are the teams playing?",
    "Which arm does Eli Manning throw the ball with?",
    "In the tennis match video, who is playing?",
    "What foot does Messi shoot at the goal with?",
    "When Does Keri Strug hurt her foot?"
]

### Multi Video RAG with Pegasus

In [None]:
from weaviate.classes.query import MetadataQuery, Filter
import time

pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

In [None]:
pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

In [None]:
for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

### Multi Video RAG with LLaVa-NeXT-Video
Now we can run our model on the full video, which outputs some more interesting answers

First we'll sample the rest of our video segments

In [None]:
for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

In [None]:
llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

In [None]:
from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

In [None]:
for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")