Skip to content

Commit

Permalink
Handle "Misses" for Identity Secrets Lookups (#331)
Browse files Browse the repository at this point in the history
* Regression tests for group lookup misses

* Return None for group lookup misses

* Regression tests for entity lookup misses

* Return None for entity lookup misses

* Also update docstrings
  • Loading branch information
jeffwecan committed Nov 1, 2018
1 parent a609d8f commit 5b42620
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 68 deletions.
24 changes: 18 additions & 6 deletions hvac/api/secrets_engines/identity.py
@@ -1,12 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Identity secret engine module."""
import logging

from hvac import exceptions
from hvac.api.vault_api_base import VaultApiBase
from hvac.constants.identity import ALLOWED_GROUP_TYPES

DEFAULT_MOUNT_POINT = 'identity'

logger = logging.getLogger(__name__)


class Identity(VaultApiBase):
"""Identity Secrets Engine (API).
Expand Down Expand Up @@ -970,8 +974,8 @@ def lookup_entity(self, name=None, entity_id=None, alias_id=None, alias_name=Non
:type alias_mount_accessor: str | unicode
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: dict
:return: The JSON response of the request if a entity / entity alias is found in the lookup, None otherwise.
:rtype: dict | None
"""
params = {}
if name is not None:
Expand All @@ -988,7 +992,11 @@ def lookup_entity(self, name=None, entity_id=None, alias_id=None, alias_name=Non
url=api_path,
json=params,
)
return response.json()
if response.status_code == 204:
logger.debug('Identity.lookup_entity: no entities found with params: {params}'.format(params=params))
return None
else:
return response.json()

def lookup_group(self, name=None, group_id=None, alias_id=None, alias_name=None, alias_mount_accessor=None, mount_point=DEFAULT_MOUNT_POINT):
"""Query a group based on the given criteria.
Expand All @@ -1010,8 +1018,8 @@ def lookup_group(self, name=None, group_id=None, alias_id=None, alias_name=None,
:type alias_mount_accessor: str | unicode
:param mount_point: The "path" the method/backend was mounted on.
:type mount_point: str | unicode
:return: The JSON response of the request.
:rtype: dict
:return: The JSON response of the request if a group / group alias is found in the lookup, None otherwise.
:rtype: dict | None
"""
params = {}
if name is not None:
Expand All @@ -1028,4 +1036,8 @@ def lookup_group(self, name=None, group_id=None, alias_id=None, alias_name=None,
url=api_path,
json=params,
)
return response.json()
if response.status_code == 204:
logger.debug('Identity.lookup_group: no groups found with params: {params}'.format(params=params))
return None
else:
return response.json()
148 changes: 86 additions & 62 deletions hvac/tests/integration_tests/api/secrets_engines/test_identity.py
Expand Up @@ -1294,27 +1294,36 @@ def test_list_group_aliases(self, label, method='LIST', raises=None, exception_m
'lookup entity alias',
criteria=['alias_id'],
),
param(
'lookup missing entity',
criteria=['entity_id'],
create_first=False,
),
])
def test_lookup_entity(self, label, criteria, raises=None, exception_message=''):
def test_lookup_entity(self, label, criteria, create_first=True, raises=None, exception_message=''):
lookup_params = {}
create_entity_response = self.client.secrets.identity.create_or_update_entity(
name=self.TEST_ENTITY_NAME,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_entity_response: %s' % create_entity_response)
entity_id = create_entity_response['data']['id']
create_alias_response = self.client.secrets.identity.create_or_update_entity_alias(
name=self.TEST_ALIAS_NAME,
canonical_id=entity_id,
mount_accessor=self.test_approle_accessor,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_alias_response: %s' % create_alias_response)
alias_id = create_alias_response['data']['id']
if 'entity_id' in criteria:
lookup_params['entity_id'] = entity_id
elif 'alias_id' in criteria:
lookup_params['alias_id'] = alias_id
if create_first:
create_entity_response = self.client.secrets.identity.create_or_update_entity(
name=self.TEST_ENTITY_NAME,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_entity_response: %s' % create_entity_response)
entity_id = create_entity_response['data']['id']
create_alias_response = self.client.secrets.identity.create_or_update_entity_alias(
name=self.TEST_ALIAS_NAME,
canonical_id=entity_id,
mount_accessor=self.test_approle_accessor,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_alias_response: %s' % create_alias_response)
alias_id = create_alias_response['data']['id']
if 'entity_id' in criteria:
lookup_params['entity_id'] = entity_id
elif 'alias_id' in criteria:
lookup_params['alias_id'] = alias_id
else:
for key in criteria:
lookup_params[key] = key
logging.debug('lookup_params: %s' % lookup_params)
if raises:
with self.assertRaises(raises) as cm:
Expand All @@ -1332,16 +1341,19 @@ def test_lookup_entity(self, label, criteria, raises=None, exception_message='')
**lookup_params
)
logging.debug('lookup_entity_response: %s' % lookup_entity_response)
if 'entity_id' in criteria:
self.assertEqual(
first=lookup_entity_response['data']['name'],
second=self.TEST_ENTITY_NAME,
)
elif 'alias_id' in criteria:
self.assertEqual(
first=lookup_entity_response['data']['aliases'][0]['name'],
second=self.TEST_ALIAS_NAME,
)
if create_first:
if 'entity_id' in criteria:
self.assertEqual(
first=lookup_entity_response['data']['name'],
second=self.TEST_ENTITY_NAME,
)
elif 'alias_id' in criteria:
self.assertEqual(
first=lookup_entity_response['data']['aliases'][0]['name'],
second=self.TEST_ALIAS_NAME,
)
else:
self.assertIsNone(obj=lookup_entity_response)

@parameterized.expand([
param(
Expand All @@ -1360,33 +1372,42 @@ def test_lookup_entity(self, label, criteria, raises=None, exception_message='')
'lookup alias',
criteria=['alias_name', 'alias_mount_accessor'],
),
param(
'lookup missing group',
criteria=['group_id'],
create_first=False,
),
])
def test_lookup_group(self, label, criteria, raises=None, exception_message=''):
def test_lookup_group(self, label, criteria, create_first=True, raises=None, exception_message=''):
lookup_params = {}
create_group_response = self.client.secrets.identity.create_or_update_group(
name=self.TEST_GROUP_NAME,
group_type='external',
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_group_response: %s' % create_group_response)
group_id = create_group_response['data']['id']
create_alias_response = self.client.secrets.identity.create_or_update_group_alias(
name=self.TEST_GROUP_ALIAS_NAME,
canonical_id=group_id,
mount_accessor=self.test_approle_accessor,
if create_first:
create_group_response = self.client.secrets.identity.create_or_update_group(
name=self.TEST_GROUP_NAME,
group_type='external',
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_alias_response: %s' % create_alias_response)
alias_id = create_alias_response['data']['id']
if 'group_id' in criteria:
lookup_params['group_id'] = group_id
elif 'alias_id' in criteria:
lookup_params['alias_id'] = alias_id
elif 'name' in criteria:
lookup_params['name'] = self.TEST_GROUP_NAME
elif 'alias_name' in criteria and 'alias_mount_accessor' in criteria:
lookup_params['alias_name'] = self.TEST_GROUP_ALIAS_NAME
lookup_params['alias_mount_accessor'] = self.test_approle_accessor
logging.debug('create_group_response: %s' % create_group_response)
group_id = create_group_response['data']['id']
create_alias_response = self.client.secrets.identity.create_or_update_group_alias(
name=self.TEST_GROUP_ALIAS_NAME,
canonical_id=group_id,
mount_accessor=self.test_approle_accessor,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_alias_response: %s' % create_alias_response)
alias_id = create_alias_response['data']['id']
if 'group_id' in criteria:
lookup_params['group_id'] = group_id
elif 'alias_id' in criteria:
lookup_params['alias_id'] = alias_id
elif 'name' in criteria:
lookup_params['name'] = self.TEST_GROUP_NAME
elif 'alias_name' in criteria and 'alias_mount_accessor' in criteria:
lookup_params['alias_name'] = self.TEST_GROUP_ALIAS_NAME
lookup_params['alias_mount_accessor'] = self.test_approle_accessor
else:
for key in criteria:
lookup_params[key] = key
logging.debug('lookup_params: %s' % lookup_params)
if raises:
with self.assertRaises(raises) as cm:
Expand All @@ -1404,13 +1425,16 @@ def test_lookup_group(self, label, criteria, raises=None, exception_message=''):
**lookup_params
)
logging.debug('lookup_group_response: %s' % lookup_group_response)
if 'group_id' in criteria or 'name' in criteria:
self.assertEqual(
first=lookup_group_response['data']['name'],
second=self.TEST_GROUP_NAME,
)
elif 'alias_id' in criteria or ('alias_name' in criteria and 'alias_mount_accessor' in criteria):
self.assertEqual(
first=lookup_group_response['data']['alias']['name'],
second=self.TEST_GROUP_ALIAS_NAME,
)
if create_first:
if 'group_id' in criteria or 'name' in criteria:
self.assertEqual(
first=lookup_group_response['data']['name'],
second=self.TEST_GROUP_NAME,
)
elif 'alias_id' in criteria or ('alias_name' in criteria and 'alias_mount_accessor' in criteria):
self.assertEqual(
first=lookup_group_response['data']['alias']['name'],
second=self.TEST_GROUP_ALIAS_NAME,
)
else:
self.assertIsNone(obj=lookup_group_response)

0 comments on commit 5b42620

Please sign in to comment.