-
Notifications
You must be signed in to change notification settings - Fork 93
/
x509.py
223 lines (180 loc) · 8.28 KB
/
x509.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# This file is part of fedmsg.
# Copyright (C) 2012 - 2014 Red Hat, Inc.
#
# fedmsg is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# fedmsg is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with fedmsg; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors: Ralph Bean <rbean@redhat.com>
#
""" ``fedmsg.crypto.x509`` - X.509 backend for :mod:`fedmsg.crypto`. """
import logging
import warnings
import six
try:
# Else we need M2Crypto and m2ext
import M2Crypto
import m2ext
_m2crypto = True
except ImportError:
_m2crypto = False
from .utils import _load_remote_cert, validate_policy
from .x509_ng import _cryptography, sign as _crypto_sign, validate as _crypto_validate
import fedmsg.crypto # noqa: E402
import fedmsg.encoding # noqa: E402
_log = logging.getLogger(__name__)
if six.PY3:
long = int
def _disabled_sign(*args, **kwargs):
"""A fallback function that emits a warning when no crypto is being used."""
warnings.warn('Message signing is disabled because "cryptography" and '
'"pyopenssl" or "m2crypto" are not available.')
def _disabled_validate(*args, **kwargs):
"""A fallback function that emits a warning when no crypto is being used."""
warnings.warn('Message signature validation is disabled because ("cryptography"'
' and "pyopenssl") or "m2crypto" are not available.')
def _m2crypto_sign(message, ssldir=None, certname=None, **config):
""" Insert two new fields into the message dict and return it.
Those fields are:
- 'signature' - the computed RSA message digest of the JSON repr.
- 'certificate' - the base64 X509 certificate of the sending host.
"""
if ssldir is None or certname is None:
error = "You must set the ssldir and certname keyword arguments."
raise ValueError(error)
message['crypto'] = 'x509'
certificate = M2Crypto.X509.load_cert(
"%s/%s.crt" % (ssldir, certname)).as_pem()
# Opening this file requires elevated privileges in stg/prod.
rsa_private = M2Crypto.RSA.load_key(
"%s/%s.key" % (ssldir, certname))
digest = M2Crypto.EVP.MessageDigest('sha1')
digest.update(fedmsg.encoding.dumps(message))
signature = rsa_private.sign(digest.digest())
# Return a new dict containing the pairs in the original message as well
# as the new authn fields.
return dict(message.items() + [
('signature', signature.encode('base64')),
('certificate', certificate.encode('base64')),
])
def _m2crypto_validate(message, ssldir=None, **config):
""" Return true or false if the message is signed appropriately.
Four things must be true:
1) The X509 cert must be signed by our CA
2) The cert must not be in our CRL.
3) We must be able to verify the signature using the RSA public key
contained in the X509 cert.
4) The topic of the message and the CN on the cert must appear in the
:ref:`conf-routing-policy` dict.
"""
if ssldir is None:
raise ValueError("You must set the ssldir keyword argument.")
def fail(reason):
_log.warn("Failed validation. %s" % reason)
return False
# Some sanity checking
for field in ['signature', 'certificate']:
if field not in message:
return fail("No %r field found." % field)
if not isinstance(message[field], six.text_type):
_log.error('msg[%r] is not a unicode string' % field)
try:
# Make an effort to decode it, it's very likely utf-8 since that's what
# is hardcoded throughout fedmsg. Worst case scenario is it'll cause a
# validation error when there shouldn't be one.
message[field] = message[field].decode('utf-8')
except UnicodeError as e:
_log.error("Unable to decode the message '%s' field: %s", field, str(e))
return False
# Peal off the auth datums
signature = message['signature'].decode('base64')
certificate = message['certificate'].decode('base64')
message = fedmsg.crypto.strip_credentials(message)
# Build an X509 object
cert = M2Crypto.X509.load_cert_string(certificate)
# Validate the cert. Make sure it is signed by our CA.
# validate_certificate will one day be a part of M2Crypto.SSL.Context
# https://bugzilla.osafoundation.org/show_bug.cgi?id=11690
default_ca_cert_loc = 'https://fedoraproject.org/fedmsg/ca.crt'
cafile = _load_remote_cert(
config.get('ca_cert_location', default_ca_cert_loc),
config.get('ca_cert_cache', '/etc/pki/fedmsg/ca.crt'),
config.get('ca_cert_cache_expiry', 0),
**config)
ctx = m2ext.SSL.Context()
ctx.load_verify_locations(cafile=cafile)
if not ctx.validate_certificate(cert):
return fail("X509 certificate is not valid.")
# Load and check against the CRL
crl = None
if 'crl_location' in config and 'crl_cache' in config:
crl = _load_remote_cert(
config.get('crl_location', 'https://fedoraproject.org/fedmsg/crl.pem'),
config.get('crl_cache', '/var/cache/fedmsg/crl.pem'),
config.get('crl_cache_expiry', 1800),
**config)
if crl:
crl = M2Crypto.X509.load_crl(crl)
# FIXME -- We need to check that the CRL is signed by our own CA.
# See https://bugzilla.osafoundation.org/show_bug.cgi?id=12954#c2
# if not ctx.validate_certificate(crl):
# return fail("X509 CRL is not valid.")
# FIXME -- we check the CRL, but by doing string comparison ourselves.
# This is not what we want to be doing.
# There is a patch into M2Crypto to handle this for us. We should use it
# once its integrated upstream.
# See https://bugzilla.osafoundation.org/show_bug.cgi?id=12954#c2
revoked_serials = [long(line.split(': ')[1].strip(), base=16)
for line in crl.as_text().split('\n')
if 'Serial Number:' in line]
if cert.get_serial_number() in revoked_serials:
subject = cert.get_subject()
signer = '(no CN)'
if subject.nid.get('CN'):
entry = subject.get_entries_by_nid(subject.nid['CN'])[0]
if entry:
signer = entry.get_data().as_text()
return fail("X509 cert %r, %r is in the Revocation List (CRL)" % (
signer, cert.get_serial_number()))
# If the cert is good, then test to see if the signature in the messages
# matches up with the provided cert.
rsa_public = cert.get_pubkey().get_rsa()
digest = M2Crypto.EVP.MessageDigest('sha1')
digest.update(fedmsg.encoding.dumps(message))
try:
if not rsa_public.verify(digest.digest(), signature):
raise M2Crypto.RSA.RSAError("RSA signature failed to validate.")
except M2Crypto.RSA.RSAError as e:
return fail(str(e))
# Now we know that the cert is valid. The message is *authenticated*.
# * Next step: Authorization *
# Load our policy from the config dict.
routing_policy = config.get('routing_policy', {})
# Determine the name of the signer of the message.
# This will be something like "shell-pkgs01.stg.phx2.fedoraproject.org"
subject = cert.get_subject()
signer = subject.get_entries_by_nid(subject.nid['CN'])[0]\
.get_data().as_text()
return validate_policy(
message.get('topic'), signer, routing_policy, config.get('routing_nitpicky', False))
# Maintain the ``sign`` and ``validate`` APIs while preferring cryptography and
# pyOpenSSL over M2Crypto.
if _cryptography:
sign = _crypto_sign
validate = _crypto_validate
elif _m2crypto:
sign = _m2crypto_sign
validate = _m2crypto_validate
else:
sign = _disabled_sign
validate = _disabled_validate