-
Notifications
You must be signed in to change notification settings - Fork 116
/
entcertlib.py
479 lines (392 loc) · 17.7 KB
/
entcertlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
from __future__ import print_function, division, absolute_import
#
# Copyright (c) 2014 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import logging
import socket
from rhsm.certificate import Key, create_from_pem
from rhsm.certificate2 import CONTENT_ACCESS_CERT_TYPE
from subscription_manager.certdirectory import Writer
from subscription_manager import certlib
from subscription_manager import content_action_client
from subscription_manager import utils
from subscription_manager.injection import IDENTITY, require
from subscription_manager import rhelentbranding
import subscription_manager.injection as inj
from subscription_manager.i18n import ungettext, ugettext as _
log = logging.getLogger(__name__)
CONTENT_ACCESS_CERT_CAPABILITY = "org_level_content_access"
class EntCertActionInvoker(certlib.BaseActionInvoker):
"""Invoker for entitlement certificate updating actions."""
def _do_update(self):
action = EntCertUpdateAction()
return action.perform()
# this guy is an oddball
# NOTE: this lib and EntCertDeleteAction are currently
# unused. Intention is to replace managerlib.clean_all_data
# with a CertActionClient.delete invocation
class EntCertDeleteLib(object):
"""Invoker for entitlement certificate delete actions."""
def __init__(self, serial_numbers=None,
ent_dir=None):
self.locker = certlib.Locker()
self.ent_dir = ent_dir
def delete(self):
self.locker.run(self._do_delete)
def _do_delete(self):
action = EntCertDeleteAction(ent_dir=self.ent_dir,
serial_numbers=self.serial_numbers)
return action.perform()
# FIXME: currently unused
class EntCertDeleteAction(object):
"""Action for deleting all entitlement certs."""
def __init__(self, ent_dir=None):
self.ent_dir = ent_dir
def perform(self, serial_numbers):
for sn in serial_numbers:
cert = self.ent_dir.find(sn)
if cert is None:
continue
cert.delete()
return self
class EntCertUpdateAction(object):
"""Action for syncing entitlement certificates.
EntCertUpdateAction is used to sync entitlement certs based on
currently entitlement status.
An EntCertUpdateReport is returned containing information about the changes
that were applied. install() and delete() methods are expected to update
self.report.
New and updated ent certs are installed via a EntitlementCertBundlesInstaller.
Expired or extraneous entitlement certs are deleted.
If there are changes applied to the EntitltementDirectory, repo_hook()
and branding_hook() are triggered. Certificates will have been updated,
and written to disk, and EntitlementDirectory refresh before these hooks
are called.
The injected self.uep is used to query RHSM API for a list of expected
entitlement certificate serial numbers. If local system is missing certs
matching those serial numbers, the API is queried for the list of serial
numbers to update.
rogue: ent certs installed on system but not known by RHSM API.
missing: ent certs RHSM API knows, but are not installed on system.
"""
def __init__(self, report=None):
self.cp_provider = inj.require(inj.CP_PROVIDER)
self.uep = self.cp_provider.get_consumer_auth_cp()
self.ent_dir = inj.require(inj.ENT_DIR)
self.identity = require(IDENTITY)
self.report = EntCertUpdateReport()
self.content_access_cache = inj.require(inj.CONTENT_ACCESS_CACHE)
# NOTE: this is slightly at odds with the manual cert import
# path, manual import certs wont get a 'report', etc
def perform(self):
local = self._get_local_serials()
try:
expected = self._get_expected_serials()
except socket.error as ex:
log.exception(ex)
log.error('Cannot modify subscriptions while disconnected')
raise Disconnected()
cert_changed = False
missing_serials = self._find_missing_serials(local, expected)
rogue_serials = self._find_rogue_serials(local, expected)
self.delete(rogue_serials)
installed_serials = self.install(missing_serials)
log.info('certs updated:\n%s', self.report)
self.syslog_results()
# We call EntCertlibActionInvoker.update() solo from
# the 'attach' cli instead of an ActionClient. So
# we need to refresh the ent_dir object before calling
# content updating actions.
self.ent_dir.refresh()
if missing_serials or rogue_serials:
cert_changed = True
if self.uep.has_capability(CONTENT_ACCESS_CERT_CAPABILITY):
content_access_certs = self._find_content_access_certs()
if len(content_access_certs) > 0:
# This addresses BZs: 1448855, 1450862
obsolete_certs = []
for cont_access_cert in content_access_certs:
if cont_access_cert.serial in installed_serials:
continue
if cont_access_cert.serial not in expected:
obsolete_certs.append(cont_access_cert)
if len(obsolete_certs) > 0:
log.info('Deleting obsolete content access certificate')
self.delete(obsolete_certs)
update_data = self.content_access_hook()
if update_data is not None:
cert_changed = True
if cert_changed:
self.repo_hook()
# NOTE: Since we have the yum repos defined here now
# we could update product id certs here, or install
# them if they are needed, but missing. That way the
# branding installs could be more accurate.
# reload certs and update branding
self.branding_hook()
# if we want the full report, we can get it, but
# this makes CertLib.update() have same sig as reset
# of *Lib.update
return self.report
def install(self, missing_serials):
"""Install any missing entitlement certificates."""
cert_bundles = self.get_certificates_by_serial_list(missing_serials)
ent_cert_bundles_installer = EntitlementCertBundlesInstaller(self.report)
return ent_cert_bundles_installer.install(cert_bundles)
def _find_content_access_certs(self):
certs = self.ent_dir.list_with_content_access()
return [cert for cert in certs if cert.entitlement_type == CONTENT_ACCESS_CERT_TYPE]
def content_access_hook(self):
if not self.uep.has_capability(CONTENT_ACCESS_CERT_CAPABILITY):
return # do nothing if we cannot check for content access cert updates
content_access_certs = self._find_content_access_certs()
update_data = None
if len(content_access_certs) > 0:
update_data = self.content_access_cache.check_for_update()
for content_access_cert in content_access_certs:
self.content_access_cache.update_cert(content_access_cert, update_data)
if len(content_access_certs) == 0 and self.content_access_cache.exists():
self.content_access_cache.remove()
if update_data is not None:
self.ent_dir.refresh()
return update_data
def branding_hook(self):
"""Update branding info based on entitlement cert changes."""
# RHELBrandsInstaller will use latest ent_dir contents
brands_installer = rhelentbranding.RHELBrandsInstaller()
brands_installer.install()
def repo_hook(self):
"""Update content repos."""
log.debug("entcerlibaction.repo_hook")
try:
# NOTE: this may need a lock
content_action = content_action_client.ContentActionClient()
content_action.update()
except Exception as e:
log.debug(e)
log.debug("Failed to update repos")
def _find_missing_serials(self, local, expected):
""" Find serials from the server we do not have locally. """
missing = [sn for sn in expected if sn not in local]
return missing
def _find_rogue_serials(self, local, expected):
"""Find serials we have locally but are not on the server."""
rogue = [local[sn] for sn in local if sn not in expected]
return rogue
def syslog_results(self):
"""Write generated EntCertUpdateReport info to syslog."""
for cert in self.report.added:
utils.system_log("Added subscription for '%s' contract '%s'" %
(cert.order.name, cert.order.contract))
for product in cert.products:
utils.system_log("Added subscription for product '%s'" %
(product.name))
for cert in self.report.rogue:
utils.system_log("Removed subscription for '%s' contract '%s'" %
(cert.order.name, cert.order.contract))
for product in cert.products:
utils.system_log("Removed subscription for product '%s'" %
(product.name))
def _get_local_serials(self):
local = {}
# certificates in grace period were being renamed everytime.
# this makes sure we don't try to re-write certificates in
# grace period
# XXX since we don't use grace period, this might not be needed
self.ent_dir.refresh()
ent_certs = self.ent_dir.list() + self.ent_dir.list_with_content_access()
ent_certs = list(set(ent_certs))
for valid in ent_certs:
sn = valid.serial
self.report.valid.append(sn)
local[sn] = valid
return local
def get_certificate_serials_list(self):
"""Query RHSM API for list of expected ent cert serial numbers."""
results = []
# if there is no UEP object, short circuit
if self.uep is None:
return results
identity = inj.require(inj.IDENTITY)
if not identity.is_valid():
# We can get here on unregister, with no id or ent certs or repos,
# but don't want to raise an exception that would be logged. So
# empty result set is returned.
return results
reply = self.uep.getCertificateSerials(identity.uuid)
for d in reply:
sn = d['serial']
results.append(sn)
return results
def get_certificates_by_serial_list(self, sn_list):
"""Fetch a list of entitlement certificates specified by a list of serial numbers."""
result = []
if sn_list:
sn_list = [str(sn) for sn in sn_list]
# NOTE: use injected IDENTITY, need to validate this
# handles disconnected errors properly
reply = self.uep.getCertificates(self.identity.uuid,
serials=sn_list)
for cert in reply:
result.append(cert)
return result
def _get_expected_serials(self):
exp = self.get_certificate_serials_list()
self.report.expected = exp
return exp
def delete(self, rogue):
for cert in rogue:
try:
cert.delete()
self.report.rogue.append(cert)
except OSError as er:
log.exception(er)
log.warn("Failed to delete cert")
# If we just deleted certs, we need to refresh the now stale
# entitlement directory before we go to delete expired certs.
rogue_count = len(self.report.rogue)
if rogue_count > 0:
print(ungettext("%s local certificate has been deleted.",
"%s local certificates have been deleted.",
rogue_count) % rogue_count)
self.ent_dir.refresh()
class EntitlementCertBundlesInstaller(object):
"""Install a list of entitlement cert bundles.
pre_install() is triggered before any of the ent cert
bundles are installed. post_install() is triggered after
all of the ent cert bundles are installed.
"""
def __init__(self, report):
self.exceptions = []
self.report = report
def install(self, cert_bundles):
"""Fetch entitliement certs, install them, and update the report."""
bundle_installer = EntitlementCertBundleInstaller(self.report)
installed_serials = []
for cert_bundle in cert_bundles:
cert_serial = bundle_installer.install(cert_bundle)
if cert_serial is not None:
installed_serials.append(cert_serial)
self.exceptions = bundle_installer.exceptions
self.post_install()
return installed_serials
# TODO: add subman plugin slot,conduit,hooks
def pre_install(self):
"""Hook called before any ent cert bundles are installed."""
log.debug("cert bundles pre_install")
def post_install(self):
"""Hook called after all cert bundles have been installed."""
for installed in self._get_installed():
log.debug("cert bundles post_install: %s" % installed)
def get_installed(self):
"""Return a list of the ent cert bundles that were installed."""
return self._get_installed()
def _get_installed(self):
"""Return the bundles installed based on this impl's EntCertUpdateReport."""
return self.report.added
class EntitlementCertBundleInstaller(object):
"""Install an entitlement cert bundle (cert/key).
Split a bundle into an certificate.EntitlementCertificate and a
certificate.Key, and persist them.
pre_install() is called before the cert bundle is installed.
post_install() is called after the cert bundle is installed.
Note that EntitlementCertBundlesInstaller's pre and post install
hooks are before and after installing the full list of ent cert
bundles, while this is pre/post each ent cert bundle.
"""
def __init__(self, report):
self.exceptions = []
self.report = report
def install(self, bundle):
"""Persist an ent cert and it's key after splitting it from the bundle."""
self.pre_install(bundle)
cert_bundle_writer = Writer()
cert_serial = None
try:
key, cert = self.build_cert(bundle)
cert_bundle_writer.write(key, cert)
self.report.added.append(cert)
cert_serial = cert.serial
except Exception as e:
self.install_exception(bundle, e)
self.post_install(bundle)
return cert_serial
# TODO: add subman plugin, slot, and conduit
def pre_install(self, bundle):
"""Hook called before an ent cert bundle is installed."""
log.debug("Ent cert bundle pre_install")
# should probably be in python-rhsm/certificate
def build_cert(self, bundle):
"""Split a cert bundle into a EntitlementCertificate and a Key."""
keypem = bundle['key']
crtpem = bundle['cert']
key = Key(keypem)
cert = create_from_pem(crtpem)
return (key, cert)
def install_exception(self, bundle, exception):
"""Log exceptions and add them to the EntCertUpdateReport."""
log.exception(exception)
log.error('Bundle not loaded:\n%s\n%s', bundle, exception)
self.report._exceptions.append(exception)
def post_install(self, bundle):
"""Hook called after an ent cert bundle is installed."""
log.debug("ent cert bundle post_install")
class Disconnected(Exception):
pass
class EntCertUpdateReport(certlib.ActionReport):
"""Report entitlement cert update action changes."""
name = "Entitlement Cert Updates"
def __init__(self):
self.valid = []
self.expected = []
self.added = []
self.rogue = []
self._exceptions = []
def updates(self):
"""Total number of ent certs installed and deleted."""
return (len(self.added) + len(self.rogue))
# need an ExceptionsReport?
# FIXME: needs to be properties
def exceptions(self):
return self._exceptions
def write(self, s, title, certificates):
"""Generate a report stanza for a list of certs."""
indent = ' '
s.append(title)
if certificates:
for c in certificates:
products = c.products
if not products:
s.append('%s[sn:%d (%s) @ %s]' %
(indent,
c.serial,
c.order.name,
c.path))
for product in products:
s.append('%s[sn:%d (%s,) @ %s]' %
(indent,
c.serial,
product.name,
c.path))
else:
s.append('%s<NONE>' % indent)
def __str__(self):
"""__str__ of report. Used in rhsm and rhsmcertd logging."""
s = []
s.append(_('Total updates: %d') % self.updates())
s.append(_('Found (local) serial# %s') % self.valid)
s.append(_('Expected (UEP) serial# %s') % self.expected)
self.write(s, _('Added (new)'), self.added)
self.write(s, _('Deleted (rogue):'), self.rogue)
return '\n'.join(s)