Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add openssl_certificate_info module #54709

Merged
merged 12 commits into from
Apr 5, 2019
267 changes: 264 additions & 3 deletions lib/ansible/module_utils/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
# --------------------------------------------------------------
# A clearly marked portion of this file is licensed under the BSD
# Copyright (c) 2015, 2016 Paul Kehrer (@reaperhulk)
# Copyright (c) 2017 Fraser Tweedale (@frasertweedale)
# For more details, search for the function _obj2txt().


try:
import OpenSSL
from OpenSSL import crypto
except ImportError:
# An error will be raised in the calling class to let the end
Expand All @@ -35,6 +42,8 @@


import abc
import base64
import binascii
import datetime
import errno
import hashlib
Expand Down Expand Up @@ -332,6 +341,225 @@ def remove(self, module):
pass


_OID_MAP = {
# First entry is 'canonical' name
"2.5.29.37.0": ('Any Extended Key Usage', 'anyExtendedKeyUsage'),
"1.3.6.1.5.5.7.1.3": ('qcStatements', ),
"1.3.6.1.5.5.7.3.10": ('DVCS', 'dvcs'),
"1.3.6.1.5.5.7.3.7": ('IPSec User', 'ipsecUser'),
"1.3.6.1.5.5.7.1.2": ('Biometric Info', 'biometricInfo'),
}

_NORMALIZE_NAMES = {
'CN': 'commonName',
'C': 'countryName',
'L': 'localityName',
'ST': 'stateOrProvinceName',
'street': 'streetAddress',
'O': 'organizationName',
'OU': 'organizationalUnitName',
'SN': 'surname',
'GN': 'givenName',
'UID': 'userId',
'userID': 'userId',
'DC': 'domainComponent',
'jurisdictionC': 'jurisdictionCountryName',
'jurisdictionL': 'jurisdictionLocalityName',
'jurisdictionST': 'jurisdictionStateOrProvinceName',
'serverAuth': 'TLS Web Server Authentication',
'clientAuth': 'TLS Web Client Authentication',
'codeSigning': 'Code Signing',
'emailProtection': 'E-mail Protection',
'timeStamping': 'Time Stamping',
'OCSPSigning': 'OCSP Signing',
}

for dotted, names in _OID_MAP.items():
for name in names[1:]:
_NORMALIZE_NAMES[name] = names[0]


def pyopenssl_normalize_name(name):
nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(name))
if nid != 0:
b_name = OpenSSL._util.lib.OBJ_nid2ln(nid)
name = to_text(OpenSSL._util.ffi.string(b_name))
return _NORMALIZE_NAMES.get(name, name)


# #####################################################################################
# #####################################################################################
# # This excerpt is dual licensed under the terms of the Apache License, Version
# # 2.0, and the BSD License. See the LICENSE file at
# # https://github.com/pyca/cryptography/blob/master/LICENSE for complete details.
# #
# # Adapted from cryptography's hazmat/backends/openssl/decode_asn1.py
# #
# # Copyright (c) 2015, 2016 Paul Kehrer (@reaperhulk)
# # Copyright (c) 2017 Fraser Tweedale (@frasertweedale)
# #
# # Relevant commits from cryptography project (https://github.com/pyca/cryptography):
# # pyca/cryptography@719d536dd691e84e208534798f2eb4f82aaa2e07
# # pyca/cryptography@5ab6d6a5c05572bd1c75f05baf264a2d0001894a
# # pyca/cryptography@2e776e20eb60378e0af9b7439000d0e80da7c7e3
# # pyca/cryptography@fb309ed24647d1be9e319b61b1f2aa8ebb87b90b
# # pyca/cryptography@2917e460993c475c72d7146c50dc3bbc2414280d
# # pyca/cryptography@3057f91ea9a05fb593825006d87a391286a4d828
# # pyca/cryptography@d607dd7e5bc5c08854ec0c9baff70ba4a35be36f
def _obj2txt(openssl_lib, openssl_ffi, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
#
# But OIDs longer than this occur in real life (e.g. Active
# Directory makes some very long OIDs). So we need to detect
# and properly handle the case where the default buffer is not
# big enough.
#
buf_len = 80
buf = openssl_ffi.new("char[]", buf_len)

# 'res' is the number of bytes that *would* be written if the
# buffer is large enough. If 'res' > buf_len - 1, we need to
# alloc a big-enough buffer and go again.
res = openssl_lib.OBJ_obj2txt(buf, buf_len, obj, 1)
if res > buf_len - 1: # account for terminating null byte
buf_len = res + 1
buf = openssl_ffi.new("char[]", buf_len)
res = openssl_lib.OBJ_obj2txt(buf, buf_len, obj, 1)
return openssl_ffi.buffer(buf, res)[:].decode()
# #####################################################################################
# #####################################################################################


def cryptography_get_extensions_from_cert(cert):
# Since cryptography won't give us the DER value for an extension
# (that is only stored for unrecognized extensions), we have to re-do
# the extension parsing outselves.
result = dict()
backend = cert._backend
x509_obj = cert._x509

for i in range(backend._lib.X509_get_ext_count(x509_obj)):
ext = backend._lib.X509_get_ext(x509_obj, i)
if ext == backend._ffi.NULL:
continue
crit = backend._lib.X509_EXTENSION_get_critical(ext)
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
entry = dict(
critical=(crit == 1),
value=base64.b64encode(der),
)
oid = _obj2txt(backend._lib, backend._ffi, backend._lib.X509_EXTENSION_get_object(ext))
result[oid] = entry
return result


def pyopenssl_get_extensions_from_cert(cert):
# While pyOpenSSL allows us to get an extension's DER value, it won't
# give us the dotted string for an OID. So we have to do some magic to
# get hold of it.
result = dict()
ext_count = cert.get_extension_count()
for i in range(0, ext_count):
ext = cert.get_extension(i)
entry = dict(
critical=bool(ext.get_critical()),
value=base64.b64encode(ext.get_data()),
)
oid = _obj2txt(
OpenSSL._util.lib,
OpenSSL._util.ffi,
OpenSSL._util.lib.X509_EXTENSION_get_object(ext._extension)
)
# This could also be done a bit simpler:
#
# oid = _obj2txt(OpenSSL._util.lib, OpenSSL._util.ffi, OpenSSL._util.lib.OBJ_nid2obj(ext._nid))
#
# Unfortunately this gives the wrong result in case the linked OpenSSL
# doesn't know the OID. That's why we have to get the OID dotted string
# similarly to how cryptography does it.
result[oid] = entry
return result


def crpytography_name_to_oid(name):
if name in ('CN', 'commonName'):
return x509.oid.NameOID.COMMON_NAME
if name in ('C', 'countryName'):
return x509.oid.NameOID.COUNTRY_NAME
if name in ('L', 'localityName'):
return x509.oid.NameOID.LOCALITY_NAME
if name in ('ST', 'stateOrProvinceName'):
return x509.oid.NameOID.STATE_OR_PROVINCE_NAME
if name in ('street', 'streetAddress'):
return x509.oid.NameOID.STREET_ADDRESS
if name in ('O', 'organizationName'):
return x509.oid.NameOID.ORGANIZATION_NAME
if name in ('OU', 'organizationalUnitName'):
return x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME
if name in ('serialNumber', ):
return x509.oid.NameOID.SERIAL_NUMBER
if name in ('SN', 'surname'):
return x509.oid.NameOID.SURNAME
if name in ('GN', 'givenName'):
return x509.oid.NameOID.GIVEN_NAME
if name in ('title', ):
return x509.oid.NameOID.TITLE
if name in ('generationQualifier', ):
return x509.oid.NameOID.GENERATION_QUALIFIER
if name in ('x500UniqueIdentifier', ):
return x509.oid.NameOID.X500_UNIQUE_IDENTIFIER
if name in ('dnQualifier', ):
return x509.oid.NameOID.DN_QUALIFIER
if name in ('pseudonym', ):
return x509.oid.NameOID.PSEUDONYM
if name in ('UID', 'userId', 'UserID'):
return x509.oid.NameOID.USER_ID
if name in ('DC', 'domainComponent'):
return x509.oid.NameOID.DOMAIN_COMPONENT
if name in ('emailAddress', ):
return x509.oid.NameOID.EMAIL_ADDRESS
if name in ('jurisdictionC', 'jurisdictionCountryName'):
return x509.oid.NameOID.JURISDICTION_COUNTRY_NAME
if name in ('jurisdictionL', 'jurisdictionLocalityName'):
return x509.oid.NameOID.JURISDICTION_LOCALITY_NAME
if name in ('jurisdictionST', 'jurisdictionStateOrProvinceName'):
return x509.oid.NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME
if name in ('businessCategory', ):
return x509.oid.NameOID.BUSINESS_CATEGORY
if name in ('postalAddress', ):
return x509.oid.NameOID.POSTAL_ADDRESS
if name in ('postalCode', ):
return x509.oid.NameOID.POSTAL_CODE
if name in ('serverAuth', 'TLS Web Server Authentication'):
return x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
if name in ('clientAuth', 'TLS Web Client Authentication'):
return x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
if name in ('codeSigning', 'Code Signing'):
return x509.oid.ExtendedKeyUsageOID.CODE_SIGNING
if name in ('emailProtection', 'E-mail Protection'):
return x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION
if name in ('timeStamping', 'Time Stamping'):
return x509.oid.ExtendedKeyUsageOID.TIME_STAMPING
if name in ('OCSPSigning', 'OCSP Signing'):
return x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING
if name in ('anyExtendedKeyUsage', 'Any Extended Key Usage'):
return x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE
for dotted, names in _OID_MAP.items():
if name in names:
return x509.oid.ObjectIdentifier(dotted)
raise OpenSSLObjectError('Cannot find OID for "{0}"'.format(name))


def crpytography_oid_to_name(oid):
dotted_string = oid.dotted_string
names = _OID_MAP.get(dotted_string)
name = names[0] if names else oid._name
return _NORMALIZE_NAMES.get(name, name)


def cryptography_get_name_oid(id):
'''
Given a symbolic ID, finds the appropriate OID for use with cryptography.
Expand Down Expand Up @@ -367,7 +595,7 @@ def cryptography_get_name_oid(id):
return x509.oid.NameOID.DN_QUALIFIER
if id in ('pseudonym', ):
return x509.oid.NameOID.PSEUDONYM
if id in ('UID', 'userId'):
if id in ('UID', 'userId', 'userID'):
return x509.oid.NameOID.USER_ID
if id in ('DC', 'domainComponent'):
return x509.oid.NameOID.DOMAIN_COMPONENT
Expand Down Expand Up @@ -409,6 +637,39 @@ def cryptography_get_name(name):
raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}" (potentially unsupported by cryptography backend)'.format(name))


def _get_hex(bytes):
if bytes is None:
return bytes
data = binascii.hexlify(bytes)
data = to_text(b':'.join(data[i:i + 2] for i in range(0, len(data), 2)))
return data


def cryptography_decode_name(name):
'''
Given a cryptography x509.Name object, returns a string.
Raises an OpenSSLObjectError if the name is not supported.
'''
if isinstance(name, x509.DNSName):
return 'DNS:{0}'.format(name.value)
if isinstance(name, x509.IPAddress):
return 'IP:{0}'.format(name.value.compressed)
if isinstance(name, x509.RFC822Name):
return 'email:{0}'.format(name.value)
if isinstance(name, x509.UniformResourceIdentifier):
return 'URI:{0}'.format(name.value)
if isinstance(name, x509.DirectoryName):
# FIXME: test
return 'DirName:' + ''.join(['/{0}:{1}'.format(attribute.oid._name, attribute.value) for attribute in name.value])
if isinstance(name, x509.RegisteredID):
# FIXME: test
return 'RegisteredID:{0}'.format(name.value)
if isinstance(name, x509.OtherName):
# FIXME: test
return '{0}:{1}'.format(name.type_id.dotted_string, _get_hex(name.value))
raise OpenSSLObjectError('Cannot decode name "{0}"'.format(name))


def _cryptography_get_keyusage(usage):
'''
Given a key usage identifier string, returns the parameter name used by cryptography's x509.KeyUsage().
Expand Down Expand Up @@ -474,10 +735,10 @@ def cryptography_get_ext_keyusage(usage):
if usage in ('OCSPSigning', 'OCSP Signing'):
return x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING
if usage in ('anyExtendedKeyUsage', 'Any Extended Key Usage'):
return x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE
return x509.oid.ObjectIdentifier("2.5.29.37.0")
if usage in ('qcStatements', ):
return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.3")
if usage in ('DVCS', ):
if usage in ('DVCS', 'dvcs'):
return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.10")
if usage in ('IPSec User', 'ipsecUser'):
return x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.7")
Expand Down