Skip to content

Commit

Permalink
Introducing adapter for PB
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Mayorov authored and djmitche committed Sep 29, 2013
1 parent 94e11e9 commit 6885d46
Show file tree
Hide file tree
Showing 20 changed files with 541 additions and 279 deletions.
149 changes: 42 additions & 107 deletions master/buildbot/buildslave/base.py
Expand Up @@ -26,14 +26,15 @@

from buildbot.status.slave import SlaveStatus
from buildbot.status.mail import MailNotifier
from buildbot.process import metrics, botmaster
from buildbot.process import metrics
from buildbot.interfaces import IBuildSlave, ILatentBuildSlave
from buildbot.process.properties import Properties
from buildbot.util import subscription
from buildbot.util.eventual import eventually
from buildbot import config

class AbstractBuildSlave(config.ReconfigurableServiceMixin, pb.Avatar,

class AbstractBuildSlave(config.ReconfigurableServiceMixin,
service.MultiService):
"""This is the master-side representative for a remote buildbot slave.
There is exactly one for each slave described in the config file (the
Expand Down Expand Up @@ -73,17 +74,15 @@ def __init__(self, name, password, max_builds=None,
self.slavename = name
self.password = password

# PB registration
# protocol registration
self.registration = None
self.registered_port = None

# these are set when the service is started, and unset when it is
# stopped
self.botmaster = None
self.master = None

self.slave_status = SlaveStatus(name)
self.slave = None # a RemoteReference to the Bot, when connected
self.slave_commands = None
self.slavebuilders = {}
self.max_builds = max_builds
Expand All @@ -108,6 +107,9 @@ def __init__(self, name, password, max_builds=None,
self.missing_timer = None
self.keepalive_interval = keepalive_interval

# a protocol connection, if we're currently connected
self.conn = None

self.detached_subs = None

self._old_builder_list = None
Expand Down Expand Up @@ -167,7 +169,7 @@ def releaseLocks(self):

def _lockReleased(self):
"""One of the locks for this slave was released; try scheduling
builds."""
builds.""" # TODO: info{} might have other keys
if not self.botmaster:
return # oh well..
self.botmaster.maybeStartBuildsForSlave(self.slavename)
Expand Down Expand Up @@ -226,19 +228,12 @@ def reconfigService(self, new_config):
new = self.findNewSlaveInstance(new_config)

assert self.slavename == new.slavename
self.password = new.password

# do we need to re-register?
if (not self.registration or
self.password != new.password or
new_config.protocols['pb']['port'] != self.registered_port):
if self.registration:
yield self.registration.unregister()
self.registration = None
self.password = new.password
self.registered_port = new_config.protocols['pb']['port']
self.registration = self.master.pbmanager.register(
self.registered_port, self.slavename,
self.password, self.getPerspective)
# update our records with the buildslave manager
if not self.registration:
self.registration = yield self.master.buildslaves.register(self)
yield self.registration.update(new, new_config)

# adopt new instance's configuration parameters
self.max_builds = new.max_builds
Expand Down Expand Up @@ -267,12 +262,13 @@ def reconfigService(self, new_config):
yield config.ReconfigurableServiceMixin.reconfigService(self,
new_config)

@defer.inlineCallbacks
def stopService(self):
if self.registration:
self.registration.unregister()
yield self.registration.unregister()
self.registration = None
self.stopMissingTimer()
return service.MultiService.stopService(self)
yield service.MultiService.stopService(self)

def findNewSlaveInstance(self, new_config):
# TODO: called multiple times per reconfig; use 1-element cache?
Expand All @@ -292,39 +288,13 @@ def stopMissingTimer(self):
self.missing_timer.cancel()
self.missing_timer = None

def getPerspective(self, mind, slavename):
assert slavename == self.slavename
metrics.MetricCountEvent.log("attached_slaves", 1)

# record when this connection attempt occurred
if self.slave_status:
self.slave_status.recordConnectTime()

# try to use TCP keepalives
try:
mind.broker.transport.setTcpKeepAlive(1)
except:
pass

if self.isConnected():
# duplicate slave - send it to arbitration
arb = botmaster.DuplicateSlaveArbitrator(self)
return arb.getPerspective(mind, slavename)
else:
log.msg("slave '%s' attaching from %s" % (slavename, mind.broker.transport.getPeer()))
return self

def doKeepalive(self):
self.keepalive_timer = reactor.callLater(self.keepalive_interval,
self.doKeepalive)
if not self.slave:
return
d = self.slave.callRemote("print", "Received keepalive from master")
d.addErrback(log.msg, "Keepalive failed for '%s'" % (self.slavename, ))
pass # TODO: this should be done at the protocol level

def stopKeepaliveTimer(self):
if self.keepalive_timer:
self.keepalive_timer.cancel()
self.keepalive_timer = None

def startKeepaliveTimer(self):
assert self.keepalive_interval
Expand All @@ -333,7 +303,7 @@ def startKeepaliveTimer(self):
self.doKeepalive()

def isConnected(self):
return self.slave
return self.conn

def _missing_timer_fired(self):
self.missing_timer = None
Expand Down Expand Up @@ -367,7 +337,7 @@ def updateSlave(self):
@return: a Deferred that indicates when an attached slave has
accepted the new builders and/or released the old ones."""
if self.slave:
if self.conn:
return self.sendBuilderList()
else:
return defer.succeed(None)
Expand All @@ -379,7 +349,7 @@ def updateSlaveStatus(self, buildStarted=None, buildFinished=None):
self.slave_status.buildFinished(buildFinished)

@metrics.countMethod('AbstractBuildSlave.attached()')
def attached(self, bot):
def attached(self, conn):
"""This is called when the slave connects.
@return: a Deferred that fires when the attachment is complete
Expand All @@ -390,7 +360,7 @@ def attached(self, bot):

metrics.MetricCountEvent.log("AbstractBuildSlave.attached_slaves", 1)

# set up the subscription point for eventual detachment
# set up the subscription point for eventual detachment; TODO: get rid of this and use the conn directly
self.detached_subs = subscription.SubscriptionPoint("detached")

# now we go through a sequence of calls, gathering information, then
Expand All @@ -410,24 +380,25 @@ def attached(self, bot):

@d.addCallback
def _log_attachment_on_slave(res):
d1 = bot.callRemote("print", "attached")
d1 = conn.remotePrint(message="attached")
d1.addErrback(lambda why: None)
return d1

@d.addCallback
def _get_info(res):
d1 = bot.callRemote("getSlaveInfo")
d1 = conn.remoteGetSlaveInfo()
def _got_info(info):
log.msg("Got slaveinfo from '%s'" % self.slavename)
# TODO: info{} might have other keys
state["admin"] = info.get("admin")
state["host"] = info.get("host")
state["access_uri"] = info.get("access_uri", None)
state["slave_environ"] = info.get("environ", {})
state["slave_basedir"] = info.get("basedir", None)
state["slave_system"] = info.get("system", None)
state["version"] = info.get("version", "(unknown)")
state["slave_commands"] = info.get("commands", {})
def _info_unavailable(why):
why.trap(pb.NoSuchMethod)
# why.trap(pb.NoSuchMethod) # TODO: do we need this?
# maybe an old slave, doesn't implement remote_getSlaveInfo
log.msg("BuildSlave.info_unavailable")
log.err(why)
Expand All @@ -436,43 +407,19 @@ def _info_unavailable(why):

d.addCallback(lambda _: self.startKeepaliveTimer())

@d.addCallback
def _get_version(_):
d = bot.callRemote("getVersion")
def _got_version(version):
state["version"] = version
def _version_unavailable(why):
why.trap(pb.NoSuchMethod)
# probably an old slave
state["version"] = '(unknown)'
d.addCallbacks(_got_version, _version_unavailable)
return d

@d.addCallback
def _get_commands(_):
d1 = bot.callRemote("getCommands")
def _got_commands(commands):
state["slave_commands"] = commands
def _commands_unavailable(why):
# probably an old slave
if why.check(AttributeError):
return
log.msg("BuildSlave.getCommands is unavailable - ignoring")
log.err(why)
d1.addCallbacks(_got_commands, _commands_unavailable)
return d1

@d.addCallback
def _accept_slave(res):
self.slave_status.setConnected(True)

self._applySlaveInfo(state)

self.slave_commands = state.get("slave_commands")
self.slave_environ = state.get("slave_environ")
self.slave_basedir = state.get("slave_basedir")
self.slave_system = state.get("slave_system")
self.slave = bot

self.conn = conn
self.conn.notifyOnDisconnect(self.detached)

if self.slave_system == "nt":
self.path_module = namedModule("ntpath")
else:
Expand All @@ -485,25 +432,20 @@ def _accept_slave(res):
self.botmaster.master.status.slaveConnected(self.slavename)

d.addCallback(lambda _: self._saveSlaveInfoDict())

d.addCallback(lambda _: self.updateSlave())

d.addCallback(lambda _:
self.botmaster.maybeStartBuildsForSlave(self.slavename))

# Finally, the slave gets a reference to this BuildSlave. They
# receive this later, after we've started using them.
d.addCallback(lambda _: self)
return d

def messageReceivedFromSlave(self):
now = time.time()
self.lastMessageReceived = now
self.slave_status.setLastMessageReceived(now)

def detached(self, mind):
def detached(self):
metrics.MetricCountEvent.log("AbstractBuildSlave.attached_slaves", -1)
self.slave = None
self.conn = None
self._old_builder_list = []
self.slave_status.removeGracefulWatcher(self._gracefulChanged)
self.slave_status.setConnected(False)
Expand Down Expand Up @@ -593,20 +535,13 @@ def sendBuilderList(self):
if blist == self._old_builder_list:
return defer.succeed(None)

d = self.slave.callRemote("setBuilderList", blist)
d = self.conn.remoteSetBuilderList(builders=blist)
def sentBuilderList(ign):
self._old_builder_list = blist
return ign
d.addCallback(sentBuilderList)
return d

def perspective_keepalive(self):
self.messageReceivedFromSlave()

def perspective_shutdown(self):
log.msg("slave %s wants to shut down" % self.slavename)
self.slave_status.setGraceful(True)

def addSlaveBuilder(self, sb):
self.slavebuilders[sb.builder_name] = sb

Expand Down Expand Up @@ -695,11 +630,11 @@ def new_way():
d = self.slave.callRemote('shutdown')
d.addCallback(lambda _ : True) # successful shutdown request
def check_nsm(f):
f.trap(pb.NoSuchMethod)
f.trap(pb.NoSuchMethod) # TODO: handle this in buildslave/protocols
return False # fall through to the old way
d.addErrback(check_nsm)
def check_connlost(f):
f.trap(pb.PBConnectionLost)
f.trap(pb.PBConnectionLost) # TODO: handle this in buildslave/protocols
return True # the slave is gone, so call it finished
d.addErrback(check_connlost)
return d
Expand Down Expand Up @@ -769,11 +704,11 @@ def _sent(slist):
if not slist:
return
dl = []
for name, remote in slist.items():
for name in slist:
# use get() since we might have changed our mind since then
b = self.botmaster.builders.get(name)
if b:
d1 = b.attached(self, remote, self.slave_commands)
d1 = b.attached(self, self.slave_commands)
dl.append(d1)
return defer.DeferredList(dl)
def _set_failed(why):
Expand All @@ -784,8 +719,8 @@ def _set_failed(why):
d.addCallbacks(_sent, _set_failed)
return d

def detached(self, mind):
AbstractBuildSlave.detached(self, mind)
def detached(self):
AbstractBuildSlave.detached(self)
self.botmaster.slaveLost(self)
self.startMissingTimer()

Expand Down Expand Up @@ -890,8 +825,8 @@ def attached(self, bot):
return defer.fail(RuntimeError(msg))
return AbstractBuildSlave.attached(self, bot)

def detached(self, mind):
AbstractBuildSlave.detached(self, mind)
def detached(self):
AbstractBuildSlave.detached(self)
if self.substantiation_deferred is not None:
d = self._substantiate(self.substantiation_build)
d.addErrback(log.err, 'while re-substantiating')
Expand Down

0 comments on commit 6885d46

Please sign in to comment.