Skip to content
Kevin Sookocheff edited this page Jun 2, 2015 · 4 revisions

Getting started with the Google App Engine Pipeline API (Python)

Introduction

The purpose of the Pipeline API is to connect together complex, time-consuming workflows (including human tasks). The goals are flexibility, workflow reuse, and testability. Importantly, no tasks or CPU are consumed while workflows block on external events, meaning many, many workflows can be in flight at the same time with minimal resource usage.

The Pipeline API is not a data or stream processing engine. The overall throughput of a single pipeline is limited. There is no fast-path for executing very short stages (though one may be added in the future). There is also no data translation layer, which means pipelines much operate on existing data sources (Blobstore, Datastore, memcache).

What is a Pipeline?

A Pipeline is a function-object that takes inputs, runs some logic using those parameters, and outputs one or more values. Basic Pipelines are little more than synchronous functions that have up to the App Engine deadline to complete (10 minutes for background tasks since 1.4.0). Generator pipelines spawn and connect child pipelines ("stages") by passing input and output parameters between them, enabling developers to express data dependencies while achieving parallelism. Advanced pipelines can be fully asynchronous and even call out to human operators to decide how the pipeline should proceed.

All pipelines must be idempotent. This means that running the same pipeline with the same inputs more than once will yield the same results and the same side-effects. The library does not enforce the idempotence requirement on pipelines, it is up to developers to do it themselves. However, the library provides a few pieces (like stable pipeline IDs) which make it easy to achieve idempotence for side-effects.

Pipelines also have a life cycle. They are initialized, executed, and then finalized. They may be retried and aborted. Each aspect of the life cycle of an advanced pipeline can be customized to accommodate special behaviors.

Add the pipeline library to your app.yaml

To use the library, add it to your list of handlers at the top. The login access to these handlers should not be admin or login restricted. That behavior is enforced by the pipeline library itself.

application: pipeline-test
version: 1
runtime: python
api_version: 1

handlers:
- url: /_ah/pipeline(/.*)?
  script: pipeline.handlers._APP
# Your handlers follow

Defining and running a synchronous pipeline

Here's a synchronous pipeline:

class AddOne(pipeline.Pipeline):

  def run(self, number):
    return number + 1

To run it, simply instantiate the class and start it (e.g., in a cron job):

stage = AddOne(15)
stage.start()
my_pipeline = stage.pipeline_id

# Later on, see if it's done.
stage = AddOne.from_id(my_pipeline)
if stage.has_finalized:
  print stage.outputs.default.value  # Prints 16

A core part of the pipeline library is a functional testing framework for writing end-to-end tests for workflows. Here's an example of verifying this sync pipeline:

stage = AddOne(15)
stage.start_test()
assert stage.outputs.default.value == 16

Defining and running a generator pipeline

Here's a generator pipeline that connects together the sync one from before:

class AddTwo(pipeline.Pipeline):

  def run(self, number):
    result = yield AddOne(number)
    yield AddOne(result)

The yield statement here instructs the runtime environment that the object returned is a Pipeline instance. Child pipelines are spawned as soon as all of their input data dependencies are ready. The return value of yield statements will be PipelineFutures, which allow subsequent pipelines to refer to previous output values. The Pipeline instance yielded last will also provide the default return value of the entire generator pipeline to its caller.

Running generator pipelines works the same way as sync pipelines:

stage = AddTwo(15)
stage.start()
my_pipeline = stage.pipeline_id

# Later on see if it's complete.
stage = AddTwo.from_id(my_pipeline)
if stage.has_finalized:
  print stage.outputs.default.value  # Prints 17

And it can be tested with the testing framework the same way:

stage = AddTwo(15)
stage.start_test()
assert stage.outputs.default.value == 17

Generator pipelines can do almost anything you'd expect to be able to do in the App Engine environment (e.g., URL fetch, Datastore lookups, memcache), but like synchronous pipelines, their execution time is limited to the task request deadline (10 minutes).

Generator pipelines and parameter resolution

One thing generator pipelines cannot do is directly access the outputs of the child Pipelines that it yields. That means this trivial example is not possible:

class AddTwoAndLog(pipeline.Pipeline):

  def run(self, number):
    result = yield AddOne(number)
    final_result = yield AddOne(result)
    logging.info('The value is: %d', final_result)  # Breaks

The reason this doesn't work is that previous outputs (from 'result' and 'final result') are not resolved to their actual values until dependent child pipelines are executed. So, if you want to do something in a generator pipeline with an output value from child, you'll need to define a second Pipeline sub-class (a sibling) to do that work. For our example this would be:

class LogMessage(pipeline.Pipeline):

  def run(self, message, *args):
    logging.info(message, *args)


class AddTwoAndLog(pipeline.Pipeline):

  def run(self, number):
    result = yield AddOne(number)
    final_result = yield AddOne(result)
    yield LogMessage('The value is: %d', final_result)  # Works

Properties of the pipeline execution context

Within a pipeline function you have access to the instance variables of the pipeline instance using the 'self' value. These instance variables can be used as input keys to guarantee idempotence of pipeline operations.

class MyPipeline(pipeline.Pipeline):

  def run(self):
    self.pipeline_id       # My ID, a web-safe stringized hexidecimal number
    self.root_pipeline_id  # The pipeline_id of the root of the dependency tree
    self.args              # My input positional arguments as a list
    self.kwargs            # My input keyword arguments as a dictionary
    self.queue_name        # The queue this pipeline should run on
    self.base_path         # Relative URL of the pipeline library's handlers
    self.outputs           # PipelineFuture for this pipeline's output values
    self.test_mode         # The pipeline is running in test mode
    self.current_attempt   # Current attempt number, starting at 1
    self.was_aborted       # The pipeline was aborted (available during finalized)

Note that while the pipeline executes, the slots in 'self.outputs' will not have been resolved to their actual values.

Named outputs

Pipelines may output to more than a single functional return value. They may also declare (or have implicit) outputs that are named. This makes it easy to pass flags from one stage to the next. For example, to compute Euclid's Greatest Common Divisor algorithm:

class DivideWithRemainder(pipeline.Pipeline):

  output_names = ['remainder']

  def run(self, dividend, divisor):
    self.fill(self.outputs.remainder, dividend % divisor)
    return dividend // divisor


class EuclidGCD(pipeline.Pipeline):

  output_names = ['gcd']

  def run(self, a, b):
    a, b = max(a, b), min(a, b)
    if b == 0:
      self.fill(self.outputs.gcd, a)
      return
    result = yield DivideWithRemainder(a, b)
    recurse = yield EuclidGCD(b, result.remainder)

# To run using the test framework
stage = EuclidGCD(1071, 462)
stage.start_test()
assert stage.outputs.gcd.value == 21

You'll also notice that this solution uses pipelines in a recursive way. The pipeline library allows for tail-recursive-like execution with no limitation of the stack size (though in test mode you are still limited to Python's stack). The stage yielded last from a generator pipeline will inherit all outputs (default and named) that have not yet been filled by the parent.

Finalization of pipelines

After a pipeline has filled all of its outputs (default or named), it will have its finalized() method called. This gives pipelines the opportunity to do any cleanup or notification. Pipelines that are finalizing are guaranteed that all of their output values are present and available while the finalized() method runs (in contrast to inside run(), when no output values are available). For example:

class MultiplyAndEmail(pipeline.Pipeline):

  def run(self, a, b):
    return a * b

  def finalized(self):
    mail.send_mail_to_admins(
        sender='foo@example.com',
        subject='Result of pipeline ID %s' % self.pipeline_id,
        body='%d * %d = %d' % (
            self.args[0], self.args[1], self.outputs.default.value))

When the outputs of a generator pipeline have been passed on to a child pipeline (as is the case with the recursive example from before), the finalized() method will not run until that child has filled all of the inherited output slots. However, the order in which finalized() methods are called is not guaranteed between parent and child pipelines.

Asynchronous pipelines

Pipelines can run in a fully asynchronous manner, meaning that after the run() method returns the pipeline will stay in the "run" state. It is then up to the pipeline itself to ensure that the complete() method for the pipeline is called at a later time (e.g., using the task queue, external input). To mark a pipeline as asynchronous, set the 'async' class property to True.

To facilitate asynchronous execution, the pipeline library provides default URL routing for Pipeline sub-classes. For example, a pipeline with the module path 'foo.bar.Elephant' will have a callback URL available on the path python/_ah/pipeline/callback/foo.bar.Elephant. The library also has convenience methods for invoking the callback() method with keyword parameters (get_callback_url and get_callback_task). This makes it easy to enqueue tasks that hit the callback URL or to have humans click a link to cause the asynchronous pipeline to move forward. Pipeline callback functions can be access restricted with the 'public_callbacks' and 'admin_callbacks' class properties, which both default to False.

Here's an example asynchronous pipeline that simply waits for N seconds:

class Delay(pipeline.Pipeline):
  async = True

  def run(self, seconds=None):
    task = self.get_callback_task(
        countdown=seconds,
        name='delay-' + self.pipeline_id)
    try:
      task.add(self.queue_name)
    except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
      pass

  def run_test(self, seconds=None):
    logging.debug('Delay pipeline pretending to sleep %0.2f seconds', seconds)
    self.complete(seconds)

  def callback(self):
    self.complete(self.kwargs['seconds'])

Note how this pipeline defines a run_test() method; this is used in place of run() when executing the asynchronous pipeline in functional testing mode, and is meant to simulate the functional behavior of the pipeline in production. Pipelines may also define finalized_test() methods for the same purpose.

Asynchronous pipelines need to be especially careful that their run() methods are idempotent, since they may be called multiple times with the same input parameters. The 'pipeline_id' property can help to achieve idempotence. In the example above, this is achieved by setting the task name of the delay callback task ('delay-' + self.pipeline_id) to a stable value and silently dropping any duplicate or tombstone task exceptions.

The callback() method, defined by developers, may return None if it has completed successfully, or return the tuple (status_code, content_type, content) to return an HTTP response to the user or taskqueue system.

TODO: Talk about try_cancel() here

Execution ordering

The pipeline library will try to exploit as much parallelism as possible and uses data dependencies to achieve this. However, sometimes you just want to run a certain set of stages in a particular order, or after a particular stage has completed. The pipeline library provides two operators, After and InOrder, that force execution order where data dependencies do not exist.

For example, you can use After with the Delay and LogMessage pipelines defined above:

class LogWaitLogAfter(pipeline.Pipeline):

  def run(self, message1, message2, delay):
    first = yield LogMessage(message1)
    with pipeline.After(first):
      delay = yield Delay(seconds=delay)
      with pipeline.After(delay)
        yield LogMessage(message2)

      yield LogMessage('This would happen after the first message')

    yield LogMessage('This would happen immediately on run')

Note how jobs that are not within 'with' blocks will run out-of-order even though they're declared in order. You can use the InOrder operator to achieve the same effect:

class LogWaitLogInOrder(pipeline.Pipeline):

  def run(self, message1, message2, delay):
    with pipeline.InOrder():
      yield LogMessage(message1)
      yield Delay(seconds=delay)
      yield LogMessage(message2)

    yield LogMessage('This would happen immediately on run')

An After operator may be nested with itself or InOrder operators, but InOrder operators cannot be nested within themselves. One or more PipelineFutures may be passed to the After operator.

Unfortunately, the Python 'with' operator is not on by default in Python 2.5, so you will need to activate it by adding this magical line to the top of your Python file (after any #!/usr/bin/python2.5 declarations):

from __future__ import with_statement

TODO: Talk about explicit abort and retry

Achieving parallelism

Contributed by Craig Quiter

One of the most powerful aspects of the Pipeline API is the ability to achieve parallelism in a simple, succinct, and elegant syntax.

Let's look at the example of a search engine described by Brett Slatkin here (35:13):

class WordCountUrl(pipeline.Pipeline):
  def run(self, url):
    r = urlfetch.fetch(url)
    return len(r.data.split())

class Sum(pipeline.Pipeline):
  def run(self, *values):
    return sum(values)

class MySearchEnginePipeline(pipeline.Pipeline):
  def run(self, *urls):
    results = []
    for u in urls:
      results.append( (yield WordCountUrl(u)) )
    yield Sum(*results) # Barrier waits

This simple code has the effect of starting taskqueue tasks for each url in urls. These tasks will tend to execute in parallel and App Engine will start allocating additional instances of the app in order to accomodate the size of the job. You can define the rate and manner that these tasks execute in your queue configuration. If you want to use a queue other than default, then call the pipeline start method with the queue_name argument like so:

stage = MySearchEnginePipeline(15)
stage.start(queue_name='pipelinequeue')

Once all the tasks processing word counts complete, each argument to Sum will be ready. In other words, the barriers to Sum's execution will be gone. Sum will then execute and the result will be yielded by MySearchEnginePipeline.

Tutorial

A series of articles explaining the Python Pipeline API in tutorial fashion.

http://sookocheff.com/series/pipelines-api/