# Person Re-ID Gallery Pipeline (H100)

This notebook demonstrates a DeepStream pipeline that:
1. **Detects persons** using a primary detector (PGIE)
2. **Tracks persons** across frames using nvtracker
3. **Extracts Re-ID embeddings** using OSNet as an SGIE
4. **Stores embeddings** in ChromaDB when tracks end

## Pipeline Architecture

```
filesrc → h264parse → nvv4l2decoder → nvstreammux → 
pgie (person detector) → nvtracker → 
sgie (OSNet Re-ID) → 
nvvideoconvert → nvdsosd → nvvideoconvert → 
x264enc → filesink
```

## Embedding Strategy: Store on Track End

- Accumulate embeddings in memory while person is tracked
- Average embeddings when track ends (30 frame grace period)
- Store single averaged embedding per person in ChromaDB
- Finalize all remaining tracks at end of stream (EOS)


## Prerequisites

Before running this notebook:


1. **Install ChromaDB** (inside container):
   ```bash
   pip install chromadb
   ```


In [None]:
# Cell 1: Install dependencies (run once)
!pip install chromadb -q


In [None]:
# Cell 2: Import Required Libraries
import sys
import os
import time
import ctypes
from collections import defaultdict

import numpy as np
import chromadb

sys.path.append('/opt/nvidia/deepstream/deepstream-8.0/sources/deepstream_python_apps/apps')

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib
from common.bus_call import bus_call
import pyds

Gst.init(None)

print(f"GStreamer version: {Gst.version_string()}")
print(f"ChromaDB version: {chromadb.__version__}")


In [None]:
# Cell 3: Configuration

# Class IDs from PGIE
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3

# Paths - adjust as needed
PIPELINE_DIR = '/app/notebooks/reid_pipeline'
INPUT_VIDEO = '/opt/nvidia/deepstream/deepstream-8.0/samples/streams/sample_720p.h264'
OUTPUT_VIDEO = f'{PIPELINE_DIR}/output_reid.mp4'

# Model configs
PGIE_CONFIG = '/opt/nvidia/deepstream/deepstream-8.0/sources/deepstream_python_apps/apps/deepstream-test1/dstest1_pgie_config.txt'
REID_SGIE_CONFIG = f'{PIPELINE_DIR}/reid_sgie_config.txt'

# Tracker config
TRACKER_LIB = '/opt/nvidia/deepstream/deepstream-8.0/lib/libnvds_nvmultiobjecttracker.so'
TRACKER_CONFIG = '/opt/nvidia/deepstream/deepstream-8.0/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml'

# Re-ID settings
EMBEDDING_DIM = 512  # OSNet output dimension
GRACE_FRAMES = 30    # Frames to wait before declaring track ended

# ChromaDB gallery path
GALLERY_PATH = f'{PIPELINE_DIR}/gallery'

print("Configuration:")
print(f"  Input video: {INPUT_VIDEO}")
print(f"  Output video: {OUTPUT_VIDEO}")
print(f"  PGIE config: {PGIE_CONFIG}")
print(f"  Re-ID SGIE config: {REID_SGIE_CONFIG}")
print(f"  Embedding dim: {EMBEDDING_DIM}")
print(f"  Grace period: {GRACE_FRAMES} frames")
print(f"  Gallery path: {GALLERY_PATH}")


In [None]:
# Cell 4: Check if OSNet model exists

osnet_path = f'{PIPELINE_DIR}/models/osnet/osnet_x1_0.onnx'
if os.path.exists(osnet_path):
    print(f"OSNet model found: {osnet_path}")
    print(f"  Size: {os.path.getsize(osnet_path) / (1024*1024):.2f} MB")
else:
    print(f"WARNING: OSNet model NOT found at {osnet_path}")
    print("\nPlease export and copy the model:")
    print("1. Run export_osnet.py outside the container")
    print("2. Copy osnet_x1_0.onnx to models/osnet/")
    print("\nThe pipeline will fail without the model.")


In [None]:
# Cell 5: Initialize ChromaDB

# Create persistent ChromaDB client
chroma_client = chromadb.PersistentClient(path=GALLERY_PATH)

# Create or get the embeddings collection
# Using cosine similarity for Re-ID (embeddings are typically L2 normalized)
collection = chroma_client.get_or_create_collection(
    name="person_embeddings",
    metadata={"hnsw:space": "cosine"}
)

print(f"ChromaDB initialized at: {GALLERY_PATH}")
print(f"Collection: person_embeddings")
print(f"Existing entries: {collection.count()}")


In [None]:
# Cell 6: Track Management - Global State

# These persist across probe calls
active_tracks = {}       # {track_id: [embedding1, embedding2, ...]}
track_last_seen = {}     # {track_id: last_frame_num}
track_metadata = {}      # {track_id: {first_frame, best_confidence, best_bbox}}
finalized_count = 0      # Counter for stored embeddings

def reset_tracking_state():
    """Reset all tracking state (call before running pipeline)"""
    global active_tracks, track_last_seen, track_metadata, finalized_count
    active_tracks = {}
    track_last_seen = {}
    track_metadata = {}
    finalized_count = 0
    print("Tracking state reset")

print("Track management state initialized")


In [None]:
# Cell 7: Embedding Extraction Function

def extract_reid_embedding(obj_meta):
    """
    Extract Re-ID embedding from object's tensor metadata.
    
    The Re-ID SGIE outputs tensor data (not class labels) because
    we set output-tensor-meta=1 in the config.
    
    Args:
        obj_meta: NvDsObjectMeta for a detected person
        
    Returns:
        numpy array of shape (512,) or None if extraction fails
    """
    l_user = obj_meta.obj_user_meta_list
    
    while l_user is not None:
        try:
            user_meta = pyds.NvDsUserMeta.cast(l_user.data)
        except StopIteration:
            break
        
        # Check if this is tensor output metadata
        if user_meta.base_meta.meta_type != pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META:
            try:
                l_user = l_user.next
            except StopIteration:
                break
            continue
        
        # Cast to tensor meta
        tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
        
        # Get the first output layer (embedding)
        layer = pyds.get_nvds_LayerInfo(tensor_meta, 0)
        
        # Get pointer to the buffer
        ptr = ctypes.cast(pyds.get_ptr(layer.buffer), ctypes.POINTER(ctypes.c_float))
        
        # Copy to numpy array (GPU -> CPU)
        embedding = np.ctypeslib.as_array(ptr, shape=(EMBEDDING_DIM,)).copy()
        
        # L2 normalize the embedding
        norm = np.linalg.norm(embedding)
        if norm > 0:
            embedding = embedding / norm
        
        return embedding
        
    return None

print("Embedding extraction function defined")


In [None]:
# Cell 8: Track Finalization Function

def finalize_track(track_id, source_id=0):
    """
    Finalize a track: average embeddings and store to ChromaDB.
    
    Called when a track hasn't been seen for GRACE_FRAMES frames,
    indicating the person has left the scene.
    
    Args:
        track_id: The tracker's object ID
        source_id: Camera/source identifier (for multi-camera)
    """
    global finalized_count
    
    if track_id not in active_tracks:
        return
    
    embeddings = active_tracks[track_id]
    metadata = track_metadata.get(track_id, {})
    
    if len(embeddings) == 0:
        print(f"  Track {track_id}: No embeddings to store")
        return
    
    # Average all embeddings for this track
    avg_embedding = np.mean(embeddings, axis=0)
    
    # L2 normalize the averaged embedding
    norm = np.linalg.norm(avg_embedding)
    if norm > 0:
        avg_embedding = avg_embedding / norm
    
    # Create unique ID for ChromaDB
    entry_id = f"src{source_id}_track{track_id}_{int(time.time()*1000)}"
    
    # Store to ChromaDB
    collection.add(
        embeddings=[avg_embedding.tolist()],
        ids=[entry_id],
        metadatas=[{
            "track_id": int(track_id),
            "source_id": int(source_id),
            "num_frames": len(embeddings),
            "first_frame": metadata.get('first_frame', 0),
            "last_frame": metadata.get('last_frame', 0),
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }]
    )
    
    finalized_count += 1
    print(f"  Stored track {track_id}: {len(embeddings)} frames averaged -> {entry_id}")
    
    # Cleanup
    del active_tracks[track_id]
    if track_id in track_last_seen:
        del track_last_seen[track_id]
    if track_id in track_metadata:
        del track_metadata[track_id]


def finalize_all_tracks(source_id=0):
    """
    Finalize all remaining active tracks.
    Called at end of stream (EOS) to ensure no data is lost.
    """
    print(f"\nFinalizing {len(active_tracks)} remaining tracks...")
    for track_id in list(active_tracks.keys()):
        finalize_track(track_id, source_id)
    print(f"Total embeddings stored: {finalized_count}")

print("Track finalization functions defined")


In [None]:
# Cell 9: Buffer Probe Function

def reid_sink_pad_buffer_probe(pad, info, u_data):
    """
    Buffer probe to extract Re-ID embeddings and manage tracks.
    
    This function is called for every frame passing through the pipeline.
    It:
    1. Extracts embeddings for each detected person
    2. Accumulates embeddings in active_tracks
    3. Checks for ended tracks (grace period expired)
    4. Finalizes and stores ended tracks to ChromaDB
    """
    global active_tracks, track_last_seen, track_metadata
    
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        return Gst.PadProbeReturn.OK
    
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break
        
        frame_num = frame_meta.frame_num
        source_id = frame_meta.source_id
        current_track_ids = set()
        person_count = 0
        embedding_count = 0
        
        l_obj = frame_meta.obj_meta_list
        
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            
            # Only process persons
            if obj_meta.class_id == PGIE_CLASS_ID_PERSON:
                person_count += 1
                track_id = obj_meta.object_id
                current_track_ids.add(track_id)
                
                # Extract Re-ID embedding
                embedding = extract_reid_embedding(obj_meta)
                
                if embedding is not None:
                    embedding_count += 1
                    
                    # Initialize track if new
                    if track_id not in active_tracks:
                        active_tracks[track_id] = []
                        track_metadata[track_id] = {
                            'first_frame': frame_num,
                            'source_id': source_id
                        }
                    
                    # Accumulate embedding
                    active_tracks[track_id].append(embedding)
                    track_last_seen[track_id] = frame_num
                    track_metadata[track_id]['last_frame'] = frame_num
            
            try:
                l_obj = l_obj.next
            except StopIteration:
                break
        
        # Check for ended tracks (not seen for GRACE_FRAMES)
        ended_tracks = []
        for track_id, last_frame in list(track_last_seen.items()):
            if frame_num - last_frame > GRACE_FRAMES:
                ended_tracks.append(track_id)
        
        # Finalize ended tracks
        for track_id in ended_tracks:
            finalize_track(track_id, source_id)
        
        # Print progress every 100 frames
        if frame_num % 100 == 0:
            print(f"Frame {frame_num}: {person_count} persons, {embedding_count} embeddings, "
                  f"{len(active_tracks)} active tracks, {finalized_count} stored")
        
        try:
            l_frame = l_frame.next
        except StopIteration:
            break
    
    return Gst.PadProbeReturn.OK

print("Buffer probe function defined")


In [None]:
# Cell 10: Helper function to create GStreamer elements

def make_elm_or_print_err(factoryname, name, printedname, detail=""):
    """Create a GStreamer element or print error message"""
    print(f"Creating {printedname}...")
    elm = Gst.ElementFactory.make(factoryname, name)
    if not elm:
        sys.stderr.write(f"Unable to create {printedname}\n")
        if detail:
            sys.stderr.write(detail)
    return elm

print("Helper function defined")


In [None]:
# Cell 11: Create Pipeline Elements

print("\n" + "="*60)
print("CREATING PIPELINE")
print("="*60)

# Reset tracking state
reset_tracking_state()

# Create Pipeline
pipeline = Gst.Pipeline()
if not pipeline:
    sys.stderr.write("Unable to create Pipeline\n")

# Source elements
source = make_elm_or_print_err("filesrc", "file-source", "File Source")
h264parser = make_elm_or_print_err("h264parse", "h264-parser", "H264 Parser")
decoder = make_elm_or_print_err("nvv4l2decoder", "nvv4l2-decoder", "NV Decoder")

# Stream muxer
streammux = make_elm_or_print_err("nvstreammux", "stream-muxer", "Stream Muxer")

# Primary inference (person detection)
pgie = make_elm_or_print_err("nvinfer", "primary-inference", "Primary Inference (Person Detector)")

# Tracker
tracker = make_elm_or_print_err("nvtracker", "tracker", "NV Tracker")

# Secondary inference (Re-ID)
reid_sgie = make_elm_or_print_err("nvinfer", "reid-inference", "Re-ID SGIE (OSNet)")

# Video processing
nvvidconv = make_elm_or_print_err("nvvideoconvert", "convertor", "NV Video Converter 1")
nvosd = make_elm_or_print_err("nvdsosd", "onscreendisplay", "On-Screen Display")
nvvidconv2 = make_elm_or_print_err("nvvideoconvert", "convertor2", "NV Video Converter 2")
capsfilter = make_elm_or_print_err("capsfilter", "caps", "Caps Filter")

# H100: Software encoder path
sw_videoconvert = make_elm_or_print_err("videoconvert", "sw-videoconvert", "Software Video Converter")
encoder = make_elm_or_print_err("x264enc", "encoder", "H264 Software Encoder")
h264parser2 = make_elm_or_print_err("h264parse", "h264-parser2", "H264 Parser 2")
mp4mux = make_elm_or_print_err("mp4mux", "mp4mux", "MP4 Muxer")
sink = make_elm_or_print_err("filesink", "filesink", "File Sink")

print("\nAll elements created!")


In [None]:
# Cell 12: Configure Pipeline Elements

print("\n" + "="*60)
print("CONFIGURING ELEMENTS")
print("="*60)

# Source
source.set_property('location', INPUT_VIDEO)
print(f"Source: {INPUT_VIDEO}")

# Stream muxer
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', 1)
streammux.set_property('batched-push-timeout', 4000000)
print("Stream muxer: 1920x1080, batch-size=1")

# Primary inference (person detector)
pgie.set_property('config-file-path', PGIE_CONFIG)
print(f"PGIE config: {PGIE_CONFIG}")

# Tracker
tracker.set_property('tracker-width', 640)
tracker.set_property('tracker-height', 384)
tracker.set_property('ll-lib-file', TRACKER_LIB)
tracker.set_property('ll-config-file', TRACKER_CONFIG)
tracker.set_property('gpu-id', 0)
print(f"Tracker: NvDCF")

# Re-ID SGIE
reid_sgie.set_property('config-file-path', REID_SGIE_CONFIG)
print(f"Re-ID SGIE config: {REID_SGIE_CONFIG}")

# Caps filter (H100: CPU memory for x264enc)
caps = Gst.Caps.from_string("video/x-raw, format=I420")
capsfilter.set_property("caps", caps)
print("Caps: I420 (CPU memory)")

# Encoder
encoder.set_property('bitrate', 4000)
encoder.set_property('speed-preset', 'ultrafast')
encoder.set_property('tune', 'zerolatency')
print("Encoder: x264enc @ 4 Mbps")

# Sink
sink.set_property('location', OUTPUT_VIDEO)
sink.set_property('sync', False)
print(f"Output: {OUTPUT_VIDEO}")

print("\nAll elements configured!")


In [None]:
# Cell 13: Build Pipeline

print("\n" + "="*60)
print("BUILDING PIPELINE")
print("="*60)

# Add elements to pipeline
print("Adding elements to pipeline...")
pipeline.add(source)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(reid_sgie)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(nvvidconv2)
pipeline.add(capsfilter)
pipeline.add(sw_videoconvert)
pipeline.add(encoder)
pipeline.add(h264parser2)
pipeline.add(mp4mux)
pipeline.add(sink)
print("All elements added")

# Link elements
print("\nLinking elements...")

# Source -> Parser -> Decoder
source.link(h264parser)
h264parser.link(decoder)
print("Linked: source -> h264parser -> decoder")

# Decoder -> Streammux (special pad handling)
sinkpad = streammux.request_pad_simple("sink_0")
srcpad = decoder.get_static_pad("src")
srcpad.link(sinkpad)
print("Linked: decoder -> streammux")

# Streammux -> PGIE -> Tracker -> Re-ID SGIE
streammux.link(pgie)
pgie.link(tracker)
tracker.link(reid_sgie)
print("Linked: streammux -> pgie -> tracker -> reid_sgie")

# Re-ID SGIE -> Video processing -> Encoder -> Sink
reid_sgie.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(nvvidconv2)
nvvidconv2.link(capsfilter)
capsfilter.link(sw_videoconvert)
sw_videoconvert.link(encoder)
encoder.link(h264parser2)
h264parser2.link(mp4mux)
mp4mux.link(sink)
print("Linked: reid_sgie -> nvvidconv -> nvosd -> encoder -> sink")

print("\nPipeline built successfully!")


In [None]:
# Cell 14: Attach Buffer Probe

# Attach probe to Re-ID SGIE's source pad
# This is where we extract embeddings after Re-ID inference
reid_sgie_srcpad = reid_sgie.get_static_pad("src")
if not reid_sgie_srcpad:
    sys.stderr.write("Unable to get src pad of reid_sgie\n")
else:
    reid_sgie_srcpad.add_probe(Gst.PadProbeType.BUFFER, reid_sink_pad_buffer_probe, 0)
    print("Buffer probe attached to Re-ID SGIE src pad")
    print("Embeddings will be extracted and accumulated here")


In [None]:
# Cell 15: Setup Bus Message Handler

loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

print("Bus message handler configured")


In [None]:
# Cell 16: Run the Pipeline

print("\n" + "="*60)
print("STARTING RE-ID PIPELINE")
print("="*60)
print(f"Input: {INPUT_VIDEO}")
print(f"Output video: {OUTPUT_VIDEO}")
print(f"Gallery: {GALLERY_PATH}")
print("="*60 + "\n")

start_time = time.time()

# Start pipeline
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
    print("ERROR: Unable to set pipeline to PLAYING state")
else:
    try:
        loop.run()
    except KeyboardInterrupt:
        print("\nInterrupted by user")
    except Exception as e:
        print(f"\nError: {e}")
    finally:
        # Finalize all remaining tracks at EOS
        finalize_all_tracks(source_id=0)
        
        # Cleanup
        print("\nCleaning up...")
        pipeline.set_state(Gst.State.NULL)
        
        elapsed_time = time.time() - start_time
        print(f"\n" + "="*60)
        print("PIPELINE COMPLETED")
        print(f"  Time elapsed: {elapsed_time:.2f} seconds")
        print(f"  Total embeddings stored: {finalized_count}")
        print(f"  Gallery entries: {collection.count()}")
        print(f"  Output video: {OUTPUT_VIDEO}")
        print("="*60)


In [None]:
# Cell 17: View Gallery Statistics

print("\n" + "="*60)
print("GALLERY STATISTICS")
print("="*60)

total_entries = collection.count()
print(f"Total entries in gallery: {total_entries}")

if total_entries > 0:
    all_data = collection.get(include=['metadatas'])
    total_frames = sum(m.get('num_frames', 0) for m in all_data['metadatas'])
    avg_frames = total_frames / total_entries if total_entries > 0 else 0
    
    print(f"\nPer-entry statistics:")
    print(f"  Average frames per person: {avg_frames:.1f}")
    
    print(f"\nSample entries (first 5):")
    for i, (entry_id, metadata) in enumerate(zip(all_data['ids'][:5], all_data['metadatas'][:5])):
        print(f"  {i+1}. {entry_id}: Track {metadata.get('track_id')}, {metadata.get('num_frames')} frames")


In [None]:
# Cell 18: Query the Gallery (Similarity Search Demo)

print("\n" + "="*60)
print("GALLERY SEARCH DEMO")
print("="*60)

if collection.count() > 1:
    # Get a random entry to use as query
    all_data = collection.get(include=['embeddings', 'metadatas'], limit=1)
    query_embedding = all_data['embeddings'][0]
    query_id = all_data['ids'][0]
    
    print(f"Query: {query_id}")
    
    # Search for similar persons
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=5,
        include=['metadatas', 'distances']
    )
    
    print(f"\nTop 5 similar entries:")
    for i, (result_id, distance, metadata) in enumerate(zip(
        results['ids'][0], results['distances'][0], results['metadatas'][0]
    )):
        similarity = 1 - distance  # Cosine distance to similarity
        print(f"  {i+1}. {result_id} - Similarity: {similarity:.4f}")
else:
    print("Gallery has < 2 entries. Run the pipeline first.")


In [None]:
# Cell 19: Clear Gallery (Optional - Uncomment to use)

# WARNING: This deletes all stored embeddings!
# Uncomment the lines below to clear the gallery

# print("Clearing gallery...")
# chroma_client.delete_collection("person_embeddings")
# collection = chroma_client.create_collection(
#     name="person_embeddings",
#     metadata={"hnsw:space": "cosine"}
# )
# print(f"Gallery cleared. New count: {collection.count()}")


In [None]:
# Cell 17: View Gallery Statistics

print("\n" + "="*60)
print("GALLERY STATISTICS")
print("="*60)

total_entries = collection.count()
print(f"Total entries in gallery: {total_entries}")

if total_entries > 0:
    # Get all entries
    all_data = collection.get(include=['metadatas'])
    
    # Analyze
    total_frames = sum(m.get('num_frames', 0) for m in all_data['metadatas'])
    avg_frames = total_frames / total_entries if total_entries > 0 else 0
    
    print(f"\nPer-entry statistics:")
    print(f"  Average frames per person: {avg_frames:.1f}")
    print(f"  Total frames processed: {total_frames}")
    
    print(f"\nSample entries:")
    for i, (entry_id, metadata) in enumerate(zip(all_data['ids'][:5], all_data['metadatas'][:5])):
        print(f"  {entry_id}:")
        print(f"    Track ID: {metadata.get('track_id')}")
        print(f"    Frames: {metadata.get('first_frame')} - {metadata.get('last_frame')} ({metadata.get('num_frames')} total)")
        print(f"    Timestamp: {metadata.get('timestamp')}")
