Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACME: add support for IP identifiers #53660

Merged
merged 11 commits into from
Mar 13, 2019
2 changes: 1 addition & 1 deletion .github/BOTMETA.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ files:
<<: *docker
support: community
test/units/module_utils/facts/network/test_generic_bsd.py: *bsd
test/units/module_utils/test_acme.py: *crypto
test/units/module_utils/acme: *crypto
test/units/module_utils/xenserver/: bvitnik
test/units/modules/cloud/docker:
<<: *docker
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- "acme_certificate - add experimental support for IP address identifiers."
88 changes: 82 additions & 6 deletions lib/ansible/module_utils/acme.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,21 +822,97 @@ def update_account(self, account_data, contact=None):
return True, account_data


def cryptography_get_csr_domains(module, csr_filename):
def _normalize_ip(ip):
if ':' not in ip:
# For IPv4 addresses: remove trailing zeros per nibble
ip = '.'.join([nibble.lstrip('0') or '0' for nibble in ip.split('.')])
return ip
# For IPv6 addresses:
# 1. Make them lowercase and split
ip = ip.lower()
i = ip.find('::')
if i >= 0:
front = ip[:i].split(':') or []
back = ip[i + 2:].split(':') or []
ip = front + ['0'] * (8 - len(front) - len(back)) + back
else:
ip = ip.split(':')
# 2. Remove trailing zeros per nibble
ip = [nibble.lstrip('0') or '0' for nibble in ip]
# 3. Find longest consecutive sequence of zeros
zeros_start = -1
zeros_length = -1
current_start = -1
for i, nibble in enumerate(ip):
if nibble == '0':
if current_start < 0:
current_start = i
elif current_start >= 0:
if i - current_start > zeros_length:
zeros_start = current_start
zeros_length = i - current_start
current_start = -1
if current_start >= 0:
if 8 - current_start > zeros_length:
zeros_start = current_start
zeros_length = 8 - current_start
# 4. If the sequence has at least two elements, contract
if zeros_length >= 2:
return ':'.join(ip[:zeros_start]) + '::' + ':'.join(ip[zeros_start + zeros_length:])
# 5. If not, return full IP
return ':'.join(ip)


def openssl_get_csr_identifiers(openssl_binary, module, csr_filename):
'''
Return a set of requested domains (CN and SANs) for the CSR.
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
domains = set([])
openssl_csr_cmd = [openssl_binary, "req", "-in", csr_filename, "-noout", "-text"]
dummy, out, dummy = module.run_command(openssl_csr_cmd, check_rc=True)

identifiers = set([])
common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
if common_name is not None:
identifiers.add(('dns', common_name.group(1)))
subject_alt_names = re.search(
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.lower().startswith("dns:"):
identifiers.add(('dns', san[4:]))
elif san.lower().startswith("ip:"):
identifiers.add(('ip', _normalize_ip(san[3:])))
elif san.lower().startswith("ip address:"):
identifiers.add(('ip', _normalize_ip(san[11:])))
else:
raise ModuleFailException('Found unsupported SAN identifier "{0}"'.format(san))
return identifiers


def cryptography_get_csr_identifiers(module, csr_filename):
'''
Return a set of requested identifiers (CN and SANs) for the CSR.
Each identifier is a pair (type, identifier), where type is either
'dns' or 'ip'.
'''
identifiers = set([])
csr = cryptography.x509.load_pem_x509_csr(read_file(csr_filename), _cryptography_backend)
for sub in csr.subject:
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
domains.add(sub.value)
identifiers.add(('dns', sub.value))
for extension in csr.extensions:
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
for name in extension.value:
if isinstance(name, cryptography.x509.DNSName):
domains.add(name.value)
return domains
identifiers.add(('dns', name.value))
elif isinstance(name, cryptography.x509.IPAddress):
identifiers.add(('ip', _normalize_ip(str(name.value))))
else:
raise ModuleFailException('Found unsupported SAN identifier {0}'.format(name))
return identifiers


def cryptography_get_cert_days(module, cert_file, now=None):
Expand Down