Stephan Kuschel, 2020

In [1]:
import generatorpipeline as gp
import time
print(gp.__version__)

v0.1+16.g9afd1ee


# The principle of generatorpipeline

To process many elements memory efficiently in python, generators can be used in pytheon. One generator `start_generator` is generating the data (here a sequence of numbers) and another function `timestwo_g` is processing every individual element. Lets look at the principle in pure python first:

In [2]:
def start_generator():
    # create the "data"
    for i in range(10):
        yield i

def timestwo_g(gen):
    # do something with each element
    for el in gen:
        yield el*2

gen = start_generator()
gen = timestwo_g(gen)
list(gen)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Instead of numbers there could also use large objects, like images, as the generator will create and process one element after the other. It is possible to define muliple processing steps and chain them together to build a data-processing pipeline.

The example has some severe shortcoming: The `timestwo_g` function cannot be used on a single element (useful for testing, development or within another function): the call `timestwo_g(7)` will not work.

With `generatorpipeline` the function can still be used without a generator and the example becomes much easier:

In [3]:
@gp.pipeline()
def timestwo(x):
    return x*2

The new `timestwo` function will automatically return a generator when a Iterator or Generator is given as the argument:

In [59]:
gen = iter(range(10))
gen = timestwo(gen)
list(gen)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

If the argument is not an Iterator, the function will act as the undecorated function:

In [60]:
timestwo(7)

14

It is important, that the function must have only one arguemnt. `kwargs` will be forwarded to every call:

In [6]:
@gp.pipeline()
def multiply(x, y=1):
    return x * y

gen = iter(range(10))
gen = multiply(gen, y=3)
list(gen)

[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

# Multiprocessing

As the processing is done for each element, they can be processed on multiple cores. Internally, the `multiprocessing` package is used to distribute the work:

In [13]:
%%time

@gp.pipeline()  # serial execution in the same python process
def multiply(x, y=1):
    time.sleep(0.1)
    return x * y

gen = iter(range(20))
gen = multiply(gen, y=3)
list(gen)

CPU times: user 2.37 ms, sys: 526 µs, total: 2.9 ms
Wall time: 2 s


[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57]

In [8]:
%%time

@gp.pipeline(5)  # parallel execution in 5 new processes
def multiply(x, y=1):
    time.sleep(0.1)
    return x * y

gen = iter(range(20))
gen = multiply(gen, y=3)
list(gen)

CPU times: user 25.1 ms, sys: 11.9 ms, total: 37.1 ms
Wall time: 439 ms


[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57]

Note, that adding more processes is not always faster! The element and the result will always be transferred from and back to the main python process! Its often better group a large task and have that distributed to minimize inter-process communication.

# Discarding elements

Elements will be removed from the data stream, if the function returns `None`

In [9]:
@gp.pipeline()
def filter_odd(el):
    if el % 2:
        return 
    return el

gen = iter(range(12))
gen = filter_odd(gen)
list(gen)

[0, 2, 4, 6, 8, 10]

`None` elements can be preserved by setting the `skipNone` argument to `False`

In [61]:
@gp.pipeline(skipNone=False)
def filter_odd2(el):
    if el % 2:
        return 
    return el

gen = iter(range(12))
gen = filter_odd2(gen)
list(gen)

[0, None, 2, None, 4, None, 6, None, 8, None, 10, None]

# Iterables and Iterators

The decorated function will be called for every element in the given `Iterator`. A normal `iterable` (like a string) will NOT be iterated over. Note the difference between the following statements.

In [18]:
timestwo('test')  # same as `'test'*2`

'testtest'

In [23]:
gen = timestwo(iter('test'))  # `timestwo` for every character
list(gen)

['tt', 'ee', 'ss', 'tt']

# Benchmark

The following benchmark only measures the overhead of the generators per function call. Particularly using multiple cores will create a lot of overhead. This becomes worse if more data needs to be transferred to and from the other process.

In [62]:
def multiply(x, y=1):
    return x * y

n = 1e6
t0 = time.time()
for i in range(int(n)):
     _= multiply(i, y=3)
t1 = time.time()
print(f'A normal function call. time per element: {(t1-t0)/n * 1e6:.2f}us')

A normal function call. time per element: 0.21us


In [55]:
@gp.pipeline() 
def multiply(x, y=1):
    return x * y

n = 1e6
t0 = time.time()
gen = iter(range(int(n)))
gen = multiply(gen, y=3)
_ = list(gen)
t1 = time.time()
print(f'execution in CURRENT process. time per element: {(t1-t0)/n * 1e6:.2f}us')

execution in CURRENT process. time per element: 1.02us


In [56]:
@gp.pipeline(1)
def multiply(x, y=1):
    return x * y

n = 3e3
t0 = time.time()
gen = iter(range(int(n)))
gen = multiply(gen, y=3)
_ = list(gen)
t1 = time.time()
print(f'execution in 1 OTHER process. time per element: {(t1-t0)/n * 1e6:.0f}us')

execution in 1 OTHER process. time per element: 491us


In [57]:
@gp.pipeline(4)
def multiply(x, y=1):
    return x * y

n = 3e3
t0 = time.time()
gen = iter(range(int(n)))
gen = multiply(gen, y=3)
_ = list(gen)
t1 = time.time()
print(f'execution in 4 OTHER processes. time per element: {(t1-t0)/n * 1e6:.0f}us')

execution in 4 OTHER processes. time per element: 155us
