-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
pb.py
330 lines (267 loc) · 11.8 KB
/
pb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# This file is part of Buildbot. Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members
import contextlib
from twisted.internet import defer
from twisted.python import log
from twisted.spread import pb
from buildbot.pbutil import decode
from buildbot.util import ComparableMixin
from buildbot.util import deferwaiter
from buildbot.worker.protocols import base
class Listener(base.Listener):
name = "pbListener"
def __init__(self):
super().__init__()
# username : (password, portstr, PBManager registration)
self._registrations = {}
@defer.inlineCallbacks
def updateRegistration(self, username, password, portStr):
# NOTE: this method is only present on the PB protocol; others do not
# use registrations
if username in self._registrations:
currentPassword, currentPortStr, currentReg = \
self._registrations[username]
else:
currentPassword, currentPortStr, currentReg = None, None, None
iseq = (ComparableMixin.isEquivalent(currentPassword, password) and
ComparableMixin.isEquivalent(currentPortStr, portStr))
if iseq:
return currentReg
if currentReg:
yield currentReg.unregister()
del self._registrations[username]
if portStr and password:
reg = yield self.master.pbmanager.register(portStr, username, password,
self._getPerspective)
self._registrations[username] = (password, portStr, reg)
return reg
return currentReg
@defer.inlineCallbacks
def _getPerspective(self, mind, workerName):
workers = self.master.workers
log.msg("worker '{}' attaching from {}".format(workerName, mind.broker.transport.getPeer()))
# try to use TCP keepalives
try:
mind.broker.transport.setTcpKeepAlive(1)
except Exception:
log.err("Can't set TcpKeepAlive")
worker = workers.getWorkerByName(workerName)
conn = Connection(self.master, worker, mind)
# inform the manager, logging any problems in the deferred
accepted = yield workers.newConnection(conn, workerName)
# return the Connection as the perspective
if accepted:
return conn
else:
# TODO: return something more useful
raise RuntimeError("rejecting duplicate worker")
class ReferenceableProxy(pb.Referenceable):
def __init__(self, impl):
assert isinstance(impl, self.ImplClass)
self.impl = impl
def __getattr__(self, name):
return getattr(self.impl, name)
# Proxy are just ReferenceableProxy to the Impl classes
class RemoteCommand(ReferenceableProxy):
ImplClass = base.RemoteCommandImpl
class FileReaderProxy(ReferenceableProxy):
ImplClass = base.FileReaderImpl
class FileWriterProxy(ReferenceableProxy):
ImplClass = base.FileWriterImpl
class _NoSuchMethod(Exception):
"""Rewrapped pb.NoSuchMethod remote exception"""
@contextlib.contextmanager
def _wrapRemoteException():
try:
yield
except pb.RemoteError as e:
if e.remoteType in (b'twisted.spread.flavors.NoSuchMethod',
'twisted.spread.flavors.NoSuchMethod'):
raise _NoSuchMethod(e) from e
raise
class Connection(base.Connection, pb.Avatar):
proxies = {base.FileWriterImpl: FileWriterProxy,
base.FileReaderImpl: FileReaderProxy}
# TODO: configure keepalive_interval in
# c['protocols']['pb']['keepalive_interval']
keepalive_timer = None
keepalive_interval = 3600
info = None
def __init__(self, master, worker, mind):
super().__init__(master, worker)
self.mind = mind
self._keepalive_waiter = deferwaiter.DeferWaiter()
self._keepalive_action_handler = \
deferwaiter.RepeatedActionHandler(master.reactor, self._keepalive_waiter,
self.keepalive_interval, self._do_keepalive)
# methods called by the PBManager
@defer.inlineCallbacks
def attached(self, mind):
self.startKeepaliveTimer()
# pbmanager calls perspective.attached; pass this along to the
# worker
yield self.worker.attached(self)
# and then return a reference to the avatar
return self
def detached(self, mind):
self.stopKeepaliveTimer()
self.mind = None
self.notifyDisconnected()
# disconnection handling
@defer.inlineCallbacks
def waitShutdown(self):
self.stopKeepaliveTimer()
yield self._keepalive_waiter.wait()
def loseConnection(self):
self.stopKeepaliveTimer()
tport = self.mind.broker.transport
# this is the polite way to request that a socket be closed
tport.loseConnection()
try:
# but really we don't want to wait for the transmit queue to
# drain. The remote end is unlikely to ACK the data, so we'd
# probably have to wait for a (20-minute) TCP timeout.
# tport._closeSocket()
# however, doing _closeSocket (whether before or after
# loseConnection) somehow prevents the notifyOnDisconnect
# handlers from being run. Bummer.
tport.offset = 0
tport.dataBuffer = b""
except Exception:
# however, these hacks are pretty internal, so don't blow up if
# they fail or are unavailable
log.msg("failed to accelerate the shutdown process")
# keepalive handling
def _do_keepalive(self):
return self.mind.callRemote('print', message="keepalive")
def stopKeepaliveTimer(self):
self._keepalive_action_handler.stop()
def startKeepaliveTimer(self):
assert self.keepalive_interval
self._keepalive_action_handler.start()
# methods to send messages to the worker
def remotePrint(self, message):
return self.mind.callRemote('print', message=message)
@defer.inlineCallbacks
def remoteGetWorkerInfo(self):
try:
with _wrapRemoteException():
# Try to call buildbot-worker method.
info = yield self.mind.callRemote('getWorkerInfo')
return decode(info)
except _NoSuchMethod:
yield self.remotePrint(
"buildbot-slave detected, failing back to deprecated buildslave API. "
"(Ignoring missing getWorkerInfo method.)")
info = {}
# Probably this is deprecated buildslave.
log.msg("Worker.getWorkerInfo is unavailable - falling back to "
"deprecated buildslave API")
try:
with _wrapRemoteException():
info = yield self.mind.callRemote('getSlaveInfo')
except _NoSuchMethod:
log.msg("Worker.getSlaveInfo is unavailable - ignoring")
# newer workers send all info in one command
if "slave_commands" in info:
assert "worker_commands" not in info
info["worker_commands"] = info.pop("slave_commands")
return info
# Old version buildslave - need to retrieve list of supported
# commands and version using separate requests.
try:
with _wrapRemoteException():
info["worker_commands"] = yield self.mind.callRemote(
'getCommands')
except _NoSuchMethod:
log.msg("Worker.getCommands is unavailable - ignoring")
try:
with _wrapRemoteException():
info["version"] = yield self.mind.callRemote('getVersion')
except _NoSuchMethod:
log.msg("Worker.getVersion is unavailable - ignoring")
return decode(info)
@defer.inlineCallbacks
def remoteSetBuilderList(self, builders):
builders = yield self.mind.callRemote('setBuilderList', builders)
self.builders = builders
return builders
def remoteStartCommand(self, remoteCommand, builderName, commandId, commandName, args):
workerforbuilder = self.builders.get(builderName)
remoteCommand = RemoteCommand(remoteCommand)
args = self.createArgsProxies(args)
return workerforbuilder.callRemote('startCommand',
remoteCommand, commandId, commandName, args)
@defer.inlineCallbacks
def remoteShutdown(self):
# First, try the "new" way - calling our own remote's shutdown
# method. The method was only added in 0.8.3, so ignore NoSuchMethod
# failures.
@defer.inlineCallbacks
def new_way():
try:
with _wrapRemoteException():
yield self.mind.callRemote('shutdown')
# successful shutdown request
return True
except _NoSuchMethod:
# fall through to the old way
return False
except pb.PBConnectionLost:
# the worker is gone, so call it finished
return True
if (yield new_way()):
return # done!
# Now, the old way. Look for a builder with a remote reference to the
# client side worker. If we can find one, then call "shutdown" on the
# remote builder, which will cause the worker buildbot process to exit.
def old_way():
d = None
for b in self.worker.workerforbuilders.values():
if b.remote:
d = b.mind.callRemote("shutdown")
break
if d:
name = self.worker.workername
log.msg("Shutting down (old) worker: {}".format(name))
# The remote shutdown call will not complete successfully since
# the buildbot process exits almost immediately after getting
# the shutdown request.
# Here we look at the reason why the remote call failed, and if
# it's because the connection was lost, that means the worker
# shutdown as expected.
@d.addErrback
def _errback(why):
if why.check(pb.PBConnectionLost):
log.msg("Lost connection to {}".format(name))
else:
log.err("Unexpected error when trying to shutdown {}".format(name))
return d
log.err("Couldn't find remote builder to shut down worker")
return defer.succeed(None)
yield old_way()
def remoteStartBuild(self, builderName):
workerforbuilder = self.builders.get(builderName)
return workerforbuilder.callRemote('startBuild')
def remoteInterruptCommand(self, builderName, commandId, why):
workerforbuilder = self.builders.get(builderName)
return defer.maybeDeferred(workerforbuilder.callRemote, "interruptCommand",
commandId, why)
# perspective methods called by the worker
def perspective_keepalive(self):
self.worker.messageReceivedFromWorker()
def perspective_shutdown(self):
self.worker.messageReceivedFromWorker()
self.worker.shutdownRequested()