### Initializing GStreamer and Pipeline ###
We initialize GStreamer with `Gst.init(list=None)` and instantiate a `Gst.Pipeline` object as `pipeline` to contain all the elements shown in the pipeline diagram. 

In [1]:
import sys
import os
sys.path.append('../')
# sys.path.append('../apps/')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from common.platform_info import PlatformInfo
from common.bus_call import bus_call

import pyds

In [2]:
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream")
        loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    return True

#### Model Inference configuration

In [3]:
input_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264" # path to input stream
pgie_config_file = "./configs/.txt"
tracker_config_file = "./configs/tracker.txt"

In [4]:
input_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264" # path to input stream
pgie_config_file = "./configs/custom_pgie_yoloe.txt"
tracker_config_file = "./configs/tracker.txt"

#### Create pipeline

In [5]:
platform_info = PlatformInfo()
# Standard initialization procedure
Gst.init(None)

# Create gstreamer elements
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()

if not pipeline:
    sys.stderr.write(" Unable to create Pipeline \n")

Creating Pipeline 
 


### Creating Pipeline Elements

In [6]:
# Create Source element for reading from a file and set the location property
source=Gst.ElementFactory.make("filesrc", "file-source")
source.set_property('location', input_file)

# Create H264 Parser with h264parse as the input file is an elementary h264 stream
h264parser=Gst.ElementFactory.make("h264parse", "h264-parser")

# Create Decoder with nvv4l2decoder for accelerated decoding on GPU
decoder=Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")

# Create Streamux with nvstreammux to form batches for one or more sources and set properties
streammux=Gst.ElementFactory.make("nvstreammux", "stream-muxer")
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batched-push-timeout', 4000000)
streammux.set_property('batch-size', 1)

# Create Convertor to convert from YUV to RGBA as required by nvdsosd
nvvidconv1=Gst.ElementFactory.make("nvvideoconvert", "convertor1")

# Create OSD with nvdsosd to draw on the converted RGBA buffer
nvosd=Gst.ElementFactory.make("nvdsosd", "onscreendisplay")

# Create Convertor to convert from RGBA to I420 as required by encoder
nvvidconv2=Gst.ElementFactory.make("nvvideoconvert", "convertor2")

# Create Capsfilter to enforce frame image format
capsfilter=Gst.ElementFactory.make("capsfilter", "capsfilter")
caps=Gst.Caps.from_string("video/x-raw, format=I420")
capsfilter.set_property("caps", caps)

# Create Encoder to encode I420 formatted frames using the MPEG4 codec
encoder = Gst.ElementFactory.make("avenc_mpeg4", "encoder")
encoder.set_property("bitrate", 2000000)

# Create Sink and set the location for the output file
filesink=Gst.ElementFactory.make('filesink', 'filesink')
filesink.set_property('location', 'output_03_encoded.mpeg4')
filesink.set_property("sync", 1)
print('Created elements')

Created elements


#### Object Detection - PGIE

In [7]:
# Create Primary GStreamer Inference Element with nvinfer to run inference on the decoder's output after batching
pgie=Gst.ElementFactory.make("nvinfer", "primary-inference")
pgie.set_property('config-file-path', pgie_config_file)

#### Tracker - SGIE

In [8]:
import configparser

tracker = Gst.ElementFactory.make("nvtracker", "tracker")

# Parse tracker config file and set properties
config = configparser.ConfigParser()
config.read(tracker_config_file)
config.sections()

for key in config['tracker']:
    if key == 'tracker-width' :
        tracker_width = config.getint('tracker', key)
        tracker.set_property('tracker-width', tracker_width)
    if key == 'tracker-height' :
        tracker_height = config.getint('tracker', key)
        tracker.set_property('tracker-height', tracker_height)
    if key == 'gpu-id' :
        tracker_gpu_id = config.getint('tracker', key)
        tracker.set_property('gpu_id', tracker_gpu_id)
    if key == 'll-lib-file' :
        tracker_ll_lib_file = config.get('tracker', key)
        tracker.set_property('ll-lib-file', tracker_ll_lib_file)
    if key == 'll-config-file' :
        tracker_ll_config_file = config.get('tracker', key)
        tracker.set_property('ll-config-file', tracker_ll_config_file)

In [9]:
# Add elements to pipeline
pipeline.add(source)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(streammux)

# object detection and tracking
pipeline.add(pgie)
pipeline.add(tracker)

# Add elements to pipeline
pipeline.add(nvvidconv1)
pipeline.add(nvosd)
pipeline.add(nvvidconv2)
pipeline.add(capsfilter)
pipeline.add(encoder)
pipeline.add(filesink)
print('Added elements to pipeline')

Added elements to pipeline


In [10]:
# Link elements in the pipeline
source.link(h264parser)
h264parser.link(decoder)

# Link decoder source pad to streammux sink pad
decoder_srcpad=decoder.get_static_pad("src")    
streammux_sinkpad=streammux.get_request_pad("sink_0")
decoder_srcpad.link(streammux_sinkpad)

# Link the rest of the elements in the pipeline
streammux.link(pgie)

pgie.link(tracker)
tracker.link(nvvidconv1)

nvvidconv1.link(nvosd)
nvosd.link(nvvidconv2)
nvvidconv2.link(capsfilter)
capsfilter.link(encoder)
encoder.link(filesink)
print('Linked elements in pipeline')

Linked elements in pipeline


  streammux_sinkpad=streammux.get_request_pad("sink_0")


#### PGIE Probe function to print object detection and Bbox

In [11]:
# Declare list to hold count data
obj_counts=[]

# Define the Probe Function
def pgie_src_pad_buffer_probe(pad, info):
    gst_buffer=info.get_buffer()

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta=pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame=batch_meta.frame_meta_list
    
    # Iterate through each frame in the batch metadata until the end
    while l_frame is not None:
        try:
            frame_meta=pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_num=frame_meta.frame_num
        num_obj=frame_meta.num_obj_meta
        l_obj=frame_meta.obj_meta_list
        
        print("Frame Number={} Number of Objects={}".format(frame_num, num_obj))
        
        # Append number of objects a list 
        obj_counts.append(num_obj)
        
        # Iterate through each object in the frame metadata until the end
        while l_obj is not None:
            try:
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
                print('\t Object: {} - Top: {}, Left: {}, Width: {}, Height: {}'.format(obj_meta.obj_label, \
                                                                round(obj_meta.rect_params.top), \
                                                                round(obj_meta.rect_params.left), \
                                                                round(obj_meta.rect_params.width), \
                                                                round(obj_meta.rect_params.height)))
            except StopIteration:
                break
            
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        
        try:
            l_frame=l_frame.next
        except StopIteration:
            break
    return Gst.PadProbeReturn.OK

In [12]:
# Add probe to inference plugin's source
pgie_src_pad=pgie.get_static_pad('src')
probe_id=pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe)
print('Attached probe')

Attached probe


In [13]:
import pyds
import random

def get_color_from_id(object_id):
    # Generate a consistent color based on object_id
    random.seed(object_id)
    r = random.random()
    g = random.random()
    b = random.random()
    return r, g, b

def osd_sink_pad_buffer_probe(pad, info, u_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        return Gst.PadProbeReturn.OK

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        l_obj = frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break

            # Unique color per object using object_id
            object_id = getattr(obj_meta, "object_id", 0)
            r, g, b = get_color_from_id(object_id)
            obj_meta.rect_params.border_color.set(r, g, b, 1.0)
            obj_meta.rect_params.border_width = 5           
            
            # Set label font size and color
            obj_meta.text_params.font_params.font_size = 18  # Larger font
            obj_meta.text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)  # White text

            # Set label background to black
            obj_meta.text_params.set_bg_clr = 1  # Enable background color
            obj_meta.text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)  # Black background


            l_obj = l_obj.next
        l_frame = l_frame.next

    return Gst.PadProbeReturn.OK

In [14]:
osd_sink_pad = nvosd.get_static_pad("sink")
osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
print('Attached probe')

Attached probe


### Starting the Pipeline ###

In [15]:
# Create an event loop
loop=GLib.MainLoop()

# Feed GStreamer bus messages to loop
bus=pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
print('Added bus message handler')

Added bus message handler


In [None]:
# Start play back and listen to events - this will generate the output_03_raw.mpeg4 file
print("Starting pipeline")
pipeline.set_state(Gst.State.PLAYING)
try:
    loop.run()
except:
    pass

# Cleaning up as the pipeline comes to an end
pipeline.set_state(Gst.State.NULL)

Starting pipeline
Opening in BLOCKING MODE 
gstnvtracker: Loading low-level lib at /opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
[NvMultiObjectTracker] Initialized


0:00:00.387567103 [33m  404[00m      0x77e06a0 [36mINFO   [00m [00m             nvinfer gstnvinfer.cpp:685:gst_nvinfer_logger:<primary-inference>[00m NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:2123> [UID = 1]: Trying to create engine from model files
0:04:37.019670602 [33m  404[00m      0x77e06a0 [36mINFO   [00m [00m             nvinfer gstnvinfer.cpp:685:gst_nvinfer_logger:<primary-inference>[00m NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:2155> [UID = 1]: serialize cuda engine to file: /opt/nvidia/deepstream/deepstream-8.0/sources/play/notebooks/yoloe-11l-det.onnx_b1_gpu0_fp16.engine successfully
0:04:37.467108467 [33m  404[00m      0x77e06a0 [36mINFO   [00m [00m             nvinfer gstnvinfer_impl.cpp:343:notifyLoadModelStatus:<primary-inference>[00m [UID 1]: Load new model:./configs/custom_pgie_yoloe.txt sucessfully


In [5]:
!python3 - <<'PY'
import onnx
m = onnx.load("yoloe-11l-det.onnx")
print("Outputs:", [(o.name, [d.dim_value for d in o.type.tensor_type.shape.dim]) for o in m.graph.output])

Outputs: [('output0', [1, 75, 8400])]


In [3]:
!pip install onnx

Collecting onnx
  Downloading onnx-1.20.1-cp312-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (8.4 kB)
Collecting protobuf>=4.25.1 (from onnx)
  Downloading protobuf-6.33.3-cp39-abi3-manylinux2014_x86_64.whl.metadata (593 bytes)
Collecting ml_dtypes>=0.5.0 (from onnx)
  Downloading ml_dtypes-0.5.4-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (8.9 kB)
Downloading onnx-1.20.1-cp312-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (17.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.5/17.5 MB[0m [31m69.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading ml_dtypes-0.5.4-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (5.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.0/5.0 MB[0m [31m46.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading protobuf-6.33.3-cp39-abi3-manylinux2014_x86_64.whl (323 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32

### Converting and viewing the output

In [None]:
# Convert MPEG4 video file to MP4 container file
!ffmpeg -i output_03_encoded.mpeg4 output.mp4 -y -loglevel quiet

In [None]:
from IPython.display import Video
Video("output.mp4", width=1280)