Skip to content

Commit

Permalink
consume change messages, removing change polling and subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
djmitche committed Apr 28, 2012
1 parent a55cc88 commit c405a42
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 210 deletions.
76 changes: 1 addition & 75 deletions master/buildbot/master.py
Expand Up @@ -96,8 +96,6 @@ def __init__(self, basedir, configFileName="master.cfg", umask=None):
self.log_rotation = LogRotation()

# subscription points
self._change_subs = \
subscription.SubscriptionPoint("changes")
self._new_buildrequest_subs = \
subscription.SubscriptionPoint("buildrequest_additions")
self._new_buildset_subs = \
Expand Down Expand Up @@ -492,12 +490,9 @@ def handle_deprec(oldname, old, newname, new, default=None,
yield wfd
change = wfd.getResult()

# old-style notification
# log, being careful to handle funny characters
msg = u"added change %s to database" % change
log.msg(msg.encode('utf-8', 'replace'))
# only deliver messages immediately if we're not polling
if not self.config.db['db_poll_interval']:
self._change_subs.deliver(change)

# new-style notification
msg = dict()
Expand All @@ -507,15 +502,6 @@ def handle_deprec(oldname, old, newname, new, default=None,

yield change

def subscribeToChanges(self, callback):
"""
Request that C{callback} be called with each Change object added to the
cluster.
Note: this method will go away in 0.9.x
"""
return self._change_subs.subscribe(callback)

@defer.deferredGenerator
def addBuildset(self, scheduler, **kwargs):
"""
Expand Down Expand Up @@ -661,72 +647,12 @@ def pollDatabase(self):
# simultaneously. Each particular poll method handles errors itself,
# although catastrophic errors are handled here
d = defer.gatherResults([
self.pollDatabaseChanges(),
self.pollDatabaseBuildRequests(),
# also unclaim
])
d.addErrback(log.err, 'while polling database')
return d

_last_processed_change = None
@defer.inlineCallbacks
def pollDatabaseChanges(self):
# Older versions of Buildbot had each scheduler polling the database
# independently, and storing a "last_processed" state indicating the
# last change it had processed. This had the advantage of allowing
# schedulers to pick up changes that arrived in the database while
# the scheduler was not running, but was horribly inefficient.

# This version polls the database on behalf of the schedulers, using a
# similar state at the master level.

timer = metrics.Timer("BuildMaster.pollDatabaseChanges()")
timer.start()

need_setState = False

# get the last processed change id
if self._last_processed_change is None:
self._last_processed_change = \
yield self._getState('last_processed_change')

# if it's still None, assume we've processed up to the latest changeid
if self._last_processed_change is None:
lpc = yield self.db.changes.getLatestChangeid()
# if there *are* no changes, count the last as '0' so that we don't
# skip the first change
if lpc is None:
lpc = 0
self._last_processed_change = lpc

need_setState = True

if self._last_processed_change is None:
timer.stop()
return

while True:
changeid = self._last_processed_change + 1
chdict = yield self.db.changes.getChange(changeid)

# if there's no such change, we've reached the end and can
# stop polling
if not chdict:
break

change = yield changes.Change.fromChdict(self, chdict)

self._change_subs.deliver(change)

self._last_processed_change = changeid
need_setState = True

# write back the updated state, if it's changed
if need_setState:
yield self._setState('last_processed_change',
self._last_processed_change)
timer.stop()

_last_unclaimed_brids_set = None
_last_claim_cleanup = 0
@defer.inlineCallbacks
Expand Down
83 changes: 51 additions & 32 deletions master/buildbot/schedulers/base.py
Expand Up @@ -85,7 +85,7 @@ def __init__(self, name, builderNames, properties):
self.master = None

# internal variables
self._change_subscription = None
self._change_consumer = None
self._change_consumption_lock = defer.DeferredLock()
self._objectid = None

Expand Down Expand Up @@ -191,37 +191,56 @@ def startConsumingChanges(self, fileIsImportant=None, change_filter=None,
assert fileIsImportant is None or callable(fileIsImportant)

# register for changes with master
assert not self._change_subscription
def changeCallback(change):
# ignore changes delivered while we're not running
if not self._change_subscription:
return
assert not self._change_consumer
self._change_consumer = self.master.mq.startConsuming(
lambda k,m : self._changeCallback(k, m, fileIsImportant,
change_filter, onlyImportant),
'change.*.new')

if change_filter and not change_filter.filter_change(change):
return
if fileIsImportant:
try:
important = fileIsImportant(change)
if not important and onlyImportant:
return
except:
log.err(failure.Failure(),
'in fileIsImportant check for %s' % change)
return defer.succeed(None)

@defer.deferredGenerator
def _changeCallback(self, key, msg, fileIsImportant, change_filter,
onlyImportant):

# ignore changes delivered while we're not running
if not self._change_consumer:
return

# get a change object, since the API requires it
wfd = defer.waitForDeferred(
self.master.db.changes.getChange(msg['changeid']))
yield wfd
chdict = wfd.getResult()

wfd = defer.waitForDeferred(
changes.Change.fromChdict(self.master, chdict))
yield wfd
change = wfd.getResult()

# filter it
if change_filter and not change_filter.filter_change(change):
return
if fileIsImportant:
try:
important = fileIsImportant(change)
if not important and onlyImportant:
return
else:
important = True

# use change_consumption_lock to ensure the service does not stop
# while this change is being processed
d = self._change_consumption_lock.acquire()
d.addCallback(lambda _ : self.gotChange(change, important))
def release(x):
self._change_consumption_lock.release()
d.addBoth(release)
d.addErrback(log.err, 'while processing change')
self._change_subscription = self.master.subscribeToChanges(changeCallback)
except:
log.err(failure.Failure(),
'in fileIsImportant check for %s' % change)
return
else:
important = True

return defer.succeed(None)
# use change_consumption_lock to ensure the service does not stop
# while this change is being processed
d = self._change_consumption_lock.acquire()
d.addCallback(lambda _ : self.gotChange(change, important))
def release(x):
self._change_consumption_lock.release()
d.addBoth(release)
d.addErrback(log.err, 'while processing change')

def _stopConsumingChanges(self):
# (note: called automatically in stopService)
Expand All @@ -230,9 +249,9 @@ def _stopConsumingChanges(self):
# consumption is complete before we are done stopping consumption
d = self._change_consumption_lock.acquire()
def stop(x):
if self._change_subscription:
self._change_subscription.unsubscribe()
self._change_subscription = None
if self._change_consumer:
self._change_consumer.stopConsuming()
self._change_consumer = None
self._change_consumption_lock.release()
d.addBoth(stop)
return d
Expand Down
39 changes: 31 additions & 8 deletions master/buildbot/status/master.py
Expand Up @@ -57,9 +57,8 @@ def startService(self):
self._build_request_sub = \
self.master.subscribeToBuildRequests(
self._buildRequestCallback)
self._change_sub = \
self.master.subscribeToChanges(
self.changeAdded)
self._change_consumer = self.master.mq.startConsuming(
self.change_consumer_cb, 'change.*.new')

return service.MultiService.startService(self)

Expand All @@ -81,9 +80,16 @@ def reconfigService(self, new_config):

def stopService(self):
self._buildset_completion_sub.unsubscribe()
self._buildset_completion_sub = None

self._buildset_sub.unsubscribe()
self._buildset_sub = None

self._build_request_sub.unsubscribe()
self._change_sub.unsubscribe()
self._build_request_sub = None

self._change_consumer.stopConsuming()
self._change_consumer = None

return service.MultiService.stopService(self)

Expand Down Expand Up @@ -367,10 +373,27 @@ def slaveDisconnected(self, name):
if hasattr(t, 'slaveDisconnected'):
t.slaveDisconnected(name)

def changeAdded(self, change):
for t in self.watchers:
if hasattr(t, 'changeAdded'):
t.changeAdded(change)
@defer.deferredGenerator
def change_consumer_cb(self, key, msg):
# get a list of watchers - no sense querying the change
# if nobody's listening
interested = [ t for t in self.watchers
if hasattr(t, 'changeAdded') ]
if not interested:
return

wfd = defer.waitForDeferred(
self.master.db.changes.getChange(msg['changeid']))
yield wfd
chdict = wfd.getResult()

wfd = defer.waitForDeferred(
changes.Change.fromChdict(self.master, chdict))
yield wfd
change = wfd.getResult()

for t in interested:
t.changeAdded(change)

def asDict(self):
result = {}
Expand Down
83 changes: 2 additions & 81 deletions master/buildbot/test/unit/test_master.py
Expand Up @@ -29,7 +29,7 @@
from buildbot.process.users import users
from buildbot.status.results import SUCCESS

class Subscriptions(dirs.DirsMixin, unittest.TestCase):
class GlobalMessages(dirs.DirsMixin, unittest.TestCase):

"""These tests coerce the master into performing some action that should be
accompanied by some messages, and then verifies that the messages were sent
Expand All @@ -53,11 +53,7 @@ def tearDown(self):
# master.$masterid.{started,stopped} are checked in
# StartupAndReconfig.test_startup_ok, below

def test_change_subscription(self):
cb = mock.Mock()
sub = self.master.subscribeToChanges(cb)
self.assertIsInstance(sub, subscription.Subscription)

def test_change_message(self):
d = self.master.addChange(author='warner', branch='warnerdb',
category='devel', comments='fix whitespace',
files=[u'master/buildbot/__init__.py'],
Expand All @@ -66,12 +62,6 @@ def test_change_subscription(self):
revlink='http://warner/0e92a098b',
when_timestamp=epoch2datetime(256738404))
def check(change):
# addChange (probably) returned the right value
self.assertEqual(change.who, 'warner')

# and the notification sub was called correctly
cb.assert_called_with(change)

# check the correct message was received
self.assertEqual(self.master.mq.productions, [
( 'change.500.new', {
Expand Down Expand Up @@ -522,75 +512,6 @@ def deliverBuildRequestAddition(self, notif):

# tests

def test_pollDatabaseChanges_empty(self):
self.db.insertTestData([
fakedb.Object(id=22, name=self.master_name,
class_name='buildbot.master.BuildMaster'),
])
d = self.master.pollDatabaseChanges()
def check(_):
self.assertEqual(self.gotten_changes, [])
self.assertEqual(self.gotten_buildset_additions, [])
self.assertEqual(self.gotten_buildset_completions, [])
self.db.state.assertState(22, last_processed_change=0)
d.addCallback(check)
return d

def test_pollDatabaseChanges_catchup(self):
# with no existing state, it should catch up to the most recent change,
# but not process anything
self.db.insertTestData([
fakedb.Object(id=22, name=self.master_name,
class_name='buildbot.master.BuildMaster'),
fakedb.Change(changeid=10),
fakedb.Change(changeid=11),
])
d = self.master.pollDatabaseChanges()
def check(_):
self.assertEqual(self.gotten_changes, [])
self.assertEqual(self.gotten_buildset_additions, [])
self.assertEqual(self.gotten_buildset_completions, [])
self.db.state.assertState(22, last_processed_change=11)
d.addCallback(check)
return d

def test_pollDatabaseChanges_multiple(self):
self.db.insertTestData([
fakedb.Object(id=53, name=self.master_name,
class_name='buildbot.master.BuildMaster'),
fakedb.ObjectState(objectid=53, name='last_processed_change',
value_json='10'),
fakedb.Change(changeid=10),
fakedb.Change(changeid=11),
fakedb.Change(changeid=12),
])
d = self.master.pollDatabaseChanges()
def check(_):
self.assertEqual([ ch.number for ch in self.gotten_changes],
[ 11, 12 ]) # note 10 was already seen
self.assertEqual(self.gotten_buildset_additions, [])
self.assertEqual(self.gotten_buildset_completions, [])
self.db.state.assertState(53, last_processed_change=12)
d.addCallback(check)
return d

def test_pollDatabaseChanges_nothing_new(self):
self.db.insertTestData([
fakedb.Object(id=53, name='master',
class_name='buildbot.master.BuildMaster'),
fakedb.ObjectState(objectid=53, name='last_processed_change',
value_json='10'),
fakedb.Change(changeid=10),
])
d = self.master.pollDatabaseChanges()
def check(_):
self.assertEqual(self.gotten_changes, [])
self.assertEqual(self.gotten_buildset_additions, [])
self.assertEqual(self.gotten_buildset_completions, [])
self.db.state.assertState(53, last_processed_change=10)
d.addCallback(check)
return d

def test_pollDatabaseBuildRequests_empty(self):
d = self.master.pollDatabaseBuildRequests()
def check(_):
Expand Down

0 comments on commit c405a42

Please sign in to comment.