Skip to content

Commit

Permalink
Merge pull request #43 from ethanrowe/release-0.6
Browse files Browse the repository at this point in the history
Release 0.6
  • Loading branch information
ethanrowe committed Aug 19, 2016
2 parents f61db5e + 94d650f commit 534c19e
Show file tree
Hide file tree
Showing 11 changed files with 2,923 additions and 45 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# Version 0.6.0

* Adds the `channelmethod` decorator as a callable-oriented version of `channelproperty` (Issue #41)
* Includes ipython notebook user guide (thanks to Patrick Rusk!)
* Suppresses the "future exception not handled" warnings from tornado for channels. (Issue #30)

# Version 0.5.0

* Adds the new `flowz.channels.tools` module for utility functions/helpers
Expand Down
40 changes: 31 additions & 9 deletions flowz/channels/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from flowz import util


class ChannelDone(Exception):
"""
Exception throw when trying to access a completed channel.
Expand All @@ -20,6 +21,27 @@ class ChannelDone(Exception):
pass


def set_channel_done_exception(fut, loc):
"""
HACK This tweaks the internal state of the future in such a way that:
-- it is still regarded as representing an exception, but
-- its traceback logger no longer has a formatted_tb value
The reason for this is to suppress the checks done by tornado
(in tornado.concurrent._TracebackLogger.__del__)
that print out "Future exception was never retrieved" messages.
This is highly reliant on internals of tornado that might change,
which is why it is wrapped in a broad try-except.
@param fut: the future on which to set the `ChannelDone` exception
@param loc: the location in the code (for logging purposes)
"""
try:
fut.set_exception(ChannelDone("Channel is done (%s)" % loc))
fut._tb_logger.formatted_tb = None
except:
pass


class Channel(object):
"""
An asynchronous, dependency-oriented message transport.
Expand Down Expand Up @@ -135,7 +157,7 @@ def reader(channel):
"""
if self.__done__:
raise ChannelDone("Channel is done")
raise ChannelDone("Channel is done (Channel.next)")

# Only one reader may advance the state at a time.
yield self.__read_blocker__.acquire()
Expand Down Expand Up @@ -376,7 +398,7 @@ def __reader__(self, thischan):
yield read_f
yield gen.moment
except ChannelDone:
head.set_exception(ChannelDone("Channel is done"))
set_channel_done_exception(head, "ReadChannel.__reader__")
except:
# Capture exception information, including traceback
# TODO Possible issue when Python3-compatibility is important
Expand Down Expand Up @@ -480,7 +502,7 @@ def __reader__(self, thischan):
yield gen.moment

except ChannelDone:
head.set_exception(ChannelDone("Channel is done"))
set_channel_done_exception(head, "FlatMapChannel.__reader__")
except:
# Capture exception information, including traceback
# TODO Possible issue when Python3-compatibility is important
Expand Down Expand Up @@ -636,7 +658,7 @@ def __next_item__(self):
# If it's none, we've already been here before; this
# channel is done (tail has been released)
if win is None:
raise ChannelDone("Channel is done")
raise ChannelDone("Channel is done (WindowChannel.__next_item__)")
# If here, this is the first time the input channel through
# the ChannelDone, so we release the windower's tail and
# clear the windower from the channel.
Expand Down Expand Up @@ -775,7 +797,7 @@ def __item_ready__(self, f):
read_f = cc.Future()
head_old.set_result((head_new, read_f, f.result()))
if self._read_done and not self._waiting:
self.__head__.set_exception(ChannelDone("Channel is done"))
set_channel_done_exception(self.__head__, "ReadyFutureChannel.__item_ready__")


@gen.coroutine
Expand Down Expand Up @@ -883,7 +905,7 @@ def put(self, item, exception=False):
yield self.__ready__
last_f = self.__head__
if last_f.done():
raise ChannelDone()
raise ChannelDone("Channel is done (ProducerChannel.put)")
if exception:
last_f.set_exception(item)
else:
Expand All @@ -905,7 +927,7 @@ def close(self):
any other channel.
"""
if not self.__head__.done():
self.__head__.set_exception(ChannelDone("Channel is done"))
set_channel_done_exception(self.__head__, "ProducerChannel.close")


class IterChannel(ProducerChannel):
Expand Down Expand Up @@ -999,7 +1021,7 @@ def __next_item__(self):
raise gen.Return(v)
except ChannelDone:
chn.pop(0)
raise ChannelDone("Channel is done.")
raise ChannelDone("Channel is done (ChainChannel.__next_item__)")



Expand Down Expand Up @@ -1114,7 +1136,7 @@ def __next_item__(self):
# If there are no next_reads, there is no more work to be done.
if not next_reads:
# No qualified keys remaining. We're done.
raise ChannelDone("Channel is done")
raise ChannelDone("Channel is done (CoGroupChannel.__next_item__)")

# Propagates the guys with the lowest qualifying key to the state
# list, and asynchronously fetch their respective channel's next vals.
Expand Down
102 changes: 100 additions & 2 deletions flowz/channels/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def __getitem__(self, key):
raise KeyError("No such key %s" % repr(key))


def _channelproperty(manager_name, fn):
def _channelmethod(manager_name, fn):
@functools.wraps(fn)
def wrapped(self):
manager = getattr(self, manager_name)
Expand All @@ -157,7 +157,11 @@ def wrapped(self):
except KeyError:
manager.add_builder(name, lambda: fn(self))
return manager[name]
return property(wrapped)
return wrapped

def _channelproperty(manager_name, fn):
return property(_channelmethod(manager_name, fn))


def channelproperty(fn_or_name):
"""
Expand Down Expand Up @@ -252,3 +256,97 @@ def pairs(self):
return _channelproperty('channel_manager', fn_or_name)
return lambda fn: _channelproperty(fn_or_name, fn)


def channelmethod(fn_or_name):
"""
Decorates a builder method to act as an access-managed channel get method.
Expects to decorate a callable for an object that has a
:class:`ChannelManager` instance. The callable should return a
channel to be managed.
The result is a method that fronts access to the corresponding
channel via the :class:`ChannelManager`, meaning that first call
of the method will cause the build logic to run, returning the
underlying channel, while each subsequent call of the method will
return a new :meth:`flowz.channels.core.Channel.tee` result.
By default, the :class:`ChannelManager` instance is expected to be
found at the ``channel_manager`` property of the object. However,
this can be overridden; call the decorator with an alternate string
name instead of a function, and you'll get a new decorator function
bound to that opposite name.
An example::
from __future__ import print_function
from flowz import app
from flowz.channels import core
from flowz.channels import management as mgmt
class StupidExample(object):
def __init__(self, count):
self.channel_manager = mgmt.ChannelManager()
self.count = count
# We get a channel of numbers from 0 to count.
@mgmt.channelmethod
def numbers(self):
return core.IterChannel(range(self.count))
# Whereas this is the doubling of those numbers.
@mgmt.channelmethod
def doubles(self):
# self.numbers might be the raw IterChannel, or
# it might be a tee thereof, depending on what
# has happened first.
return self.numbers().map(lambda i: i*2)
# This pairs them up. It is guaranteed that there
# will be at least one tee of self.numbers, because
# it is accessed twice (once directly, once via doubles)
@mgmt.channelmethod
def pairs(self):
return self.numbers().zip(self.doubles())
# This will print
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)
app.Flo([StupidExample(5).pairs().map(print)]).run()
If you wanted to name your :class:`ChannelManager` something else,
this would do it::
class OtherExample(object):
def __init__(self, count):
self.mgr = mgmt.ChannelManager()
self.count = count
@mgmt.channelmethod('mgr')
def numbers(self):
return core.IterChannel(range(self.count))
@mgmt.channelmethod('mgr')
def doubles(self):
return self.numbers().map(lambda i: i*2)
@mgmt.channelmethod('mgr')
def pairs(self):
return self.numbers().zip(self.doubles())
Args:
fn_or_name (callable or string): if a callable, the callable to
be used as the underlying ``buildfunc`` for the managed channel
associated with this method. Otherwise, the name of attribute
at which the :class:`ChannelManager` would be found.
Returns a new method if `fn_or_name` was a callable, and a new decorator
function bound to the alternate attribute name otherwise.
"""
if callable(fn_or_name):
return _channelmethod('channel_manager', fn_or_name)
return lambda fn: _channelmethod(fn_or_name, fn)

0 comments on commit 534c19e

Please sign in to comment.