/
base.py
328 lines (273 loc) · 12.4 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# This file is part of Buildbot. Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members
from buildbot import config
from buildbot import interfaces
from buildbot.changes import changes
from buildbot.process.properties import Properties
from buildbot.util.service import ClusteredBuildbotService
from buildbot.util.state import StateMixin
from twisted.internet import defer
from twisted.python import failure
from twisted.python import log
from zope.interface import implements
class BaseScheduler(ClusteredBuildbotService, StateMixin):
implements(interfaces.IScheduler)
DEFAULT_CODEBASES = {'': {}}
compare_attrs = ClusteredBuildbotService.compare_attrs + \
('builderNames', 'properties', 'codebases')
def __init__(self, name, builderNames, properties=None,
codebases=DEFAULT_CODEBASES):
super(BaseScheduler, self).__init__(name=name)
ok = True
if not isinstance(builderNames, (list, tuple)):
ok = False
else:
for b in builderNames:
if not isinstance(b, basestring):
ok = False
if not ok:
config.error(
"The builderNames argument to a scheduler must be a list "
"of Builder names.")
self.builderNames = builderNames
if properties is None:
properties = {}
self.properties = Properties()
self.properties.update(properties, "Scheduler")
self.properties.setProperty("scheduler", name, "Scheduler")
self.objectid = None
# Set the codebases that are necessary to process the changes
# These codebases will always result in a sourcestamp with or without
# changes
known_keys = set(['branch', 'repository', 'revision'])
if codebases is None:
config.error("Codebases cannot be None")
elif isinstance(codebases, list):
codebases = dict((codebase, {}) for codebase in codebases)
elif not isinstance(codebases, dict):
config.error("Codebases must be a dict of dicts, or list of strings")
else:
for codebase, attrs in codebases.iteritems():
if not isinstance(attrs, dict):
config.error("Codebases must be a dict of dicts")
else:
unk = set(attrs) - known_keys
if unk:
config.error(
"Unknown codebase keys %s for codebase %s"
% (', '.join(unk), codebase))
self.codebases = codebases
# internal variables
self._change_consumer = None
self._change_consumption_lock = defer.DeferredLock()
# activity handling
def activate(self):
return defer.succeed(None)
def deactivate(self):
return defer.maybeDeferred(self._stopConsumingChanges)
# service handling
def _getServiceId(self):
return self.master.data.updates.findSchedulerId(self.name)
def _claimService(self):
return self.master.data.updates.trySetSchedulerMaster(self.serviceid,
self.master.masterid)
def _unclaimService(self):
return self.master.data.updates.trySetSchedulerMaster(self.serviceid,
None)
# status queries
# deprecated: these aren't compatible with distributed schedulers
def listBuilderNames(self):
return self.builderNames
def getPendingBuildTimes(self):
return []
# change handling
@defer.inlineCallbacks
def startConsumingChanges(self, fileIsImportant=None, change_filter=None,
onlyImportant=False):
assert fileIsImportant is None or callable(fileIsImportant)
# register for changes with the data API
assert not self._change_consumer
self._change_consumer = yield self.master.data.startConsuming(
lambda k, m: self._changeCallback(k, m, fileIsImportant,
change_filter, onlyImportant),
{},
('changes',))
@defer.inlineCallbacks
def _changeCallback(self, key, msg, fileIsImportant, change_filter,
onlyImportant):
# ignore changes delivered while we're not running
if not self._change_consumer:
return
# get a change object, since the API requires it
chdict = yield self.master.db.changes.getChange(msg['changeid'])
change = yield changes.Change.fromChdict(self.master, chdict)
# filter it
if change_filter and not change_filter.filter_change(change):
return
if change.codebase not in self.codebases:
log.msg(format='change contains codebase %(codebase)s that is '
'not processed by scheduler %(name)s',
codebase=change.codebase, name=self.name)
return
if fileIsImportant:
try:
important = fileIsImportant(change)
if not important and onlyImportant:
return
except Exception:
log.err(failure.Failure(),
'in fileIsImportant check for %s' % change)
return
else:
important = True
# use change_consumption_lock to ensure the service does not stop
# while this change is being processed
d = self._change_consumption_lock.run(
self.gotChange, change, important)
d.addErrback(log.err, 'while processing change')
def _stopConsumingChanges(self):
# (note: called automatically in deactivate)
# acquire the lock change consumption lock to ensure that any change
# consumption is complete before we are done stopping consumption
def stop():
if self._change_consumer:
self._change_consumer.stopConsuming()
self._change_consumer = None
return self._change_consumption_lock.run(stop)
def gotChange(self, change, important):
raise NotImplementedError
# starting builds
@defer.inlineCallbacks
def addBuildsetForSourceStampsWithDefaults(self, reason, sourcestamps=None,
waited_for=False, properties=None, builderNames=None,
**kw):
if sourcestamps is None:
sourcestamps = []
# convert sourcestamps to a dictionary keyed by codebase
stampsByCodebase = {}
for ss in sourcestamps:
cb = ss['codebase']
if cb in stampsByCodebase:
raise RuntimeError("multiple sourcestamps with same codebase")
stampsByCodebase[cb] = ss
# Merge codebases with the passed list of sourcestamps
# This results in a new sourcestamp for each codebase
stampsWithDefaults = []
for codebase in self.codebases:
cb = yield self.getCodebaseDict(codebase)
ss = {
'codebase': codebase,
'repository': cb.get('repository', ''),
'branch': cb.get('branch', None),
'revision': cb.get('revision', None),
'project': '',
}
# apply info from passed sourcestamps onto the configured default
# sourcestamp attributes for this codebase.
ss.update(stampsByCodebase.get(codebase, {}))
stampsWithDefaults.append(ss)
# fill in any supplied sourcestamps that aren't for a codebase in the
# scheduler's codebase dictionary
for codebase in set(stampsByCodebase) - set(self.codebases):
cb = stampsByCodebase[codebase]
ss = {
'codebase': codebase,
'repository': cb.get('repository', ''),
'branch': cb.get('branch', None),
'revision': cb.get('revision', None),
'project': '',
}
stampsWithDefaults.append(ss)
rv = yield self.addBuildsetForSourceStamps(
sourcestamps=stampsWithDefaults, reason=reason,
waited_for=waited_for, properties=properties,
builderNames=builderNames, **kw)
defer.returnValue(rv)
def getCodebaseDict(self, codebase):
# Hook for subclasses to change codebase parameters when a codebase does
# not have a change associated with it.
try:
return defer.succeed(self.codebases[codebase])
except KeyError:
return defer.fail()
@defer.inlineCallbacks
def addBuildsetForChanges(self, waited_for=False, reason='',
external_idstring=None, changeids=None, builderNames=None,
properties=None,
**kw):
if changeids is None:
changeids = []
changesByCodebase = {}
def get_last_change_for_codebase(codebase):
return max(changesByCodebase[codebase], key=lambda change: change["changeid"])
# Changes are retrieved from database and grouped by their codebase
for changeid in changeids:
chdict = yield self.master.db.changes.getChange(changeid)
changesByCodebase.setdefault(chdict["codebase"], []).append(chdict)
sourcestamps = []
for codebase in self.codebases:
if codebase not in changesByCodebase:
# codebase has no changes
# create a sourcestamp that has no changes
cb = yield self.getCodebaseDict(codebase)
ss = {
'codebase': codebase,
'repository': cb.get('repository', ''),
'branch': cb.get('branch', None),
'revision': cb.get('revision', None),
'project': '',
}
else:
lastChange = get_last_change_for_codebase(codebase)
ss = lastChange['sourcestampid']
sourcestamps.append(ss)
# add one buildset, using the calculated sourcestamps
bsid, brids = yield self.addBuildsetForSourceStamps(
waited_for, sourcestamps=sourcestamps, reason=reason,
external_idstring=external_idstring, builderNames=builderNames,
properties=properties, **kw)
defer.returnValue((bsid, brids))
@defer.inlineCallbacks
def addBuildsetForSourceStamps(self, waited_for=False, sourcestamps=None,
reason='', external_idstring=None, properties=None,
builderNames=None, **kw):
if sourcestamps is None:
sourcestamps = []
# combine properties
if properties:
properties.updateFromProperties(self.properties)
else:
properties = self.properties
# apply the default builderNames
if not builderNames:
builderNames = self.builderNames
# Get the builder id
builderids = list()
for bldr in (yield self.master.data.get(('builders', ))):
if bldr['name'] in builderNames:
builderids.append(bldr['builderid'])
# translate properties object into a dict as required by the
# addBuildset method
properties_dict = properties.asDict()
bsid, brids = yield self.master.data.updates.addBuildset(
scheduler=self.name, sourcestamps=sourcestamps, reason=reason,
waited_for=waited_for, properties=properties_dict, builderids=builderids,
external_idstring=external_idstring, **kw)
defer.returnValue((bsid, brids))
@defer.inlineCallbacks
def reconfigService(self, *args, **kwargs):
if self.running:
yield self.stopService()
yield self.startService()