# PyStream Pipeline Demo

This notebook contains some examples of how to use PyStream to construct real time data pipeline. For details, please also visit the package [documentation](https://pystream-pipeline.readthedocs.io/). First, let's import related packages.

In [1]:
import time
from typing import Dict, List, Tuple

import cv2
import numpy as np
from tabulate import tabulate

from pystream import Pipeline, Stage

## Creating Stages

In PyStream, a pipeline is created based on several components, which is called `Stages`. The data given to the pipeline will be passed from one stage to another until they reach the final stage.

Stage can be defined in two ways:
- As a custom class instance. The class must be inherited from ``pystream.Stage`` abstract class and has ``__call__`` and ``cleanup`` methods defined.
- As a function. The function must only takes one argument and return one result of the same type.

Now, let's create a sample stage. A stage below perform some convolutions on an image (3D numpy array with shape HxWx3) contained in a dictionary. The result is then put in the same dictionary as the input, replacing the input image. The class also keeps track of the number of executions and reset the counter to 0 during cleanup.

In [2]:
class DummyStage(Stage):
    """A dummy stage that performs some convolutions to the input 2D array,
    and count how many input it has processed.
    
    For stages in class form, the __call__ and cleanup methods have to be 
    defined, and it is recommended to define 'name' property in the stage 
    instance init
    """

    def __init__(self, name: str) -> None:
        self.count = 0
        self.name = name
        self.kernel = np.random.randint(-10, 10, size=(5, 5))

    def __call__(self, data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        img = data["data"]
        for _ in range(100):
            img = cv2.filter2D(src=img, ddepth=-1, kernel=self.kernel)
        data["data"] = img
        self.count += 1
        return data

    def cleanup(self):
        self.count = 0

We can also define the above stage as a function without the counter.

In [3]:
def dummy_stage_func(data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    """A function that does some convolutions to the input.
    
    This is a dummy stage defined as a function. A stage in function form only 
    takes one argument and return one result with same type"""

    img = data["data"]
    kernel = np.random.randint(-10, 10, size=(5, 5))
    for _ in range(100):
        img = cv2.filter2D(src=img, ddepth=-1, kernel=kernel)
    data["data"] = img
    return data

## Automated Pipeline

You can also run pipeline autonomously, i.e., the data will be generated automatically at each specific seconds. To do that, you need to define a data generator, which is a callable that takes no argument and return the data. In this example, we only want to generate a random image.

In [4]:
def generate_data() -> Dict[str, np.ndarray]:
    img = np.random.randint(0, 255, size=(480, 720, 3), dtype=np.uint8)
    return {"data": img}

Then, let's define some parameters

In [5]:
# Cycle period for the input data
INPUT_PERIOD = 0.2
# Time to run the pipeline
ON_TIME = 5

Create the pipeline. You only need to use `pystream.Pipeline` to create it. Here I put the pipeline creation into `create_pipeline` function. I also made a helper function `print_profile` to print profiling results later. In this particular example we use only the class-based stage.

In [6]:
def create_stages() -> List[DummyStage]:
    # Return 5 dummy stages
    return [DummyStage(f"Stage{i + 1}") for i in range(5)]

def create_pipeline() -> Tuple[Pipeline, List[DummyStage]]:
    # First, create the pipeline instance, we want to use the profiler
    pipeline = Pipeline(input_generator=generate_data, use_profiler=True)
    # Create the stages
    stages = create_stages()
    # Now, add the stages to the pipeline.
    for stage in stages:
        pipeline.add(stage)
    return pipeline, stages

def print_profile(latency: Dict[str, float], throughput: Dict[str, float]) -> None:
    data = []
    for k in latency.keys():
        d = [k, latency[k], throughput[k]]
        data.append(d)
    table = tabulate(data, headers=["Stage", "Latency (s)", "Throughput (d/s)"])
    print(table)

We are ready! Run the pipeline in serial mode. Use `get_profiles` method of `pystream.Pipeline` to get the latency and throughput records.

In [7]:
pipeline, stages = create_pipeline()
print("Starting pipeline in serial...")
pipeline.serialize()
print(f"Streaming data each {INPUT_PERIOD} s...")
pipeline.start_loop(INPUT_PERIOD)
print(f"Waiting for {ON_TIME} s...")
time.sleep(ON_TIME)
print("Stopping pipeline...")
pipeline.stop_loop()

# Let's try read the last result and do cleanup
latest = pipeline.get_results()
print()
print("Last output shape:")
print(latest["data"].shape)
print("Number of processed data:")
print(stages[-1].count)
pipeline.cleanup()
print("Pipeline has been cleaned-up")
print(f"Data counter was reset to {stages[-1].count}.")

# Get the profile
latency, throughput = pipeline.get_profiles()
print()
print("Pipeline profile:")
print_profile(latency, throughput)

Starting pipeline in serial...
Streaming data each 0.2 s...
Waiting for 5 s...


Stopping pipeline...

Last output shape:
(480, 720, 3)
Number of processed data:
6
Pipeline has been cleaned-up
Data counter was reset to 0.

Pipeline profile:
Stage                   Latency (s)    Throughput (d/s)
--------------------  -------------  ------------------
MainPipeline               0.885433             1.10035
MainPipeline__Stage1       0.168262             1.09003
MainPipeline__Stage2       0.170458             1.09568
MainPipeline__Stage3       0.182034             1.09805
MainPipeline__Stage4       0.182642             1.09975
MainPipeline__Stage5       0.181923             1.10035


Let's do the same thing, but now use parallel pipeline instead of serial.

In [8]:
pipeline, stages = create_pipeline()
print("Starting pipeline in parallel...")
pipeline.parallelize()
print(f"Streaming data each {INPUT_PERIOD} s...")
pipeline.start_loop(INPUT_PERIOD)
print(f"Waiting for {ON_TIME} s...")
time.sleep(ON_TIME)
print("Stopping pipeline...")
pipeline.stop_loop()

# Let's try read the last result and do cleanup
latest = pipeline.get_results()
print()
print("Last output shape:")
print(latest["data"].shape)
print("Number of processed data:")
print(stages[-1].count)
pipeline.cleanup()
print("Pipeline has been cleaned-up")
print(f"Data counter was reset to {stages[-1].count}.")

# Get the profile
latency, throughput = pipeline.get_profiles()
print()
print("Pipeline profile:")
print_profile(latency, throughput)

Starting pipeline in parallel...
Streaming data each 0.2 s...
Waiting for 5 s...
Stopping pipeline...

Last output shape:
(480, 720, 3)
Number of processed data:
19
Pipeline has been cleaned-up
Data counter was reset to 0.

Pipeline profile:
Stage                   Latency (s)    Throughput (d/s)
--------------------  -------------  ------------------
MainPipeline               1.15793              4.48034
MainPipeline__Stage1       0.215918             4.58622
MainPipeline__Stage2       0.218174             4.53008
MainPipeline__Stage3       0.204841             4.50127
MainPipeline__Stage4       0.20452              4.47497
MainPipeline__Stage5       0.210609             4.48047


Compare the above profile with the previous one. You might notice several things:

- The throughput has been significantly increased, thanks to the parallelization of the pipeline
- Thus, the number of processed data is increased
- You might get the latency slower than the serial, which is due to the resource utilization of your CPU. In this experiment, we run all of the stages at the same time.

## Manually Triggered Pipeline

Of course, you can also pass the data to the pipeline manually. Just use the `forward` method. Let's recreate the pipeline. This time, we use both class and function-based stages.

In [9]:
def create_pipeline() -> Pipeline:
    # First, create the pipeline instance, we want to use the profiler
    pipeline = Pipeline(input_generator=generate_data, use_profiler=True)
    # Now, add the stages to the pipeline.
    pipeline.add(DummyStage("StageA"))
    pipeline.add(DummyStage("StageB"))
    pipeline.add(dummy_stage_func)
    pipeline.add(dummy_stage_func)
    return pipeline

def print_profile(latency: Dict[str, float], throughput: Dict[str, float]) -> None:
    data = []
    for k in latency.keys():
        d = [k, latency[k], throughput[k]]
        data.append(d)
    table = tabulate(data, headers=["Stage", "Latency (s)", "Throughput (d/s)"])
    print(table)

Define some parameters

In [10]:
# Times to run the pipeline
ON_CYCLE = 10

Try to pass the data manually by using the `forward` method. Let's just use parallel pipeline this time.

In [11]:
pipeline = create_pipeline()
print("Starting pipeline in parallel...")
# Wait for 5 seconds if the pipeline is not ready to take data yet when
# pushing data to the pipeline
pipeline.parallelize(block_input=True, input_timeout=10)

# Generate and forward the data for ON_CYCLE times
for _ in range(ON_CYCLE):
    data = {
        "data": np.random.randint(0, 255, size=(480, 720, 3), dtype=np.uint8)
    }
    pipeline.forward(data)
time.sleep(5)

# Let's try read the last result and do cleanup
latest = pipeline.get_results()
print()
print("Last output shape:")
print(latest["data"].shape)
pipeline.cleanup()
print("Pipeline has been cleaned-up")

# Get the profile
latency, throughput = pipeline.get_profiles()
print()
print("Pipeline profile:")
print_profile(latency, throughput)

Starting pipeline in parallel...

Last output shape:
(480, 720, 3)
Pipeline has been cleaned-up

Pipeline profile:
Stage                    Latency (s)    Throughput (d/s)
---------------------  -------------  ------------------
MainPipeline                1.15573              5.02434
MainPipeline__StageA        0.203491             4.92226
MainPipeline__StageB        0.188007             4.90051
MainPipeline__Stage_1       0.190019             4.91418
MainPipeline__Stage_2       0.188901             5.02444


## Mixed Pipeline

It is also possible to combine serial and parallel pipeline, i.e., embed a pipeline in another pipeline. To do that, you need to convert the child pipeline into a compatible form. It is easy, you only need to invoke `as_stage` method of `Pipeline`.

In this experiment, we will embed a serial pipeline into a parallel pipeline. First, let's define the child pipeline

In [12]:
def create_child_pipeline() -> Pipeline:
    # First, create the pipeline instance, we want to use the profiler
    pipeline = Pipeline(input_generator=generate_data, use_profiler=True)
    # Now, add the stages to the pipeline.
    pipeline.add(DummyStage("StageA"))
    pipeline.add(DummyStage("StageB"))
    return pipeline

child_pipeline = create_child_pipeline()
child_pipeline.serialize()

<pystream.pipeline.pipeline.Pipeline at 0x216908f3700>

Then, create a parent pipeline with the child pipeline in it

In [13]:
def create_parent_pipeline(child_pipeline: Pipeline) -> Pipeline:
    # First, create the pipeline instance, we want to use the profiler
    pipeline = Pipeline(input_generator=generate_data, use_profiler=True)
    # Now, add the stages to the pipeline. The last stage is the child pipeline.
    pipeline.add(DummyStage("Stage1"))
    pipeline.add(DummyStage("Stage2"))
    pipeline.add(child_pipeline.as_stage(), "ChildPipeline")
    return pipeline

parent_pipeline = create_parent_pipeline(child_pipeline)
parent_pipeline.parallelize(block_input=True)

<pystream.pipeline.pipeline.Pipeline at 0x216908f3dc0>

Let's try it out!

In [14]:
# Generate and forward the data for ON_CYCLE times
ON_CYCLE = 10
for _ in range(ON_CYCLE):
    data = {
        "data": np.random.randint(0, 255, size=(480, 720, 3), dtype=np.uint8)
    }
    parent_pipeline.forward(data)
time.sleep(5)

# Let's try read the last result and do cleanup
latest = parent_pipeline.get_results()
print()
print("Last output shape:")
print(latest["data"].shape)
parent_pipeline.cleanup()
print("Pipeline has been cleaned-up")

# Get the profile
latency, throughput = parent_pipeline.get_profiles()
print()
print("Pipeline profile:")
print_profile(latency, throughput)


Last output shape:
(480, 720, 3)
Pipeline has been cleaned-up

Pipeline profile:
Stage                                  Latency (s)    Throughput (d/s)
-----------------------------------  -------------  ------------------
MainPipeline                              1.97543              2.75196
MainPipeline__Stage1                      0.191864             4.40831
MainPipeline__Stage2                      0.191059             3.57146
MainPipeline__ChildPipeline               0.363558             2.75198
MainPipeline__ChildPipeline__StageA       0.187975             2.74605
MainPipeline__ChildPipeline__StageB       0.175553             2.75198


You might have noticed that in the profiling reports, the pipeline levels are separated with `__`.