# Pipeline measurements

At some point you might want to emit measurements or events from your pipeline. While this would be possible via regular cell outputs, it would become messy quite rapidly:
* if you want to send multiple measurements you might end up sending nested lists and dicts
* some measurements like events should not be triggered on every call which means you often need to send a `None` value or an empty list
* the information you want to emit might not be relevant for other cells, so not really that usefull to set it as a cell output
* if you create a sink that consumes these measurements, you also need to track down their origin to know what these values mean and to what 'measurement' they correspond

All these extra outputs can potentially obscure the 'normal' dataflow from source to sink. Therefore the SDK provides a mechanism to log measurements.
* each cell defines the measurements it can emit
* measurements can be handled by a separate 'measurement' hook which gives you all the information about the origin

We will start with creating the basics for our cell.

In [None]:
!pip install -qqq rvai==0.6.0rc2 pygraphviz

In [None]:
from dataclasses import dataclass, field
from typing import Type
from rvai.base.cell import (
    Cell,
    Inputs,
    Outputs,
    Parameters,
    State,
    cell
)
from rvai.base.pipeline import Pipeline
from rvai.types import Integer

# Cell
@dataclass
class MyInputs(Inputs):
    dummy: Integer = field(metadata=dict(description="Some dummy input"))


@dataclass
class MyOutputs(Outputs):
    dummy: Integer = field(metadata=dict(description="Some dummy output"))


@dataclass
class MyParameters(Parameters):
    pass


Now the measurements can be defined. There is a helper function `Measurement.field` that can be used to provide a name and description for the field but will also give you the option to mark a field as 'indexable'. This can be useful information for external tools. For example suppose you want to record all the measurements into an influxdb, you can use these 'indexable' fields as a 'tag' for your measurements.

In [None]:
from rvai.base.data import Measurement
from rvai.types import Float, Integer, String

@dataclass
class Detection(Measurement):
    """Information about a specific detection."""
    zone_id: String = Measurement.field(
        name="Zone",
        description="Zone where object was detected",
        options=list(map(String, ["zone1", "zone2"])),
        index=True)
    vehicle_type: String = Measurement.field(
        name="Vehicle type",
        description="Type of the vehicle",
        options=list(map(String, ["car", "truck", "moto"])),
        index=True
    )    
    speed: Float = Measurement.field(
        name="Vehicle speed",
        description="Speed of the vehicle",
        default=Float(0.0)
    )

# although it's better to give a proper name and description to a measurement, it's completely optional
@dataclass
class Detections(Measurement):
    """Information about all detections in some zone."""
    zone_id: String = Measurement.field(index=True)
    num_detections: Integer = Measurement.field(default=Integer(0))

To let third party tools know which measurement types they can expect, the measurements must be bundled in a `Measurements` dataclass and added as an annotation to the cell class.

Finally to emit a measurement, simply use the `Context.log_measurement` method.

In [None]:
import random
from collections import defaultdict

from rvai.base.data import Measurements
from rvai.base.context import Context

@dataclass
class MyMeasurements(Measurements):
    """Collection of my measurements."""
    detection: Type[Detection]
    detections: Type[Detections]

@cell
class MyCell(Cell):

    inputs: Type[MyInputs]
    outputs: Type[MyOutputs]
    parameters: Type[MyParameters]
    
    measurements: Type[MyMeasurements]
    
    @classmethod
    def call(
        cls, context: Context, parameters: MyParameters, inputs: MyInputs,
    ) -> MyOutputs:
        # run dummy detector
        detections = defaultdict(list)
        for x in range(random.randint(2, 4)):
            detections[random.choice(["zone1", "zone2"])].append({
                "speed": random.random() * 100,
                "type": random.choice(["car", "truck", "moto"]),
            })
        # log measurements
        for zone, zone_detections in detections.items():
            for detection in zone_detections:
                
                context.log_measurement(Detection(
                    zone_id=String(zone),
                    vehicle_type=String(detection["type"]),
                    speed=Float(detection["speed"])))
                
            context.log_measurement(Detections(
                    zone_id=String(zone),
                    num_detections=Integer(len(zone_detections))))
        
        return MyOutputs(inputs.dummy)

Since there's only one cell, the easiest way to create a pipeline is to use the `Pipeline.from_cell` constructor that will automatically link the cell inputs and outputs to the pipeline's source and sink.

In [None]:
pipeline = Pipeline.from_cell(cell=MyCell())
pipeline.show()

Now we need to add a handler for the 'measurement' hook, this one will simply print the results of the measurements:

In [None]:
from rvai.base.pipeline import CellRef
from rvai.base.runtime import ProcessID
from rvai.types import Timestamp

def handle_measurements(
        pid: ProcessID,
        pipeline: Pipeline,
        cell_ref: CellRef,
        measurement: Measurement,
        timestamp: Timestamp,
    ):
        print(f"=={measurement.name()}==")
        for field_key, field in measurement.items():
            index = "(index)" if field.index else ""
            print(f"--{index}{field.name}: {field.value}")

Next, register the handler and run a few predictions to see the measurements appearing:

In [None]:
from rvai.base.hooks import Hooks
from rvai.base.runtime import Inference, init

hooks = Hooks()
hooks.on_measurement.append(handle_measurements)

rt = init("debug", hooks=hooks)
inference = Inference(pipeline=pipeline)

proc = rt.start_inference(inference=inference)

for x in range(2):
    print(f"FRAME {x}")
    proc.predict(inputs={"dummy": Integer(x)}).result()

Stop inference process.

In [None]:
proc.stop()

And there you have it! This hook could also be implemented to record the measurements to a database or even to trigger some alarm.

In [None]:
rt.stop()