# Video Q/A with Videoprism and Parakeet-v3

<a target="_blank" href="https://colab.research.google.com/github/everettVT/daft-video-embeddings/blob/main/friction/UNFILTERED_Pt2_videoprism_parakeet.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

Videoprism is a general-purpose video encoder designed to tackle a wide spectrum of video understanding tasks, including classification localization, retrieval, captioning, and question answering.

Parakeet is a 600-million-parameter multilingual automatic speech recognition (ASR) model designed for high-throughput speech-to-text transcription.

In this notebook, we will explore how to leverage these foundational models to generate video and text embeddings from youtube or any video file.

Video processing requires us to extract both image and audio frames, which can then use to generate embeddings. In this use case we will be transcribing the audio to text segments so we can perform RAG Q/A against both the visual and spoken content.


In [1]:
# @title Prepare environment

import os
import sys

# Fetch VideoPrism repository if Python does not know about it and install
# dependencies needed for this notebook.
if not os.path.exists("videoprism_repo"):
  !git clone --quiet --branch=main --depth=1 \
     https://github.com/everettVT/videoprism.git videoprism_repo
  os.chdir('./videoprism_repo')
  !pip install .
  os.chdir('..')

# Append VideoPrism code to Python import path.
if "videoprism_repo" not in sys.path:
  sys.path.append("videoprism_repo")

Processing /content/videoprism_repo
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting einshape (from videoprism==1.0.0)
  Downloading einshape-1.0-py3-none-any.whl.metadata (706 bytes)
Downloading einshape-1.0-py3-none-any.whl (21 kB)
Building wheels for collected packages: videoprism
  Building wheel for videoprism (setup.py) ... [?25l[?25hdone
  Created wheel for videoprism: filename=videoprism-1.0.0-py3-none-any.whl size=40354 sha256=e2dc98c1b6036cd05205b68c747b78e1f3bf4c32a35163a4c952fb56d3fa3d91
  Stored in directory: /tmp/pip-ephem-wheel-cache-fd5cjzii/wheels/e3/73/3c/3dc3551ff92b46a1e55f9a893f2d5b8fdc55d670bd73d3b605
Successfully built videoprism
Installing collected packages: einshape, videoprism
Successfully installed einshape-1.0 videoprism-1.0.0


In [4]:
!pip install "daft>=0.6.1" av yt-dlp "jax[cuda12]" "nemo_toolkit[asr]"

Collecting daft>=0.6.1
  Using cached daft-0.6.1-cp39-abi3-manylinux_2_24_x86_64.whl.metadata (12 kB)
Collecting av
  Using cached av-15.1.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (4.6 kB)
Collecting yt-dlp
  Using cached yt_dlp-2025.9.5-py3-none-any.whl.metadata (177 kB)
Collecting nemo_toolkit[asr]
  Using cached nemo_toolkit-2.4.0-py3-none-any.whl.metadata (91 kB)
Collecting fsspec (from daft>=0.6.1)
  Using cached fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Collecting onnx>=1.7.0 (from nemo_toolkit[asr])
  Using cached onnx-1.19.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (7.0 kB)
Collecting ruamel.yaml (from nemo_toolkit[asr])
  Using cached ruamel.yaml-0.18.15-py3-none-any.whl.metadata (25 kB)
Collecting wget (from nemo_toolkit[asr])
  Using cached wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting braceexpand (from nemo_toolkit[asr])
  Using cached braceexpand-0.1.7-py2.py3-none-any.whl.metadata (3.0 kB)


In [1]:
import daft
from daft import col, DataType as dt
import numpy as np
import jax
import jax.numpy as jnp
from jax.extend import backend
import tensorflow as tf
from videoprism import models as vp
print(jax.devices())    # should list a CUDA device

[CudaDevice(id=0)]


### Define Parameters

Tensor Dimensions:
- B: batch size (number of videos in a batch).
- T: number of frames per video clip (typically 16).
- N: tokens per frame (for 288×288 with 18×18 patches → 16×16 = 256).
- D: embedding dimension (Base: 768; Large: 1024).

VideoPrism supports video+text inputs and returns:
- video_embeddings: [B, D] (global video embeddings).
- text_embeddings: [B, D] (global text embeddings).
- Optional: frame_embeddings [B, T, D]; tokens [B, T×N, D]

In [2]:
B, T, H, W, C = 24, 16, 288, 288, 3
ROW_LIMIT = 2048

Download first 5 videos from your favorite youtube playlist

In [3]:
!yt-dlp -I 1:6 https://www.youtube.com/playlist?list=PL3Q1vFKgSohNO4mbMKo5xccOsYWISUlou

[youtube:tab] Extracting URL: https://www.youtube.com/playlist?list=PL3Q1vFKgSohNO4mbMKo5xccOsYWISUlou
[youtube:tab] PL3Q1vFKgSohNO4mbMKo5xccOsYWISUlou: Downloading webpage
[youtube:tab] PL3Q1vFKgSohNO4mbMKo5xccOsYWISUlou: Redownloading playlist API JSON with unavailable videos
[download] Downloading playlist: Data Topic Deep Dives
[youtube:tab] Playlist Data Topic Deep Dives: Downloading 6 items of 19
[download] Downloading item [0;32m1[0m of [0;94m6[0m
[youtube] Extracting URL: https://www.youtube.com/watch?v=WAsmZJ2kff0
[youtube] WAsmZJ2kff0: Downloading webpage
[youtube] WAsmZJ2kff0: Downloading tv simply player API JSON
[youtube] WAsmZJ2kff0: Downloading tv client config
[youtube] WAsmZJ2kff0: Downloading player 4f8fa943-main
[youtube] WAsmZJ2kff0: Downloading tv player API JSON
[info] WAsmZJ2kff0: Downloading 1 format(s): 303+251
[download] Sleeping 2.00 seconds as required by the site...
[download] Destination: GPU Pipeline Optimization Explained ｜ Async UDFs, CUDA Streams &

Discover the file paths

In [9]:
from daft.functions import file
df_files = daft.from_glob_path("/content").with_column("file", file(col("path")))
df_files.show(5)

path Utf8,size Int64,num_rows Int64,file File
file:///content/Why Your Image Processing Pipeline Keeps Running Out of Memory [4CGQ-c7iivg].mkv,36050736,,"File(Reference(""file:///content/Why Your Image Processing Pipeline Keeps Running Out of Memory [4CGQ-c7iivg].mkv"", None))"
file:///content/Build Scalable Batch Inference Pipelines in 3 Lines ｜ Daft + GPT⧸vLLM [wKOC_w4oKO8].mkv,27825597,,"File(Reference(""file:///content/Build Scalable Batch Inference Pipelines in 3 Lines ｜ Daft + GPT⧸vLLM [wKOC_w4oKO8].mkv"", None))"
"file:///content/GPU Pipeline Optimization Explained ｜ Async UDFs, CUDA Streams & Pinned Memory [WAsmZJ2kff0].webm",134080404,,"File(Reference(""file:///content/GPU Pipeline Optimization Explained ｜ Async UDFs, CUDA Streams & Pinned Memory [WAsmZJ2kff0].webm"", None))"
file:///content/Near-100% GPU Utilization： Embedding Millions of Text Documents With Qwen3 [WI83lRzk7YE].mkv,37924770,,"File(Reference(""file:///content/Near-100% GPU Utilization： Embedding Millions of Text Documents With Qwen3 [WI83lRzk7YE].mkv"", None))"


 Get Video File Metadata

In [17]:
import av
with av.open("file:///content/Near-100% GPU Utilization： Embedding Millions of Text Documents With Qwen3 [WI83lRzk7YE].mkv") as container:
    print(container.)
    container.metadata

{'COMPATIBLE_BRANDS': 'iso6avc1mp41', 'MAJOR_BRAND': 'dash', 'MINOR_VERSION': '0', 'ENCODER': 'Lavf58.76.100'}


In [13]:
@daft.func(return_dtype=dt.struct({
    "width": dt.int32(),
    "height": dt.int32(),
    "frames": dt.int64(),
    "FPS": dt.int32()
}))
def read_video_metadata(file: daft.File):
    import av
    container = av.open(file)
    metadata = {
        "width": container.streams.video[0].width,
        "height": container.streams.video[0].height,
        "frame_count": container.streams.video[0].frames,
        "FPS": container.streams.video[0].framerate
    }
    return metadata

In [14]:
df_meta = df_files.with_column("metadata", read_video_metadata(df_files["file"])).collect()
df_meta.show(5)



🗡️ 🐟 InMemorySource: 00:00 

🗡️ 🐟 Project: 00:00 

🗡️ 🐟 UDF read_video_metadata: 00:00 

ERROR:daft_local_execution:Error when running pipeline node UDF read_video_metadata


AttributeError: 'VideoStream' object has no attribute 'framerate'

Extract Audio from Video, Transcribe and Embed

In [None]:
import av
from av.audio.resampler import AudioResampler

@daft.func()
def extract_audio_frames_into_numpy_arrays(file: daft.File) -> np.ndarray:

    container = av.open(file)
    resampler = AudioResampler(format='s16', layout='mono', rate=16000)

    chunks = []
    try:
        for frame in container.decode(audio=0):
            # Resample to desired SR/mono/PCM16; result can be a frame or list of frames
            res = resampler.resample(frame)
            frames = res if isinstance(res, (list, tuple)) else [res]

            for f in frames:
                arr = f.to_ndarray()  # typically (channels, samples) or (samples,)
                arr = np.asarray(arr)

                # Flatten to 1-D mono
                if arr.ndim == 2:
                    # (1, N) or (N, 1) → (N,)
                    if arr.shape[0] == 1:
                        arr = arr[0]
                    elif arr.shape[1] == 1:
                        arr = arr[:, 0]
                    else:
                        # Unexpected multi-channel after mono resample: average as fallback
                        arr = arr.mean(axis=0)
                elif arr.ndim > 2:
                    arr = arr.reshape(-1)

                # Convert PCM16 → float32 in [-1, 1]
                if arr.dtype != np.float32:
                    arr = (arr.astype(np.float32) / 32768.0).clip(-1.0, 1.0)

                chunks.append(arr)
    finally:
        container.close()

    if not chunks:
        return np.zeros((0,), dtype=np.float32)

    audio = np.concatenate(chunks, axis=0).astype(np.float32, copy=False)
    return audio



In [None]:
@daft.udf(return_dtype = dt.string())
class ParakeetTranscribeUDF:
    def __init__(self, context_size: int = 256):
        import nemo.collections.asr as nemo_asr
        self.asr_model = nemo_asr.models.ASRModel.from_pretrained(model_name="nvidia/parakeet-tdt-0.6b-v3")
        self.asr_model.change_attention_model(
            self_attention_model="rel_pos_local_attn",
            att_context_size=[context_size, context_size]
        )

    def __call__(self, audio: list[np.ndarray]):
        outputs = self.asr_model.transcribe(audio)
        texts = [o.text for o in outputs]
        return texts



In [None]:

# Parakeet Transcribe with Timestamps
@daft.udf(return_dtype = dt.struct({
    "segment": dt.list(dt.struct({
        "start_offset": dt.int32(),
        "end_offset": dt.int32(),
        "start": dt.float32(),
        "end": dt.float32()
    })),
}))
class ParakeetTranscribeTimestampsUDF:
    def __init__(self, context_size: int = 256):
        import nemo.collections.asr as nemo_asr
        self.asr_model = nemo_asr.models.ASRModel.from_pretrained(model_name="nvidia/parakeet-tdt-0.6b-v3")
        self.asr_model.change_attention_model(
            self_attention_model="rel_pos_local_attn",
            att_context_size=[context_size, context_size]
        )

    def __call__(self, audio: list[np.ndarray]):
        outputs = self.asr_model.transcribe(audio, timestamps=True)   # No public flag to emit only segments
        return [o.timestamp["segment"] for o in outputs]


In [None]:
@daft.udf(
    return_dtype = dt.embedding(dt.float32(), 768),
    batch_size=B, # clips per batch (tune for throughput)
    num_gpus=1,
)
class VideoPrismTextUDF:
    def __init__(self, model_name: str = "videoprism_lvt_public_v1_base"):
        from videoprism import models as vp
        self.model = vp.get_model(model_name)
        self.params = vp.load_pretrained_weights(model_name)
        self.text_tokenizer = vp.load_text_tokenizer('c4_en')

        @jax.jit
        def vf_b(text_ids, text_paddings):  # [B,T,288,288,3] -> [B,D]
            _, t, _ = self.model.apply(self.params, None, text_ids, text_paddings, train=False)
            return t # text embeddings

        self.vf_b = vf_b

        # Warmup both
        text_ids, text_paddings = vp.tokenize_texts(self.text_tokenizer, ["Hello", "World"])
        _ = self.vf_b(None, text_ids, text_paddings).block_until_ready()

    def __call__(self,
        prompts: list[str], # List[T,H,W,C] of len B
    ):
        # Batch Inference
        text_ids, text_paddings = vp.tokenize_texts(self.text_tokenizer, prompts)
        text_embeddings = self.vf_b(text_ids, text_paddings)

        return text_embeddings


# Using daft.File instead of read_video_frames()

Video data, as a critical type of multimodal data, uniquely integrates visual, audio, and temporal dimensions, inherently fusing spatial (image-based) and temporal information. It has been widely adopted across domains including short-video platforms, live streaming, public security, healthcare, and autonomous driving.

Given the large volume of video data, most processing paradigms typically involve streaming-based reading and processing to minimize memory footprint. This distinguishes it from image data, which generally requires full initial loading into memory prior to processing.

Thus, when introducing the Video data type into Daft, it should avoid storing the entire dataset in memory. Drawing inspiration from the File data type, we can either store merely a URL reference to the video data or directly utilize the underlying data structure of the File data type as its internal representation.

Beyond the core content of video data, it is critical to extract key metadata to facilitate subsequent filtering of target videos prior to processing. Videos encompass extensive metadata, such as frame count, resolution (height/width), time base, duration, pixel format, bit rate, codec name, and profile, among others. However, incorporating all such metadata into the Video data type is impractical from a memory efficiency standpoint. Instead, we prioritize including only essential metadata fields—specifically frame count, height, width, and FPS. Additional metadata can be dynamically retrieved during video processing as needed.


- support using different algorithms to reading/extracting accurate key frames, e.g. difference, optical flow, the default behavior of PyAV to extract key frame is based on I-frame the use the native encoding metadata pict_type='I'

- Besides extracting/reading key frames, there are other use case about video, e.g. split video by key frame, extract audio from video, etc.

 it's better to add these functions on Video or file data type instead of adding new API for each use cases.

From performance perspective, it's better to use rust library to handle video processing logical as much as possible, e.g. ffmpeg-next, even though most tools are based on ffmepg filter/pushdown video data based on thier metadata before processing them.



R Conner Howell
:daft:  Aug 28th at 11:07 AM
I agree. The idea behind the "File" type was to start with a wrapper of the appropriate python file-like protocols, then to further type into VideoFile, AudioFile, PdfFile, etc. — each of which having their own domain-specific methods such a read_frames, read_channels, read_pages etc. respectively. As you have also pointed out, this enables daft to implement this functionality in Rust as well.

In [None]:

df = df.from_glob_path("s3://bucket/videos/")

# Convert path to video directly from utf8 data type to video type. Daft should support convert from utf8 and file data type both.
df = df.with_column("video", video(col("path")))

# Filter video by video metadata.
df = df.filter((df["width"] > 1024) & (df["height"] > 576) & (df["frames"] > 100)))

# Extract the key frames, the `key_frames` function will streaming read video data
# and extract multiple key frames, the data type of each frame is FixedShapeImage. The `key_frames` might add more parameters to indicate what's the image mode of key frames.
# TODO consider whether to include some metadata for key frame to compatible with daft.read_video_frames
df = df.with_column("key_frames", key_frames(col("video"), method= "I_frame").explode("key_frames")

# Save the key frames as a dataset.
df.select("path", "key_frames").write_lance("key_frames_dataset")


### Read Video Frames

In [7]:
df_video_frames = daft.read_video_frames(
    df_files["path"],
    image_height=H,
    image_width=W,
).limit(ROW_LIMIT).collect()
df_frames.show(3)

ValueError: Expressions don't have a truth value. If you used Python keywords `and` `not` `or` on an expression, use `&` `~` `|` instead.

### Group Frames into Clips

In [None]:
df_grouped = (
    df_video_frames
    .with_column("group_index", df_frames["frame_index"] // T)
    .groupby("path", "group_index")
    .agg_list("data", "frame_index")
)
df_grouped.show(3)

### Stack, Normalize, and Cast Frames into Clip Tensors

In [None]:
@daft.func(return_dtype=dt.tensor(dt.float32(), shape=(16,288, 288, 3)))
def stack_clip(frames: list[np.ndarray], indices: list[int], clip_size: int):
    """Stacks a list of frames into a single numpy array

    Args:
        frames: List[T] of (H,W,3) float32
        indices: List[T] of int

    Returns:
        (1,T,H,W,3) float32 in [0,1]

    In a parallel/distributed groupby, a pre-group sort isn’t guaranteed
    to survive aggregation order; partitions can concatenate in
    non-deterministic order. Additionally, the image dtype is natively a
    list[uint8], so we need to cast to float32 before normalizing from
    [0,255] to [0,1].

    Steps:
    1. Aggregate both image_tensor and frame_index.
    2. Sort by frame_index inside the group-level UDF, then stack.
    3. Normalize and cast in one step.
    4. Add a batch dimension and return.

    """

    # Don't assume frames are sorted already:
    order = np.argsort(np.asarray(indices))

    # Convert Daft Image to np.ndarray
    def to_np(x):
        if hasattr(x, "to_numpy"):
            return x.to_numpy()          # Daft Image -> np.ndarray (H,W,C) uint8
        return np.asarray(x)

    # Sort frames by frame_index
    frames_sorted = [to_np(frames[i]) for i in order]

    # Ensure Tails are padded with duplicates
    if len(order) < clip_size:
        frames_sorted.extend([frames_sorted[-1]] * (clip_size - len(order)))

    # Stack, Normalize, and Cast in one step
    x = np.stack(frames_sorted[:clip_size], axis=0).astype(np.float32) / 255.0 # (T,H,W,3) float32 in [0,1]

    return x # [1,T,H,W,C] where T=clip_size

df_clips = df_grouped.with_column("clip", stack_clip(df_grouped["data"], df_grouped["frame_index"], clip_size=T))
df_clips.show(3)


### Define Inference Strategy

In [None]:
@daft.udf(
    return_dtype = dt.embedding(dt.float32(), 768),
    batch_size=B, # clips per batch (tune for throughput)
    num_gpus=1,
)
class VideoPrismVideoUDF:
    def __init__(self, model_name: str = "videoprism_lvt_public_v1_base"):
        "for 'videoprism_lvt_public_v1_large', set T = 8"

        from videoprism import models as vp
        self.model = vp.get_model(model_name)
        self.params = vp.load_pretrained_weights(model_name)

        @jax.jit
        def vf_b(clips):  # [B,T,288,288,3] -> [B,D]
            v, _, _ = self.model.apply(
                self.params,
                clips,
                None,
                None,
                train=False
            )
            return v

        self.vf_b = vf_b

        # Warmup both
        _ = self.vf_b(jnp.zeros((B, T, H, W, C), jnp.float32)).block_until_ready()

    def __call__(self,
        clips: list[np.ndarray], # List[T,H,W,C] of len B
    ):
        # Batch Inference
        xb = jnp.stack(clips, axis=0) # [B,T,H,W,C]
        video_embeddings = self.vf_b(xb) # [B,768]
        np_embeddings = np.asarray(video_embeddings)  # Back to NumPy
        return [np_embeddings[i].tolist() for i in range(B)]



Previous runs with 24 batches of 16 frame clips processed in 128 sec.

In [None]:
print(f"Video Embeddings will process {B} clips of {T} frame each at {W}x{H}x{3}")

df_clips_few = df_clips.sort("group_index").collect()
df_video_embs = df_clips_few.with_column("video_embeddings", VideoPrismVideoUDF(df_clips_few["clip"])).collect()


In [None]:
df_video_embs.select("group_index","video_embeddings", "clip").count_rows()

## Appendix

In [None]:
# Parakeet Transcribe with Timestamps
@daft.udf(return_dtype = dt.struct({
    "word": dt.list(dt.struct({
        "word": dt.string(),
        "start_offset": dt.int32(),
        "end_offset": dt.int32(),
        "start": dt.float32(),
        "end": dt.float32()
    })),
    "segment": dt.list(dt.struct({
        "start_offset": dt.int32(),
        "end_offset": dt.int32(),
        "start": dt.float32(),
        "end": dt.float32()
    })),
    "char": dt.list(dt.struct({
        "char": dt.string(),
        "start_offset": dt.int32(),
        "end_offset": dt.int32(),
        "start": dt.float32(),
        "end": dt.float32()
    })),
}))
class ParakeetTranscribeTimestampsUDF:
    def __init__(self, context_size: int = 256):
        import nemo.collections.asr as nemo_asr
        self.asr_model = nemo_asr.models.ASRModel.from_pretrained(model_name="nvidia/parakeet-tdt-0.6b-v3")
        self.asr_model.change_attention_model(
            self_attention_model="rel_pos_local_attn",
            att_context_size=[context_size, context_size]
        )

    def __call__(self, audio: list[np.ndarray]):
        outputs = self.asr_model.transcribe(audio, timestamps=True)   # No public flag to emit only segments
        return [o.timestamp for o in outputs]

In [None]:
class DiarizationSortFormerUDF:
    def __init__(self, context_size: int = 256):
        from nemo.collections.asr.models import SortformerEncLabelModel
        self.diar_model = SortformerEncLabelModel.from_pretrained("nvidia/diar_streaming_sortformer_4spk-v2")
        self.diar_model.eval() # Switch to inference mode

    def __call__(self, audio: list[np.ndarray]):
        outputs = self.asr_model.transcribe(audio, timestamps=True)   # No public flag to emit only segments
        return [o.timestamp for o in outputs]

In [18]:
import av

probe_info = av.probe("/content/Near-100% GPU Utilization： Embedding Millions of Text Documents With Qwen3 [WI83lRzk7YE].mkv")
print(probe_info)

AttributeError: module 'av' has no attribute 'probe'