Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[bumpversion]
commit = True
current_version = 3.0.0
current_version = 4.0.0
tag = True
tag_name = {new_version}

Expand Down
2 changes: 1 addition & 1 deletion cert_manager/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
__title__ = "cert_manager"
__description__ = "Python interface to the Sectigo Certificate Manager REST API"
__url__ = "https://github.com/broadinstitute/python-cert_manager"
__version__ = "3.0.0"
__version__ = "4.0.0"
6 changes: 5 additions & 1 deletion cert_manager/_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ def enroll(self, **kwargs):
external_requester: One or more e-mail addresses
custom_fields: zero or more objects representing custom fields and their values
Note: each object must have a 'name' key and a 'value' key
dcv_mode: The DCV method to use for the certificate. Allowed values are "EMAIL",
"CNAME", and "HTTP" and "HTTPS". If not provided, "CNAME is the default.

Returns:
The certificate_id and the normal status messages for errors
"""
Expand All @@ -167,6 +170,7 @@ def enroll(self, **kwargs):
subject_alt_names = kwargs.get("subject_alt_names", None)
external_requester = kwargs.get("external_requester", None)
custom_fields = kwargs.get("custom_fields", [])
dcv_mode = kwargs.get("dcv_mode", "CNAME")

# Make sure a valid certificate type name was provided
if cert_type_name not in self.types:
Expand All @@ -193,7 +197,7 @@ def enroll(self, **kwargs):
data = {
"orgId": org_id, "csr": csr.rstrip(), "subjAltNames": final_san, "certType": type_id,
"numberServers": 1, "serverType": -1, "term": term, "comments": f"Enrolled by {self._client.user_agent}",
"externalRequester": external_requester
"externalRequester": external_requester, "dcvMode": dcv_mode,
}
if custom_fields:
data['customFields'] = custom_fields
Expand Down
72 changes: 54 additions & 18 deletions tests/test_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,18 @@ def test_success(self):
post_data = {
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224,
"numberServers": 1, "serverType": -1, "term": self.test_term,
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester,
"dcvMode": "CNAME",
}
post_json = json.dumps(post_data)

# Verify all the query information
self.assertEqual(resp, self.test_result)
self.assertEqual(len(responses.calls), 3)
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
self.assertEqual(responses.calls[2].request.url, self.test_url)
self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8"))
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
self.assertEqual(test_json, post_data)

@responses.activate
def test_san_list(self):
Expand All @@ -393,17 +394,50 @@ def test_san_list(self):
post_data = {
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": self.test_san, "certType": 224,
"numberServers": 1, "serverType": -1, "term": self.test_term,
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester
"comments": f"Enrolled by {self.client.user_agent}",
"externalRequester": self.test_external_requester, "dcvMode": "CNAME",
}
post_json = json.dumps(post_data)

# Verify all the query information
self.assertEqual(resp, self.test_result)
self.assertEqual(len(responses.calls), 3)
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
self.assertEqual(responses.calls[2].request.url, self.test_url)
self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8"))
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
self.assertEqual(test_json, post_data)

@responses.activate
def test_dcv_mode(self):
"""Handle a custom dcvMode correctly."""
# Setup the mocked responses
# We need to mock the /types and /customFields URLs as well
# since Certificates.types and Certificate.custom_fields are called from enroll
responses.add(responses.GET, self.test_types_url, json=self.types_data, status=200)
responses.add(responses.GET, self.test_customfields_url, json=self.cf_data, status=200)
responses.add(responses.POST, self.test_url, json=self.test_result, status=200)

# Call the function
resp = self.certobj.enroll(cert_type_name=self.test_ct_name, csr=self.test_csr, term=self.test_term,
org_id=self.test_org, external_requester=self.test_external_requester,
dcv_mode="HTTP")

# Mock up the data that should be sent with the post
post_data = {
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224,
"numberServers": 1, "serverType": -1, "term": self.test_term,
"comments": f"Enrolled by {self.client.user_agent}",
"externalRequester": self.test_external_requester, "dcvMode": "HTTP",
}

# Verify all the query information
self.assertEqual(resp, self.test_result)
self.assertEqual(len(responses.calls), 3)
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
self.assertEqual(responses.calls[2].request.url, self.test_url)
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
self.assertEqual(test_json, post_data)

@responses.activate
def test_bad_cert_name(self):
Expand Down Expand Up @@ -465,17 +499,17 @@ def test_mandatory_custom_fields_success(self):
"orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224,
"numberServers": 1, "serverType": -1, "term": self.test_term,
"comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester,
"customFields": self.test_cf
"dcvMode": "CNAME", "customFields": self.test_cf
}
post_json = json.dumps(post_data)

# Verify all the query information
self.assertEqual(resp, self.test_result)
self.assertEqual(len(responses.calls), 3)
self.assertEqual(responses.calls[0].request.url, self.test_types_url)
self.assertEqual(responses.calls[1].request.url, self.test_customfields_url)
self.assertEqual(responses.calls[2].request.url, self.test_url)
self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8"))
test_json = json.loads(responses.calls[2].request.body.decode("utf-8"))
self.assertEqual(test_json, post_data)

@responses.activate
def test_mandatory_custom_fields_missing(self):
Expand Down Expand Up @@ -618,13 +652,14 @@ def test_success(self):
# Call the function
resp = self.certobj.revoke(cert_id=self.test_id, reason="Because")

post_json = json.dumps({"reason": "Because"})
post_json = {"reason": "Because"}

# Verify all the query information
self.assertEqual(resp, {})
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, self.test_url)
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
self.assertEqual(test_json, post_json)

@responses.activate
def test_no_reason(self):
Expand All @@ -646,7 +681,8 @@ def test_failure(self):
# Verify all the query information
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, self.test_url)
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
test_json = responses.calls[0].request.body.decode("utf-8")
self.assertEqual(test_json, post_json)


class TestReplace(TestCertificates):
Expand Down Expand Up @@ -677,13 +713,13 @@ def test_success(self):
# Mock up the data that should be sent with the post
post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": None,
"reason": self.test_reason}
post_json = json.dumps(post_data)

# Verify all the query information
self.assertEqual(resp, {})
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, self.test_url)
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
self.assertEqual(test_json, post_data)

@responses.activate
def test_san_string(self):
Expand All @@ -700,13 +736,13 @@ def test_san_string(self):
# Mock up the data that should be sent with the post
post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": san_list,
"reason": self.test_reason}
post_json = json.dumps(post_data)

# Verify all the query information
self.assertEqual(resp, {})
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, self.test_url)
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
self.assertEqual(test_json, post_data)

@responses.activate
def test_failure(self):
Expand All @@ -721,9 +757,9 @@ def test_failure(self):
# Mock up the data that should be sent with the post
post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": None,
"reason": self.test_reason}
post_json = json.dumps(post_data)

# Verify all the query information
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, self.test_url)
self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8"))
test_json = json.loads(responses.calls[0].request.body.decode("utf-8"))
self.assertEqual(test_json, post_data)
Loading