Skip to content

Latest commit

 

History

History
87 lines (60 loc) · 6.16 KB

File metadata and controls

87 lines (60 loc) · 6.16 KB

Simple Pipeline

This example illustrates how to create a simple pipeline with a single source, node, and sink connected together in C++. Each of the nodes in the segment ("node" here refers to either a source, sink or an object that is both a source and sink) is responsible for a simple task:

  • Source: Creates 3 int in the sequence 1, 2, 3 and completes
  • Node: Transforms the int to a float by multiplying it by 2.5, resulting in a float
  • Sink: Prints any received float value and counts the number of emitted items

Each of the objects in the Segment is created using the segment::Builder::make_XXX(NAME, ...) function where XXX is replace with either source, sink or node.

Once each object is created, they can be linked together using segment::Builder::make_edge(SOURCE, SINK). There are a few rules when making edges:

  • Objects deriving from srf::node::SourceProperties can only appear in the left-hand argument of make_edge()
  • Objects deriving from srf::node::SinkProperties can only appear in the right-hand argument of make_edge()
  • Node objects can appear in either side since they derive from both srf::node::SourceProperties and srf::node::SinkProperties
  • Sources can only be connected to one downstream sink
    • To use multiple downstream sinks with broadcast or round-robin functionality, see the guide on operators
  • Sinks can accept multiple upstream sources
    • Its possible to connect two sources to a single sink. The sink will process the messages in the order that they are received in

Creating the Source

The first "node" that we will be creating is a source. Source objects have no upstream dependencies and are responsible for producing data to be consume by downstream object.

To create a source, you call make_source<T> on the segment::Builder object passing a name and a lambda of type std::function<(rxcpp::subscriber<T>)> Please see https://reactivex.io/RxCpp/ and navigate to rxcpp::subscriber<T, Observer> for more detail on the lambda type. where T is the type of object that the source will be producing. In our example, we are creating integers so the source object looks like:

auto source = s.make_source<int>("int_source", [](rxcpp::subscriber<int> s) {
    s.on_next(1);
    s.on_next(2);
    s.on_next(3);
    s.on_completed();
});

The reason that we pass a lambda to make_source is because this function is only called once the source is started. Inside of our lambda, the source generates objects and then passes them to downstream objects by calling s.on_next(). Once completed, sources indicate that no more values will be generated by either calling s.on_completed() if the source completed successfully, or s.on_error() if the source completed with an error. After this point, it is undefined behavior to call s.on_next or any other function on s after on_completed or on_error is called.

Creating the Node

Node objects work as both a source and a sink and are responsible for connecting the upstream source to any downstream sink. Because the upstream source and downstream sink can be different types, make_node<T, R> accepts two template arguments for the source type and sink type respectively. To make building nodes easier, SRF supports Reactive operators in the make_node function. Please see https://reactivex.io/RxCpp/ and navigate to Operators for more details on Reactive operators. These Reactive operators can be used to chain multiple steps together and simplify node creation.

For our example, we will only be using a single Reactive operator: map. To build the node, we need to call make_node and provide the source type, sink type, node name and node operators:

auto node = s.make_node<int, float>("int_to_float", rxcpp::operators::map([](const int& data) {
                                        // Multiple the input value returning a float
                                        return float(2.5f * data);
                                    }));

In our example, the map operator takes a lambda that will be called for each value that passes through the node. The result returned by this lambda will be passed on to the next downstream node. Our lambda is very simple, only multipling the input value by 2.5 and returning a float.

Creating the Sink

The final "node" we will be making is a sink object. Sinks are the opposite of sources and are terminators in the pipeline graph. They only accept upstream connections and do not provide the ability to pass data on. To make a sink, we need to provide the sink type, sink name, and an rxcpp::observer shown here:

auto sink = s.make_sink<float>("float_sink", rxcpp::make_observer_dynamic<float>([&counter](float data) {
                                    counter++;
                                    std::cout << "sink: " << data << std::endl;
                                }));

We build up the rxcpp::observer using rxcpp::make_observer_dynamic which allows us to create the observer using only lambda functions. Each observer has 3 lambdas, an on_next, and on_error and an on_completed. These are the functions that get called by the respective calls in the source object.

In our sink, we are simply printing the values passed to the sink, and also incrementing a counter for each message received. Using lambda captures like this is the best way to get data out of the sink.

Connecting Everything Together

To connect sources and sinks together, you simply need to call s.make_edge(source, sink) passing the source and sink objects. During compilation, the source and sink types will be checked for compatibility and result in a compilation error if they are incompatible.

Running the Example

We can see this simple pipeline in action by running the example in the build folder:

$ ${QSG_BUILD_DIR}/docs/quickstart/cpp/ex00_simple_pipeline/ex00_simple_pipeline.x
srf pipeline starting...
sink: 2.5
sink: 5
sink: 7.5
srf pipeline complete: counter should be 3; counter=3

Where ${QSG_BUILD_DIR} is the output location of the CMake build.

We can see that the sink function was called 3 times, one for each value emitted by the source. What happens if you change the number of on_next statements in the source object?