Skip to content

Commit

Permalink
make startConsuming return a Deferred
Browse files Browse the repository at this point in the history
This will be necessary for some MQ plugins.  Fixes #2669.
  • Loading branch information
djmitche committed May 5, 2014
1 parent f73865d commit a12242d
Show file tree
Hide file tree
Showing 31 changed files with 147 additions and 89 deletions.
4 changes: 3 additions & 1 deletion master/buildbot/data/connector.py
Expand Up @@ -120,9 +120,11 @@ def get(self, path, filters=None, fields=None, order=None,
rv = resultSpec.apply(rv)
defer.returnValue(rv)

@defer.inlineCallbacks
def startConsuming(self, callback, options, path):
endpoint, kwargs = self.getEndpoint(path)
return endpoint.startConsuming(callback, options, kwargs)
ref = yield endpoint.startConsuming(callback, options, kwargs)
defer.returnValue(ref)

def control(self, action, args, path):
endpoint, kwargs = self.getEndpoint(path)
Expand Down
3 changes: 2 additions & 1 deletion master/buildbot/mq/simple.py
Expand Up @@ -18,6 +18,7 @@
from buildbot import config
from buildbot.mq import base
from buildbot.util import tuplematch
from twisted.internet import defer
from twisted.python import log


Expand Down Expand Up @@ -53,7 +54,7 @@ def startConsuming(self, callback, filter, persistent_name=None):
else:
qref = QueueRef(self, callback, filter)
self.qrefs.append(qref)
return qref
return defer.succeed(qref)


class QueueRef(base.QueueRef):
Expand Down
8 changes: 5 additions & 3 deletions master/buildbot/process/botmaster.py
Expand Up @@ -143,6 +143,7 @@ def getBuildernames(self):
def getBuilders(self):
return self.builders.values()

@defer.inlineCallbacks
def startService(self):
def buildRequestAdded(key, msg):
self.maybeStartBuildsForBuilder(msg['buildername'])
Expand All @@ -154,13 +155,14 @@ def buildRequestAdded(key, msg):
# self.buildrequest_consumer_new = self.master.mq.startConsuming(
# buildRequestAdded,
# ('buildrequests', None, None, None, 'new'))
self.buildrequest_consumer_new = self.master.mq.startConsuming(
startConsuming = self.master.mq.startConsuming
self.buildrequest_consumer_new = yield startConsuming(
buildRequestAdded,
('buildsets', None, 'builders', None, 'buildrequests', None, 'new'))
self.buildrequest_consumer_unclaimed = self.master.mq.startConsuming(
self.buildrequest_consumer_unclaimed = yield startConsuming(
buildRequestAdded,
('buildrequests', None, None, None, 'unclaimed'))
return service.AsyncMultiService.startService(self)
yield service.AsyncMultiService.startService(self)

@defer.inlineCallbacks
def reconfigService(self, new_config):
Expand Down
4 changes: 2 additions & 2 deletions master/buildbot/schedulers/base.py
Expand Up @@ -113,18 +113,18 @@ def getPendingBuildTimes(self):

# change handling

@defer.inlineCallbacks
def startConsumingChanges(self, fileIsImportant=None, change_filter=None,
onlyImportant=False):
assert fileIsImportant is None or callable(fileIsImportant)

# register for changes with the data API
assert not self._change_consumer
self._change_consumer = self.master.data.startConsuming(
self._change_consumer = yield self.master.data.startConsuming(
lambda k, m: self._changeCallback(k, m, fileIsImportant,
change_filter, onlyImportant),
{},
('changes',))
return defer.succeed(None)

@defer.inlineCallbacks
def _changeCallback(self, key, msg, fileIsImportant, change_filter,
Expand Down
4 changes: 2 additions & 2 deletions master/buildbot/schedulers/dependent.py
Expand Up @@ -46,12 +46,12 @@ def __init__(self, name, upstream, builderNames, properties={}, **kwargs):
def activate(self):
yield base.BaseScheduler.deactivate(self)

self._buildset_new_consumer = self.master.data.startConsuming(
self._buildset_new_consumer = yield self.master.data.startConsuming(
self._buildset_new_cb,
{}, ('buildsets',))
# TODO: refactor to subscribe only to interesting buildsets, and
# subscribe to them directly, via the data API
self._buildset_complete_consumer = self.master.mq.startConsuming(
self._buildset_complete_consumer = yield self.master.mq.startConsuming(
self._buildset_complete_cb,
('buildsets', None, 'complete'))

Expand Down
22 changes: 15 additions & 7 deletions master/buildbot/schedulers/triggerable.py
Expand Up @@ -20,6 +20,7 @@
from buildbot.schedulers import base
from twisted.internet import defer
from twisted.python import failure
from twisted.python import log


class Triggerable(base.BaseScheduler):
Expand Down Expand Up @@ -62,13 +63,18 @@ def trigger(self, waited_for, sourcestamps=None, set_props=None,
def setup_waiter(ids):
bsid, brids = ids
self._waiters[bsid] = (resultsDeferred, brids)
self._updateWaiters()
return ids
d = self._updateWaiters()
d.addCallback(lambda _: ids)
return d

idsDeferred.addCallback(setup_waiter)
return idsDeferred, resultsDeferred

@defer.inlineCallbacks
def stopService(self):
# finish any _updateWaiters calls
yield self._updateWaiters.stop()

# cancel any outstanding subscription
if self._buildset_complete_consumer:
self._buildset_complete_consumer.stopConsuming()
Expand All @@ -81,11 +87,13 @@ def stopService(self):
d.errback(failure.Failure(RuntimeError(msg)))
self._waiters = {}

return base.BaseScheduler.stopService(self)
yield base.BaseScheduler.stopService(self)

@defer.inlineCallbacks
def _updateWaiters(self):
if self._waiters and not self._buildset_complete_consumer:
self._buildset_complete_consumer = self.master.mq.startConsuming(
startConsuming = self.master.mq.startConsuming
self._buildset_complete_consumer = yield startConsuming(
self._buildset_complete_cb,
('buildsets', None, 'complete'))
elif not self._waiters and self._buildset_complete_consumer:
Expand All @@ -96,10 +104,10 @@ def _buildset_complete_cb(self, key, msg):
if msg['bsid'] not in self._waiters:
return

# pop this bsid from the waiters list, and potentially stop consuming
# buildset completion notifications
# pop this bsid from the waiters list,
d, brids = self._waiters.pop(msg['bsid'])
self._updateWaiters()
# ..and potentially stop consuming buildset completion notifications
self._updateWaiters().addErrback(log.err, "in _updateWaiters")

# fire the callback to indicate that the triggered build is complete
d.callback((msg['results'], brids))
22 changes: 12 additions & 10 deletions master/buildbot/schedulers/trysched.py
Expand Up @@ -277,7 +277,7 @@ def gotBuild(key, msg):
return subscriber.callRemote('newbuild',
RemoteBuild(self.master, msg, self.builderName),
self.builderName)
self.consumer = self.master.data.startConsuming(
self.consumer = yield self.master.data.startConsuming(
gotBuild, {}, ('builders', builderId, 'builds'))
subscriber.notifyOnDisconnect(lambda _:
self.remote_unsubscribe(subscriber))
Expand Down Expand Up @@ -306,6 +306,7 @@ def __init__(self, master, builddict, builderName):
self.builderName = builderName
self.consumer = None

@defer.inlineCallbacks
def remote_subscribe(self, subscriber, interval):
# subscribe to any new steps..
def stepChanged(key, msg):
Expand All @@ -315,7 +316,7 @@ def stepChanged(key, msg):
elif key[-1] == 'finished':
return subscriber.callRemote('stepFinished',
self.builderName, self, msg['name'], None, msg['results'])
self.consumer = self.master.data.startConsuming(
self.consumer = yield self.master.data.startConsuming(
stepChanged, {},
('builds', self.builddict['buildid'], 'steps'))
subscriber.notifyOnDisconnect(lambda _:
Expand All @@ -326,19 +327,20 @@ def remote_unsubscribe(self, subscriber):
self.consumer.stopConsuming()
self.consumer = None

@defer.inlineCallbacks
def remote_waitUntilFinished(self):
d = defer.Deferred()
cons = []

def buildEvent(key, msg):
if key[-1] != 'finished':
return
cons[0].stopConsuming()
d.callback(self) # callers expect result=self
cons.append(self.master.data.startConsuming(
if key[-1] == 'finished':
d.callback(None)
consumer = yield self.master.data.startConsuming(
buildEvent, {},
('builds', self.builddict['buildid'])))
return d
('builds', self.builddict['buildid']))

yield d # wait for event
consumer.stopConsuming()
defer.returnValue(self) # callers expect result=self

@defer.inlineCallbacks
def remote_getResults(self):
Expand Down
3 changes: 2 additions & 1 deletion master/buildbot/status/buildset.py
Expand Up @@ -80,7 +80,8 @@ class BuildSetSummaryNotifierMixin:
_buildsetCompleteConsumer = None

def summarySubscribe(self):
self._buildsetCompleteConsumer = self.master.mq.startConsuming(
startConsuming = self.master.mq.startConsuming
self._buildsetCompleteConsumer = yield startConsuming(
self._buildsetComplete,
('buildsets', None, 'complete'))

Expand Down
3 changes: 2 additions & 1 deletion master/buildbot/status/mail.py
Expand Up @@ -472,7 +472,8 @@ def setServiceParent(self, parent):

def startService(self):
if self.buildSetSummary:
self.summarySubscribe()
# TODO: handle deferred
self.summarySubscribe().addErrback(twlog.err, 'while subscribing')

base.StatusReceiverMultiService.startService(self)

Expand Down
11 changes: 6 additions & 5 deletions master/buildbot/status/master.py
Expand Up @@ -57,18 +57,19 @@ def __init__(self, master):

# service management

@defer.inlineCallbacks
def startService(self):
# subscribe to the things we need to know about
self._buildset_new_consumer = self.master.mq.startConsuming(
self._buildset_new_consumer = yield self.master.mq.startConsuming(
self.bs_new_consumer_cb, ('buildsets', None, 'new'))
self._buildset_complete_consumer = self.master.mq.startConsuming(
self._buildset_complete_consumer = yield self.master.mq.startConsuming(
self.bs_complete_consumer_cb, ('buildsets', None, 'complete'))
self._br_consumer = self.master.mq.startConsuming(
self._br_consumer = yield self.master.mq.startConsuming(
self.br_consumer_cb, ('buildrequests', None, None, None, 'new'))
self._change_consumer = self.master.mq.startConsuming(
self._change_consumer = yield self.master.mq.startConsuming(
self.change_consumer_cb, ('changes', None, 'new'))

return service.AsyncMultiService.startService(self)
yield service.AsyncMultiService.startService(self)

@defer.inlineCallbacks
def reconfigService(self, new_config):
Expand Down
4 changes: 3 additions & 1 deletion master/buildbot/status/status_gerrit.py
Expand Up @@ -31,6 +31,7 @@
from distutils.version import LooseVersion
from twisted.internet import reactor
from twisted.internet.protocol import ProcessProtocol
from twisted.python import log

# Cache the version that the gerrit server is running for this many seconds
GERRIT_VERSION_CACHE_TIMEOUT = 600
Expand Down Expand Up @@ -212,7 +213,8 @@ def setServiceParent(self, parent):
def startService(self):
print """Starting up."""
if self.summaryCB:
self.summarySubscribe()
# TODO: handle deferred
self.summarySubscribe().addErrback(log.err, 'while subscribing')

StatusReceiverMultiService.startService(self)

Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/status/words.py
Expand Up @@ -445,8 +445,9 @@ def watchForCompleteEvent(key, msg):
return self.watchedBuildFinished(msg)

for build in builds:
handle = self.master.data.startConsuming(watchForCompleteEvent, {},
('builds', str(build['buildid'])))
startConsuming = self.master.data.startConsuming
handle = yield startConsuming(watchForCompleteEvent, {},
('builds', str(build['buildid'])))
self.build_subscriptions.append((build['buildid'], handle))

if self.useRevisions:
Expand Down
2 changes: 1 addition & 1 deletion master/buildbot/test/fake/fakemq.py
Expand Up @@ -71,7 +71,7 @@ def startConsuming(self, callback, filter, persistent_name=None):
qref.filter = filter
qref.persistent_name = persistent_name
self.qrefs.append(qref)
return qref
return defer.succeed(qref)

def clearProductions(self):
"Clear out the cached productions"
Expand Down
4 changes: 2 additions & 2 deletions master/buildbot/test/unit/test_data_builders.py
Expand Up @@ -131,8 +131,8 @@ def check(builders):
return d

def test_startConsuming(self):
self.callStartConsuming({}, {},
expected_filter=('builders', None, None))
return self.callStartConsuming({}, {},
expected_filter=('builders', None, None))


class Builder(interfaces.InterfaceTests, unittest.TestCase):
Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/test/unit/test_data_buildsets.py
Expand Up @@ -81,8 +81,9 @@ def check(buildset):
return d

def test_startConsuming(self):
self.callStartConsuming({}, {'bsid': 13},
expected_filter=('buildsets', '13', 'complete'))
return self.callStartConsuming({}, {'bsid': 13},
expected_filter=('buildsets', '13',
'complete'))


class BuildsetsEndpoint(endpoint.EndpointMixin, unittest.TestCase):
Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/test/unit/test_data_changes.py
Expand Up @@ -95,8 +95,9 @@ def check(changes):
return d

def test_startConsuming(self):
self.callStartConsuming({}, {},
expected_filter=('changes', None, 'new'))
return self.callStartConsuming({}, {},
expected_filter=('changes',
None, 'new'))


class Change(interfaces.InterfaceTests, unittest.TestCase):
Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/test/unit/test_data_changesources.py
Expand Up @@ -158,8 +158,9 @@ def check(changesources):
return d

def test_startConsuming(self):
self.callStartConsuming({}, {},
expected_filter=('changesources', None, None))
return self.callStartConsuming({}, {},
expected_filter=('changesources',
None, None))


class ChangeSource(interfaces.InterfaceTests, unittest.TestCase):
Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/test/unit/test_data_connector.py
Expand Up @@ -210,13 +210,14 @@ def check(gotten):
ep.get.assert_called_once_with(mock.ANY, {})
return d

@defer.inlineCallbacks
def test_startConsuming(self):
ep = self.patchFooPattern()
ep.startConsuming = mock.Mock(name='MyEndpoint.startConsuming')
ep.startConsuming.return_value = 'qref'
ep.startConsuming.return_value = defer.succeed('qref')

# since startConsuming is a mock, there's no need for real mq stuff
qref = self.data.startConsuming('cb', {}, ('foo', '10', 'bar'))
qref = yield self.data.startConsuming('cb', {}, ('foo', '10', 'bar'))
self.assertEqual(qref, 'qref')
ep.startConsuming.assert_called_with('cb', {}, dict(fooid=10))

Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/test/unit/test_data_masters.py
Expand Up @@ -142,8 +142,9 @@ def check(masters):
return d

def test_startConsuming(self):
self.callStartConsuming({}, {},
expected_filter=('masters', None, None))
return self.callStartConsuming({}, {},
expected_filter=('masters',
None, None))


class Master(interfaces.InterfaceTests, unittest.TestCase):
Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/test/unit/test_data_schedulers.py
Expand Up @@ -150,8 +150,9 @@ def check(schedulers):
return d

def test_startConsuming(self):
self.callStartConsuming({}, {},
expected_filter=('schedulers', None, None))
return self.callStartConsuming({}, {},
expected_filter=('schedulers',
None, None))


class Scheduler(interfaces.InterfaceTests, unittest.TestCase):
Expand Down
5 changes: 3 additions & 2 deletions master/buildbot/test/unit/test_data_sourcestamps.py
Expand Up @@ -99,8 +99,9 @@ def check(sourcestamps):
return d

def test_startConsuming(self):
self.callStartConsuming({}, {},
expected_filter=('sourcestamps', None, None))
return self.callStartConsuming({}, {},
expected_filter=('sourcestamps',
None, None))


class SourceStamp(unittest.TestCase):
Expand Down

0 comments on commit a12242d

Please sign in to comment.