![Xilinx Logo](images/xilinx_logo.png "Xilinx Logo")

# 1. Introduction

This notebook demonstrates how to configure a V4L2 device or video raw file from host machine through pcie, processes media content through 2D filter(optional) and transfers processed content to host via PCIe (or) display the content on hdmi monitor.

Depending on the usecase following Gstreamer elements are used:
* The `appsrc` element defined in xlnx_pcie_abstract python module is used to receive video frames from host       machine.
* The `vvas_xfilter` element is used to implement a 2D convolution filter.
* The `appsink` element defined in xlnx_pcie_abstract python module is used to send video frames to host mahine   in zero copy fashion.
* The `kmsink` elemenet is used to display video frames on the hdmi monitor.

Three different use cases are demonstrated using gstreamer based pcie Endpoint application.
* Capture from v4l2 device (MIPI), process the video frames using vvas_xfilter plugin (or) bypass the vvas_xfilter   plugin and display on host machine using appsink.

* Receive a video file from host machine using appsrc, process the received frames using vvas_xfilter plugin (or)   bypass vvas_xfilter plugin and send processed video frames to host machine through appsink.

* Receive a video file from host machine using appsrc,process the received frames using vvas_xfilter plugin (or)   bypass vvas_xfilter plugin and display processed video frames on kmssink (HDMI).

# 2. Imports and Initialization

Import all python modules required for this notebook. 

In [None]:
from IPython.display import Image, display, clear_output
from threading import Thread

import sys, time, gi
gi.require_version('Gst', '1.0')
gi.require_version("GstApp", "1.0")
from gi.repository import GObject, GLib, Gst, GstApp

# Importing enum for enumerations of usecases 
import enum


This is the VCK190-PCIe TRD notebook 3 (nb3).

In [None]:
nb = "nb3"

# Importing xlnx_pcie_abstract python module 

* xlnx_pcie_abstract python module is an abstraction which is used to create required `appsrc` and `appsink` elements, also helps to move media content between `src` --> with/bypass `vvas_xfilter` --> `sink` elements. 

* To move  media content between elements following wrapper functions are implement and are bind through Glib connect call. 
    - `need_data` callback is used to push data read from pcie to next elements i.e., `vvas_xfilter` (or) `appsink` (or) `kmssink` based on the control parameters received from host machine.
    - `new_sample_cb` callback is used to pull data from `appsrc` (or) `vvas_xfilter` based on the control parameters received from host machine and pass the media content to appsink via pcie . 

In [None]:
# Importing xlnx_pcie_abstract python module
import xlnx_pcie_abstract
import ctypes
from ctypes import *

Different usecases are defined though enumeration

In [None]:
class Usecase(enum.IntEnum):
    VGST_USECASE_TYPE_NONE = 0
    VGST_USECASE_TYPE_MIPISRC_FLTR_HOST = 1
    VGST_USECASE_TYPE_MIPISRC_TO_HOST = 3
    VGST_USECASE_TYPE_APPSRC_FLTR_HOST = 4
    VGST_USECASE_TYPE_APPSRC_TO_HOST = 5
    VGST_USECASE_TYPE_APPSRC_FLTR_KMSSINK = 6
    VGST_USECASE_TYPE_APPSRC_TO_KMSSINK = 8
    VGST_USECASE_TYPE_MAX = 9

pcie_fd = xlnx_pcie_abstract.PCIe_GetDevice()

usecase = xlnx_pcie_abstract.PCIe_Getusecase(pcie_fd)
if( (usecase.value <= Usecase.VGST_USECASE_TYPE_NONE)  and (usecase.value >= Usecase.VGST_USECASE_TYPE_NONE) ) :
    raise Exception("Unsupported usecases" )
    


# 3. Create Src , Filter, and Sink elements

Create the ``mediasrcbin`` element which is a bin element that uses the standard ``v4l2src`` element inside. Set the following some properties:
* Set the ``media-device`` property to the desired media device node
* Register the above ``pad_added`` callback function with the ``pad-added`` signal of the ``mediasrcbin`` element.

In [None]:
def pad_added(element, pad):
    sink_pad = caps.get_static_pad("sink")
    if not sink_pad.is_linked():
        pad.link(sink_pad)
        if src_type == "mipi" :
            pad.set_property("io-mode", "dmabuf")

Create `mediasrcbin` or `appsrc` element based on the control information received from host machine

In [None]:
Gst.init(None)
if ( usecase.value <= Usecase.VGST_USECASE_TYPE_MIPISRC_TO_HOST) :
        src_type = "mipi"
        device = xlnx_pcie_abstract.get_device_by_name(src_type)
        if device is None:
            raise Exception('Unable to find video source ' + src_type + '. Make sure the device is plugged in, powered, and the correct platform is used.')
        src = Gst.ElementFactory.make("mediasrcbin")
        src.set_property("media-device", device)
        src.connect("pad_added", pad_added);   
        
        res_dict = {
            "1080p" : ("1920", "1080"),
            "2160p" : ("3840", "2160")
            }

        resp = xlnx_pcie_abstract.PCIe_GetResolution(pcie_fd)        
        if(resp == 2160) :
            res = "2160p" # Change the resolution string to 720p, 1080p, or 2160p (mipi only)
            !xmediactl.sh
        else :
            res = "1080p"
            !xmediactl_1080.sh
        width = res_dict[res][0]
        height = res_dict[res][1]
        #print("Selected resolution: " + width + "x" + height)
        fmt = "YUY2"
        caps = Gst.ElementFactory.make("capsfilter")
        cap = Gst.Caps.from_string("video/x-raw, width=" + str(width) + ", height=" + str(height) + ", format=" + fmt)
        caps.set_property("caps", cap)

elif (usecase.value >= Usecase.VGST_USECASE_TYPE_APPSRC_FLTR_HOST)  :
        src = Gst.ElementFactory.make("appsrc")
        caps = Gst.ElementFactory.make("capsfilter")
        xlnx_pcie_abstract.xlnx_pcieappsrc(src,caps)
        xlnx_pcie_abstract.export_pciedmabuff(pcie_fd)


Create `appsink` or `kmssink` element based on the control information received from host machine

In [None]:
if (usecase.value < Usecase.VGST_USECASE_TYPE_APPSRC_FLTR_KMSSINK) :
        sink = Gst.ElementFactory.make("appsink")
        xlnx_pcie_abstract.xlnx_pcieappsink(sink)

elif (usecase.value >= Usecase.VGST_USECASE_TYPE_APPSRC_FLTR_KMSSINK )   :
        #print("kmssink use case")
        driver_name = "xlnx"
        plane_id = 38
        xoff = 0 # Change this value to move the plane position in the x-direction
        yoff = 0 # Change this value to move the plane position in the y-direction
        width = int('3840', 10)
        height = int('2160', 10)
        render_rectangle = Gst.ValueArray((xoff, yoff, width, height))
        hdmisink = Gst.ElementFactory.make("kmssink")
        hdmisink.set_property("driver-name", driver_name)
        hdmisink.set_property("plane-id", plane_id)
        hdmisink.set_property("render-rectangle", render_rectangle)
        hdmisink.set_property("sync", False)

Create vvas_xfilter element based on the control information received from host machine 

In [None]:
add_filter = '0'
if ( (usecase.value != Usecase.VGST_USECASE_TYPE_MIPISRC_TO_HOST ) and
        (usecase.value != Usecase.VGST_USECASE_TYPE_APPSRC_TO_HOST ) and 
            (usecase.value != Usecase.VGST_USECASE_TYPE_APPSRC_TO_KMSSINK ) ) :
    add_filter = '1'
    
    jsondir = "/usr/share/vvas/vck190-pcie-trd/"
    jfile = jsondir + "kernel_xfilter2d_pl.json"

    filter2d = Gst.ElementFactory.make("vvas_xfilter")
    filter2d.set_property("kernels-config", jfile)
    plist = [
        "blur",
        "edge",
        "horizontal edge",
        "vertical edge",
        "emboss",
        "horizontal gradient",
        "vertical gradient",
        "identity",
        "sharpen",
        "horizontal sobel",
        "vertical sobel",
        "custom"
        ]

    def print_presets():
        print("Supported filter presets:\n")
        print('\n'.join(plist) + '\n')
    
    #print_presets()
    def set_preset(val) :
        if val in plist :
            jstring = '{ "filter_preset" : "' +  val + '" }'
            filter2d.set_property("dynamic-config", jstring)
        else :
            raise Exception("Unsupported filter preset \'" + val + "\'")

    filter_preset = xlnx_pcie_abstract.PCIe_GetFilterPreset(pcie_fd)        
    set_preset(plist[filter_preset.value])

    def set_coeff(val):
        jstring = '{ "filter_coefficients" : ' + val + ' }'
        filter2d.set_property("dynamic-config", jstring)

Create the ``perf`` element which is used to measure and print the frame rate while the video pipeline is running.

In [None]:
perf = Gst.ElementFactory.make("perf")

# 4. Create and Run the GStreamer Pipeline

Create the pipeline, add all elements, and link them together.

In [None]:
pipeline = Gst.Pipeline.new(nb)
pipeline.add(src)
if (usecase.value <= Usecase.VGST_USECASE_TYPE_MIPISRC_TO_HOST ) :
    pipeline.add(caps)
    if(add_filter == '1') :
         pipeline.add(filter2d)
    pipeline.add(perf)
    pipeline.add(sink)
    
    src.link(caps)
    if(add_filter == '1') :
        caps.link(filter2d)
        filter2d.link(perf)
        perf.link(sink)
    else :
        caps.link(perf)
        perf.link(sink)
        

if ( (usecase.value > Usecase.VGST_USECASE_TYPE_MIPISRC_TO_HOST) and
         (usecase.value <= Usecase.VGST_USECASE_TYPE_APPSRC_TO_HOST) )   :
    if(add_filter == '1') :
         pipeline.add(filter2d)
    pipeline.add(perf)
    pipeline.add(sink)
    
    if(add_filter == '1') :
        src.link(filter2d)
        filter2d.link(perf)
        perf.link(sink)
    else :
        src.link(perf)
        perf.link(sink)

if (usecase.value >= Usecase.VGST_USECASE_TYPE_APPSRC_FLTR_KMSSINK) :
    if(add_filter == '1') :
         pipeline.add(filter2d)
    pipeline.add(perf)
    pipeline.add(hdmisink)
    
    if(add_filter == '1') :
        src.link(filter2d)
        filter2d.link(perf)
        perf.link(hdmisink)
    else :
        src.link(perf)
        perf.link(hdmisink)


Start the pipeline (set to `PLAYING` state), create the main loop and listen to messages on the bus. Register the `bus_call` callback function with the `message` signal of the bus. Start the main loop.

The video will be displayed on the monitor. The frame rate will be printed and updated below the code cell.

In [None]:
def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        xlnx_pcie_abstract.xlnx_pciecleanup()
        pipeline.set_state(Gst.State.NULL)
        Gst.deinit()
        loop.quit()
    elif t == Gst.MessageType.INFO:
        err, info = message.parse_info()
        sys.stderr.write("Info: %s\n" % info)
        clear_output(wait=True)
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        pipeline.set_state(Gst.State.NULL)
        loop.quit()
    return True

In [None]:
exit_pipeline = 0
if(usecase.value <= Usecase.VGST_USECASE_TYPE_MIPISRC_TO_HOST) :
    def thread_eos_status(threadname) :
        global src
        global exit_pipeline
        while(exit_pipeline != 1) :
            stop_feed = xlnx_pcie_abstract.stop_mipi_feed() 
            if (stop_feed) :
                src.send_event(Gst.Event.new_eos())
                xlnx_pcie_abstract.xlnx_pciecleanup()
    
    check_eos_thread = Thread(target=thread_eos_status, args=("thread_eos_status",))

pipeline.set_state(Gst.State.PLAYING);
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message",bus_call, loop)
if(usecase.value <= Usecase.VGST_USECASE_TYPE_MIPISRC_TO_HOST) :
    check_eos_thread.start()
    
try:
    loop.run()
except:
    if (usecase.value <= Usecase.VGST_USECASE_TYPE_MIPISRC_TO_HOST) :
            xlnx_pcie_abstract.xlnx_pciecleanup()
    sys.stdout.write("Interrupt caught\n")
    pipeline.set_state(Gst.State.NULL)
    loop.quit()
    pass

<center>Copyright© 2023 AMD</center>