Skip to content

Commit

Permalink
eventual.py: add eventual-send operator, copied (with tests) from Foo…
Browse files Browse the repository at this point in the history
…lscap
  • Loading branch information
warner committed Feb 15, 2010
1 parent 4e102fd commit 3785c59
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 0 deletions.
82 changes: 82 additions & 0 deletions buildbot/eventual.py
@@ -0,0 +1,82 @@

# copied from foolscap

from twisted.internet import reactor, defer
from twisted.python import log

class _SimpleCallQueue(object):
# XXX TODO: merge epsilon.cooperator in, and make this more complete.
def __init__(self):
self._events = []
self._flushObservers = []
self._timer = None
self._in_turn = False

def append(self, cb, args, kwargs):
self._events.append((cb, args, kwargs))
if not self._timer:
self._timer = reactor.callLater(0, self._turn)

def _turn(self):
self._timer = None
self._in_turn = True
# flush all the messages that are currently in the queue. If anything
# gets added to the queue while we're doing this, those events will
# be put off until the next turn.
events, self._events = self._events, []
for cb, args, kwargs in events:
try:
cb(*args, **kwargs)
except:
log.err()
self._in_turn = False
if self._events and not self._timer:
self._timer = reactor.callLater(0, self._turn)
if not self._events:
observers, self._flushObservers = self._flushObservers, []
for o in observers:
o.callback(None)

def flush(self):
"""Return a Deferred that will fire (with None) when the call queue
is completely empty."""
if not self._events and not self._in_turn:
return defer.succeed(None)
d = defer.Deferred()
self._flushObservers.append(d)
return d


_theSimpleQueue = _SimpleCallQueue()

def eventually(cb, *args, **kwargs):
"""This is the eventual-send operation, used as a plan-coordination
primitive. The callable will be invoked (with args and kwargs) in a later
reactor turn. Doing 'eventually(a); eventually(b)' guarantees that a will
be called before b.
Any exceptions that occur in the callable will be logged with log.err().
If you really want to ignore them, be sure to provide a callable that
catches those exceptions.
This function returns None. If you care to know when the callable was
run, be sure to provide a callable that notifies somebody.
"""
_theSimpleQueue.append(cb, args, kwargs)


def fireEventually(value=None):
"""This returns a Deferred which will fire in a later reactor turn, after
the current call stack has been completed, and after all other deferreds
previously scheduled with callEventually().
"""
d = defer.Deferred()
eventually(d.callback, value)
return d

def flushEventualQueue(_ignored=None):
"""This returns a Deferred which fires when the eventual-send queue is
finally empty. This is useful to wait upon as the last step of a Trial
test method.
"""
return _theSimpleQueue.flush()
70 changes: 70 additions & 0 deletions buildbot/test/test_eventual.py
@@ -0,0 +1,70 @@

from twisted.trial import unittest
from twisted.internet import defer

from buildbot.eventual import eventually, fireEventually, flushEventualQueue

class TestEventual(unittest.TestCase):

def tearDown(self):
return flushEventualQueue()

def testSend(self):
results = []
eventually(results.append, 1)
self.failIf(results)
def _check():
self.failUnlessEqual(results, [1])
eventually(_check)
def _check2():
self.failUnlessEqual(results, [1,2])
eventually(results.append, 2)
eventually(_check2)

def testFlush(self):
results = []
eventually(results.append, 1)
eventually(results.append, 2)
d = flushEventualQueue()
def _check(res):
self.failUnlessEqual(results, [1,2])
d.addCallback(_check)
return d

def testFlush2(self):
added = []
called = []
done_d = defer.Deferred()
def _then():
called.append("f1")
added.append("f2")
d = flushEventualQueue()
d.addCallback(lambda ign: called.append("f2"))
def _second_flush_done(ign):
done_d.callback(None)
d.addCallback(_second_flush_done)
added.append("f1")
eventually(_then)
added.append(1)
eventually(called.append, 1)
added.append(2)
eventually(called.append, 2)
d = flushEventualQueue()
d.addCallback(flushEventualQueue)
d.addCallback(lambda ign: done_d)
def _check(res):
self.failUnlessEqual(called, ["f1", 1, 2, "f2"])
self.failUnlessEqual(added, called)
d.addCallback(_check)
return d

def testFire(self):
results = []
fireEventually(1).addCallback(results.append)
fireEventually(2).addCallback(results.append)
self.failIf(results)
def _check(res):
self.failUnlessEqual(results, [1,2])
d = flushEventualQueue()
d.addCallback(_check)
return d

0 comments on commit 3785c59

Please sign in to comment.