Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Pipelines

Michael LaCorte edited this page Jan 28, 2014 · 21 revisions

When dealing with realized values, it’s easy to chain functions together:

> (def f #(-> % inc inc))
#'f
> (f 1)
3

But what if the value isn’t realized? We can use on-realized to register a callback on an async-promise:

(on-realized (task 1)
  #(println (-> % inc inc))
  #(println "error:" %))

But once we’ve printed out the value, we can’t do anything further with it. We can’t compose on-realized callbacks like we can functions. Luckily, pipeline give us a way around this:

> (def p (pipeline inc inc))
#'p

A pipeline is a series of functions that accept a single argument, called stages. A pipeline is also a function that accepts a single value; when invoked it will return an async-promise representing that argument passed through each stage, first to last:

> (p 1)
<< 3 >>
> @(p 1)
3

This may seem very similar to Clojure’s comp (albeit with reversed order of execution), but it has an interesting property: if any stage is given an async-promise rather than a value, then evaluation of that stage will wait for the async-promise to become realized. This means that for the pipeline, 1 and (task 1) are equivalent:

> (p (task 1))
<< ... >>
> @(p (task 1))
3

Notice that in the first case, the async-promise is unrealized. This is because in order for the result from the pipeline to become realized, it must first wait for all of its inputs to be realized.

Any of the stages can return async-promises. This pipeline is functionally equivalent to the original pipeline:

(pipeline
  #(task (inc %))
  #(task (inc %)))

To test this, we can use the somewhat more convenient run-pipeline, which takes an initial value followed by zero or more stages. (run-pipeline 1 inc inc) is equivalent to ((pipeline inc inc) 1).

> @(run-pipeline 1
     #(task (inc %))
     #(task (inc %)))
3

Since pipelines are functions that take a single value and return a result, they can be used as a stage within another pipeline. All of these pipelines are equivalent:

(pipeline inc inc)

(pipeline (pipeline inc inc))

(pipeline (pipeline inc) inc)

(pipeline (pipeline (pipeline inc inc)))

error-handler

If an error occurs at any stage, all other stages will be skipped, and the result returned by the pipeline will be realized as that error. You may define an explicit error-handler if you like, which acts as an error callback for all stages in the pipeline:

(pipeline
  {:error-handler (fn [ex] (println "error:" ex))}
  something-that-may-fail)

(run-pipeline 1
  {:error-handler (fn [ex] ...)}
  something-that-may-fail)

If an error occurs and no error-handler is defined, then the error will be automatically logged. If you don’t want to log the error, you can define an empty error-handler callback.

merge-results

Since each stage in a pipeline only takes a single value, it can be awkward to combine async-promises together. While it is possible to do so by closing over a value in a sub-pipeline, given values a and b:

(run-pipeline a
  #(run-pipeline b
     (partial vector a)))

this is not particularly clear or concise. Luckily, there’s merge-results, which accepts zero or more async-promises or values. It returns an async-promise representing a list of everything passed into it, which will only be realized once all of its inputs have been realized.

> (merge-results 1 2 3)
<< (1 2 3) >>
> (merge-results (task 1) 2 (task 3))
<< ... >>>
> @(merge-results (task 1) 2 (task 3))
(1 2 3)

This means that the earlier example can be reduced to:

(merge-results a b)

If any of the inputs are realized as an error, the async-promise returned by merge-results will also be realized as that error.

pipeline options

In addition to :error-handler, a variety of other values may be provided in the pipeline’s option map:

:finally a function which is called with zero arguments when the pipeline completes, either due to success or error
:value the result-channel into which the pipeline’s result will be forwarded, causes pipeline invocation to return nil
:timeout the max duration of the pipeline’s invocation. Once the timeout elapses, the result for the pipeline will be realized as a timeout error. If the pipeline times out in the middle of a stage it won’t terminate computation, but it won’t continue onto the next stage.
:implicit? if true, pipeline stages will show up in sub-traces of instrumented functions
:unwrap? if true, and the pipeline does not need to pause between streams, the pipeline will return an actual value rather than an async-promise
:with-bindings? if true, conveys the thread-local binding context of the initial invocation of the pipeline onto any deferred stages

pipeline redirects

Pipelines are useful for simple data flow, but having a predefined, linear list of stages can make it difficult to have more complex flow control such as branching, short-circuits, or loops. These are not often required, but when they are, Lamina allows any stage to return a redirect signal. There are three redirect signals: redirect, restart, and complete.

(redirect pipeline value) allows you to pass the stream of execution to a new pipeline. The value returned by this new pipeline will be forwarded into the async-promise returned by the original pipeline. This redirection is tail-recursive, allowing pipelines to mutually recur between each other:

(declare decrement-until-zero)

(def check-for-zero
  (pipeline 
    (fn [n]
      (if (zero? n)
        :success!
        (redirect decrement-until-zero n)))))

(def decrement-until-zero
  (pipeline
    (fn [n]
      (redirect check-for-zero (dec n)))))

(decrement-until-zero 1e6)

These pipelines will forward values back and forth, until the number finally reaches zero. Note that the redirect signal is just a value, and must be returned from the stage. Simply calling redirect elsewhere in a stage won’t do anything.

(complete value) works much like redirect, except that it redirects to a pipeline that immediately returns with value. This allows intermediate stages to end the pipeline.

restart is a redirect signal that always returns the top of the current pipeline. It can either be called as (restart val), or simply as (restart) to return the original initial value passed into the pipeline.

When a redirection signal is returned, the pipeline is assumed to have “ended”, and the :finally callback, if any was defined, will be called.

redirects and :error-handler

By default, the :error-handler callback will simply perform a side-effect in response to the error; the callback’s return value does not affect the pipeline’s return value. However, if the error-handler returns a redirect signal, the pipeline will be redirected. This can be used to implement retry-on-error behavior, among other things.


To learn more about how async-promises can be created and used within Lamina, read the entry on result channels.