Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
edf0efd
adds functions to get cert guid from dn and dn from guid
Pmaraveyias Nov 10, 2023
971856e
Merge branch 'Venafi:master' into retire-cert
Pmaraveyias Nov 10, 2023
2cc1a6f
adds retire functionality for tpp
Pmaraveyias Nov 10, 2023
23cf139
adds retire functionality for vaas
Pmaraveyias Nov 20, 2023
231d4a2
adds RetireRequest class
Pmaraveyias Nov 30, 2023
df063df
minor updates to RetireRequest class
Pmaraveyias Nov 30, 2023
b9a98ef
adds put method functions for TPPTokenConnection
Pmaraveyias Nov 30, 2023
936b264
minor bugfixes for tpp retire
Pmaraveyias Nov 30, 2023
5b87d1b
minor bugfixes for vaas retire
Pmaraveyias Nov 30, 2023
24e1abd
adds tpp retire by thumbprint and tests for tpp and vaas
Pmaraveyias Dec 12, 2023
b637f57
bumps cryptography dependency
Pmaraveyias Jan 18, 2024
2b8566c
adds pip dependency for tests
Pmaraveyias Jan 31, 2024
9c4e714
bumps cryptography dependency
Pmaraveyias Jan 31, 2024
2c85ce6
fixes revoke and retire tests
Pmaraveyias Jan 31, 2024
140235d
fixes additional tests
Pmaraveyias Jan 31, 2024
5cc767f
reverts small test change
Pmaraveyias Feb 2, 2024
d223bf4
fixes ec test
Pmaraveyias Feb 2, 2024
a35ba70
fixes ec test
Pmaraveyias Feb 2, 2024
542f942
updates cyrptography dependency in setup
Pmaraveyias Feb 2, 2024
2020477
increases key types to 8 for local tests
Pmaraveyias Feb 2, 2024
af1cfef
increases ec key types to 5 for local tests
Pmaraveyias Feb 2, 2024
6a5a459
reverts revoke changes and minor logic update to vaas retire
Pmaraveyias Feb 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-build.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pip==23.3.2
pytest==7.4.3
pytest-cov==4.1.0
safety==2.3.5
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
requests==2.31.0
python-dateutil==2.8.2
cryptography==40.0.2
cryptography==42.0.2
six==1.16.0
ruamel.yaml==0.18.5
pynacl==1.5.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
url="https://github.com/Venafi/vcert-python",
packages=['vcert', 'vcert.parser', 'vcert.policy'],
install_requires=['requests==2.31.0', 'python-dateutil==2.8.2', 'certvalidator<=0.11.1', 'six==1.16.0',
'cryptography==40.0.2', 'ruamel.yaml==0.17.31', 'pynacl==1.5.0'],
'cryptography==42.0.2', 'ruamel.yaml==0.17.31', 'pynacl==1.5.0'],
description='Python client library for Venafi Trust Protection Platform and Venafi Cloud.',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
1 change: 1 addition & 0 deletions tests/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
CLOUD_URL = environ.get('CLOUD_URL')
CLOUD_APIKEY = environ.get('CLOUD_APIKEY')
CLOUD_ZONE = environ.get('CLOUD_ZONE')
VAAS_ZONE_ONLY_EC = environ.get('VAAS_ZONE_ONLY_EC')
CLOUD_TEAM = environ.get('CLOUD_TEAM')

TPP_PM_ROOT = environ.get('TPP_PM_ROOT')
Expand Down
4 changes: 2 additions & 2 deletions tests/test_local_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ def test_parse_tpp_policy1(self):
conn = TPPConnection(url="http://example.com/", user="", password="")
raw_data = json.loads(POLICY_TPP1)
p = conn._parse_zone_config_to_policy(raw_data)
self.assertEqual(len(p.key_types), 7)
self.assertEqual(len(p.key_types), 8)
raw_data['Policy']['KeyPair']['KeySize']['Locked'] = True
p = conn._parse_zone_config_to_policy(raw_data)
self.assertEqual(len(p.key_types), 4)
self.assertEqual(len(p.key_types), 5)
raw_data['Policy']['KeyPair']['KeyAlgorithm']['Locked'] = True
p = conn._parse_zone_config_to_policy(raw_data)
self.assertEqual(len(p.key_types), 1)
Expand Down
20 changes: 19 additions & 1 deletion tests/test_tpp_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from assets import TEST_KEY_ECDSA, TEST_KEY_RSA_4096, TEST_KEY_RSA_2048_ENCRYPTED
from test_env import TPP_ZONE, TPP_ZONE_ECDSA, TPP_USER, TPP_PASSWORD, TPP_TOKEN_URL
from test_utils import (random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse,
enroll_with_zone_update, simple_enroll)
enroll_with_zone_update, simple_enroll, retire_by_id, retire_by_thumbprint)
from vcert import (CustomField, KeyType, RevocationRequest, CertificateRequest, IssuerHint, logger, TPPTokenConnection)
from vcert.errors import ClientBadData, ServerUnexptedBehavior

Expand Down Expand Up @@ -175,6 +175,7 @@ def test_token_revoke_normal(self):
with self.assertRaises(Exception):
self.tpp_conn.renew_cert(req)


def test_token_revoke_without_disable(self):
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
rev_req = RevocationRequest(req_id=req.id, disable=False)
Expand Down Expand Up @@ -267,3 +268,20 @@ def test_revoke_access_token(self):
cn = f"{random_word(10)}.venafi.example.com"
with self.assertRaises(Exception):
enroll(self.tpp_conn, self.tpp_zone, cn)

def test_tpp_token_retire_cert_id(self):
try:
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
ret_data = retire_by_id(self.tpp_conn, req.id)
assert ret_data['Success'] is True
except Exception as err:
self.fail(f"Error in tpp retire by id test: {err}")

def test_tpp_token_retire_cert_thumbprint(self):
try:
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
ret_data = retire_by_thumbprint(self.tpp_conn, cert)
assert ret_data['Success'] is True
except Exception as err:
self.fail(f"Error in tpp retire by thumbprint test: {err}")
16 changes: 16 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from test_env import RANDOM_DOMAIN
from vcert import CertificateRequest, FakeConnection, TPPConnection, TPPTokenConnection, CSR_ORIGIN_SERVICE
from vcert.common import RetireRequest


def random_word(length):
Expand Down Expand Up @@ -209,3 +210,18 @@ def renew_by_thumbprint(conn, prev_cert):
print(prev_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME))
assert cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) == prev_cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)


def retire_by_id(conn, prev_cert_id):
print("trying to retire by id")
ret_request = RetireRequest(req_id=prev_cert_id)
retire_data = conn.retire_cert(ret_request)
return retire_data


def retire_by_thumbprint(conn, prev_cert):
print("Trying to retire by thumbprint")
thumbprint = binascii.hexlify(prev_cert.fingerprint(hashes.SHA1())).decode()
ret_request = RetireRequest(thumbprint=thumbprint)
retire_data = conn.retire_cert(ret_request)
return retire_data
39 changes: 19 additions & 20 deletions tests/test_vaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.x509.oid import NameOID

from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN
from test_env import CLOUD_ZONE, CLOUD_APIKEY, CLOUD_URL, RANDOM_DOMAIN, VAAS_ZONE_ONLY_EC
from test_pm import get_policy_obj, get_defaults_obj
from test_utils import random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse, simple_enroll, \
get_vaas_zone
from vcert import CloudConnection, KeyType, CertificateRequest, CustomField, logger, CSR_ORIGIN_SERVICE
from vcert.policy import KeyPair, DefaultKeyPair, PolicySpecification
from vcert.common import RetireRequest

log = logger.get_child("test-vaas")


class TestVaaSMethods(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.cloud_zone = CLOUD_ZONE
self.vaas_zone_ec = VAAS_ZONE_ONLY_EC
self.cloud_conn = CloudConnection(token=CLOUD_APIKEY, url=CLOUD_URL)
super(TestVaaSMethods, self).__init__(*args, **kwargs)

Expand Down Expand Up @@ -170,29 +172,14 @@ def test_cloud_enroll_service_generated_csr(self):
log.info(f"PKCS12 created successfully for certificate with CN: {cn}")

def test_enroll_ec_key_certificate(self):
policy = get_policy_obj()
kp = KeyPair(
key_types=['EC'],
elliptic_curves=['P521', 'P384'],
reuse_allowed=False)
policy.key_pair = kp
zone = self.vaas_zone_ec

defaults = get_defaults_obj()
defaults.key_pair = DefaultKeyPair(
key_type='EC',
elliptic_curve='P521')

policy_spec = PolicySpecification()
policy_spec.policy = policy
policy_spec.defaults = defaults

zone = get_vaas_zone()

self.cloud_conn.set_policy(zone, policy_spec)
password = 'FooBarPass123'
random_name = f"{random_word(10)}.vfidev.com"

request = CertificateRequest(
common_name=f"{random_word(10)}.venafi.example",
common_name=random_name,
san_dns=[random_name],
key_type=KeyType(
key_type="ec",
option="P384"
Expand All @@ -214,3 +201,15 @@ def test_enroll_ec_key_certificate(self):
if p_key:
self.assertIsInstance(p_key, EllipticCurvePrivateKey, "returned private key is not of type Elliptic Curve")
self.assertEqual(p_key.curve.key_size, 384, f"Private Key expected curve: 384. Got: {p_key.curve.key_size}")

def test_cloud_retire_by_thumbprint(self):
try:
req, cert = simple_enroll(self.cloud_conn, self.cloud_zone)
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()
time.sleep(1)
ret_request = RetireRequest(thumbprint=fingerprint)
ret_data = self.cloud_conn.retire_cert(ret_request)
assert ret_data is True
except Exception as e:
log.error(msg=f"Error retiring certificate by thumbprint: {e.message}")
18 changes: 16 additions & 2 deletions vcert/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_ip_address():

class KeyType:
ALLOWED_SIZES = [2048, 3072, 4096, 8192]
ALLOWED_CURVES = ["p256", "p384", "p521"]
ALLOWED_CURVES = ["p256", "p384", "p521", "ed25519"]
RSA = 'rsa'
ECDSA = 'ec'

Expand All @@ -125,7 +125,7 @@ def __init__(self, key_type, option):
raise BadData
elif self.key_type == KeyType.ECDSA:
option = {"secp521r1": "p521", "secp384r1": "p384", "secp256r1": "p256", "p256": "p256", "p384": "p384",
"p521": "p521"}[option.lower().strip()]
"p521": "p521", "ed25519": "ed25519"}[option.lower().strip()]
if option not in KeyType.ALLOWED_CURVES:
log.error(f"unknown curve: {option}, should be one of {KeyType.ALLOWED_CURVES}")
raise BadData
Expand Down Expand Up @@ -593,6 +593,20 @@ def __init__(self, req_id=None, thumbprint=None, reason=RevocationReasons.NoRea
self.disable = disable


class RetireRequest:
def __init__(self, req_id=None, thumbprint=None, guid=None, description=None):
"""
:param req_id:
:param thumbprint:
:param guid:
:param description:
"""
self.id = req_id
self.thumbprint = thumbprint
self.guid = guid
self.description = description


class Authentication:
def __init__(self, user=None, password=None, access_token=None, refresh_token=None, api_key=None, state=None,
token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM):
Expand Down
38 changes: 38 additions & 0 deletions vcert/connection_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self):
CERTIFICATE_STATUS = CERTIFICATE_REQUESTS + "/{}"
CERTIFICATE_RETRIEVE = API_BASE_PATH + "certificates/{}/contents"
CERTIFICATE_SEARCH = API_BASE_PATH + "certificatesearch"
CERTIFICATE_RETIRE = API_BASE_PATH + "certificates/retirement"
APPLICATIONS = API_BASE_PATH + "applications"
APP_BY_ID = APPLICATIONS + "/{}"
CERTIFICATE_TEMPLATE_BY_ID = APP_BY_ID + "/certificateissuingtemplates/{}"
Expand Down Expand Up @@ -477,6 +478,43 @@ def revoke_cert(self, request):
# not supported in Venafi Cloud
raise NotImplementedError

def retire_cert(self, request):
cert_id = None
if not request.id and not request.thumbprint:
log.error("id or thumbprint must be specified for retiring certificate")
raise ClientBadData

if request.id:
cert_id = request.id

elif request.thumbprint:
response = self.search_by_thumbprint(request.thumbprint)
cert_ids = response.certificateIds
if len(cert_ids) > 1:
log.error(f"multiple certificates matching thumbprint found")
raise VenafiError
cert_id = cert_ids[0]

retire_data = {
'certificateIds': [
cert_id
]
}

status, data = self._post(URLS.CERTIFICATE_RETIRE, retire_data)
if status == HTTPStatus.OK:
if len(data) == 0:
log.error(f"certificate retirement was not successful for {cert_id}")
raise VenafiError
else:
return True
elif status == HTTPStatus.BAD_REQUEST or status == HTTPStatus.PRECONDITION_FAILED:
log.error("bad request for certificate retirement")
raise ClientBadData
else:
log.error("unexpected status returned")
raise ServerUnexptedBehavior

def renew_cert(self, request, reuse_key=False):
cert_request_id = None
if not request.id and not request.thumbprint:
Expand Down
27 changes: 27 additions & 0 deletions vcert/connection_tpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ def post(self, args):

return self._post(url=url, data=data)

def put(self, args):
"""

:param dict args:
:rtype: tuple[Any, Any]
"""
url = args[self.ARG_URL] if self.ARG_URL in args else None
data = args[self.ARG_DATA] if self.ARG_DATA in args else None

return self._put(url=url, data=data)

def _get(self, url="", params=None):
if not self._token or self._token[1] < time.time() + 1:
self.auth()
Expand Down Expand Up @@ -106,6 +117,22 @@ def _post(self, url, data=None):
raise ClientBadData
return self.process_server_response(r)

def _put(self, url, data=None):
if not self._token or self._token[1] < time.time() + 1:
self.auth()
log.debug(f"Token is {self._token[0]}, timeout is {self._token[1]}")

if isinstance(data, dict):
r = requests.put(f"{self._base_url}{url}",
headers={TOKEN_HEADER_NAME: self._token[0],
'content-type': MIME_JSON,
'cache-control': "no-cache"},
json=data,
**self._http_request_kwargs) # nosec B113
else:
log.error(f"Unexpected client data type: {type(data)} for {url}")
raise ClientBadData
return self.process_server_response(r)
@staticmethod
def _normalize_and_verify_base_url(u):
if u.startswith('http://'): # nosec
Expand Down
Loading