From 75dceaf9f798e8a4490fec5b0e584879fdaf2a8f Mon Sep 17 00:00:00 2001 From: Andrew Teixeira Date: Wed, 27 May 2026 10:25:23 -0400 Subject: [PATCH] feat: Allow dcvMode to be passed to the Certificates::enroll method chore: Update unit tests to account for the new enroll field --- .bumpversion.cfg | 2 +- cert_manager/__version__.py | 2 +- cert_manager/_certificates.py | 6 ++- tests/test_certificates.py | 72 ++++++++++++++++++++++++++--------- 4 files changed, 61 insertions(+), 21 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 35dc8c2..4e7a58b 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,6 +1,6 @@ [bumpversion] commit = True -current_version = 3.0.0 +current_version = 4.0.0 tag = True tag_name = {new_version} diff --git a/cert_manager/__version__.py b/cert_manager/__version__.py index 303bb19..b27725f 100644 --- a/cert_manager/__version__.py +++ b/cert_manager/__version__.py @@ -3,4 +3,4 @@ __title__ = "cert_manager" __description__ = "Python interface to the Sectigo Certificate Manager REST API" __url__ = "https://github.com/broadinstitute/python-cert_manager" -__version__ = "3.0.0" +__version__ = "4.0.0" diff --git a/cert_manager/_certificates.py b/cert_manager/_certificates.py index d499c4b..7ac6dbd 100644 --- a/cert_manager/_certificates.py +++ b/cert_manager/_certificates.py @@ -156,6 +156,9 @@ def enroll(self, **kwargs): external_requester: One or more e-mail addresses custom_fields: zero or more objects representing custom fields and their values Note: each object must have a 'name' key and a 'value' key + dcv_mode: The DCV method to use for the certificate. Allowed values are "EMAIL", + "CNAME", and "HTTP" and "HTTPS". If not provided, "CNAME is the default. + Returns: The certificate_id and the normal status messages for errors """ @@ -167,6 +170,7 @@ def enroll(self, **kwargs): subject_alt_names = kwargs.get("subject_alt_names", None) external_requester = kwargs.get("external_requester", None) custom_fields = kwargs.get("custom_fields", []) + dcv_mode = kwargs.get("dcv_mode", "CNAME") # Make sure a valid certificate type name was provided if cert_type_name not in self.types: @@ -193,7 +197,7 @@ def enroll(self, **kwargs): data = { "orgId": org_id, "csr": csr.rstrip(), "subjAltNames": final_san, "certType": type_id, "numberServers": 1, "serverType": -1, "term": term, "comments": f"Enrolled by {self._client.user_agent}", - "externalRequester": external_requester + "externalRequester": external_requester, "dcvMode": dcv_mode, } if custom_fields: data['customFields'] = custom_fields diff --git a/tests/test_certificates.py b/tests/test_certificates.py index ab9f48c..b6fcef0 100644 --- a/tests/test_certificates.py +++ b/tests/test_certificates.py @@ -359,9 +359,9 @@ def test_success(self): post_data = { "orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224, "numberServers": 1, "serverType": -1, "term": self.test_term, - "comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester + "comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester, + "dcvMode": "CNAME", } - post_json = json.dumps(post_data) # Verify all the query information self.assertEqual(resp, self.test_result) @@ -369,7 +369,8 @@ def test_success(self): self.assertEqual(responses.calls[0].request.url, self.test_types_url) self.assertEqual(responses.calls[1].request.url, self.test_customfields_url) self.assertEqual(responses.calls[2].request.url, self.test_url) - self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8")) + test_json = json.loads(responses.calls[2].request.body.decode("utf-8")) + self.assertEqual(test_json, post_data) @responses.activate def test_san_list(self): @@ -393,9 +394,9 @@ def test_san_list(self): post_data = { "orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": self.test_san, "certType": 224, "numberServers": 1, "serverType": -1, "term": self.test_term, - "comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester + "comments": f"Enrolled by {self.client.user_agent}", + "externalRequester": self.test_external_requester, "dcvMode": "CNAME", } - post_json = json.dumps(post_data) # Verify all the query information self.assertEqual(resp, self.test_result) @@ -403,7 +404,40 @@ def test_san_list(self): self.assertEqual(responses.calls[0].request.url, self.test_types_url) self.assertEqual(responses.calls[1].request.url, self.test_customfields_url) self.assertEqual(responses.calls[2].request.url, self.test_url) - self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8")) + test_json = json.loads(responses.calls[2].request.body.decode("utf-8")) + self.assertEqual(test_json, post_data) + + @responses.activate + def test_dcv_mode(self): + """Handle a custom dcvMode correctly.""" + # Setup the mocked responses + # We need to mock the /types and /customFields URLs as well + # since Certificates.types and Certificate.custom_fields are called from enroll + responses.add(responses.GET, self.test_types_url, json=self.types_data, status=200) + responses.add(responses.GET, self.test_customfields_url, json=self.cf_data, status=200) + responses.add(responses.POST, self.test_url, json=self.test_result, status=200) + + # Call the function + resp = self.certobj.enroll(cert_type_name=self.test_ct_name, csr=self.test_csr, term=self.test_term, + org_id=self.test_org, external_requester=self.test_external_requester, + dcv_mode="HTTP") + + # Mock up the data that should be sent with the post + post_data = { + "orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224, + "numberServers": 1, "serverType": -1, "term": self.test_term, + "comments": f"Enrolled by {self.client.user_agent}", + "externalRequester": self.test_external_requester, "dcvMode": "HTTP", + } + + # Verify all the query information + self.assertEqual(resp, self.test_result) + self.assertEqual(len(responses.calls), 3) + self.assertEqual(responses.calls[0].request.url, self.test_types_url) + self.assertEqual(responses.calls[1].request.url, self.test_customfields_url) + self.assertEqual(responses.calls[2].request.url, self.test_url) + test_json = json.loads(responses.calls[2].request.body.decode("utf-8")) + self.assertEqual(test_json, post_data) @responses.activate def test_bad_cert_name(self): @@ -465,9 +499,8 @@ def test_mandatory_custom_fields_success(self): "orgId": self.test_org, "csr": self.test_csr.rstrip(), "subjAltNames": None, "certType": 224, "numberServers": 1, "serverType": -1, "term": self.test_term, "comments": f"Enrolled by {self.client.user_agent}", "externalRequester": self.test_external_requester, - "customFields": self.test_cf + "dcvMode": "CNAME", "customFields": self.test_cf } - post_json = json.dumps(post_data) # Verify all the query information self.assertEqual(resp, self.test_result) @@ -475,7 +508,8 @@ def test_mandatory_custom_fields_success(self): self.assertEqual(responses.calls[0].request.url, self.test_types_url) self.assertEqual(responses.calls[1].request.url, self.test_customfields_url) self.assertEqual(responses.calls[2].request.url, self.test_url) - self.assertEqual(responses.calls[2].request.body, post_json.encode("utf-8")) + test_json = json.loads(responses.calls[2].request.body.decode("utf-8")) + self.assertEqual(test_json, post_data) @responses.activate def test_mandatory_custom_fields_missing(self): @@ -618,13 +652,14 @@ def test_success(self): # Call the function resp = self.certobj.revoke(cert_id=self.test_id, reason="Because") - post_json = json.dumps({"reason": "Because"}) + post_json = {"reason": "Because"} # Verify all the query information self.assertEqual(resp, {}) self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.url, self.test_url) - self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8")) + test_json = json.loads(responses.calls[0].request.body.decode("utf-8")) + self.assertEqual(test_json, post_json) @responses.activate def test_no_reason(self): @@ -646,7 +681,8 @@ def test_failure(self): # Verify all the query information self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.url, self.test_url) - self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8")) + test_json = responses.calls[0].request.body.decode("utf-8") + self.assertEqual(test_json, post_json) class TestReplace(TestCertificates): @@ -677,13 +713,13 @@ def test_success(self): # Mock up the data that should be sent with the post post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": None, "reason": self.test_reason} - post_json = json.dumps(post_data) # Verify all the query information self.assertEqual(resp, {}) self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.url, self.test_url) - self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8")) + test_json = json.loads(responses.calls[0].request.body.decode("utf-8")) + self.assertEqual(test_json, post_data) @responses.activate def test_san_string(self): @@ -700,13 +736,13 @@ def test_san_string(self): # Mock up the data that should be sent with the post post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": san_list, "reason": self.test_reason} - post_json = json.dumps(post_data) # Verify all the query information self.assertEqual(resp, {}) self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.url, self.test_url) - self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8")) + test_json = json.loads(responses.calls[0].request.body.decode("utf-8")) + self.assertEqual(test_json, post_data) @responses.activate def test_failure(self): @@ -721,9 +757,9 @@ def test_failure(self): # Mock up the data that should be sent with the post post_data = {"csr": self.test_csr, "commonName": self.test_cn, "subjectAlternativeNames": None, "reason": self.test_reason} - post_json = json.dumps(post_data) # Verify all the query information self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.url, self.test_url) - self.assertEqual(responses.calls[0].request.body, post_json.encode("utf-8")) + test_json = json.loads(responses.calls[0].request.body.decode("utf-8")) + self.assertEqual(test_json, post_data)