# Pipelines

Data pipelines allow large amounts of data to be processed without loading into memory at once.

## Pull Pipelines

Generators can be used for iteration based data pipelines

In [47]:
def pipeline_stage_one_a(limit):
    num = 0
    while num <= limit:
        print("pipeline_stage_one_a: stage 1 yield")
        yield num
        num += 1

def pipeline_stage_two(previous_stage_generator):
    for value in previous_stage_generator:
        print("pipeline_stage_two: stage 2 yield")
        yield value * value

def pipeline_stage_three(previous_stage_generator):
    for squared_value in previous_stage_generator:
        squared_value += 1
        print("pipeline_stage_three: stage 3 yield")
        yield squared_value

Create a data pipeline using generators

In [48]:
stage1 = pipeline_stage_one_a(limit=2)          # create initial generator - returns generator object, no execution
stage2 = pipeline_stage_two(stage1)             # pass it to the next in the pipeline
stage3 = pipeline_stage_three(stage2)           # pass it to the next in the pipeline

Use iteration to pull data through pipeline

In [49]:
for i in stage3:
    print("x^2 + 1 =", i)

pipeline_stage_one_a: stage 1 yield
pipeline_stage_two: stage 2 yield
pipeline_stage_three: stage 3 yield
x^2 + 1 = 1
pipeline_stage_one_a: stage 1 yield
pipeline_stage_two: stage 2 yield
pipeline_stage_three: stage 3 yield
x^2 + 1 = 2
pipeline_stage_one_a: stage 1 yield
pipeline_stage_two: stage 2 yield
pipeline_stage_three: stage 3 yield
x^2 + 1 = 5


Same can be done using generator comprehensions

In [50]:
stage1 = (x for x in range(3))                 # create initial generator
stage2 = (x*x for x in stage1)                  # pass it to the next in the pipeline
stage3 = (x+1 for x in stage2)                  # pass it to the next in the pipeline

# here we use iteration to pull the data through the pipeline from the end
for i in stage3:
    print("x^2 + 1 =", i)

x^2 + 1 = 1
x^2 + 1 = 2
x^2 + 1 = 5


You can create many-to-one generators in the pipeline with `zip`

In [51]:
def pipeline_stage_one_b(limit):
    num = 0
    while num <= limit:
        print("pipeline_stage_one_b: stage 1a yield")
        yield f"(x = {num})"
        num += 1

def pipeline_stage_multiplex(previous_stage_generator_a, previous_stage_generator_b):
    for squared_value, value_label in zip(previous_stage_generator_a, previous_stage_generator_b):
        squared_value += 1
        print("pipeline_stage_multiplex: stage 3 yield")
        yield f"{squared_value} {value_label}"

stage1a = pipeline_stage_one_a(limit=2)                # create first generator
stage1b = pipeline_stage_one_b(limit=2)                # create second generator
stage2 = pipeline_stage_two(stage1a)                    # pass first to the next in the pipeline
stage3 = pipeline_stage_multiplex(stage2, stage1b)      # pass two generators to the next in the pipeline

for i in stage3:
    print("x^2 + 1 =", i)

pipeline_stage_one_a: stage 1 yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one_b: stage 1a yield
pipeline_stage_multiplex: stage 3 yield
x^2 + 1 = 1 (x = 0)
pipeline_stage_one_a: stage 1 yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one_b: stage 1a yield
pipeline_stage_multiplex: stage 3 yield
x^2 + 1 = 2 (x = 1)
pipeline_stage_one_a: stage 1 yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one_b: stage 1a yield
pipeline_stage_multiplex: stage 3 yield
x^2 + 1 = 5 (x = 2)


## Push Pipelines

Coroutines can be used to create producer-consumer data pipelines

In [52]:
def pipeline_producer(next_stage_coroutine, limit):
    num = 0
    _return_value = next_stage_coroutine.send(None)                 # prime the next stage in the pipeline
    while num <= limit:
        print("pipeline_producer: stage producer send")
        _return_value = next_stage_coroutine.send(num)
        print("pipeline_producer: return value from producer send =", _return_value)
        num += 1
    next_stage_coroutine.close()

def pipeline_stage_one(next_stage_coroutine):
    _return_value = next_stage_coroutine.send(None)
    while True:
        print("pipeline_stage_one: stage 1 yield")
        value = (yield _return_value)                               # yield back up the pipeline
        print("pipeline_stage_one: stage 1 send")
        _return_value = next_stage_coroutine.send(value * value)    # send down the pipeline

def pipeline_stage_two(next_stage_coroutine):
    _return_value = next_stage_coroutine.send(None)
    while True:
        print("pipeline_stage_two: stage 2 yield")
        value = (yield _return_value)
        print("pipeline_stage_two: stage 2 send")
        _return_value = next_stage_coroutine.send(value + 1)

Now create a consumer for the pipeline.

In [53]:
def pipeline_stage_consumer(prefix_string):
    while True:
        print("pipeline_stage_consumer: stage consumer yield")
        value = (yield 'Consumed')
        print(prefix_string, value)

Build the pipeline starting with the consumer

In [54]:
stage_consumer = pipeline_stage_consumer("x^2 + 1 =")
stage2 = pipeline_stage_two(stage_consumer)
stage1 = pipeline_stage_one(stage2)

Finally, push data data through the pipeline by calling producer.

In [55]:
pipeline_producer(stage1, 2)

pipeline_stage_consumer: stage consumer yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one: stage 1 yield
pipeline_producer: stage producer send
pipeline_stage_one: stage 1 send
pipeline_stage_two: stage 2 send
x^2 + 1 = 1
pipeline_stage_consumer: stage consumer yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one: stage 1 yield
pipeline_producer: return value from producer send = Consumed
pipeline_producer: stage producer send
pipeline_stage_one: stage 1 send
pipeline_stage_two: stage 2 send
x^2 + 1 = 2
pipeline_stage_consumer: stage consumer yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one: stage 1 yield
pipeline_producer: return value from producer send = Consumed
pipeline_producer: stage producer send
pipeline_stage_one: stage 1 send
pipeline_stage_two: stage 2 send
x^2 + 1 = 5
pipeline_stage_consumer: stage consumer yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one: stage 1 yield
pipeline_producer: return value from producer send = Consumed


Create a data pipeline with broadcasting coroutines

In [56]:
def pipeline_stage_broadcaster(next_stage_coroutines):
    for _sink in next_stage_coroutines:
        _sink.send(None)
    while True:
        print("pipeline_stage_broadcaster: stage broadcast yield")
        value = (yield 'Broadcasting')
        print("pipeline_stage_broadcaster: stage broadcasting")
        for _sink in next_stage_coroutines:
            _sink.send(value)

stage_consumer1 = pipeline_stage_consumer("consumer1: x^2 + 1 =")
stage_consumer2 = pipeline_stage_consumer("consumer2: x^2 + 1 =")
stage_broadcaster = pipeline_stage_broadcaster([stage_consumer1, stage_consumer2])
stage2 = pipeline_stage_two(stage_broadcaster)
stage1 = pipeline_stage_one(stage2)

Again, push data data through the pipeline by calling producer.

In [57]:
pipeline_producer(stage1, 2)

pipeline_stage_consumer: stage consumer yield
pipeline_stage_consumer: stage consumer yield
pipeline_stage_broadcaster: stage broadcast yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one: stage 1 yield
pipeline_producer: stage producer send
pipeline_stage_one: stage 1 send
pipeline_stage_two: stage 2 send
pipeline_stage_broadcaster: stage broadcasting
consumer1: x^2 + 1 = 1
pipeline_stage_consumer: stage consumer yield
consumer2: x^2 + 1 = 1
pipeline_stage_consumer: stage consumer yield
pipeline_stage_broadcaster: stage broadcast yield
pipeline_stage_two: stage 2 yield
pipeline_stage_one: stage 1 yield
pipeline_producer: return value from producer send = Broadcasting
pipeline_producer: stage producer send
pipeline_stage_one: stage 1 send
pipeline_stage_two: stage 2 send
pipeline_stage_broadcaster: stage broadcasting
consumer1: x^2 + 1 = 2
pipeline_stage_consumer: stage consumer yield
consumer2: x^2 + 1 = 2
pipeline_stage_consumer: stage consumer yield
pipeline_stage_broadcaster