In [1]:
from __future__ import absolute_import
from __future__ import print_function

from tornado import gen
from flowz import app
from flowz.channels import *

In [2]:
import logging
from logging import config as logconf
logconf.dictConfig({
        'version': 1,
        'loggers': {},
        'disable_existing_loggers': 0,
        'root': {
            'level': 'DEBUG',
            'handlers': ['default_handler'],
            },
        'handlers': {
            'default_handler': {
                'class': 'logging.StreamHandler',
                'stream': 'ext://sys.stdout',
                },
            },
        }
)
logger = logging.getLogger('foo')

# Introduction to Channels

## Channel

A `Channel` gives a means of asynchronous iteration over messages coming from some upstream source.

A consumer of a `Channel` uses its `next()` method to iteratively receive messages as the channel makes them available; when the channel is exhausted, subsequent invocations of `next()` result in a `ChannelDone` exception.  Those calls to `next()`, however, would need to be made in the context of a tornado coroutine, which gets complex pretty quickly.

In practice, though, channels are set up declaratively and passed to an `app.Flo` object to run exhaustively through them via the `run()` method.  That behavior will generally be used in this guide via this function:

In [3]:
def print_chans(*chans):
    app.Flo([chan.map(print) for chan in chans]).run()

The plain `Channel` class is effectivley abstract, with many useful implementations via subclasses.  Many of the important subclasses will be covered in this guide.

## IterChannel

An `IterChannel` converts an iterable into a channel.  This is particularly helpful in this guide for illustrative purposes:

In [4]:
chan = IterChannel(xrange(5))
print_chans(chan)

0
1
2
3
4


Those innocuous lines did the following things:
1. Wrapped an xrange iterator with an IterChannel.
2. Wrapped that channel with a MapChannel (see below) that will print each element in the wrapped channel.
3. Passed that MapChannel to an `app.Flo` object.
4. Fully iterated (asychronously) over that channel.

For now, that seems like a lot a hullabaloo for iterating over five numbers, but the value will become clearer as time goes on.

## TeeChannel [.tee()]

A `TeeChannel` wraps another channel to provide a way to _independently_ iterate over the wrapped channel starting from the same point forwards.  It doesn't create a copy of the channel or the objects in it, but just presents the same objects in the same order via an independent iterator.

In practice, you would never use the `TeeChannel` class directly; you would, instead, just call `.tee()` on an existing channel.

In [5]:
chan1 = IterChannel(xrange(5))
chan2 = chan1.tee()
print_chans(chan1)
print('----')
print_chans(chan2)

0
1
2
3
4
----
0
1
2
3
4


In almost every respect, you can effectively think of chan2 as being an `IterChannel` as well (though `type(chan2)` would not agree).

A slight modification of that example gives a first chance to demonstrate the asynchronous nature of the iteration.

In [6]:
chan1 = IterChannel(xrange(5))
chan2 = chan1.tee()
print_chans(chan1, chan2)

0
0
1
1
2
2
3
3
4
4


Note that the printing of the number interleaves between the two channels.  That is a consequence of the logic running on a tornado loop with each channel yielding control back and forth to each other (and any other coroutines involved).  That asynchrony becomes very important and useful as the channel get more complicated and involve accessing cloud-based storage and other sources that are best accessed asynchronously themselves.

## MapChannel [.map(mapper)]

A `MapChannel` wraps another channel and applies a function to each of the elements in the underlying channel.

In [7]:
chan = MapChannel(IterChannel(xrange(5)), lambda x: x*2)
print_chans(chan)

0
2
4
6
8


The same logic can be performed on any channel with the helper method `.map(mapper)`:

In [8]:
chan = IterChannel(xrange(5)).map(lambda x: x*2)
print_chans(chan)

0
2
4
6
8


Almost all of the useful `Channel` subclasses are accessible via helper methods on the `Channel` class, so the rest of this guide will generally demonstrate the helper methods only.

### Indexing

An occasionally handy variant on mapping is to use the standard python indexing operator `[]` to perform the indexing operation on each element of the channel.

In [9]:
chan = IterChannel(({'first': 'John', 'last': 'Cleese'}, {'first': 'Eric', 'last': 'Idle'}))['last']
print_chans(chan)

Cleese
Idle


## FlatMapChannel [.flat_map(mapper)]

A variant on the `MapChannel` is a `FlatMapChannel`. Its mapper can return an iterable, and the items will be emitted one by one by the channel.  Note the difference in behavior below.

In [10]:
chan = IterChannel(xrange(5))
map_chan = chan.map(lambda x: [x for i in range(x)])
flat_map_chan = chan.tee().flat_map(lambda x: [x for i in range(x)])
print_chans(map_chan)
print('----')
print_chans(flat_map_chan)

[]
[1]
[2, 2]
[3, 3, 3]
[4, 4, 4, 4]
----
1
2
2
3
3
3
4
4
4
4


Note also the care taken

## FilterChannel [.filter(predicate)]

A `FilterChannel` wraps another channel and applies a function to each of the elements in the underlying channel, passing through the element only if the function return true.

In [11]:
chan = IterChannel(xrange(5)).filter(lambda x: x % 2 == 0)
print_chans(chan)

0
2
4


And since these examples are looking a lot like the standard map/filter examples in Python tutorials, we might as well string them together!

In [12]:
chan = IterChannel(xrange(5)).filter(lambda x: x % 2 == 0).map(lambda x: x*2)
print_chans(chan)

0
4
8


## ZipChannel [.zip(*channels)]

A `ZipChannel` returns the items from multiple channels grouped together in a way akin to the built-in `zip` function.  In the resulting `ZipChannel`, the items in `self` and all channels specified will be zipped together on a per-item basis.  The channel on which you're invoking `zip` will be the first, and items from the other channels will follow their order of specification in parameters.

In [14]:
chan1 = IterChannel(xrange(5))
chan2 = chan1.tee().map(lambda x: x * 2)
chan3 = chan1.tee().map(lambda x: x ** 2)
print_chans(chan1.zip(chan2, chan3))

(0, 0, 0)
(1, 2, 1)
(2, 4, 4)
(3, 6, 9)
(4, 8, 16)


## ChainChannel [.chain(*channels)]

A `ChainChannel` simply chains together multiple channels into one channel, as though concatenating them.

In [16]:
print_chans(IterChannel(xrange(3)).chain(IterChannel(xrange(10,13)), IterChannel(xrange(100,103))))

0
1
2
10
11
12
100
101
102


## ObserveChannel [.observe(observer)]

An `ObserveChannel` wraps another channel and passes along its items untouched, but also has the opportunity to run its `observer` function against them.

In [19]:
print_chans(IterChannel(xrange(3)).observe(lambda x: print('I saw %d' % x)))

I saw 0
0
I saw 1
1
I saw 2
2


That's it for basic c