diff --git a/requirements-build.txt b/requirements-build.txt index 20fb9df..df5d468 100644 --- a/requirements-build.txt +++ b/requirements-build.txt @@ -1,3 +1,4 @@ +pip==23.3.2 pytest==7.4.3 pytest-cov==4.1.0 safety==2.3.5 diff --git a/requirements.txt b/requirements.txt index e99ab19..373b517 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ requests==2.31.0 python-dateutil==2.8.2 -cryptography==40.0.2 +cryptography==42.0.2 six==1.16.0 ruamel.yaml==0.18.5 pynacl==1.5.0 diff --git a/setup.py b/setup.py index 96d2b34..e0f556b 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ url="https://github.com/Venafi/vcert-python", packages=['vcert', 'vcert.parser', 'vcert.policy'], install_requires=['requests==2.31.0', 'python-dateutil==2.8.2', 'certvalidator<=0.11.1', 'six==1.16.0', - 'cryptography==40.0.2', 'ruamel.yaml==0.17.31', 'pynacl==1.5.0'], + 'cryptography==42.0.2', 'ruamel.yaml==0.17.31', 'pynacl==1.5.0'], description='Python client library for Venafi Trust Protection Platform and Venafi Cloud.', long_description=long_description, long_description_content_type="text/markdown", diff --git a/tests/test_env.py b/tests/test_env.py index a2125c7..3c2326d 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -30,6 +30,7 @@ CLOUD_URL = environ.get('CLOUD_URL') CLOUD_APIKEY = environ.get('CLOUD_APIKEY') CLOUD_ZONE = environ.get('CLOUD_ZONE') +VAAS_ZONE_ONLY_EC = environ.get('VAAS_ZONE_ONLY_EC') CLOUD_TEAM = environ.get('CLOUD_TEAM') TPP_PM_ROOT = environ.get('TPP_PM_ROOT') diff --git a/tests/test_local_methods.py b/tests/test_local_methods.py index f2878fe..96049db 100644 --- a/tests/test_local_methods.py +++ b/tests/test_local_methods.py @@ -200,10 +200,10 @@ def test_parse_tpp_policy1(self): conn = TPPConnection(url="http://example.com/", user="", password="") raw_data = json.loads(POLICY_TPP1) p = conn._parse_zone_config_to_policy(raw_data) - self.assertEqual(len(p.key_types), 7) + self.assertEqual(len(p.key_types), 8) raw_data['Policy']['KeyPair']['KeySize']['Locked'] = True p = conn._parse_zone_config_to_policy(raw_data) - self.assertEqual(len(p.key_types), 4) + self.assertEqual(len(p.key_types), 5) raw_data['Policy']['KeyPair']['KeyAlgorithm']['Locked'] = True p = conn._parse_zone_config_to_policy(raw_data) self.assertEqual(len(p.key_types), 1) diff --git a/tests/test_tpp_token.py b/tests/test_tpp_token.py index f3cd9fc..54b8b8c 100644 --- a/tests/test_tpp_token.py +++ b/tests/test_tpp_token.py @@ -26,7 +26,7 @@ from assets import TEST_KEY_ECDSA, TEST_KEY_RSA_4096, TEST_KEY_RSA_2048_ENCRYPTED from test_env import TPP_ZONE, TPP_ZONE_ECDSA, TPP_USER, TPP_PASSWORD, TPP_TOKEN_URL from test_utils import (random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse, - enroll_with_zone_update, simple_enroll) + enroll_with_zone_update, simple_enroll, retire_by_id, retire_by_thumbprint) from vcert import (CustomField, KeyType, RevocationRequest, CertificateRequest, IssuerHint, logger, TPPTokenConnection) from vcert.errors import ClientBadData, ServerUnexptedBehavior @@ -175,6 +175,7 @@ def test_token_revoke_normal(self): with self.assertRaises(Exception): self.tpp_conn.renew_cert(req) + def test_token_revoke_without_disable(self): req, cert = simple_enroll(self.tpp_conn, self.tpp_zone) rev_req = RevocationRequest(req_id=req.id, disable=False) @@ -267,3 +268,20 @@ def test_revoke_access_token(self): cn = f"{random_word(10)}.venafi.example.com" with self.assertRaises(Exception): enroll(self.tpp_conn, self.tpp_zone, cn) + + def test_tpp_token_retire_cert_id(self): + try: + req, cert = simple_enroll(self.tpp_conn, self.tpp_zone) + ret_data = retire_by_id(self.tpp_conn, req.id) + assert ret_data['Success'] is True + except Exception as err: + self.fail(f"Error in tpp retire by id test: {err}") + + def test_tpp_token_retire_cert_thumbprint(self): + try: + req, cert = simple_enroll(self.tpp_conn, self.tpp_zone) + cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend()) + ret_data = retire_by_thumbprint(self.tpp_conn, cert) + assert ret_data['Success'] is True + except Exception as err: + self.fail(f"Error in tpp retire by thumbprint test: {err}") \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py index 21b0ab2..9f7f0a3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -28,6 +28,7 @@ from test_env import RANDOM_DOMAIN from vcert import CertificateRequest, FakeConnection, TPPConnection, TPPTokenConnection, CSR_ORIGIN_SERVICE +from vcert.common import RetireRequest def random_word(length): @@ -209,3 +210,18 @@ def renew_by_thumbprint(conn, prev_cert): print(prev_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)) assert cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) == prev_cert.subject.get_attributes_for_oid( NameOID.COMMON_NAME) + + +def retire_by_id(conn, prev_cert_id): + print("trying to retire by id") + ret_request = RetireRequest(req_id=prev_cert_id) + retire_data = conn.retire_cert(ret_request) + return retire_data + + +def retire_by_thumbprint(conn, prev_cert): + print("Trying to retire by thumbprint") + thumbprint = binascii.hexlify(prev_cert.fingerprint(hashes.SHA1())).decode() + ret_request = RetireRequest(thumbprint=thumbprint) + retire_data = conn.retire_cert(ret_request) + return retire_data \ No newline at end of file diff --git a/tests/test_vaas.py b/tests/test_vaas.py index 8d46ade..83422e6 100644 --- a/tests/test_vaas.py +++ b/tests/test_vaas.py @@ -25,12 +25,13 @@ from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey from cryptography.x509.oid import NameOID -from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN +from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN, VAAS_ZONE_ONLY_EC from test_pm import get_policy_obj, get_defaults_obj from test_utils import random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse, simple_enroll, \ get_vaas_zone from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger, CSR_ORIGIN_SERVICE from vcert.policy import KeyPair, DefaultKeyPair, PolicySpecification +from vcert.common import RetireRequest log = logger.get_child("test-vaas") @@ -38,6 +39,7 @@ class TestVaaSMethods(unittest.TestCase): def __init__(self, *args, **kwargs): self.cloud_zone = CLOUD_ZONE + self.vaas_zone_ec = VAAS_ZONE_ONLY_EC self.cloud_conn = CloudConnection(token=CLOUD_APIKEY, url=CLOUD_URL) super(TestVaaSMethods, self).__init__(*args, **kwargs) @@ -170,29 +172,14 @@ def test_cloud_enroll_service_generated_csr(self): log.info(f"PKCS12 created successfully for certificate with CN: {cn}") def test_enroll_ec_key_certificate(self): - policy = get_policy_obj() - kp = KeyPair( - key_types=['EC'], - elliptic_curves=['P521', 'P384'], - reuse_allowed=False) - policy.key_pair = kp + zone = self.vaas_zone_ec - defaults = get_defaults_obj() - defaults.key_pair = DefaultKeyPair( - key_type='EC', - elliptic_curve='P521') - - policy_spec = PolicySpecification() - policy_spec.policy = policy - policy_spec.defaults = defaults - - zone = get_vaas_zone() - - self.cloud_conn.set_policy(zone, policy_spec) password = 'FooBarPass123' + random_name = f"{random_word(10)}.vfidev.com" request = CertificateRequest( - common_name=f"{random_word(10)}.venafi.example", + common_name=random_name, + san_dns=[random_name], key_type=KeyType( key_type="ec", option="P384" @@ -214,3 +201,15 @@ def test_enroll_ec_key_certificate(self): if p_key: self.assertIsInstance(p_key, EllipticCurvePrivateKey, "returned private key is not of type Elliptic Curve") self.assertEqual(p_key.curve.key_size, 384, f"Private Key expected curve: 384. Got: {p_key.curve.key_size}") + + def test_cloud_retire_by_thumbprint(self): + try: + req, cert = simple_enroll(self.cloud_conn, self.cloud_zone) + cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend()) + fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode() + time.sleep(1) + ret_request = RetireRequest(thumbprint=fingerprint) + ret_data = self.cloud_conn.retire_cert(ret_request) + assert ret_data is True + except Exception as e: + log.error(msg=f"Error retiring certificate by thumbprint: {e.message}") diff --git a/vcert/common.py b/vcert/common.py index feabf23..d862634 100644 --- a/vcert/common.py +++ b/vcert/common.py @@ -109,7 +109,7 @@ def get_ip_address(): class KeyType: ALLOWED_SIZES = [2048, 3072, 4096, 8192] - ALLOWED_CURVES = ["p256", "p384", "p521"] + ALLOWED_CURVES = ["p256", "p384", "p521", "ed25519"] RSA = 'rsa' ECDSA = 'ec' @@ -125,7 +125,7 @@ def __init__(self, key_type, option): raise BadData elif self.key_type == KeyType.ECDSA: option = {"secp521r1": "p521", "secp384r1": "p384", "secp256r1": "p256", "p256": "p256", "p384": "p384", - "p521": "p521"}[option.lower().strip()] + "p521": "p521", "ed25519": "ed25519"}[option.lower().strip()] if option not in KeyType.ALLOWED_CURVES: log.error(f"unknown curve: {option}, should be one of {KeyType.ALLOWED_CURVES}") raise BadData @@ -593,6 +593,20 @@ def __init__(self, req_id=None, thumbprint=None, reason=RevocationReasons.NoRea self.disable = disable +class RetireRequest: + def __init__(self, req_id=None, thumbprint=None, guid=None, description=None): + """ + :param req_id: + :param thumbprint: + :param guid: + :param description: + """ + self.id = req_id + self.thumbprint = thumbprint + self.guid = guid + self.description = description + + class Authentication: def __init__(self, user=None, password=None, access_token=None, refresh_token=None, api_key=None, state=None, token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM): diff --git a/vcert/connection_cloud.py b/vcert/connection_cloud.py index a26dde8..84f3c5d 100644 --- a/vcert/connection_cloud.py +++ b/vcert/connection_cloud.py @@ -85,6 +85,7 @@ def __init__(self): CERTIFICATE_STATUS = CERTIFICATE_REQUESTS + "/{}" CERTIFICATE_RETRIEVE = API_BASE_PATH + "certificates/{}/contents" CERTIFICATE_SEARCH = API_BASE_PATH + "certificatesearch" + CERTIFICATE_RETIRE = API_BASE_PATH + "certificates/retirement" APPLICATIONS = API_BASE_PATH + "applications" APP_BY_ID = APPLICATIONS + "/{}" CERTIFICATE_TEMPLATE_BY_ID = APP_BY_ID + "/certificateissuingtemplates/{}" @@ -477,6 +478,43 @@ def revoke_cert(self, request): # not supported in Venafi Cloud raise NotImplementedError + def retire_cert(self, request): + cert_id = None + if not request.id and not request.thumbprint: + log.error("id or thumbprint must be specified for retiring certificate") + raise ClientBadData + + if request.id: + cert_id = request.id + + elif request.thumbprint: + response = self.search_by_thumbprint(request.thumbprint) + cert_ids = response.certificateIds + if len(cert_ids) > 1: + log.error(f"multiple certificates matching thumbprint found") + raise VenafiError + cert_id = cert_ids[0] + + retire_data = { + 'certificateIds': [ + cert_id + ] + } + + status, data = self._post(URLS.CERTIFICATE_RETIRE, retire_data) + if status == HTTPStatus.OK: + if len(data) == 0: + log.error(f"certificate retirement was not successful for {cert_id}") + raise VenafiError + else: + return True + elif status == HTTPStatus.BAD_REQUEST or status == HTTPStatus.PRECONDITION_FAILED: + log.error("bad request for certificate retirement") + raise ClientBadData + else: + log.error("unexpected status returned") + raise ServerUnexptedBehavior + def renew_cert(self, request, reuse_key=False): cert_request_id = None if not request.id and not request.thumbprint: diff --git a/vcert/connection_tpp.py b/vcert/connection_tpp.py index 84baabd..2bf1ee1 100644 --- a/vcert/connection_tpp.py +++ b/vcert/connection_tpp.py @@ -76,6 +76,17 @@ def post(self, args): return self._post(url=url, data=data) + def put(self, args): + """ + + :param dict args: + :rtype: tuple[Any, Any] + """ + url = args[self.ARG_URL] if self.ARG_URL in args else None + data = args[self.ARG_DATA] if self.ARG_DATA in args else None + + return self._put(url=url, data=data) + def _get(self, url="", params=None): if not self._token or self._token[1] < time.time() + 1: self.auth() @@ -106,6 +117,22 @@ def _post(self, url, data=None): raise ClientBadData return self.process_server_response(r) + def _put(self, url, data=None): + if not self._token or self._token[1] < time.time() + 1: + self.auth() + log.debug(f"Token is {self._token[0]}, timeout is {self._token[1]}") + + if isinstance(data, dict): + r = requests.put(f"{self._base_url}{url}", + headers={TOKEN_HEADER_NAME: self._token[0], + 'content-type': MIME_JSON, + 'cache-control': "no-cache"}, + json=data, + **self._http_request_kwargs) # nosec B113 + else: + log.error(f"Unexpected client data type: {type(data)} for {url}") + raise ClientBadData + return self.process_server_response(r) @staticmethod def _normalize_and_verify_base_url(u): if u.startswith('http://'): # nosec diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 0dba95e..840f01a 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -59,9 +59,12 @@ class URLS: CERTIFICATE_REVOKE = API_BASE_URL + "certificates/revoke" CERTIFICATE_RENEW = API_BASE_URL + "certificates/renew" CERTIFICATE_SEARCH = API_BASE_URL + "certificates/" + CERTIFICATE_UPDATE = API_BASE_URL + "certificates/" CERTIFICATE_IMPORT = API_BASE_URL + "certificates/import" ZONE_CONFIG = API_BASE_URL + "certificates/checkpolicy" CONFIG_READ_DN = API_BASE_URL + "Config/ReadDn" + CONFIG_DN_TO_GUID = API_BASE_URL + "Config/DnToGuid" + CONFIG_GUID_TO_DN = API_BASE_URL + "Config/GuidToDn" POLICY_IS_VALID = API_BASE_URL + "config/isvalid" POLICY_CREATE = API_BASE_URL + "config/create" @@ -339,8 +342,6 @@ def revoke_cert(self, request): d['Thumbprint'] = request.thumbprint else: raise ClientBadData - if request.comments: - d['Comments'] = request.comments # TODO: Change _post() with post(args) status, data = self._post(URLS.CERTIFICATE_REVOKE, data=d) if status in (HTTPStatus.OK, HTTPStatus.ACCEPTED): @@ -348,6 +349,48 @@ def revoke_cert(self, request): raise ServerUnexptedBehavior + def retire_cert(self, request): + if not (request.id or request.thumbprint or request.guid): + raise ClientBadData + data = { + 'AttributeData': [ + { + 'Name': 'Disabled', + 'Value': [ + '1' + ] + } + ] + } + + if request.guid: + cert_guid = request.guid + elif request.id: + cert_guid = self.get_certificate_guid_from_dn(request.id) + elif request.thumbprint: + req_id = self.search_by_thumbprint(request.thumbprint) + cert_guid = self.get_certificate_guid_from_dn(req_id) + else: + raise ClientBadData + + if request.description: + data['AttributeData'] += { + 'Name': 'Description', + 'Value': [ + request.description + ] + } + args = { + self.ARG_URL: URLS.CERTIFICATE_UPDATE+cert_guid, + self.ARG_DATA: data + } + status, data = self.put(args) + if status in (HTTPStatus.OK, HTTPStatus.ACCEPTED): + return data + + raise ServerUnexptedBehavior + + def import_cert(self, request): raise NotImplementedError @@ -731,6 +774,14 @@ def post(self, args): """ raise NotImplementedError + def put(self, args): + """ + + :param dict args: + :rtype: tuple[Any, Any] + """ + raise NotImplementedError + # ======================================== API IMPLEMENTATION ENDS ======================================== # # ========================================================================================================= # @@ -1064,3 +1115,28 @@ def validate_identity(self, prefixed_universal): status, response = self._post(URLS.POLICY_VALIDATE_IDENTITY, data=data) identity = build_identity_entry(response['ID']) return identity + + def get_certificate_guid_from_dn(self, cert_dn): + request_data = { + 'ObjectDN': cert_dn + } + args = { + self.ARG_URL: URLS.CONFIG_DN_TO_GUID, + self.ARG_DATA: request_data + } + status, response = self.post(args) + cert_guid = response['GUID'] + return cert_guid + + def get_certificate_dn_from_guid(self, cert_guid): + request_data = { + 'ObjectGUID': cert_guid + } + args = { + self.ARG_URL: URLS.CONFIG_GUID_TO_DN, + self.ARG_DATA: request_data + } + status, response = self.post(args) + cert_dn = response['ObjectDN'] + return cert_dn + diff --git a/vcert/connection_tpp_token.py b/vcert/connection_tpp_token.py index 5159daf..cdf940c 100644 --- a/vcert/connection_tpp_token.py +++ b/vcert/connection_tpp_token.py @@ -85,6 +85,18 @@ def post(self, args): include_token_header = args[self.ARG_INCLUDE_TOKEN_HEADER] if self.ARG_INCLUDE_TOKEN_HEADER in args else True return self._post(url=url, data=data, check_token=check_token, include_token_header=include_token_header) + def put(self, args): + """ + + :param dict args: + :rtype: tuple[Any, Any] + """ + url = args[self.ARG_URL] if self.ARG_URL in args else None + data = args[self.ARG_DATA] if self.ARG_DATA in args else None + check_token = args[self.ARG_CHECK_TOKEN] if self.ARG_CHECK_TOKEN in args else True + include_token_header = args[self.ARG_INCLUDE_TOKEN_HEADER] if self.ARG_INCLUDE_TOKEN_HEADER in args else True + + return self._put(url=url, data=data, check_token=check_token, include_token_header=include_token_header) def _get(self, url=None, params=None, check_token=True, include_token_header=True): if check_token: @@ -121,6 +133,27 @@ def _post(self, url=None, data=None, check_token=True, include_token_header=True raise ClientBadData return self.process_server_response(r) + def _put(self, url, data=None, check_token=True, include_token_header=True): + if check_token: + self._check_token() + + headers = { + 'content-type': MIME_JSON, + 'cache-control': "no-cache" + } + if include_token_header: + token = self._get_auth_header_value(self._auth.access_token) + headers[HEADER_AUTHORIZATION] = token + + if isinstance(data, dict): + log.debug(f"POST Request\n\tURL: {self._base_url + url}\n\tHeaders:{headers}\n\tBody:{data}\n") + r = requests.put(self._base_url + url, headers=headers, json=data, + **self._http_request_kwargs) # nosec B113 + else: + log.error(f"Unexpected client data type: {type(data)} for {url}") + raise ClientBadData + return self.process_server_response(r) + def _check_token(self): if not self._auth.access_token: self.get_access_token()