-
Notifications
You must be signed in to change notification settings - Fork 197
/
Copy pathmasher.py
1500 lines (1278 loc) · 60.5 KB
/
masher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# Copyright © 2007-2018 Red Hat, Inc. and others.
#
# This file is part of Bodhi.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
The Bodhi "Masher".
This module is responsible for the process of "pushing" updates out. It's
comprised of a fedmsg consumer that launches threads for each repository being
mashed.
"""
import functools
import hashlib
import json
import os
import shutil
import subprocess
import tempfile
import threading
import time
from datetime import datetime
try:
from http.client import IncompleteRead
except ImportError: # pragma: no cover
# Python 2 does not have this Exception.
IncompleteRead = None
import fedmsg.consumers
import jinja2
from six.moves import zip
from six.moves.urllib import request as urllib2
from six.moves.urllib.error import HTTPError, URLError
import six
from bodhi.server import bugs, initialize_db, log, buildsys, notifications, mail
from bodhi.server.config import config, validate_path
from bodhi.server.exceptions import BodhiException
from bodhi.server.metadata import UpdateInfoMetadata
from bodhi.server.models import (Compose, ComposeState, Update, UpdateRequest, UpdateType, Release,
UpdateStatus, ReleaseState, ContentType)
from bodhi.server.scripts import clean_old_mashes
from bodhi.server.util import (copy_container, sorted_updates, sanity_check_repodata,
transactional_session_maker)
def checkpoint(method):
"""
Decorate a method for skipping sections of the mash when resuming.
Args:
method (callable): The callable to skip if we are resuming.
Returns:
callable: A function that skips the method if it can.
"""
key = method.__name__
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
if not self.resume or not self._checkpoints.get(key):
# Call it
retval = method(self, *args, **kwargs)
if retval is not None:
raise ValueError("checkpointed functions may not return stuff")
# if it didn't raise an exception, mark the checkpoint
self._checkpoints[key] = True
self.save_state()
else:
# cool! we don't need to do anything, since we ran last time
pass
return None
return wrapper
class Masher(fedmsg.consumers.FedmsgConsumer):
"""
The Bodhi Masher.
A fedmsg consumer that listens for messages from releng members.
An updates "compose" consists of:
- Verify that the message was sent by someone in releng
- Determine which updates to push
- Lock repo
- track which repos were completed
- track which packages are in the push
- lock updates
- Make sure things are safe to move? (ideally we should trust our own state)
- Check with taskotron to see if updates are pushable.
- Update security bug titles
- Move build tags
- Expire buildroot overrides
- Remove pending tags
- Send fedmsgs
- mash
Things to do while we're waiting on mash:
- Add testing updates to updates-testing digest
- Generate/update updateinfo.xml
- Have a coffee. Have 7 coffees.
Once mash is done:
- inject the updateinfo it into the repodata
- Sanity check the repo
- Flip the symlinks to the new repo
- Cache the new repodata
- Generate and email stable update notices
- Wait for the repo to hit the master mirror
- Update bugzillas
- Add comments to updates
- Email updates-testing digest
- request_complete
- Unlock repo
- unlock updates
- see if any updates now meet the stable criteria, and set the request
"""
config_key = 'masher'
def __init__(self, hub, db_factory=None, mash_dir=config.get('mash_dir'),
*args, **kw):
"""
Initialize the Masher.
Args:
hub (moksha.hub.hub.CentralMokshaHub): The hub this handler is consuming messages from.
It is used to look up the hub config values.
db_factory (bodhi.server.util.TransactionalSessionMaker or None): If given, used as the
db_factory for this Masher. If None (the default), a new TransactionalSessionMaker
is created and used.
mash_dir (basestring): The directory in which to place mashes.
Raises:
ValueError: If pungi.cmd is set to a path that does not exist.
"""
if not db_factory:
initialize_db(config)
self.db_factory = transactional_session_maker()
else:
self.db_factory = db_factory
buildsys.setup_buildsystem(config)
bugs.set_bugtracker()
self.mash_dir = mash_dir
prefix = hub.config.get('topic_prefix')
env = hub.config.get('environment')
self.topic = prefix + '.' + env + '.' + hub.config.get('masher_topic')
self.valid_signer = hub.config.get('releng_fedmsg_certname')
if not self.valid_signer:
log.warning('No releng_fedmsg_certname defined'
'Cert validation disabled')
self.max_mashes_sem = threading.BoundedSemaphore(config.get('max_concurrent_mashes'))
# This will ensure that the configured paths exist, and will raise ValueError if any does
# not.
for setting in ('pungi.cmd', 'mash_dir', 'mash_stage_dir'):
try:
validate_path(config[setting])
except ValueError as e:
raise ValueError('{} Check the {} setting.'.format(str(e), setting))
super(Masher, self).__init__(hub, *args, **kw)
log.info('Bodhi masher listening on topic: %s' % self.topic)
def consume(self, msg):
"""
Receive a fedmsg and call work() with it.
Args:
msg (munch.Munch): The fedmsg that was received.
"""
self.log.info(msg)
if self.valid_signer:
if not fedmsg.crypto.validate_signed_by(msg['body'], self.valid_signer,
**self.hub.config):
self.log.error('Received message with invalid signature!'
'Ignoring.')
# TODO: send email notifications
return
self.work(msg)
def _get_composes(self, msg):
"""
Return a list of dictionaries that represent the :class:`Composes <Compose>` we should run.
This method is compatible with the unversioned masher.start message, and also version 2.
If no version is found, it will use the updates listed in the message to create new Compose
objects and return dictionary representations of them.
This method also marks the Composes as pending, which acknowledges the receipt of the
message.
Args:
msg (munch.Munch): The body of the received fedmsg.
Returns:
list: A list of dictionaries, as returned from :meth:`Compose.__json__`.
"""
with self.db_factory() as db:
if 'api_version' in msg and msg['api_version'] == 2:
composes = [Compose.from_dict(db, c) for c in msg['composes']]
elif 'updates' in msg:
updates = [db.query(Update).filter(Update.title == t).one() for t in msg['updates']]
composes = Compose.from_updates(updates)
for c in composes:
db.add(c)
# This flush is necessary so the compose finds its updates, which gives it a
# content_type when it is serialized later.
db.flush()
else:
raise ValueError('Unable to process fedmsg: {}'.format(msg))
for c in composes:
# Acknowledge that we've received the command to run these composes.
c.state = ComposeState.pending
return [c.__json__(composer=True) for c in composes]
def work(self, msg):
"""Begin the push process.
Here we organize & prioritize the updates, and fire off separate
threads for each repo tag being mashed.
If there are any security updates in the push, then those repositories
will be executed before all others.
"""
body = msg['body']['msg']
resume = body.get('resume', False)
agent = body.get('agent')
notifications.publish(topic="mashtask.start", msg=dict(agent=agent), force=True)
results = []
threads = []
for compose in self._get_composes(body):
self.log.info('Now starting mashes')
masher = get_masher(ContentType.from_string(compose['content_type']))
if not masher:
self.log.error('Unsupported content type %s submitted for mashing. SKIPPING',
compose['content_type'])
continue
thread = masher(self.max_mashes_sem, compose, agent, self.log, self.db_factory,
self.mash_dir, resume)
threads.append(thread)
thread.start()
self.log.info('All of the batches are running. Now waiting for the final results')
for thread in threads:
thread.join()
for result in thread.results():
results.append(result)
self.log.info('Push complete! Summary follows:')
for result in results:
self.log.info(result)
def get_masher(content_type):
"""
Return the correct ComposerThread subclass for content_type.
Args:
content_type (bodhi.server.models.EnumSymbol): The content type we seek a masher for.
Return:
ComposerThread or None: Either a ContainerComposerThread, RPMComposerThread, or a
ModuleComposerThread, as appropriate, or None if no masher is found.
"""
mashers = [ContainerComposerThread, FlatpakComposerThread,
RPMComposerThread, ModuleComposerThread]
for possible in mashers:
if possible.ctype is content_type:
return possible
class ComposerThread(threading.Thread):
"""The base class that defines common things for all composes."""
ctype = None
def __init__(self, max_concur_sem, compose, agent, log, db_factory, mash_dir, resume=False):
"""
Initialize the ComposerThread.
Args:
max_concur_sem (threading.BoundedSemaphore): Semaphore making sure only a limited
number of ComposerThreads run at the same time.
compose (dict): A dictionary representation of the Compose to run, formatted like the
output of :meth:`Compose.__json__`.
agent (basestring): The user who is executing the mash.
log (logging.Logger): A logger to use for this mash.
db_factory (bodhi.server.util.TransactionalSessionMaker): A DB session to use while
mashing.
mash_dir (basestring): A path to a directory to generate the mash in.
resume (bool): Whether or not we are resuming a previous failed mash. Defaults to False.
"""
super(ComposerThread, self).__init__()
self.db_factory = db_factory
self.log = log
self.agent = agent
self.max_concur_sem = max_concur_sem
self._compose = compose
self.resume = resume
self.add_tags_async = []
self.move_tags_async = []
self.add_tags_sync = []
self.move_tags_sync = []
self.testing_digest = {}
self.success = False
def run(self):
"""Run the thread by managing a db transaction and calling work()."""
self.log.info('Grabbing semaphore')
self.max_concur_sem.acquire()
self.log.info('Acquired semaphore, starting')
try:
with self.db_factory() as session:
self.db = session
self.compose = Compose.from_dict(session, self._compose)
self._checkpoints = json.loads(self.compose.checkpoints)
self.log.info('Starting masher type %s for %s with %d updates',
self, str(self.compose), len(self.compose.updates))
self.save_state(ComposeState.initializing)
self.work()
except Exception as e:
with self.db_factory() as session:
self.db = session
self.compose = Compose.from_dict(session, self._compose)
self.compose.error_message = six.text_type(e)
self.save_state(ComposeState.failed)
self.log.exception('ComposerThread failed. Transaction rolled back.')
finally:
self.compose = None
self.db = None
self.max_concur_sem.release()
self.log.info('Released semaphore')
def results(self):
"""
Yield log string messages about the results of this mash run.
Yields:
basestring: A string for human readers indicating the success of the mash.
"""
attrs = ['name', 'success']
yield " name: %(name)-20s success: %(success)s" % dict(
zip(attrs, [getattr(self, attr, 'Undefined') for attr in attrs])
)
def work(self):
"""Perform the various high-level tasks for the mash."""
self.id = getattr(self.compose.release, '%s_tag' % self.compose.request.value)
# Set our thread's "name" so it shows up nicely in the logs.
# https://docs.python.org/2/library/threading.html#thread-objects
self.name = self.id
# For 'pending' branched releases, we only want to perform repo-related
# tasks for testing updates. For stable updates, we should just add the
# dist_tag and do everything else other than mashing/updateinfo, since
# the nightly build-branched cron job mashes for us.
self.skip_compose = False
if self.compose.release.state is ReleaseState.pending \
and self.compose.request is UpdateRequest.stable:
self.skip_compose = True
self.log.info('Running ComposerThread(%s)' % self.id)
notifications.publish(
topic="mashtask.mashing",
msg=dict(repo=self.id,
updates=[u.title for u in self.compose.updates],
agent=self.agent,
ctype=self.ctype.value),
force=True,
)
try:
if self.resume:
self.load_state()
else:
self.save_state()
if self.compose.request is UpdateRequest.stable:
self.perform_gating()
self.determine_and_perform_tag_actions()
self.update_security_bugs()
self.expire_buildroot_overrides()
self.remove_pending_tags()
self._compose_updates()
self._mark_status_changes()
self.save_state(ComposeState.notifying)
# Send fedmsg notifications
self.send_notifications()
# Update bugzillas
self.modify_bugs()
# Add comments to updates
self.status_comments()
# Announce stable updates to the mailing list
self.send_stable_announcements()
# Email updates-testing digest
self.send_testing_digest()
self._unlock_updates()
self.check_all_karma_thresholds()
self.obsolete_older_updates()
if config['clean_old_composes']:
# Clean old composes
self.save_state(ComposeState.cleaning)
clean_old_mashes.remove_old_composes()
self.save_state(ComposeState.success)
self.success = True
self.remove_state()
except Exception:
self.log.exception('Exception in ComposerThread(%s)' % self.id)
self.save_state()
raise
finally:
self.finish(self.success)
def check_all_karma_thresholds(self):
"""Run check_karma_thresholds() on testing Updates."""
if self.compose.request is UpdateRequest.testing:
self.log.info('Determing if any testing updates reached the karma '
'thresholds during the push')
for update in self.compose.updates:
try:
update.check_karma_thresholds(self.db, agent=u'bodhi')
except BodhiException:
self.log.exception('Problem checking karma thresholds')
def obsolete_older_updates(self):
"""Obsolete any older updates that may still be lying around."""
self.log.info('Checking for obsolete updates')
for update in self.compose.updates:
update.obsolete_older_updates(self.db)
def perform_gating(self):
"""Look for Updates that don't meet testing requirements, and eject them from the mash."""
self.log.debug('Performing gating.')
for update in self.compose.updates:
result, reason = update.check_requirements(self.db, config)
if not result:
self.log.warning("%s failed gating: %s" % (update.title, reason))
self.eject_from_mash(update, reason)
# We may have removed some updates from this compose above, and do we don't want future
# reads on self.compose.updates to see those, so let's mark that attribute expired so
# sqlalchemy will requery for the composes instead of using its cached copy.
self.db.expire(self.compose, ['updates'])
def eject_from_mash(self, update, reason):
"""
Eject the given Update from the current mash for the given human-readable reason.
Args:
update (bodhi.server.models.Update): The Update being ejected.
reason (basestring): A human readable explanation for the ejection, which is used in a
comment on the update, in a log message, and in a fedmsg.
"""
update.locked = False
text = '%s ejected from the push because %r' % (update.title, reason)
log.warning(text)
update.comment(self.db, text, author=u'bodhi')
# Remove the pending tag as well
if update.request is UpdateRequest.stable:
update.remove_tag(update.release.pending_stable_tag,
koji=buildsys.get_session())
elif update.request is UpdateRequest.testing:
update.remove_tag(update.release.pending_testing_tag,
koji=buildsys.get_session())
update.request = None
notifications.publish(
topic="update.eject",
msg=dict(
repo=self.id,
update=update,
reason=reason,
request=self.compose.request,
release=self.compose.release,
agent=self.agent,
),
force=True,
)
def save_state(self, state=None):
"""
Save the state of this push so it can be resumed later if necessary.
Args:
state (bodhi.server.models.ComposeState): If not ``None``, set the Compose's state
attribute to the given state. Defaults to ``None``.
"""
self.compose.checkpoints = json.dumps(self._checkpoints)
if state is not None:
self.compose.state = state
self.db.commit()
self.log.info('Compose object updated.')
def load_state(self):
"""Load the state of this push so it can be resumed later if necessary."""
self._checkpoints = json.loads(self.compose.checkpoints)
self.log.info('Masher state loaded from %s', self.compose)
self.log.info(self.compose.state)
def remove_state(self):
"""Remove the Compose object from the database."""
self.log.info('Removing state: %s', self.compose)
self.db.delete(self.compose)
def finish(self, success):
"""
Clean up pungi configs if the mash was successful, and send logs and fedmsgs.
Args:
success (bool): True if the mash had been successful, False otherwise.
"""
self.log.info('Thread(%s) finished. Success: %r' % (self.id, success))
notifications.publish(
topic="mashtask.complete",
msg=dict(success=success, repo=self.id, agent=self.agent, ctype=self.ctype.value),
force=True,
)
def update_security_bugs(self):
"""Update the bug titles for security updates."""
self.log.info('Updating bug titles for security updates')
for update in self.compose.updates:
if update.type is UpdateType.security:
for bug in update.bugs:
bug.update_details()
@checkpoint
def determine_and_perform_tag_actions(self):
"""Call _determine_tag_actions() and _perform_tag_actions()."""
self._determine_tag_actions()
self._perform_tag_actions()
def _determine_tag_actions(self):
tag_types, tag_rels = Release.get_tags(self.db)
# sync & async tagging batches
for i, batch in enumerate(sorted_updates(self.compose.updates)):
for update in batch:
add_tags = []
move_tags = []
if update.status is UpdateStatus.testing:
status = 'testing'
else:
status = 'candidate'
for build in update.builds:
from_tag = None
tags = build.get_tags()
for tag in tags:
if tag in tag_types[status]:
from_tag = tag
break
else:
reason = 'Cannot find relevant tag for %s. None of %s are in %s.'
reason = reason % (build.nvr, tags, tag_types[status])
self.eject_from_mash(update, reason)
break
if self.skip_compose:
add_tags.append((update.requested_tag, build.nvr))
else:
move_tags.append((from_tag, update.requested_tag,
build.nvr))
else:
if i == 0:
self.add_tags_sync.extend(add_tags)
self.move_tags_sync.extend(move_tags)
else:
self.add_tags_async.extend(add_tags)
self.move_tags_async.extend(move_tags)
def _perform_tag_actions(self):
koji = buildsys.get_session()
for i, batches in enumerate([(self.add_tags_sync, self.move_tags_sync),
(self.add_tags_async, self.move_tags_async)]):
add, move = batches
if i == 0:
koji.multicall = False
else:
koji.multicall = True
for action in add:
tag, build = action
self.log.info("Adding tag %s to %s" % (tag, build))
koji.tagBuild(tag, build, force=True)
for action in move:
from_tag, to_tag, build = action
self.log.info('Moving %s from %s to %s' % (
build, from_tag, to_tag))
koji.moveBuild(from_tag, to_tag, build, force=True)
if i != 0:
results = koji.multiCall()
failed_tasks = buildsys.wait_for_tasks([task[0] for task in results],
koji, sleep=15)
if failed_tasks:
raise Exception("Failed to move builds: %s" % failed_tasks)
def expire_buildroot_overrides(self):
"""Expire any buildroot overrides that are in this push."""
for update in self.compose.updates:
if update.request is UpdateRequest.stable:
for build in update.builds:
if build.override:
try:
log.debug("Expiring BRO for {} because it is being pushed.".format(
build.nvr))
build.override.expire()
except Exception:
log.exception('Problem expiring override')
def remove_pending_tags(self):
"""Remove all pending tags from the updates."""
self.log.debug("Removing pending tags from builds")
koji = buildsys.get_session()
koji.multicall = True
for update in self.compose.updates:
if update.request is UpdateRequest.stable:
update.remove_tag(update.release.pending_stable_tag,
koji=koji)
elif update.request is UpdateRequest.testing:
update.remove_tag(update.release.pending_signing_tag,
koji=koji)
update.remove_tag(update.release.pending_testing_tag,
koji=koji)
result = koji.multiCall()
self.log.debug('remove_pending_tags koji.multiCall result = %r',
result)
def _mark_status_changes(self):
"""Mark each update's status as fulfilling its request."""
self.log.info('Updating update statuses.')
for update in self.compose.updates:
now = datetime.utcnow()
if update.request is UpdateRequest.testing:
update.status = UpdateStatus.testing
update.date_testing = now
elif update.request is UpdateRequest.stable:
update.status = UpdateStatus.stable
update.date_stable = now
update.date_pushed = now
update.pushed = True
def _unlock_updates(self):
"""Unlock all the updates and clear their requests."""
self.log.info("Unlocking updates.")
for update in self.compose.updates:
update.request = None
update.locked = False
def add_to_digest(self, update):
"""Add an package to the digest dictionary.
{'release-id': {'build nvr': body text for build, ...}}
Args:
update (bodhi.server.models.Update): The update to add to the dict.
"""
prefix = update.release.long_name
if prefix not in self.testing_digest:
self.testing_digest[prefix] = {}
for i, subbody in enumerate(mail.get_template(
update, use_template='maillist_template')):
self.testing_digest[prefix][update.builds[i].nvr] = subbody[1]
def generate_testing_digest(self):
"""Generate a testing digest message for this release."""
self.log.info('Generating testing digest for %s' % self.compose.release.name)
for update in self.compose.updates:
if update.request is UpdateRequest.testing:
self.add_to_digest(update)
self.log.info('Testing digest generation for %s complete' % self.compose.release.name)
def send_notifications(self):
"""Send fedmsgs to announce completion of mashing for each update."""
self.log.info('Sending notifications')
try:
agent = os.getlogin()
except OSError: # this can happen when building on koji
agent = u'masher'
for update in self.compose.updates:
topic = u'update.complete.%s' % update.request
notifications.publish(
topic=topic,
msg=dict(update=update, agent=agent),
force=True,
)
@checkpoint
def modify_bugs(self):
"""Mark bugs on each Update as modified."""
self.log.info('Updating bugs')
for update in self.compose.updates:
self.log.debug('Modifying bugs for %s', update.title)
update.modify_bugs()
@checkpoint
def status_comments(self):
"""Add bodhi system comments to each update."""
self.log.info('Commenting on updates')
for update in self.compose.updates:
update.status_comment(self.db)
@checkpoint
def send_stable_announcements(self):
"""Send the stable announcement e-mails out."""
self.log.info('Sending stable update announcements')
for update in self.compose.updates:
if update.request is UpdateRequest.stable:
update.send_update_notice()
@checkpoint
def send_testing_digest(self):
"""Send digest mail to mailing lists."""
self.log.info('Sending updates-testing digest')
sechead = u'The following %s Security updates need testing:\n Age URL\n'
crithead = u'The following %s Critical Path updates have yet to be approved:\n Age URL\n'
testhead = u'The following builds have been pushed to %s updates-testing\n\n'
for prefix, content in six.iteritems(self.testing_digest):
release = self.db.query(Release).filter_by(long_name=prefix).one()
test_list_key = '%s_test_announce_list' % (
release.id_prefix.lower().replace('-', '_'))
test_list = config.get(test_list_key)
if not test_list:
log.warning('%r undefined. Not sending updates-testing digest',
test_list_key)
continue
log.debug("Sending digest for updates-testing %s" % prefix)
maildata = u''
security_updates = self.get_security_updates(prefix)
if security_updates:
maildata += sechead % prefix
for update in security_updates:
maildata += u' %3i %s %s\n' % (
update.days_in_testing,
update.abs_url(),
update.title)
maildata += '\n\n'
critpath_updates = self.get_unapproved_critpath_updates(prefix)
if critpath_updates:
maildata += crithead % prefix
for update in self.get_unapproved_critpath_updates(prefix):
maildata += u' %3i %s %s\n' % (
update.days_in_testing,
update.abs_url(),
update.title)
maildata += '\n\n'
maildata += testhead % prefix
updlist = sorted(content.keys())
for pkg in updlist:
maildata += u' %s\n' % pkg
maildata += u'\nDetails about builds:\n\n'
for nvr in updlist:
maildata += u"\n" + self.testing_digest[prefix][nvr]
mail.send_mail(config.get('bodhi_email'), test_list,
'%s updates-testing report' % prefix, maildata)
def get_security_updates(self, release):
"""
Return an iterable of security updates in the given release.
Args:
release (basestring): The long_name of a Release object, used to query for the matching
Release model.
Returns:
iterable: An iterable of security Update objects from the given release.
"""
release = self.db.query(Release).filter_by(long_name=release).one()
updates = self.db.query(Update).filter(
Update.type == UpdateType.security,
Update.status == UpdateStatus.testing,
Update.release == release,
Update.request.is_(None)
).all()
updates = self.sort_by_days_in_testing(updates)
return updates
def get_unapproved_critpath_updates(self, release):
"""
Return a list of unapproved critical path updates for the given release.
Builds a query for critical path updates that are testing and do not have a request, and
then returns a list of the query results reverse sorted by the number of days they have been
in testing.
Args:
release (basestring): The long_name of the Release to be queried.
Return:
list: The list of unapproved critical path updates for the given release.
"""
release = self.db.query(Release).filter_by(long_name=release).one()
updates = self.db.query(Update).filter_by(
critpath=True,
status=UpdateStatus.testing,
request=None,
release=release,
).order_by(Update.date_submitted.desc()).all()
updates = self.sort_by_days_in_testing(updates)
return updates
def sort_by_days_in_testing(self, updates):
"""
Sort the given updates by the number of days they have been in testing, reversed.
Args:
updates (iterable): The updates to be sorted.
Return:
list: The sorted updates.
"""
updates = list(updates)
updates.sort(key=lambda update: update.days_in_testing, reverse=True)
return updates
class ContainerComposerThread(ComposerThread):
"""Use skopeo to copy and tag container images."""
ctype = ContentType.container
def _compose_updates(self):
"""Use skopeo to copy images to the correct repos and tags."""
for update in self.compose.updates:
if update.request is UpdateRequest.stable:
destination_tag = 'latest'
else:
destination_tag = 'testing'
for build in update.builds:
# Using None as the destination tag on the first one will default to the
# version-release string.
for dtag in [None, build.nvr_version, destination_tag]:
copy_container(build, destination_tag=dtag)
class FlatpakComposerThread(ContainerComposerThread):
"""Use skopeo to copy and tag flatpak images."""
ctype = ContentType.flatpak
class PungiComposerThread(ComposerThread):
"""Compose update with Pungi."""
pungi_template_config_key = None
def __init__(self, max_concur_sem, compose, agent, log, db_factory, mash_dir, resume=False):
"""
Initialize the ComposerThread.
Args:
max_concur_sem (threading.BoundedSemaphore): Semaphore making sure only a limited
number of ComposerThreads run at the same time.
compose (dict): A dictionary representation of the Compose to run, formatted like the
output of :meth:`Compose.__json__`.
agent (basestring): The user who is executing the mash.
log (logging.Logger): A logger to use for this mash.
db_factory (bodhi.server.util.TransactionalSessionMaker): A DB session to use while
mashing.
mash_dir (basestring): A path to a directory to generate the mash in.
resume (bool): Whether or not we are resuming a previous failed mash. Defaults to False.
"""
super(PungiComposerThread, self).__init__(max_concur_sem, compose, agent, log, db_factory,
mash_dir, resume)
self.devnull = None
self.mash_dir = mash_dir
self.path = None
def finish(self, success):
"""
Clean up pungi configs if the mash was successful, and send logs and fedmsgs.
Args:
success (bool): True if the mash had been successful, False otherwise.
"""
if hasattr(self, '_pungi_conf_dir') and os.path.exists(self._pungi_conf_dir) and success:
# Let's clean up the pungi configs we wrote
shutil.rmtree(self._pungi_conf_dir)
# The superclass will handle the logs and fedmsg.
super(PungiComposerThread, self).finish(success)
def load_state(self):
"""Set self.path if completed_repo is found in checkpoints."""
super(PungiComposerThread, self).load_state()
if 'completed_repo' in self._checkpoints:
self.path = self._checkpoints['completed_repo']
self.log.info('Resuming push with completed repo: %s' % self.path)
return
self.log.info('Resuming push without any completed repos')
def _compose_updates(self):
"""Start pungi, generate updateinfo, wait for pungi, and wait for the mirrors."""
if not os.path.exists(self.mash_dir):
self.log.info('Creating %s' % self.mash_dir)
os.makedirs(self.mash_dir)
composedone = self._checkpoints.get('compose_done')
if not self.skip_compose and not composedone:
pungi_process = self._punge()
# Things we can do while Pungi is running
self.generate_testing_digest()
if not self.skip_compose and not composedone:
uinfo = self._generate_updateinfo()
self._wait_for_pungi(pungi_process)
uinfo.insert_updateinfo(self.path)
self._sanity_check_repo()
self._wait_for_repo_signature()
self._stage_repo()
self._checkpoints['compose_done'] = True
self.save_state()
if not self.skip_compose:
# Wait for the repo to hit the master mirror
self._wait_for_sync()
def _copy_additional_pungi_files(self, pungi_conf_dir, template_env):
"""
Child classes should override this to place type-specific Pungi files in the config dir.
Args:
pungi_conf_dir (basestring): A path to the directory that Pungi's configs are being
written to.
template_env (jinja2.Environment): The jinja2 environment to be used while rendering the
variants.xml template.
raises:
NotImplementedError: The parent class does not implement this method.
"""
raise NotImplementedError
def _create_pungi_config(self):
"""Create a temp dir and render the Pungi config templates into the dir."""
loader = jinja2.FileSystemLoader(searchpath=config.get('pungi.basepath'))
env = jinja2.Environment(loader=loader,
autoescape=False,
block_start_string='[%',
block_end_string='%]',
variable_start_string='[[',
variable_end_string=']]',
comment_start_string='[#',
comment_end_string='#]')
env.globals['id'] = self.id
env.globals['release'] = self.compose.release
env.globals['request'] = self.compose.request
env.globals['updates'] = self.compose.updates
config_template = config.get(self.pungi_template_config_key)
template = env.get_template(config_template)