Stephan Kuschel, 2020

In [1]:
import generatorpipeline as gp
import time
import random
print(gp.__version__)

v1.0+10.g13bb506.dirty


# The principle of generatorpipeline

Python has a feature called "generators" for memory efficient processing. The function `start_generator` returns a generator yielding data (here a sequence of numbers) and another function `timestwo_g` is processing every individual element one by one. Lets look at the principle in pure python:

In [2]:
def start_generator():
    # create the data-stream
    for i in range(10):
        yield i

def timestwo_g(gen):
    # do something with each element
    for el in gen:
        yield el*2

gen = start_generator()
gen = timestwo_g(gen)
list(gen)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

This is particularly useful when each element is large (e.g. an image) and the Generator has more elements than memory can hold.

Of course, muliple processing steps can be defined and chained together to build a data-processing pipeline.

However, the example has a severe shortcoming: The `timestwo_g` function cannot be used on a single element anymore: the call `timestwo_g(7)` will not work. That would be useful for testing, development or within another function. Further, all function which shall be used as a pipeline must always carry the `for .. in ..` statement.

Using the `generatorpipeline.pipeline` decorator, a function acting on a single element will be able to accept a Generator or Iterator and return a Generator.

In [3]:
@gp.pipeline()
def timestwo(x):
    return x*2

The new `timestwo` function will automatically return a generator when a Iterator or Generator is given as the argument:

In [4]:
gen = iter(range(10))
gen = timestwo(gen)
list(gen)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

If the argument is not a Generator or Iterator, the function will act like the undecorated function:

In [5]:
timestwo(7)

14

It is important, that the function must have only one arguemnt. `kwargs` will be forwarded to every call:

In [6]:
@gp.pipeline()
def multiply(x, y=1):
    return x * y

gen = iter(range(10))
gen = multiply(gen, y=3)
list(gen)

[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

# Multiprocessing

As the processing is done for each element, multiple elements can be processed on multiple cores simulateously. Internally, the `multiprocessing` package is used to distribute the work:

In [7]:
%%time

@gp.pipeline()  # serial execution in the same python process
def multiply(x, y=1):
    time.sleep(0.1)
    return x * y

gen = iter(range(20))
gen = multiply(gen, y=3)
list(gen)

CPU times: user 2.83 ms, sys: 484 µs, total: 3.31 ms
Wall time: 2.01 s


[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57]

In [8]:
%%time

@gp.pipeline(5)  # parallel execution in 5 new processes
def multiply(x, y=1):
    time.sleep(0.1)
    return x * y

gen = iter(range(20))
gen = multiply(gen, y=3)
list(gen)

CPU times: user 24.2 ms, sys: 30 ms, total: 54.2 ms
Wall time: 449 ms


[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57]

Note, that adding more processes is not always faster! The element and the result has to be transferred from and back to the main python process! Its often better to group a large task and have that distributed to minimize inter-process communication.

The pipeline decorator will ensure that the order of elements does not change regardless of how many processors are used! If some elments finish fast while others don't, the `extracache` keyword can be used to enlarge the number of cached elements. Note, that this also requires more elements to be held in memory. By default there are as many elements in the cache as processors available. Default is `extracache=0`.

In [9]:
%%time
random.seed(1)
@gp.pipeline(5)  # parallel execution in 5 new processes
def multiply(x, y=1):
    time.sleep(random.random())
    return x * y

gen = iter(range(20))
gen = multiply(gen, y=3)
list(gen)

CPU times: user 23.9 ms, sys: 18.8 ms, total: 42.7 ms
Wall time: 3.52 s


[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57]

In [10]:
%%time
random.seed(1)
@gp.pipeline(5, extracache=5)  # parallel execution in 5 new processes and 5 additional cache elements (10 elements cached total).
def multiply(x, y=1):
    time.sleep(random.random())
    return x * y

gen = iter(range(20))
gen = multiply(gen, y=3)
list(gen)

CPU times: user 25.3 ms, sys: 19.5 ms, total: 44.8 ms
Wall time: 2.31 s


[0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57]

# Discarding elements

Elements will be removed from the data stream, if the function returns `None`

In [11]:
@gp.pipeline()
def filter_odd(el):
    if el % 2:
        return 
    return el

gen = iter(range(12))
gen = filter_odd(gen)
list(gen)

[0, 2, 4, 6, 8, 10]

`None` elements can be preserved by setting the `skipNone` argument to `False`

In [12]:
@gp.pipeline(skipNone=False)
def filter_odd2(el):
    if el % 2:
        return 
    return el

gen = iter(range(12))
gen = filter_odd2(gen)
list(gen)

[0, None, 2, None, 4, None, 6, None, 8, None, 10, None]

# Inserting Elements
When a decorated function is a generator (i.e. yields Elements), the pipeline will first yield from that generator until it is exhausted before continuing on the main generator. In orther words: Nested generators will automatically flattened out. `None` elements will still be discarded.

Warning: Inserting Elements can only be done inside the current process. Trying to use multiprocessing here, will fail!

In [13]:
@gp.pipeline()  # MUST be in curren process!
def count(nmax):
    for i in range(nmax):
        yield nmax
        
gen = iter(range(7))
gen = count(gen)
list(gen)

[1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6]

In [14]:
count.pipe_info()

Pipe_info(processed=7, yielded=21)[300.00%]

# Iterables and Iterators

The decorated function will be called for every element in the given `Iterator`. A normal `iterable` (like a string) will NOT be iterated over. Note the difference between the following statements.

In [15]:
timestwo('test')  # same as `'test'*2`

'testtest'

In [16]:
gen = timestwo(iter('test'))  # `timestwo` for every character
list(gen)

['tt', 'ee', 'ss', 'tt']

# Benchmark

The following benchmark measures the overhead of the generators per function call. Using multiple cores will create a lot of overhead. This becomes worse if more data needs to be transferred to and from the other process.

In [17]:
def multiply(x, y=1):
    return x * y

n = 1e6
t0 = time.time()
for i in range(int(n)):
     _= multiply(i, y=3)
t1 = time.time()
print(f'A normal function call. time per element: {(t1-t0)/n * 1e6:.2f}us')

A normal function call. time per element: 0.14us


In [18]:
@gp.pipeline() 
def multiply(x, y=1):
    return x * y

n = 1e6
t0 = time.time()
gen = iter(range(int(n)))
gen = multiply(gen, y=3)
_ = list(gen)
t1 = time.time()
print(f'execution in CURRENT process. time per element: {(t1-t0)/n * 1e6:.2f}us')

execution in CURRENT process. time per element: 1.16us


In [19]:
@gp.pipeline(1)
def multiply(x, y=1):
    return x * y

n = 3e3
t0 = time.time()
gen = iter(range(int(n)))
gen = multiply(gen, y=3)
_ = list(gen)
t1 = time.time()
print(f'execution in 1 OTHER process. time per element: {(t1-t0)/n * 1e6:.0f}us')

execution in 1 OTHER process. time per element: 6788us


In [20]:
@gp.pipeline(4)
def multiply(x, y=1):
    return x * y

n = 3e3
t0 = time.time()
gen = iter(range(int(n)))
gen = multiply(gen, y=3)
_ = list(gen)
t1 = time.time()
print(f'execution in 4 OTHER processes. time per element: {(t1-t0)/n * 1e6:.0f}us')

execution in 4 OTHER processes. time per element: 125us
