Kwiver Processes in Python
==========================

Note: this file pretains only to the implementation of a Sporkit process in python, and does not directly cover pipelines or any related topics. That is the subject of another doc.

Algos and Arrows can be utilzied programmatically directly from a python script, or composed as components of a pipeline integrating any combination of C++ and Python based processes, utilizing the vital types to ensure universal data compatibility. To this end, if implementing a mixed pipeline with C++ and Python processes that leverages a custom, non vital type, either in C++ or Python, a Pybind11 binding must be created for the type and exposed to the pipeline so that python/C++ can interpret and properly use the type #TODO

Python Processes can inherit from one of two base classes that support fundamental process methods, KwiverProcess and PythonProcess. Below are examples of each approach.

A further note that if a process exists in C++ it does *not* need to be bound in Python as C++ processes can be incorporated into mixed pipelines alongside python processes out of the box. This includes pipelines executed from the python interpreter.

In [None]:
from kwiver.sprokit.pipeline import process, datum
from kwiver.kwiver_process import KwiverProcess


class HelloWorldPython(process.PythonProcess):
    # process instances need to expose the config obj as a constructor argument
    # if a process for whatever reason does not require/need a conf object
    # A default arg can be used. This will be rare.
    def __init__(self, conf = None):
        super().__init__(self,conf)

class HelloWorldPythonKw(process.KwiverProcess):
    def __init__(self, conf = None):
        super().__init__(self,conf)      



Much like algos, we also need to expose this process to the pipeline runner via a dunder registration method

In [None]:
def __sprokit_register__():
    from kwiver.sprokit.pipeline import process_factory

    module_name = 'python:kwiver.hello_world'

    if process_factory.is_process_module_loaded(module_name):
        return

    process_factory.add_process('hello_world_process',
                                'A Simple Kwiver Test Process',
                                HelloWorldPython)

    process_factory.mark_process_module_as_loaded(module_name)

Of course, in its current state, this process does absolutely nothing, and is a fairly trivial example. The following examples demonstrate a little more diverse functionality.

To properly implement a process we need to implement a couple of core methods, without which, the process will not function. These functions are the ```_configure``` step and the ```_step``` step. The configure step serves to construct the process during exeuction of a pipeline and the step is the programatic exeuction of the process.

This process serves as the only node in the pipeline, accepting and producing no output ports. A more substantive example will follow.

In [None]:
import sys

class HelloWorldPython(process.KwiverProcess):
    def __init__(self, conf):
        super().__init__(self,conf)
        flags = kwiver.sprokit.pipeline.process.PortFlags()
        flags.add(self.flag_required)
    def _configure(self):
        pass
    def _step(self):
        sys.stdout.write("hello world")
        self._base_step()

\# Pipefile for this process

#==================================================================

process helloworld

  :: hello_world_process

#==================================================================

config _scheduler

  type = pythread_per_process

\# -- end of file --

Next we will detail the differentiation between inheriting from `process.KwiverProcess` and `process.PythonProcess`

Pure python process implementing functionality from python

For this example we will be implementing a *very* simple 3d point class

In [None]:
import numpy as np

class pt3d(object):
    def __init__(self, x = 0, y = 0, z = 0, dtype = np.float32):
        self.x = x
        self.y = y
        self.z = z
        self.dtype = dtype
    def as_array(self):
        return np.ndarray((1,3), dtype=self.dtype, buffer=[self.x,self.y,self.z])
    def as_mat(self):
        return np.matrix(self.as_array())
    def __str__(self):
        return "(%s)" % ",".join(str(self.x), str(self.y), str(self.z))

Now that we have a basic data type, we need to create processes that can handle this datatype. Preexisting Kwiver processes will not support it as this data type is outside of vital. For demonstration purposes, the KwiverProcess utility mixin will leveraged to improve clarity and readability.

In [None]:
from kwiver.sprokit.processes import KwiverProcess


class NodeSource(KwiverProcess):
    # Note we need to have an option for the config in our constructor
    def __init__(self, conf):
        super().__init__(self, conf)
        required = process.PortFlags()
        required.add(self.flag_required)
        self.add_type_trait(
            "pt3d",
            "pt3d",
            datum.Datum.get_datum,
            datum.new
        )
        self.add_port_trait("pt3d","pt3d","Custom numpy 3d pt class")
        self.declare_output_port_using_trait("3dpt", required)
    def _configure(self):
        self._base_configure()
        self.data = pt3d(2,3,4,np.int64)
    def _step(self):
        self.push_to_port_using_trait("pt3d", self.data)
        self.push_to_port_using_trait("pt3d", datum.complete())
        self.mark_process_as_complete()

def __sprokit_register__():
    from sprokit.pipeline import process_factory
    module_name = "python:source"
    if process_factory.is_process_module_loaded(module_name):
        return
    process_factory.add_process("SourceNode","3dpt producer", NodeSource)
    process_factory.mark_process_module_as_loaded(module_name)


In [None]:
from kwiver.sprokit.processes import KwiverProcess
from kwiver.vital.types import Point3d
class Node(KwiverProcess):
    def __init__(self, conf):
        super().__init__(self, conf)
        required = process.PortFlags()
        required.add(self.flag_required)
        self.add_type_trait(
            "pt3d",
            "pt3d",
            datum.Datum.get_datum,
            datum.new
        )
        self.add_type_trait(
            "point",
            "kwiver:point",
            datum.Datum.get_datum,
            datum.new
        )
        self.add_port_trait("pt3d","pt3d","Custom numpy 3d pt class")
        self.add_port_trait("point", "point", "3d point in world frame")
        self.declare_input_port_using_trait("3dpt",required)
        self.declare_output_port_using_trait("point",required)
    def _configure(self):
        self._base_configure()
    def _step(self):
        pt3d = self.grab_input_using_trait("pt3d")
        pt_mat = pt3d.as_mat()
        vital_pt = Point3d(pt_mat)
        self.push_to_port_using_trait("point")

def __sprokit_register__():
    from sprokit.pipeline import process_factory
    module_name = "python:source"
    if process_factory.is_process_module_loaded(module_name):
        return
    process_factory.add_process("SourceNode","3dpt producer", NodeSource)
    process_factory.mark_process_module_as_loaded(module_name)

In [None]:
from kwiver.sprokit.process import KwiverProcess
from kwiver.vital.types import Point3d

class NodeSink(KwiverProcess):
    def __init__(self, conf):
        super().__init__(self, conf)
        required = process.PortFlags()
        required.add(self.flag_required)
        self.add_type_trait(
            "point",
            "kwiver:point",
            datum.Datum.get_datum,
            datum.new
        )
        self.add_port_trait("point","point","3d point in world plane")
        self.declare_input_port_using_trait("point",required)
    def _configure(self):
        self._base_configure()
    def _step(self):
        pt = self.grab_input_using_trait('point')
        print(pt)
def __sprokit_register__():
    from sprokit.pipeline import process_factory
    module_name = "python:sink"
    if process_factory.is_process_module_loaded(module_name):
        return
    process_factory.add_process("SinkNode","3d pt producer", NodeSink)
    process_factory.mark_process_module_as_loaded(module_name)