Concurrent data pipelines made easy
Clone or download
Latest commit 29947c4 Oct 15, 2018
Permalink
Failed to load latest commit information.
benchmarks/100_million_downloads update benchmark using on_start + on_done Sep 28, 2018
docker remove wrapt dependency Sep 24, 2018
docs 0.1.4 Oct 15, 2018
examples refactor: long module names Sep 24, 2018
experiments 0.1.4 Oct 15, 2018
pypeln 0.1.4 Oct 15, 2018
scripts remove wrapt dependency Sep 24, 2018
tests 0.1.4 Oct 15, 2018
.gitignore init readme Sep 18, 2018
LICENSE Initial commit Sep 1, 2018
README.md Mixed Pipelines in Readme Sep 25, 2018
book.json update gitbook Sep 24, 2018
docker-compose.yml tests Sep 22, 2018
requirements.txt finish lazy io Sep 18, 2018
setup.py 0.1.4 Oct 15, 2018

README.md

Pypeline

Pypeline is a simple yet powerful python library for creating concurrent data pipelines.

  • Pypeline was designed to solve simple medium data tasks that require concurrency and parallelism but where using frameworks like Spark or Dask feel exaggerated or unnatural.
  • Pypeline exposes an easy to use, familiar, functional API.
  • Pypeline enables you to build pipelines using Processes, Threads and asyncio.Tasks via the exact same API.
  • Pypeline allows you to have control over the memory and cpu resources used at each stage of your pipeline.

Installation

Install Pypeline using pip:

pip install pypeln

Basic Usage

With Pypeline you can easily create multi-stage data pipelines using 3 type of workers:

Processes

You can create a pipeline based on multiprocessing.Process workers by using the process module:

from pypeln import process as pr
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pr.map(slow_add1, data, workers = 3, maxsize = 4)
stage = pr.filter(slow_gt3, stage, workers = 2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

At each stage the you can specify the numbers of workers. The maxsize parameter limits the maximum amount of elements that the stage can hold simultaneously.

Threads

You can create a pipeline based on threading.Thread workers by using the thread module:

from pypeln import thread as th
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = th.map(slow_add1, data, workers = 3, maxsize = 4)
stage = th.filter(slow_gt3, stage, workers = 2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Here we have the exact same situation as in the previous case except that the worker are Threads.

Tasks

You can create a pipeline based on asyncio.Task workers by using the asyncio_task module:

from pypeln import asyncio_task as aio
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = aio.map(slow_add1, data, workers = 3, maxsize = 4)
stage = aio.filter(slow_gt3, stage, workers = 2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Conceptually similar but everything is running in a single thread and Task workers are created dynamically.

Mixed Pipelines

You can create pipelines using different worker types such that each type is the best for its given task so you can get the maximum performance out of your code:

data = get_iterable()
data = aio.map(f1, data, workers = 100)
data = th.flat_map(f2, data, workers = 10)
data = filter(f3, data)
data = pr.map(f4, data, workers = 5, maxsize = 200)

Notice that here we even used a regular python filter, since stages are iterables Pypeline integrates smoothly with any python code, just be aware of how each stage behaves.

For more information see the Pypeline Guide.

Pipe Operator

In the spirit of being a true pipeline library, Pypeline also lets you create your pipelines using the pipe | operator:

data = (
    range(10)
    | pr.map(slow_add1, workers = 3, maxsize = 4)
    | pr.filter(slow_gt3, workers = 2)
    | list
)

Benchmarks

Resources

Related Stuff

Contributors