-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
pb.py
283 lines (227 loc) · 10.2 KB
/
pb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# This file is part of Buildbot. Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members
from __future__ import absolute_import
from buildbot.buildslave.protocols import base
from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import log
from twisted.spread import pb
class Listener(base.Listener):
name = "pbListener"
def __init__(self, master):
assert master
base.Listener.__init__(self, master)
# username : (password, portstr, PBManager registration)
self._registrations = {}
@defer.inlineCallbacks
def updateRegistration(self, username, password, portStr):
# NOTE: this method is only present on the PB protocol; others do not
# use registrations
if username in self._registrations:
currentPassword, currentPortStr, currentReg = \
self._registrations[username]
else:
currentPassword, currentPortStr, currentReg = None, None, None
if currentPassword != password or currentPortStr != portStr:
if currentReg:
yield currentReg.unregister()
del self._registrations[username]
if portStr:
reg = self.master.pbmanager.register(
portStr, username, password, self._getPerspective)
self._registrations[username] = (password, portStr, reg)
defer.returnValue(reg)
@defer.inlineCallbacks
def _getPerspective(self, mind, buildslaveName):
bslaves = self.master.buildslaves
log.msg("slave '%s' attaching from %s" % (buildslaveName,
mind.broker.transport.getPeer()))
# try to use TCP keepalives
try:
mind.broker.transport.setTcpKeepAlive(1)
except Exception:
log.err("Can't set TcpKeepAlive")
buildslave = bslaves.getBuildslaveByName(buildslaveName)
conn = Connection(self.master, buildslave, mind)
# inform the manager, logging any problems in the deferred
accepted = yield bslaves.newConnection(conn, buildslaveName)
# return the Connection as the perspective
if accepted:
defer.returnValue(conn)
else:
# TODO: return something more useful
raise RuntimeError("rejecting duplicate slave")
class ReferenceableProxy(pb.Referenceable):
def __init__(self, impl):
assert isinstance(impl, self.ImplClass)
self.impl = impl
def __getattr__(self, default=None):
return getattr(self.impl, default)
# Proxy are just ReferenceableProxy to the Impl classes
class RemoteCommand(ReferenceableProxy):
ImplClass = base.RemoteCommandImpl
class FileReaderProxy(ReferenceableProxy):
ImplClass = base.FileReaderImpl
class FileWriterProxy(ReferenceableProxy):
ImplClass = base.FileWriterImpl
class Connection(base.Connection, pb.Avatar):
proxies = {base.FileWriterImpl: FileWriterProxy, base.FileReaderImpl: FileReaderProxy}
# TODO: configure keepalive_interval in c['protocols']['pb']['keepalive_interval']
keepalive_timer = None
keepalive_interval = 3600
info = None
def __init__(self, master, buildslave, mind):
base.Connection.__init__(self, master, buildslave)
self.mind = mind
# methods called by the PBManager
@defer.inlineCallbacks
def attached(self, mind):
self.startKeepaliveTimer()
# pbmanager calls perspective.attached; pass this along to the
# buildslave
yield self.buildslave.attached(self)
# and then return a reference to the avatar
defer.returnValue(self)
def detached(self, mind):
self.stopKeepaliveTimer()
self.mind = None
self.notifyDisconnected()
# disconnection handling
def loseConnection(self):
self.stopKeepaliveTimer()
tport = self.mind.broker.transport
# this is the polite way to request that a socket be closed
tport.loseConnection()
try:
# but really we don't want to wait for the transmit queue to
# drain. The remote end is unlikely to ACK the data, so we'd
# probably have to wait for a (20-minute) TCP timeout.
# tport._closeSocket()
# however, doing _closeSocket (whether before or after
# loseConnection) somehow prevents the notifyOnDisconnect
# handlers from being run. Bummer.
tport.offset = 0
tport.dataBuffer = ""
except Exception:
# however, these hacks are pretty internal, so don't blow up if
# they fail or are unavailable
log.msg("failed to accelerate the shutdown process")
# keepalive handling
def doKeepalive(self):
return self.mind.callRemote('print', message="keepalive")
def stopKeepaliveTimer(self):
if self.keepalive_timer and self.keepalive_timer.active():
self.keepalive_timer.cancel()
self.keepalive_timer = None
def startKeepaliveTimer(self):
assert self.keepalive_interval
self.keepalive_timer = reactor.callLater(self.keepalive_interval,
self.doKeepalive)
# methods to send messages to the slave
def remotePrint(self, message):
return self.mind.callRemote('print', message=message)
@defer.inlineCallbacks
def remoteGetSlaveInfo(self):
info = {}
try:
info = yield self.mind.callRemote('getSlaveInfo')
except pb.NoSuchMethod:
log.msg("BuildSlave.getSlaveInfo is unavailable - ignoring")
# newer slaves send all info in one command
if "slave_commands" in info:
defer.returnValue(info)
try:
info["slave_commands"] = yield self.mind.callRemote('getCommands')
except pb.NoSuchMethod:
log.msg("BuildSlave.getCommands is unavailable - ignoring")
try:
info["version"] = yield self.mind.callRemote('getVersion')
except pb.NoSuchMethod:
log.msg("BuildSlave.getVersion is unavailable - ignoring")
defer.returnValue(info)
def remoteSetBuilderList(self, builders):
d = self.mind.callRemote('setBuilderList', builders)
@d.addCallback
def cache_builders(builders):
self.builders = builders
return builders
return d
def remoteStartCommand(self, remoteCommand, builderName, commandId, commandName, args):
slavebuilder = self.builders.get(builderName)
remoteCommand = RemoteCommand(remoteCommand)
args = self.createArgsProxies(args)
return slavebuilder.callRemote('startCommand',
remoteCommand, commandId, commandName, args)
@defer.inlineCallbacks
def remoteShutdown(self):
# First, try the "new" way - calling our own remote's shutdown
# method. The method was only added in 0.8.3, so ignore NoSuchMethod
# failures.
def new_way():
d = self.mind.callRemote('shutdown')
d.addCallback(lambda _: True) # successful shutdown request
@d.addErrback
def check_nsm(f):
f.trap(pb.NoSuchMethod)
return False # fall through to the old way
@d.addErrback
def check_connlost(f):
f.trap(pb.PBConnectionLost)
return True # the slave is gone, so call it finished
return d
if (yield new_way()):
return # done!
# Now, the old way. Look for a builder with a remote reference to the
# client side slave. If we can find one, then call "shutdown" on the
# remote builder, which will cause the slave buildbot process to exit.
def old_way():
d = None
for b in self.buildslave.slavebuilders.values():
if b.remote:
d = b.mind.callRemote("shutdown")
break
if d:
name = self.buildslave.slavename
log.msg("Shutting down (old) slave: %s" % name)
# The remote shutdown call will not complete successfully since
# the buildbot process exits almost immediately after getting
# the shutdown request.
# Here we look at the reason why the remote call failed, and if
# it's because the connection was lost, that means the slave
# shutdown as expected.
@d.addErrback
def _errback(why):
if why.check(pb.PBConnectionLost):
log.msg("Lost connection to %s" % name)
else:
log.err("Unexpected error when trying to shutdown %s"
% name)
return d
log.err("Couldn't find remote builder to shut down slave")
return defer.succeed(None)
yield old_way()
def remoteStartBuild(self, builderName):
slavebuilder = self.builders.get(builderName)
return slavebuilder.callRemote('startBuild')
def remoteInterruptCommand(self, builderName, commandId, why):
slavebuilder = self.builders.get(builderName)
return defer.maybeDeferred(slavebuilder.callRemote, "interruptCommand",
commandId, why)
# perspective methods called by the slave
def perspective_keepalive(self):
self.buildslave.messageReceivedFromSlave()
def perspective_shutdown(self):
self.buildslave.messageReceivedFromSlave()
self.buildslave.shutdownRequested()