Skip to content

Latest commit

 

History

History
89 lines (64 loc) · 4.04 KB

File metadata and controls

89 lines (64 loc) · 4.04 KB

Reactive Operators

If we look at the previous example, 01_CustomData, we might see one potential issue: We always process the same number of objects in the sink that were generated by the source. What if we need pipelines that can filter out messages in the middle or create new ones as needed?

This is where the Reactive-style operators come in to use. SRF's Reactive-style operators make it easy to perform common tasks like:

  • filtering: srf.core.operators.filter
  • mapping: srf.core.operators.map
  • combining: srf.core.operators.to_list & srf.core.operators.pairwise
  • flattening: srf.core.operators.flatten

To use these operators, we first need to use a different function instead of make_node. We will be using the more verbose make_node_full function which takes a lambda function with the signature: def lambda_fn(src: srf.Observable, dst: srf.Suscriber).

Inside of this lambda, you are required to do one thing: connect the src to the dst using the subscribe method. For example, the Segment.make_node(map_fn) function is shorthand for the following:

def node_fn(src: srf.Observable, dst: srf.Suscriber):
   src.pipe(
      srf.core.operators.map(map_fn)
   ).subscribe(dst)

The initial call to srf.Observable.pipe() inidicates you would like to pipe every message from the src through some reactive operators. Inside of pipe() you can supply one or more operators to process messages in order. Finally, the call to subscribe() passes the result of the pipe'ed messages to the argument of subscribe().

Lets look at a more complex example:

value_count = 0
value_sum = 0

def node_fn(src: srf.Observable, dst: srf.Subscriber):
    def update_obj(x: MyCustomClass):
        nonlocal value_count
        nonlocal value_sum

        # Alter the value property of the class
        x.value = x.value * 2

        # Update the sum values
        value_count += 1
        value_sum += x.value

        return x

    def on_completed():

        # Prevent divide by 0. Just in case
        if (value_count <= 0):
            return

        return MyCustomClass(value_sum / value_count, "Mean")

    src.pipe(
       ops.filter(lambda x: x.value % 2 == 0),
       ops.map(update_obj),
       ops.on_completed(on_completed)
    ).subscribe(dst)

# Make an intermediate node
node = seg.make_node_full("node", node_fn)

In this example, we are using 3 different operators: filter, map, and on_completed:

  • The filter operator will only pass through messages where the lambda is evaluated to True
    • In our example, we are filtering out all messages with an odd value.
  • The map operator can transform the incoming value and return a new value
    • In our example, we are doubling the value property and recording the total count and total sum of this property
  • The on_completed function is only called once when there are no more messages to process. You can optionally return a value which will be passed on to the rest of the pipeline.
    • In our example, we are calculating the average from the sum and count values and emitting a new obect with the value set to the mean

In combination, these operators perform a higher level functionality to modify the stream, record some information, and finally print an analysis of all emitted values. Let's see it in practice.

Running the Example

If we run the example, we will see the following output:

$ python ./docs/quickstart/python/srf_qs_python/ex02_reactive_operators/run.py
srf pipeline starting...
Sink: Got Obj Name: Instance-0, Value: 0
Sink: Got Obj Name: Instance-2, Value: 4
Sink: Got Obj Name: Instance-4, Value: 8
Sink: Got Obj Name: Mean, Value: 4.0
srf pipeline completed.

In this example, we emitted 5 values: 0, 1, 2, 3, and 4 but we only see 4 output messages by the sink. This is expected since we were filtering out all of the odd messages and emitting a single message at the end with the mean of all messages.

What would be the output if we increased the number of messages to 6? What would happen if we changed the order of the filter and map operators?