# DeepStream 8.0 - Object Detection Pipeline

**Uses nvv4l2h264enc hardware encoder (NVENC support)**

**Supports both FILE and RTSP sources!**

**FILE Pipeline:**
```
filesrc → h264parse → nvv4l2decoder → nvstreammux → nvinfer → 
nvvideoconvert → nvdsosd → nvvideoconvert → capsfilter(NVMM) → 
nvv4l2h264enc → h264parse → mp4mux → filesink
```

**RTSP Pipeline:**
```
rtspsrc → rtph264depay → h264parse → nvv4l2decoder → nvstreammux → nvinfer → 
nvvideoconvert → nvdsosd → nvvideoconvert → capsfilter(NVMM) → 
nvv4l2h264enc → h264parse → mp4mux → filesink
```


In [None]:
import sys
import time

sys.path.append('/opt/nvidia/deepstream/deepstream-8.0/sources/deepstream_python_apps/apps')

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib
from common.bus_call import bus_call
import pyds

Gst.init(None)

print(f"GStreamer version: {Gst.version_string()}")
print(f"pyds version: {pyds.__version__}")


In [None]:
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3

SOURCE_TYPE = 'FILE'

INPUT_VIDEO_FILE = '/opt/nvidia/deepstream/deepstream-8.0/samples/streams/sample_720p.h264'

RTSP_URI = 'rtsp://localhost:8556/stream1'

OUTPUT_VIDEO_NAME = '/home/manasi/ds_out.mp4'
CONFIG_FILE_PATH = '/opt/nvidia/deepstream/deepstream-8.0/sources/deepstream_python_apps/apps/deepstream-test1/dstest1_pgie_config.txt'

print("=" * 60)
print(f"SOURCE TYPE: {SOURCE_TYPE}")
print("=" * 60)
if SOURCE_TYPE == 'RTSP':
    print(f"RTSP URI: {RTSP_URI}")
else:
    print(f"Input file: {INPUT_VIDEO_FILE}")
print(f"Output video: {OUTPUT_VIDEO_NAME}")
print(f"Config file: {CONFIG_FILE_PATH}")


In [None]:
def make_elm_or_print_err(factoryname, name, printedname, detail=""):
    """Create a GStreamer element or print error message"""
    print(f"Creating {printedname}...")
    elm = Gst.ElementFactory.make(factoryname, name)
    if not elm:
        sys.stderr.write(f"Unable to create {printedname}\n")
    if detail:
        sys.stderr.write(detail)
    return elm


In [None]:
print("\n" + "=" * 60)
print("CREATING PIPELINE")
print("=" * 60)

pipeline = Gst.Pipeline()
if not pipeline:
    sys.stderr.write("Unable to create Pipeline\n")

if SOURCE_TYPE == 'RTSP':
    print("[RTSP MODE] Creating RTSP source elements...")
    source = make_elm_or_print_err("rtspsrc", "rtsp-source", "RTSP Source")
    depay = make_elm_or_print_err("rtph264depay", "h264-depay", "RTP H264 Depayloader")
else:
    print("[FILE MODE] Creating file source elements...")
    source = make_elm_or_print_err("filesrc", "file-source", "File Source")
    depay = None

h264parser = make_elm_or_print_err("h264parse", "h264-parser", "H264 Parser")
decoder = make_elm_or_print_err("nvv4l2decoder", "nvv4l2-decoder", "NV Decoder")
streammux = make_elm_or_print_err("nvstreammux", "stream-muxer", "Stream Muxer")
pgie = make_elm_or_print_err("nvinfer", "primary-inference", "Primary Inference")
nvvidconv = make_elm_or_print_err("nvvideoconvert", "convertor", "NV Video Converter 1")
nvosd = make_elm_or_print_err("nvdsosd", "onscreendisplay", "On-Screen Display")
nvvidconv2 = make_elm_or_print_err("nvvideoconvert", "convertor2", "NV Video Converter 2")
capsfilter = make_elm_or_print_err("capsfilter", "caps", "Caps Filter")

# GB10: Hardware encoder - no software videoconvert needed
encoder = make_elm_or_print_err("nvv4l2h264enc", "encoder", "H264 HW Encoder (nvv4l2h264enc)")
h264parser2 = make_elm_or_print_err("h264parse", "h264-parser2", "H264 Parser 2")
mp4mux = make_elm_or_print_err("mp4mux", "mp4mux", "MP4 Muxer")
sink = make_elm_or_print_err("filesink", "filesink", "File Sink")


In [None]:
print("\n" + "=" * 60)
print("CONFIGURING ELEMENTS")
print("=" * 60)

if SOURCE_TYPE == 'RTSP':
    source.set_property('location', RTSP_URI)
    source.set_property('latency', 100)
    source.set_property('drop-on-latency', True)
    print(f"RTSP URI: {RTSP_URI}")
    print("RTSP latency: 100ms, drop-on-latency enabled")
else:
    source.set_property('location', INPUT_VIDEO_FILE)
    print(f"File location: {INPUT_VIDEO_FILE}")

streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', 1)
streammux.set_property('batched-push-timeout', 4000000)
if SOURCE_TYPE == 'RTSP':
    streammux.set_property('live-source', 1)
    print("Stream muxer: 1920x1080, batch-size=1, live-source=1")
else:
    print("Stream muxer: 1920x1080, batch-size=1")

pgie.set_property('config-file-path', CONFIG_FILE_PATH)
print(f"Inference config: {CONFIG_FILE_PATH}")

# GB10: NVMM caps for hardware encoder path
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420")
capsfilter.set_property("caps", caps)
print("Caps filter: I420 format (NVMM memory for nvv4l2h264enc)")

# GB10: Hardware encoder settings
encoder.set_property('bitrate', 4000000)
print("Encoder: nvv4l2h264enc @ 4 Mbps (hardware)")

sink.set_property('location', OUTPUT_VIDEO_NAME)
sink.set_property('sync', False)
print(f"Output file: {OUTPUT_VIDEO_NAME}")

print("\nAll elements configured!")


In [None]:
print("\n" + "=" * 60)
print("BUILDING PIPELINE")
print("=" * 60)

def on_rtspsrc_pad_added(rtspsrc, pad, depay):
    """Called when rtspsrc creates a new pad (when stream connects)"""
    print(f"RTSP pad added: {pad.get_name()}")
    caps = pad.get_current_caps()
    struct = caps.get_structure(0)
    media_type = struct.get_name()

    if media_type.startswith("application/x-rtp"):
        sink_pad = depay.get_static_pad("sink")
        if not sink_pad.is_linked():
            pad.link(sink_pad)
            print("RTSP source linked to depayloader")

print("Adding elements to pipeline...")
pipeline.add(source)
if SOURCE_TYPE == 'RTSP':
    pipeline.add(depay)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(nvvidconv2)
pipeline.add(capsfilter)
# GB10: No sw_videoconvert needed - direct HW encoder
pipeline.add(encoder)
pipeline.add(h264parser2)
pipeline.add(mp4mux)
pipeline.add(sink)
print("All elements added (HW encoder path for GB10)")

print("\nLinking elements...")

if SOURCE_TYPE == 'RTSP':
    source.connect("pad-added", on_rtspsrc_pad_added, depay)
    print("RTSP pad-added callback connected")
    depay.link(h264parser)
    h264parser.link(decoder)
    print("Linked: depay -> h264parser -> decoder")
else:
    source.link(h264parser)
    h264parser.link(decoder)
    print("Linked: source -> h264parser -> decoder")

sinkpad = streammux.request_pad_simple("sink_0")
if not sinkpad:
    sys.stderr.write("Unable to get sink pad of streammux\n")
srcpad = decoder.get_static_pad("src")
if not srcpad:
    sys.stderr.write("Unable to get source pad of decoder\n")
srcpad.link(sinkpad)
print("Linked: decoder -> streammux")

streammux.link(pgie)
pgie.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(nvvidconv2)
nvvidconv2.link(capsfilter)
# GB10: Direct HW encoder link (no sw_videoconvert)
capsfilter.link(encoder)
encoder.link(h264parser2)
h264parser2.link(mp4mux)
mp4mux.link(sink)
print("Linked: streammux -> pgie -> nvvidconv -> nvosd -> nvvidconv2 -> nvv4l2h264enc -> sink")

print("\nPipeline built successfully!")


In [None]:
def osd_sink_pad_buffer_probe(pad, info, u_data):
    """Callback function to process metadata from each frame"""

    obj_counter = {
        PGIE_CLASS_ID_VEHICLE: 0,
        PGIE_CLASS_ID_PERSON: 0,
        PGIE_CLASS_ID_BICYCLE: 0,
        PGIE_CLASS_ID_ROADSIGN: 0
    }

    frame_number = 0
    num_rects = 0

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.OK

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj = frame_meta.obj_meta_list

        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break

            obj_counter[obj_meta.class_id] += 1

            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]

        py_nvosd_text_params.display_text = "Frame={} Objects={} Vehicles={} Persons={} RoadSign={}".format(
            frame_number, num_rects,
            obj_counter[PGIE_CLASS_ID_VEHICLE],
            obj_counter[PGIE_CLASS_ID_PERSON],
            obj_counter[PGIE_CLASS_ID_ROADSIGN]
        )

        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12
        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
        py_nvosd_text_params.set_bg_clr = 1
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)

        print(pyds.get_string(py_nvosd_text_params.display_text))

        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK

print("Metadata probe function defined")


In [None]:
osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
    sys.stderr.write("Unable to get sink pad of nvosd\n")
else:
    osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
    print("Metadata probe attached to OSD element")


In [None]:
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

print("Bus message handler configured")


In [None]:
print("\n" + "=" * 60)
print("STARTING PIPELINE")
print("=" * 60)
if SOURCE_TYPE == 'RTSP':
    print(f"Processing RTSP stream: {RTSP_URI}")
    print("(Make sure RTSP server is running!)")
else:
    print(f"Processing file: {INPUT_VIDEO_FILE}")
print("=" * 60 + "\n")

start_time = time.time()

ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
    print("ERROR: Unable to set pipeline to PLAYING state")
else:
    try:
        loop.run()
    except KeyboardInterrupt:
        print("\nInterrupted by user")
    except Exception as e:
        print(f"\nError: {e}")
    finally:
        print("\nCleaning up...")
        pipeline.set_state(Gst.State.NULL)

        elapsed_time = time.time() - start_time
        print(f"\n" + "=" * 60)
        print(f"PIPELINE COMPLETED")
        print(f"Time elapsed: {elapsed_time:.2f} seconds")
        print(f"Output saved to: {OUTPUT_VIDEO_NAME}")
        print("=" * 60)


In [None]:
import os

if os.path.exists(OUTPUT_VIDEO_NAME):
    file_size = os.path.getsize(OUTPUT_VIDEO_NAME)
    print(f"Output file exists")
    print(f"Location: {OUTPUT_VIDEO_NAME}")
    print(f"Size: {file_size / (1024 * 1024):.2f} MB")
else:
    print(f"Output file not found: {OUTPUT_VIDEO_NAME}")


In [None]:
from IPython.display import HTML
import os

if os.path.exists(OUTPUT_VIDEO_NAME):
    html = f"""
    <div style="text-align: center; margin: 20px;">
        <h3>DeepStream Output Video</h3>
        <video width="800" controls>
            <source src="ds_out.mp4" type="video/mp4">
            Your browser does not support the video tag.
        </video>
        <p style="margin-top: 10px;">
            <strong>File:</strong> ds_out.mp4 |
            <strong>Size:</strong> {os.path.getsize(OUTPUT_VIDEO_NAME) / (1024 * 1024):.2f} MB
        </p>
    </div>
    """
    display(HTML(html))
else:
    print(f"Video not found: {OUTPUT_VIDEO_NAME}")
