Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to letsencrypt_mgmt_profile.py #237

Merged
merged 21 commits into from Aug 31, 2021
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
261 changes: 194 additions & 67 deletions cert_mgmt/letsencrypt_mgmt_profile.py
@@ -1,30 +1,67 @@
'''
Modified https://github.com/diafygi/acme-tiny/blob/master/acme_tiny.py for Avi Controller
###
# Name: letsencrypt_mgmt_profile.py
# Version: 0.9.0
# License: MIT
#
# Description -
# This is a python script used for automatically requesting and renewing certificates
# from and via Let's Encrypt.
#
# Setup -
# 1. This content needs to be imported in the Avi Controller in the settings menu
# at <<Templates - Security - Certificate Management>>.
# 2. Create at least following script params: user, password (sensitive).
# 3. Go to <<Templates - Security - SSL/TLS Certificates>>, click on <<Create>>
# and then <<Application Certificate>>.
# 4. Specify a suitable name to identify this certificate in Avi Controller.
# (Like sub.domain.tld RSA or sub.domain2.tld ECDSA)
# 5. Change <<Type>> to <<CSR>>
# 6. Set <<Common Name>> to the domain to which the certificate should be issued to
# and select the "Certificate Management" as previously created.
# 7. Save and wait a few seconds for the certificate to be requested and imported.
#
# Note -
# 1. This script can issue RSA and ECDSA certificates, as specified when
# creating an application certificate (CSR) via UI.
# 2. This REQUIRES a L7 Virtual Service (HTTP/S) due to the requirement
# of HTTP-Policy-Sets.
#
# Parameters -
# user - Avi user name (Default None)
# password - Password of the above user (Default None)
# tenant - Avi tenant name (default is 'admin')
# dryrun - True/False. If True letsencrypt's staging server will be used. (Default False)
# contact - E-mail address sent to letsencrypt for account creation. (Default None.)
# (set this only once until updated, otherwise an update request will be sent every time.)
#
# Useful links -
# Ratelimiting - https://letsencrypt.org/docs/rate-limits/
#
# Source -
# https://github.com/avinetworks/devops/blob/master/cert_mgmt/letsencrypt_mgmt_profile.py
#
# Authors/Credits -
# acme-tiny, modified for Avi Controller <https://github.com/diafygi/acme-tiny>
# Nikhil Kumar Yadav <kumaryadavni@vmware.com>
# Patrik Kernstock <pkernstock@vmware.com>
###
'''

'''
Parameters -
user - Avi user name
password - Password of the above user
tenant - Avi tenant name
dryrun - True/False. If True letsencrypt's staging server will be used.

Useful links -
Ratelimiting - https://letsencrypt.org/docs/rate-limits/
'''

import os, subprocess, json, base64, binascii, time, hashlib, re
import base64, binascii, datetime, hashlib, os, json, re, ssl, subprocess, sys, time
from urllib.request import urlopen, Request # Python 3
from tempfile import NamedTemporaryFile

from avi.sdk.avi_api import ApiSession

VERSION = "0.9.0"

DEFAULT_CA = "https://acme-v02.api.letsencrypt.org" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD
DEFAULT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
DEFAULT_STAGING_DIRECTORY_URL = "https://acme-staging-v02.api.letsencrypt.org/directory"
ACCOUNT_KEY_PATH = "/var/lib/avi/ca/private/letsencrypt.key"

def get_crt(user, password, tenant, api_version, csr, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None):
def get_crt(user, password, tenant, api_version, csr, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, debug=False):
directory, acct_headers, alg, jwk = None, None, None, None # global variables

# helper functions - base64 encode for jose spec
Expand All @@ -40,9 +77,15 @@ def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):
return out

# helper function - make request and automatically parse json response
def _do_request(url, data=None, err_msg="Error", depth=0):
def _do_request(url, data=None, err_msg="Error", depth=0, verify=True):
try:
resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"}))
ctx = ssl.create_default_context()
if not verify:
# disable certificate verify
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# open request.
resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"}), context=ctx)
patschi marked this conversation as resolved.
Show resolved Hide resolved
resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
except IOError as e:
resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
Expand Down Expand Up @@ -98,12 +141,13 @@ def _do_request_avi(url, method, data=None, error_msg="Error"):
else:
raise Exception("Unsupported API method")
if rsp.status_code >= 300:
err = error_msg + " url - {}. Method - {}. Response status - {}. Response - ".format(url,method,rsp.status_code,rsp.json())
err = error_msg + " url - {}. Method - {}. Response status - {}. Response - {}".format(url, method, rsp.status_code, rsp.json())
raise Exception(err)
return rsp

if os.path.exists(ACCOUNT_KEY_PATH):
print ("Reusing account key.")
if debug:
print ("Reusing account key.")
else:
print ("Account key not found. Generating account key...")
out = _cmd(["openssl", "genrsa", "4096"], err_msg="OpenSSL Error")
Expand Down Expand Up @@ -163,6 +207,8 @@ def _do_request_avi(url, method, data=None, error_msg="Error"):

# get the authorizations that need to be completed
for auth_url in order['authorizations']:
if debug:
print ("Authorization URL is: {}".format(auth_url))
authorization, _, _ = _send_signed_request(auth_url, None, "Error getting challenges")
domain = authorization['identifier']['value']
print ("Verifying {0}...".format(domain))
Expand All @@ -172,84 +218,134 @@ def _do_request_avi(url, method, data=None, error_msg="Error"):
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
keyauthorization = "{0}.{1}".format(token, thumbprint)

# Update vs
rsp = _do_request_avi("vsvip/?search=(fqdn,{})".format(domain), "GET").json()
# Get VSVIPs/VSs, based on FQDN
Copy link

@chitr chitr Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break this flow to smaller functions based on related functionality.

rsp = _do_request_avi("vsvip/?fqdn={}".format(domain), "GET").json()
vhMode = False
if debug:
print ("Found {} matching VSVIP FQDNs".format(rsp["count"]))
if rsp["count"] == 0:
raise Exception("Could not find a VSVIP with fqdn = {}".format(domain))
vsvip_uuid = rsp["results"][0]["uuid"]
rsp = _do_request_avi("virtualservice?search=(vsvip_ref,{})".format(vsvip_uuid), "GET").json()
print ("Warning: Could not find a VSVIP with fqdn = {}".format(domain))
# As a fallback we search for VirtualHosting entries with that domain
vhMode = True
search_term = "vh_domain_name.contains={}".format(domain)
else:
vsvip_uuid = rsp["results"][0]["uuid"]
search_term = "vsvip_ref={}".format(vsvip_uuid)

rsp = _do_request_avi("virtualservice/?{}".format(search_term), "GET").json()
if debug:
print ("Found {} matching VSs".format(rsp["count"]))
if rsp['count'] == 0:
raise Exception("Could not find a VS with common name = {}".format(domain))
raise Exception("Could not find a VS with fqdn = {}".format(domain))

vs_uuid = rsp["results"][0]["uuid"]
print ("Found vs {} with fqdn {}".format(vs_uuid, domain))
# Check if the vs is servering on port 80
print ("Found VS {} with fqdn {}".format(vs_uuid, domain))

# Let's check if VS is enabled, otherwise challenge can never successfully complete.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move avi specific flow to separate function.(for next iteration )

if not rsp["results"][0]["enabled"]:
raise Exception("VS with fqdn {} is not enabled.".format(domain))

# Special handling for virtualHosting: if child, get services from parent.
if vhMode and rsp["results"][0]["type"] == "VS_TYPE_VH_CHILD":
# vh_parent_vs_ref is schema of https://avi.domain.tld/api/virtualservice/virtualservice-UUID, hence picking the last part
vs_uuid_parent = rsp["results"][0]["vh_parent_vs_ref"].split("/")[-1]
vhRsp = _do_request_avi("virtualservice/?uuid={}".format(vs_uuid_parent), "GET").json()
if debug:
print ("Parent VS of Child-VS is {} and found {} matches".format(vs_uuid_parent, vhRsp['count']))
if vhRsp['count'] == 0:
raise Exception("Could not find parent VS {} of child VS UUID = {}".format(vs_uuid_parent, vs_uuid))

# we just copy it over. more transparent for further logic.
rsp["results"][0]["services"] = vhRsp["results"][0]["services"]

# Check if the vs is serving on port 80
serving_on_port_80 = False
service_on_port_80_data = None
for service in rsp["results"][0]["services"]:
if service["port"] == 80:
if service["port"] == 80 and not service["enable_ssl"]:
serving_on_port_80 = True
print ("VS serving on port 80")
if debug:
print ("VS serving on port 80")
break

# Update vs
# create HTTP policy
httppolicy_data = {
"name": (domain + "LetsEncryptHTTPpolicy"),
"name": (domain + "-LetsEncryptHTTPpolicy"),
"http_security_policy": {
"rules": [{
"name": "Rule 1",
"index": 1,
"enable": True,
"match": {
"vs_port": {
"match_criteria": "IS_IN",
"ports": [80]
"rules": [{
"name": "Rule 1",
"index": 1,
"enable": True,
"match": {
"path": {
"match_criteria": "CONTAINS",
"match_case": "SENSITIVE",
"match_str": [
".well-known/acme-challenge/{}".format(token)
]
}
},
"path": {
"match_criteria": "CONTAINS",
"match_case": "SENSITIVE",
"match_str": [
".well-known/acme-challenge/{}".format(token)
]
"action": {
"action": "HTTP_SECURITY_ACTION_SEND_RESPONSE",
"status_code": "HTTP_LOCAL_RESPONSE_STATUS_CODE_200",
"file": {
"content_type": "text/plain",
"file_content": keyauthorization
}
}
},
"action": {
"action": "HTTP_SECURITY_ACTION_SEND_RESPONSE",
"status_code": "HTTP_LOCAL_RESPONSE_STATUS_CODE_200",
"file": {
"content_type": "text/plain",
"file_content": keyauthorization
}
}
}]
}]
},
"is_internal_policy": False
}

try:
rsp = _do_request_avi("httppolicyset", "POST", data=httppolicy_data).json()
httppolicy_uuid = rsp["uuid"]
print ("Created HTTP policy with uuid {}".format(httppolicy_uuid))

patch_data = {"add" : {"http_policies": [{"http_policy_set_ref": "/api/httppolicyset/{}".format(httppolicy_uuid), "index":1000001}]}}
if not serving_on_port_80:
# Add to port to virtualservice
# Add port to virtualservice
print ("Adding port 80 to VS")
service_on_port_80_data = {
service_on_port_80_data = {
"enable_http2": False,
"enable_ssl": False,
"port": 80,
"port_range_end": 80
}
patch_data["add"]["services"] = [service_on_port_80_data]
_do_request_avi("virtualservice/{}".format(vs_uuid), "PATCH", patch_data)
print ("Added HTTPPolicy to VS")

if vhMode: # if VH, we set the rule on the parent. Without SNI (so HTTP) it will go to the parent.
_do_request_avi("virtualservice/{}".format(vs_uuid_parent), "PATCH", patch_data)
print ("Added HTTPPolicy to parent-VS {}".format(vs_uuid_parent))
else:
_do_request_avi("virtualservice/{}".format(vs_uuid), "PATCH", patch_data)
print ("Added HTTPPolicy to VS {}".format(vs_uuid))

# check that the file is in place
try:
if not disable_check:
print ("Validating token from Avi Controller...")
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization)
except (AssertionError, ValueError) as e:
raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format('wellknown_path', 'wellknown_url', e))
try:
maxVerifyAttempts = 5 # maximal amount of verification attempts
# retrying logic. Otherwise race-condition can occurr between avi controller pushing config and the token validation request
for verifyAttempt in range(maxVerifyAttempts):
reqToken = _do_request(wellknown_url, verify=False)
if reqToken[0] != keyauthorization:
print ("Internal token validation failed, {0} of {1} attempts. Retrying in 2 seconds."
.format((verifyAttempt + 1), maxVerifyAttempts))
time.sleep(2)
else:
break
else:
raise Exception("All {2} internal token verifications failed. Got '{0}' but expected '{1}'."
.format(reqToken[0], keyauthorization, maxVerifyAttempts))
except Exception as e:
raise ValueError("Wrote file, but Avi couldn't verify token at {0}. Exception: {1}".format(wellknown_url, str(e)))
else:
print ("Waiting 5 seconds before letting LetsEncrypt validating the challenge as validation disabled. Give controller time to push configs.")
time.sleep(5) # wait 5 secs if not validating, due to above mentioned race condition

print ("Challenge Completed, notifying LetsEncrypt")
# say the challenge is done
Expand All @@ -260,12 +356,17 @@ def _do_request_avi(url, method, data=None, error_msg="Error"):
print ("Challenge Passed")

finally:
print ("Cleaning up...")
# Update the vs
patch_data = {"delete" : {"http_policies": [{"http_policy_set_ref": "/api/httppolicyset/{}".format(httppolicy_uuid), "index":1000001}]}}
if not serving_on_port_80:
patch_data["delete"]["services"] = [service_on_port_80_data]
_do_request_avi("virtualservice/{}".format(vs_uuid), "PATCH", patch_data)
print ("Removed HTTPPolicy from VS")
if vhMode: # if VH, we set the rule on the parent. Without SNI (so HTTP) it will go to the parent.
_do_request_avi("virtualservice/{}".format(vs_uuid_parent), "PATCH", patch_data)
print ("Removed HTTPPolicy from parent-VS {}".format(vs_uuid_parent))
else:
_do_request_avi("virtualservice/{}".format(vs_uuid), "PATCH", patch_data)
print ("Removed HTTPPolicy from VS {}".format(vs_uuid))
_do_request_avi("httppolicyset/{}".format(httppolicy_uuid), "DELETE")
print ("Deleted HTTPPolicy")

Expand All @@ -290,32 +391,58 @@ def _do_request_avi(url, method, data=None, error_msg="Error"):
def certificate_request(csr, common_name, kwargs):
user = kwargs.get('user', None)
password = kwargs.get('password', None)
tenant = kwargs.get('tenant', '*')
dry_run = kwargs.get('dryrun', '')
tenant = kwargs.get('tenant', None)
dry_run = kwargs.get('dryrun', "false")
contact = kwargs.get('contact', None)
api_version = kwargs.get('api_version', '20.1.1')
disable_check = kwargs.get('disable_check', "false")
debug = kwargs.get('debug', "false")

print ("Running version {}".format(VERSION))

if debug.lower() == "true":
debug = True
print ("Debug enabled.")
else:
debug = False

if dry_run.lower() == "true":
dry_run = True
else:
dry_run = False
print ("dry_run is: {}".format(str(dry_run)))

if disable_check.lower() == "true":
disable_check = True
else:
disable_check = False
print ("disable_check is: {}".format(str(disable_check)))

directory_url = DEFAULT_DIRECTORY_URL
if dry_run:
directory_url = DEFAULT_STAGING_DIRECTORY_URL
print ("directory_url is {}".format(directory_url))

csr_temp_file = NamedTemporaryFile(mode='w',delete=False)
csr_temp_file.close()

with open(csr_temp_file.name, 'w') as f:
f.write(csr)

if tenant == None:
print ("Using default tenant. You might want to define a specific tenant.".format(tenant))

if contact != None and "@" in contact:
contact = [ "mailto:{}".format(contact) ] # contact must be array as of ACME RFC
print ("Contact set to: {}".format(contact))

signed_crt = None
try:
signed_crt = get_crt(user, password, tenant, api_version, csr_temp_file.name, directory_url=directory_url, contact=contact)
signed_crt = get_crt(user, password, tenant, api_version, csr_temp_file.name,
disable_check=disable_check, directory_url=directory_url,
contact=contact, debug=debug)
finally:
patschi marked this conversation as resolved.
Show resolved Hide resolved
os.remove(csr_temp_file.name)

print (signed_crt)
return signed_crt