Skip to content

Commit

Permalink
Merge pull request #1773 from tardyp/wampmq
Browse files Browse the repository at this point in the history
Wampmq
  • Loading branch information
Mikhail Sobolev committed Aug 19, 2015
2 parents 7d2ccfe + 987b9b3 commit a57d74d
Show file tree
Hide file tree
Showing 14 changed files with 659 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -23,6 +23,7 @@ master/docs/buildbot.info-2
.project
.pydevproject
.settings
*/build/
master/docs/buildbot
master/docs/version.texinfo
master/docs/docs.tgz
Expand Down
2 changes: 2 additions & 0 deletions master/buildbot/config.py
Expand Up @@ -354,6 +354,8 @@ def copy_str_param(name, alt_key=None):
error("Both c['slavePortnum'] and c['protocols']['pb']['port']"
" defined, recommended to remove slavePortnum and leave"
" only c['protocols']['pb']['port']")
if proto == "wamp":
self.check_wamp_proto(options)
else:
error("c['protocols'] must be dict")
return
Expand Down
18 changes: 14 additions & 4 deletions master/buildbot/master.py
Expand Up @@ -55,6 +55,7 @@
from buildbot.util import datetime2epoch
from buildbot.util import service
from buildbot.util.eventual import eventually
from buildbot.wamp import connector as wampconnector
from buildbot.www import service as wwwservice

#
Expand Down Expand Up @@ -156,6 +157,9 @@ def create_child_services(self):
self.db = dbconnector.DBConnector(self.basedir)
self.db.setServiceParent(self)

self.wamp = wampconnector.WampConnector()
self.wamp.setServiceParent(self)

self.mq = mqconnector.MQConnector()
self.mq.setServiceParent(self)

Expand Down Expand Up @@ -186,7 +190,8 @@ def heartbeat():
yield self.data.updates.expireMasters((self.masterHouskeepingTimer % (24 * 60)) == 0)
self.masterHouskeepingTimer += 1
self.masterHeartbeatService = internet.TimerService(60, heartbeat)
self.masterHeartbeatService.setServiceParent(self)
# we do setServiceParent only when the master is configured
# master should advertise itself only at that time

# setup and reconfig handling

Expand Down Expand Up @@ -284,12 +289,12 @@ def sigusr1(*args):
log.msg("BuildMaster is running")

@defer.inlineCallbacks
def stopService(self):
if self.running:
yield service.AsyncMultiService.stopService(self)
def stopService(self, _reactor=reactor):
if self.masterid is not None:
yield self.data.updates.masterStopped(
name=self.name, masterid=self.masterid)
if self.running:
yield service.AsyncMultiService.stopService(self)

log.msg("BuildMaster is stopped")
self._master_initialized = False
Expand Down Expand Up @@ -328,6 +333,11 @@ def cleanup(res):
self.reconfig()
return res

@d.addCallback
def advertiseItself(res):
if self.masterHeartbeatService.parent is None:
self.masterHeartbeatService.setServiceParent(self)

d.addErrback(log.err, 'while reconfiguring')

return d # for tests
Expand Down
4 changes: 4 additions & 0 deletions master/buildbot/mq/connector.py
Expand Up @@ -24,6 +24,10 @@ class MQConnector(service.ReconfigurableServiceMixin, service.AsyncMultiService)
'class': "buildbot.mq.simple.SimpleMQ",
'keys': set(['debug']),
},
'wamp': {
'class': "buildbot.mq.wamp.WampMQ",
'keys': set(["router_url", "debug", "realm", "debug_websockets", "debug_lowlevel"]),
},
}
name = 'mq'

Expand Down
108 changes: 108 additions & 0 deletions master/buildbot/mq/wamp.py
@@ -0,0 +1,108 @@
# This file is part of Buildbot. Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members
from __future__ import unicode_literals
from autobahn.wamp.exception import TransportLost
from autobahn.wamp.types import PublishOptions
from autobahn.wamp.types import SubscribeOptions

from twisted.internet import defer

from buildbot.mq import base
from buildbot.util import service
from buildbot.util import toJson
from twisted.python import log

import json


class WampMQ(service.ReconfigurableServiceMixin, base.MQBase):
NAMESPACE = "org.buildbot.mq"

def __init__(self):
base.MQBase.__init__(self)

def produce(self, routingKey, data):
d = self._produce(routingKey, data)
d.addErrback(log.err, "Problem while producing message on topic " + repr(routingKey))

@classmethod
def messageTopic(cls, routingKey):
ifNone = lambda v, default: default if v is None else v
# replace None values by "" in routing key
routingKey = [ifNone(key, "") for key in routingKey]
# then join them with "dot", and add the prefix
return cls.NAMESPACE + "." + ".".join(routingKey)

@classmethod
def routingKeyFromMessageTopic(cls, topic):
# just split the topic, and remove the NAMESPACE prefix
return tuple(topic[len(WampMQ.NAMESPACE) + 1:].split("."))

def _produce(self, routingKey, data):
_data = json.loads(json.dumps(data, default=toJson))
options = PublishOptions(exclude_me=False)
return self.master.wamp.publish(self.messageTopic(routingKey), _data, options=options)

def startConsuming(self, callback, _filter, persistent_name=None):
if persistent_name is not None:
print "wampmq: persistant queues are not persisted!", persistent_name, _filter

qr = QueueRef(callback)

self._startConsuming(qr, callback, _filter)
return defer.succeed(qr)

def _startConsuming(self, qr, callback, _filter, persistent_name=None):
return qr.subscribe(self.master.wamp, _filter)


class QueueRef(base.QueueRef):

def __init__(self, callback):
base.QueueRef.__init__(self, callback)
self.unreg = None

@defer.inlineCallbacks
def subscribe(self, service, _filter):
self.filter = _filter
self.emulated = False
options = dict(details_arg=str('details'))
if None in _filter:
options["match"] = "wildcard"
options = SubscribeOptions(**options)
_filter = WampMQ.messageTopic(_filter)
self.unreg = yield service.subscribe(self.invoke, _filter, options=options)
if self.callback is None:
yield self.stopConsuming()

def invoke(self, msg, details):
if details.topic is not None:
# in the case of a wildcard, wamp router sends the topic
topic = WampMQ.routingKeyFromMessageTopic(details.topic)
else:
# in the case of an exact match, then we can use our own topic
topic = self.filter
return base.QueueRef.invoke(self, topic, msg)

@defer.inlineCallbacks
def stopConsuming(self):
self.callback = None
if self.unreg is not None:
unreg = self.unreg
self.unreg = None
try:
yield unreg.unsubscribe()
except TransportLost:
pass
16 changes: 7 additions & 9 deletions master/buildbot/test/unit/test_master.py
Expand Up @@ -224,26 +224,24 @@ def check(_):
self.assertFalse(self.master.data.updates.thisMasterActive)
return d

@defer.inlineCallbacks
def test_reconfig(self):
reactor = self.make_reactor()
self.master.reconfigServiceWithBuildbotConfig = mock.Mock(
side_effect=lambda n: defer.succeed(None))

d = self.master.startService(_reactor=reactor)
d.addCallback(lambda _: self.master.reconfig())
d.addCallback(lambda _: self.master.stopService())

@d.addCallback
def check(_):
self.master.reconfigServiceWithBuildbotConfig.assert_called_with(mock.ANY)
return d
self.master.masterHeartbeatService = mock.Mock()
yield self.master.startService(_reactor=reactor)
yield self.master.reconfig()
yield self.master.stopService()
self.master.reconfigServiceWithBuildbotConfig.assert_called_with(mock.ANY)

@defer.inlineCallbacks
def test_reconfig_bad_config(self):
reactor = self.make_reactor()
self.master.reconfigService = mock.Mock(
side_effect=lambda n: defer.succeed(None))

self.master.masterHeartbeatService = mock.Mock()
yield self.master.startService(_reactor=reactor)

# reset, since startService called reconfigService
Expand Down

0 comments on commit a57d74d

Please sign in to comment.