Skip to content

Commit

Permalink
approle - fix metadata for generated secret IDs, re-add wrap_ttl (#782
Browse files Browse the repository at this point in the history
)

* approle.generate_secret_id(): send metadata only if it not empty

* update integration tests (also for custom secret_id)

* update unit tests (also for custom secret_id)

* fix custom_secret_id metadata processing

* blackify

* add back support for wrap_ttl

---------

Co-authored-by: Brian Scholer <1260690+briantist@users.noreply.github.com>
  • Loading branch information
Prividen and briantist committed Mar 1, 2023
1 parent b11a2ec commit 92f68c9
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 28 deletions.
21 changes: 17 additions & 4 deletions hvac/api/auth_methods/approle.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def generate_secret_id(
cidr_list=None,
token_bound_cidrs=None,
mount_point=DEFAULT_MOUNT_POINT,
wrap_ttl=None,
):
"""
Generates and issues a new Secret ID on a role in the auth method.
Expand All @@ -255,6 +256,9 @@ def generate_secret_id(
:type token_bound_cidrs: list
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:param wrap_ttl: Returns the request as a response-wrapping token.
Can be either an integer number of seconds or a string duration of seconds (`15s`), minutes (`20m`), or hours (`25h`).
:type wrap_ttl: int | str
:return: The JSON response of the read_role_id request.
:rtype: dict
"""
Expand All @@ -267,7 +271,9 @@ def generate_secret_id(
)
)

params = {"metadata": json.dumps(metadata) if metadata else metadata}
params = {}
if metadata:
params = {"metadata": json.dumps(metadata)}

list_of_strings_params = {
"cidr_list": cidr_list,
Expand All @@ -286,7 +292,7 @@ def generate_secret_id(
mount_point=mount_point,
role_name=role_name,
)
return self._adapter.post(url=api_path, json=params)
return self._adapter.post(url=api_path, json=params, wrap_ttl=wrap_ttl)

def create_custom_secret_id(
self,
Expand All @@ -296,6 +302,7 @@ def create_custom_secret_id(
cidr_list=None,
token_bound_cidrs=None,
mount_point=DEFAULT_MOUNT_POINT,
wrap_ttl=None,
):
"""
Generates and issues a new Secret ID on a role in the auth method.
Expand All @@ -315,6 +322,9 @@ def create_custom_secret_id(
:type token_bound_cidrs: list
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:param wrap_ttl: Returns the request as a response-wrapping token.
Can be either an integer number of seconds or a string duration of seconds (`15s`), minutes (`20m`), or hours (`25h`).
:type wrap_ttl: int | str
:return: The JSON response of the read_role_id request.
:rtype: dict
"""
Expand All @@ -327,7 +337,10 @@ def create_custom_secret_id(
)
)

params = {"secret_id": secret_id, "metadata": metadata}
params = {"secret_id": secret_id}

if metadata:
params["metadata"] = json.dumps(metadata)

list_of_strings_params = {
"cidr_list": cidr_list,
Expand All @@ -346,7 +359,7 @@ def create_custom_secret_id(
mount_point=mount_point,
role_name=role_name,
)
return self._adapter.post(url=api_path, json=params)
return self._adapter.post(url=api_path, json=params, wrap_ttl=wrap_ttl)

def read_secret_id(self, role_name, secret_id, mount_point=DEFAULT_MOUNT_POINT):
"""
Expand Down
51 changes: 38 additions & 13 deletions tests/integration_tests/api/auth_methods/test_approle.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,20 @@ def test_read_role_id(self):

@parameterized.expand(
[
("good request", None),
("bad metadata option", exceptions.ParamValidationError),
("good request, no metadata", None, None, None),
("good request, good metadata", None, {"a": "val1", "B": "two"}, 300),
("good request, good metadata", None, {"a": "val1", "B": "two"}, "5m"),
("bad metadata option", exceptions.ParamValidationError, "bad", None),
]
)
def test_generate_secret_id(self, test_label, raises):
def test_generate_secret_id(self, test_label, raises, metadata, wrap_ttl):
if raises is not None:
with self.assertRaises(raises) as cm:
self.client.auth.approle.generate_secret_id(
role_name=self.TEST_ROLE_NAME,
metadata="metadata string",
metadata=metadata,
mount_point=self.TEST_MOUNT_POINT,
wrap_ttl=wrap_ttl,
)
self.assertIn(
member="unsupported metadata argument", container=str(cm.exception)
Expand All @@ -122,24 +125,38 @@ def test_generate_secret_id(self, test_label, raises):
role_name=self.TEST_ROLE_NAME,
cidr_list=["127.0.0.1/32"],
mount_point=self.TEST_MOUNT_POINT,
metadata=metadata,
wrap_ttl=wrap_ttl,
)
self.assertIn(member="secret_id", container=response["data"])
if wrap_ttl is not None:
assert "wrap_info" in response
assert isinstance(response["wrap_info"]["ttl"], int)
assert (
response["wrap_info"]["ttl"] == 300
) # NOTE: hardcoded for now because of string formats
else:
self.assertIn(
member="secret_id", container=response["data"], msg=response
)

@parameterized.expand(
[
("good request", None),
("bad metadata option", exceptions.ParamValidationError),
("good request, no metadata", None, None, None),
("good request, good metadata", None, {"a": "val1", "B": "two"}, 300),
("good request, good metadata", None, {"a": "val1", "B": "two"}, "5m"),
("bad metadata option", exceptions.ParamValidationError, "bad", None),
]
)
def test_create_custom_secret_id(self, test_label, raises):
def test_create_custom_secret_id(self, test_label, raises, metadata, wrap_ttl):
if raises is not None:
with self.assertRaises(raises) as cm:
self.client.auth.approle.create_custom_secret_id(
role_name=self.TEST_ROLE_NAME,
secret_id=self.TEST_SECRET_ID,
cidr_list=["127.0.0.1/32"],
metadata="metadata string",
metadata=metadata,
mount_point=self.TEST_MOUNT_POINT,
wrap_ttl=wrap_ttl,
)
self.assertIn(
member="unsupported metadata argument", container=str(cm.exception)
Expand All @@ -150,11 +167,19 @@ def test_create_custom_secret_id(self, test_label, raises):
secret_id=self.TEST_SECRET_ID,
cidr_list=["127.0.0.1/32"],
mount_point=self.TEST_MOUNT_POINT,
metadata=metadata,
wrap_ttl=wrap_ttl,
)

self.assertEqual(
first=self.TEST_SECRET_ID, second=response["data"]["secret_id"]
)
if wrap_ttl is not None:
assert "wrap_info" in response
assert isinstance(response["wrap_info"]["ttl"], int)
assert (
response["wrap_info"]["ttl"] == 300
) # NOTE: hardcoded for now because of string formats
else:
self.assertEqual(
first=self.TEST_SECRET_ID, second=response["data"]["secret_id"]
)

def test_read_secret_id(self):
secret_id_response = self._secret_id()
Expand Down
82 changes: 71 additions & 11 deletions tests/unit_tests/api/auth_methods/test_approle.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,28 @@ def test_update_role_id(self, test_label, mount_point, requests_mocker):

@parameterized.expand(
[
("default mount point", DEFAULT_MOUNT_POINT, None),
("custom mount point", "approle-test", exceptions.ParamValidationError),
("default mount point", DEFAULT_MOUNT_POINT, None, None, None),
(
"metadata as dict",
DEFAULT_MOUNT_POINT,
None,
{"a": "val1", "b": "two"},
300,
),
(
"invalid metadata",
DEFAULT_MOUNT_POINT,
exceptions.ParamValidationError,
"bad metadata",
None,
),
("custom mount point", "approle-test", None, None, "5m"),
]
)
@requests_mock.Mocker()
def test_generate_secret_id(self, test_label, mount_point, raises, requests_mocker):
def test_generate_secret_id(
self, test_label, mount_point, raises, metadata, wrap_ttl, requests_mocker
):
expected_status_code = 200
role_name = "testrole"

Expand All @@ -271,10 +287,11 @@ def test_generate_secret_id(self, test_label, mount_point, raises, requests_mock
"warnings": None,
"wrap_info": None,
}

mock_url = "http://localhost:8200/v1/auth/{mount_point}/role/{role_name}/secret-id".format(
mount_point=mount_point, role_name=role_name
)
requests_mocker.register_uri(
adapter = requests_mocker.register_uri(
method="POST",
url=mock_url,
status_code=expected_status_code,
Expand All @@ -287,28 +304,58 @@ def test_generate_secret_id(self, test_label, mount_point, raises, requests_mock
with self.assertRaises(raises) as cm:
app_role.generate_secret_id(
role_name=role_name,
metadata="metadata string",
metadata=metadata,
mount_point=mount_point,
wrap_ttl=wrap_ttl,
)
self.assertIn(
member="unsupported metadata argument", container=str(cm.exception)
)
assert adapter.call_count == 0

else:
response = app_role.generate_secret_id(
role_name=role_name, cidr_list=["127.0.0.1/32"], mount_point=mount_point
role_name=role_name,
cidr_list=["127.0.0.1/32"],
mount_point=mount_point,
metadata=metadata,
wrap_ttl=wrap_ttl,
)

self.assertEqual(first=mock_response, second=response)
assert adapter.call_count == 1
last_request = adapter.last_request
assert ("metadata" in last_request.json()) == (metadata is not None)

if wrap_ttl is None:
assert "X-Vault-Wrap-TTL" not in last_request.headers
else:
assert "X-Vault-Wrap-TTL" in last_request.headers
assert last_request.headers["X-Vault-Wrap-TTL"] == str(wrap_ttl)

@parameterized.expand(
[
("default mount point", DEFAULT_MOUNT_POINT, None),
("custom mount point", "approle-test", exceptions.ParamValidationError),
("default mount point", DEFAULT_MOUNT_POINT, None, None, None),
(
"metadata as dict",
DEFAULT_MOUNT_POINT,
None,
{"a": "val1", "b": "two"},
300,
),
(
"invalid metadata",
DEFAULT_MOUNT_POINT,
exceptions.ParamValidationError,
"bad metadata",
None,
),
("custom mount point", "approle-test", None, None, "5m"),
]
)
@requests_mock.Mocker()
def test_create_custom_secret_id(
self, test_label, mount_point, raises, requests_mocker
self, test_label, mount_point, raises, metadata, wrap_ttl, requests_mocker
):
expected_status_code = 200
role_name = "testrole"
Expand All @@ -330,7 +377,7 @@ def test_create_custom_secret_id(
mock_url = "http://localhost:8200/v1/auth/{mount_point}/role/{role_name}/custom-secret-id".format(
mount_point=mount_point, role_name=role_name
)
requests_mocker.register_uri(
adapter = requests_mocker.register_uri(
method="POST",
url=mock_url,
status_code=expected_status_code,
Expand All @@ -345,21 +392,34 @@ def test_create_custom_secret_id(
role_name=role_name,
secret_id=secret_id,
cidr_list=["127.0.0.1/32"],
metadata="metadata string",
metadata=metadata,
mount_point=mount_point,
wrap_ttl=wrap_ttl,
)
self.assertIn(
member="unsupported metadata argument", container=str(cm.exception)
)
assert adapter.call_count == 0
else:
response = app_role.create_custom_secret_id(
role_name=role_name,
secret_id=secret_id,
cidr_list=["127.0.0.1/32"],
mount_point=mount_point,
metadata=metadata,
wrap_ttl=wrap_ttl,
)

self.assertEqual(first=mock_response, second=response)
assert adapter.call_count == 1
last_request = adapter.last_request
assert ("metadata" in last_request.json()) == (metadata is not None)

if wrap_ttl is None:
assert "X-Vault-Wrap-TTL" not in last_request.headers
else:
assert "X-Vault-Wrap-TTL" in last_request.headers
assert last_request.headers["X-Vault-Wrap-TTL"] == str(wrap_ttl)

@parameterized.expand(
[
Expand Down

0 comments on commit 92f68c9

Please sign in to comment.