diff --git a/.gitignore b/.gitignore index a153d65da..96c60ffda 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,7 @@ test/*.log # sphinx build folder docs/_build/ + +.idea/ +venv/ +.envrc diff --git a/CHANGELOG.md b/CHANGELOG.md index 4001bc021..0652eb324 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 0.8.0 (March 26th, 2019) + +IMPROVMENTS: + +* Support for the Kubernetes auth method + +BUG FIXES: + +* Fix for comparision `recovery_threshold` and `recovery_shares` during initialization. + ## 0.7.2 (January 1st, 2019) IMPROVEMENTS: diff --git a/hvac/api/auth_methods/__init__.py b/hvac/api/auth_methods/__init__.py index e4b5b7832..ef809fc85 100644 --- a/hvac/api/auth_methods/__init__.py +++ b/hvac/api/auth_methods/__init__.py @@ -5,6 +5,7 @@ from hvac.api.auth_methods.azure import Azure from hvac.api.auth_methods.gcp import Gcp from hvac.api.auth_methods.github import Github +from hvac.api.auth_methods.kubernetes import Kubernetes from hvac.api.auth_methods.ldap import Ldap from hvac.api.auth_methods.mfa import Mfa from hvac.api.auth_methods.okta import Okta @@ -16,6 +17,7 @@ 'Azure', 'Gcp', 'Github', + 'Kubernetes', 'Ldap', 'Mfa', 'Okta' @@ -28,6 +30,7 @@ class AuthMethods(VaultApiCategory): Azure, Github, Gcp, + Kubernetes, Ldap, Mfa, Okta @@ -38,7 +41,6 @@ class AuthMethods(VaultApiCategory): 'AliCloud', 'Aws', 'Jwt', - 'Kubernetes', 'Radius', 'Cert', 'Token', diff --git a/hvac/api/auth_methods/kubernetes.py b/hvac/api/auth_methods/kubernetes.py new file mode 100644 index 000000000..0532099e7 --- /dev/null +++ b/hvac/api/auth_methods/kubernetes.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Kubernetes methods module.""" +from hvac import exceptions +from hvac.api.vault_api_base import VaultApiBase +from hvac.utils import validate_list_of_strings_param, comma_delimited_to_list, validate_pem_format + +DEFAULT_MOUNT_POINT = 'kubernetes' + + +class Kubernetes(VaultApiBase): + """Kubernetes Auth Method (API). + + Reference: https://www.vaultproject.io/api/auth/kubernetes/index.html + """ + def configure(self, kubernetes_host, kubernetes_ca_cert='', token_reviewer_jwt='', pem_keys=None, + mount_point=DEFAULT_MOUNT_POINT): + """Configure the connection parameters for Kubernetes. + + This path honors the distinction between the create and update capabilities inside ACL policies. + + Supported methods: + POST: /auth/{mount_point}/config. Produces: 204 (empty body) + + :param kubernetes_host: Host must be a host string, a host:port pair, or a URL to the base of the + Kubernetes API server. Example: https://k8s.example.com:443 + :type kubernetes_host: str | unicode + :param kubernetes_ca_cert: PEM encoded CA cert for use by the TLS client used to talk with the Kubernetes API. + NOTE: Every line must end with a newline: \n + :type kubernetes_ca_cert: str | unicode + :param token_reviewer_jwt: A service account JWT used to access the TokenReview API to validate other + JWTs during login. If not set the JWT used for login will be used to access the API. + :type token_reviewer_jwt: str | unicode + :param pem_keys: Optional list of PEM-formatted public keys or certificates used to verify the signatures of + Kubernetes service account JWTs. If a certificate is given, its public key will be extracted. Not every + installation of Kubernetes exposes these keys. + :type pem_keys: list + :param mount_point: The "path" the method/backend was mounted on. + :type mount_point: str | unicode + :return: The response of the configure_method request. + :rtype: requests.Response + """ + if pem_keys is None: + pem_keys = [] + + list_of_pem_params = { + 'kubernetes_ca_cert': kubernetes_ca_cert, + 'pem_keys': pem_keys + } + for param_name, param_argument in list_of_pem_params.items(): + validate_pem_format( + param_name=param_name, + param_argument=param_argument, + ) + + params = { + 'kubernetes_host': kubernetes_host, + 'kubernetes_ca_cert': kubernetes_ca_cert, + 'token_reviewer_jwt': token_reviewer_jwt, + 'pem_keys': pem_keys, + } + api_path = '/v1/auth/{mount_point}/config'.format( + mount_point=mount_point + ) + return self._adapter.post( + url=api_path, + json=params, + ) + + def read_config(self, mount_point=DEFAULT_MOUNT_POINT): + """Return the previously configured config, including credentials. + + Supported methods: + GET: /auth/{mount_point}/config. Produces: 200 application/json + + :param mount_point: The "path" the kubernetes auth method was mounted on. + :type mount_point: str | unicode + :return: The data key from the JSON response of the request. + :rtype: dict + """ + api_path = '/v1/auth/{mount_point}/config'.format(mount_point=mount_point) + response = self._adapter.get( + url=api_path, + ) + return response.json().get('data') + + def create_role(self, name, bound_service_account_names, bound_service_account_namespaces, ttl="", max_ttl="", + period="", policies=None, mount_point=DEFAULT_MOUNT_POINT): + """Create a role in the method. + + Registers a role in the auth method. Role types have specific entities that can perform login operations + against this endpoint. Constraints specific to the role type must be set on the role. These are applied to + the authenticated entities attempting to login. + + Supported methods: + POST: /auth/{mount_point}/role/{name}. Produces: 204 (empty body) + + :param name: Name of the role. + :type name: str | unicode + :param bound_service_account_names: List of service account names able to access this role. If set to "*" + all names are allowed, both this and bound_service_account_namespaces can not be "*". + :type bound_service_account_names: list | str | unicode + :param bound_service_account_namespaces: List of namespaces allowed to access this role. If set to "*" all + namespaces are allowed, both this and bound_service_account_names can not be set to "*". + :type bound_service_account_namespaces: list | str | unicode + :param ttl: The TTL period of tokens issued using this role in seconds. + :type ttl: str | unicode + :param max_ttl: The maximum allowed lifetime of tokens issued in seconds using this role. + :type max_ttl: str | unicode + :param period: If set, indicates that the token generated using this role should never expire. The token should + be renewed within the duration specified by this value. At each renewal, the token's TTL will be set to the + value of this parameter. + :type period: str | unicode + :param policies: Policies to be set on tokens issued using this role. + :type policies: list | str | unicode + :param mount_point: The "path" the azure auth method was mounted on. + :type mount_point: str | unicode + :return: The response of the request. + :rtype: requests.Response + """ + list_of_strings_params = { + 'bound_service_account_names': bound_service_account_names, + 'bound_service_account_namespaces': bound_service_account_namespaces, + 'policies': policies + } + for param_name, param_argument in list_of_strings_params.items(): + validate_list_of_strings_param( + param_name=param_name, + param_argument=param_argument, + ) + + if bound_service_account_names in ("*", ["*"]) and bound_service_account_namespaces in ("*", ["*"]): + error_msg = 'unsupported combination of `bind_service_account_names` and ' \ + '`bound_service_account_namespaces` arguments. Both of them can not be set to `*`' + raise exceptions.ParamValidationError(error_msg) + + params = { + 'bound_service_account_names': comma_delimited_to_list(bound_service_account_names), + 'bound_service_account_namespaces': comma_delimited_to_list(bound_service_account_namespaces), + 'ttl': ttl, + 'max_ttl': max_ttl, + 'period': period, + 'policies': comma_delimited_to_list(policies), + } + + api_path = '/v1/auth/{mount_point}/role/{name}'.format(mount_point=mount_point, name=name) + return self._adapter.post( + url=api_path, + json=params, + ) + + def read_role(self, name, mount_point=DEFAULT_MOUNT_POINT): + """Returns the previously registered role configuration. + + Supported methods: + POST: /auth/{mount_point}/role/{name}. Produces: 200 application/json + + :param name: Name of the role. + :type name: str | unicode + :param mount_point: The "path" the kubernetes auth method was mounted on. + :type mount_point: str | unicode + :return: The "data" key from the JSON response of the request. + :rtype: dict + """ + api_path = '/v1/auth/{mount_point}/role/{name}'.format( + mount_point=mount_point, + name=name, + ) + response = self._adapter.get( + url=api_path, + ) + return response.json().get('data') + + def list_roles(self, mount_point=DEFAULT_MOUNT_POINT): + """List all the roles that are registered with the plugin. + + Supported methods: + LIST: /auth/{mount_point}/roles. Produces: 200 application/json + + :param mount_point: The "path" the kubernetes auth method was mounted on. + :type mount_point: str | unicode + :return: The "data" key from the JSON response of the request. + :rtype: dict + """ + api_path = '/v1/auth/{mount_point}/roles'.format(mount_point=mount_point) + response = self._adapter.list( + url=api_path, + ) + return response.json().get('data') + + def delete_role(self, name, mount_point=DEFAULT_MOUNT_POINT): + """Delete the previously registered role. + + Supported methods: + DELETE: /auth/{mount_point}/role/{name}. Produces: 204 (empty body) + + + :param name: Name of the role. + :type name: str | unicode + :param mount_point: The "path" the kubernetes auth method was mounted on. + :type mount_point: str | unicode + :return: The response of the request. + :rtype: requests.Response + """ + api_path = '/v1/auth/{mount_point}/role/{name}'.format( + mount_point=mount_point, + name=name, + ) + return self._adapter.delete( + url=api_path, + ) + + def login(self, role, jwt, use_token=True, mount_point=DEFAULT_MOUNT_POINT): + """Fetch a token. + + This endpoint takes a signed JSON Web Token (JWT) and a role name for some entity. It verifies the JWT signature + to authenticate that entity and then authorizes the entity for the given role. + + Supported methods: + POST: /auth/{mount_point}/login. Produces: 200 application/json + + :param role: Name of the role against which the login is being attempted. + :type role: str | unicode + :param jwt: Signed JSON Web Token (JWT) from Azure MSI. + :type jwt: str | unicode + :param use_token: if True, uses the token in the response received from the auth request to set the "token" + attribute on the the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute. + :type use_token: bool + :param mount_point: The "path" the azure auth method was mounted on. + :type mount_point: str | unicode + :return: The JSON response of the request. + :rtype: dict + """ + params = { + 'role': role, + 'jwt': jwt, + } + + api_path = '/v1/auth/{mount_point}/login'.format(mount_point=mount_point) + response = self._adapter.login( + url=api_path, + use_token=use_token, + json=params, + ) + return response diff --git a/hvac/utils.py b/hvac/utils.py index 8f8079e6c..0a3d6535c 100644 --- a/hvac/utils.py +++ b/hvac/utils.py @@ -196,8 +196,10 @@ def validate_list_of_strings_param(param_name, param_argument): """ if param_argument is None: param_argument = [] + if isinstance(param_argument, str): + param_argument = param_argument.split(',') if not isinstance(param_argument, list) or not all([isinstance(p, str) for p in param_argument]): - error_msg = 'unsupported {param} argument provided "{arg}" ({arg_type}), required type: List[str]"' + error_msg = 'unsupported {param} argument provided "{arg}" ({arg_type}), required type: List[str]' raise exceptions.ParamValidationError(error_msg.format( param=param_name, arg=param_argument, @@ -216,3 +218,45 @@ def list_to_comma_delimited(list_param): if list_param is None: list_param = [] return ','.join(list_param) + + +def comma_delimited_to_list(list_param): + """Convert comma-delimited list / string into a list of strings + + :param list_param: Comma-delimited string + :type list_param: str | unicode + :return: A list of strings + :rtype: list + """ + if isinstance(list_param, list): + return list_param + if isinstance(list_param, str): + return list_param.split(',') + else: + return [] + + +def validate_pem_format(param_name, param_argument): + """Validate that an argument is a PEM-formatted public key or certificate + + :param param_name: The name of the parameter being validate. Used in any resulting exception messages. + :type param_name: str | unicode + :param param_argument: The argument to validate + :type param_argument: str | unicode + :return True if the argument is validate False otherwise + :rtype: bool + """ + + def _check_pem(arg): + arg = arg.strip() + if not arg.startswith('-----BEGIN CERTIFICATE-----') \ + or not arg.endswith('-----END CERTIFICATE-----'): + return False + return True + + if isinstance(param_argument, str): + param_argument = [param_argument] + + if not isinstance(param_argument, list) or not all(_check_pem(p) for p in param_argument): + error_msg = 'unsupported {param} public key / certificate format, required type: PEM' + raise exceptions.ParamValidationError(error_msg.format(param=param_name)) diff --git a/tests/integration_tests/api/auth_methods/test_gcp.py b/tests/integration_tests/api/auth_methods/test_gcp.py index 567d7813b..6b67a98bb 100644 --- a/tests/integration_tests/api/auth_methods/test_gcp.py +++ b/tests/integration_tests/api/auth_methods/test_gcp.py @@ -139,9 +139,7 @@ def test_delete_config(self, label, write_config_first=True, raises=None): param( 'success iam', role_type='iam', - extra_params=dict( - bound_service_accounts=['*'], - ) + bound_service_accounts=['*'] ), param( 'iam no bound service account', @@ -159,19 +157,10 @@ def test_delete_config(self, label, write_config_first=True, raises=None): raises=exceptions.ParamValidationError, exception_message='unsupported role_type argument provided', ), - param( - 'wrong policy arg type', - role_type='iam', - policies='cats', - raises=exceptions.ParamValidationError, - exception_message='unsupported policies argument provided', - ) ]) - def test_create_role(self, label, role_type, policies=None, extra_params=None, raises=None, exception_message=''): + def test_create_role(self, label, role_type, policies=None, bound_service_accounts=None, raises=None, exception_message=''): role_name = 'hvac' project_id = 'test-hvac-project-not-a-real-project' - if extra_params is None: - extra_params = {} if raises: with self.assertRaises(raises) as cm: self.client.auth.gcp.create_role( @@ -179,8 +168,8 @@ def test_create_role(self, label, role_type, policies=None, extra_params=None, r role_type=role_type, project_id=project_id, policies=policies, + bound_service_accounts=bound_service_accounts, mount_point=self.TEST_MOUNT_POINT, - **extra_params ) self.assertIn( member=exception_message, @@ -192,8 +181,8 @@ def test_create_role(self, label, role_type, policies=None, extra_params=None, r role_type=role_type, project_id=project_id, policies=policies, + bound_service_accounts=bound_service_accounts, mount_point=self.TEST_MOUNT_POINT, - **extra_params ) logging.debug('create_role_response: %s' % create_role_response) if utils.vault_version_lt('0.10.0'): diff --git a/tests/integration_tests/api/auth_methods/test_kubernetes.py b/tests/integration_tests/api/auth_methods/test_kubernetes.py new file mode 100644 index 000000000..ca061dfda --- /dev/null +++ b/tests/integration_tests/api/auth_methods/test_kubernetes.py @@ -0,0 +1,297 @@ +import logging +from unittest import TestCase +from unittest import skipIf + +from parameterized import parameterized, param + +from hvac import exceptions +from tests import utils +from tests.utils.hvac_integration_test_case import HvacIntegrationTestCase + + +@skipIf(utils.vault_version_lt('0.8.3'), "Kubernetes auth method not available before Vault version 0.8.3") +class TestKubernetes(HvacIntegrationTestCase, TestCase): + TEST_MOUNT_POINT = 'kubernetes-test' + + def setUp(self): + super(TestKubernetes, self).setUp() + if '%s/' % self.TEST_MOUNT_POINT not in self.client.list_auth_backends(): + self.client.enable_auth_backend( + backend_type='kubernetes', + mount_point=self.TEST_MOUNT_POINT, + ) + + def tearDown(self): + super(TestKubernetes, self).tearDown() + self.client.disable_auth_backend( + mount_point=self.TEST_MOUNT_POINT, + ) + + @parameterized.expand([ + param( + 'success', + + ), + param( + 'set invalid kubernetes_ca_cert', + kubernetes_ca_cert='ca_cert', + raises=exceptions.ParamValidationError, + exception_message='required type: PEM' + ), + param( + 'set invalid pem_key', + kubernetes_ca_cert='-----BEGIN CERTIFICATE-----\\n.....\\n-----END CERTIFICATE-----', + pem_keys=['pem_key'], + raises=exceptions.ParamValidationError, + exception_message='required type: PEM' + ), + param( + 'set invalid token_reviewer_jwt', + kubernetes_ca_cert='-----BEGIN CERTIFICATE-----\\n.....\\n-----END CERTIFICATE-----', + token_reviewer_jwt='reviewer_jwt', + raises=exceptions.InternalServerError, + exception_message='* not a compact JWS' + ) + ]) + def test_configure(self, label, kubernetes_ca_cert=None, token_reviewer_jwt=None, pem_keys=None, + raises=None, exception_message=''): + kubernetes_host = 'https://192.168.99.100:8443' + if raises: + with self.assertRaises(raises) as cm: + self.client.auth.kubernetes.configure( + kubernetes_host=kubernetes_host, + kubernetes_ca_cert=kubernetes_ca_cert, + token_reviewer_jwt=token_reviewer_jwt, + pem_keys=pem_keys, + mount_point=self.TEST_MOUNT_POINT + ) + self.assertIn( + member=exception_message, + container=str(cm.exception) + ) + else: + configure_response = self.client.auth.kubernetes.configure( + kubernetes_host=kubernetes_host, + kubernetes_ca_cert='-----BEGIN CERTIFICATE-----\\n.....\\n-----END CERTIFICATE-----', + mount_point=self.TEST_MOUNT_POINT + ) + logging.debug('configure_response: %s' % configure_response) + self.assertEqual( + first=configure_response.status_code, + second=204, + ) + + @parameterized.expand([ + param( + 'success', + ), + param( + 'no config written yet', + write_config_first=False, + raises=exceptions.InvalidPath + ) + ]) + def test_read_config(self, label, write_config_first=True, raises=None): + expected_config = { + 'kubernetes_host': 'https://192.168.99.100:8443', + 'kubernetes_ca_cert': '-----BEGIN CERTIFICATE-----\\n.....\\n-----END CERTIFICATE-----', + } + if write_config_first: + self.client.auth.kubernetes.configure( + mount_point=self.TEST_MOUNT_POINT, + **expected_config + ) + if raises is not None: + with self.assertRaises(raises): + self.client.auth.kubernetes.read_config( + mount_point=self.TEST_MOUNT_POINT, + ) + else: + read_config_response = self.client.auth.kubernetes.read_config( + mount_point=self.TEST_MOUNT_POINT, + ) + logging.debug('read_config_response: %s' % read_config_response) + for k, v in expected_config.items(): + self.assertEqual( + first=v, + second=read_config_response[k], + ) + + @parameterized.expand([ + param( + 'success', + bound_service_account_names=['vault-auth'], + bound_service_account_namespaces=['default'], + ), + param( + 'both bounds wildcard', + bound_service_account_names=['*'], + bound_service_account_namespaces=['*'], + raises=exceptions.ParamValidationError, + ), + ]) + def test_create_role(self, label, bound_service_account_names=None, bound_service_account_namespaces=None, + raises=None, exception_message=''): + role_name = 'test-role' + if raises: + with self.assertRaises(raises) as cm: + self.client.auth.kubernetes.create_role( + name=role_name, + bound_service_account_names=bound_service_account_names, + bound_service_account_namespaces=bound_service_account_namespaces, + mount_point=self.TEST_MOUNT_POINT, + ) + self.assertIn( + member=exception_message, + container=str(cm.exception), + ) + else: + create_role_response = self.client.auth.kubernetes.create_role( + name=role_name, + bound_service_account_names=bound_service_account_names, + bound_service_account_namespaces=bound_service_account_namespaces, + mount_point=self.TEST_MOUNT_POINT, + ) + logging.debug('create_role_response: %s' % create_role_response) + self.assertEqual( + first=create_role_response.status_code, + second=204, + ) + + @parameterized.expand([ + param( + 'success', + ), + param( + 'nonexistent role name', + create_role_first=False, + raises=exceptions.InvalidPath, + ), + ]) + def test_read_role(self, label, create_role_first=True, raises=None, exception_message=''): + role_name = 'test-role' + expected_role_config = { + 'name': role_name, + 'bound_service_account_names': ['vault-auth'], + 'bound_service_account_namespaces': ['default'], + } + role_name = 'test-role' + + if create_role_first: + self.client.auth.kubernetes.create_role( + mount_point=self.TEST_MOUNT_POINT, + **expected_role_config + ) + if raises: + with self.assertRaises(raises) as cm: + self.client.auth.kubernetes.read_role( + name=role_name, + mount_point=self.TEST_MOUNT_POINT, + ) + self.assertIn( + member=exception_message, + container=str(cm.exception), + ) + else: + read_role_response = self.client.auth.kubernetes.read_role( + name=role_name, + mount_point=self.TEST_MOUNT_POINT, + ) + logging.debug('read_role_response: %s' % read_role_response) + self.assertEqual( + first=read_role_response['bound_service_account_names'], + second=expected_role_config['bound_service_account_names'] + ) + self.assertEqual( + first=read_role_response['bound_service_account_namespaces'], + second=expected_role_config['bound_service_account_namespaces'] + ) + + @parameterized.expand([ + # param( + # 'success', # TODO: figure out why this is returning a "InvalidPath" exception ("unsupported path") + # ), + param( + 'no roles', + num_roles_to_create=0, + raises=exceptions.InvalidPath, + ), + param( + 'no config', + write_config_first=False, + raises=exceptions.InvalidPath, + ), + ]) + def test_list_roles(self, label, num_roles_to_create=1, write_config_first=True, raises=None): + if write_config_first: + self.client.auth.kubernetes.configure( + kubernetes_host='https://192.168.99.100:8443', + kubernetes_ca_cert='-----BEGIN CERTIFICATE-----\n.....\n-----END CERTIFICATE-----', + mount_point=self.TEST_MOUNT_POINT, + ) + roles_to_create = ['hvac{}'.format(str(n)) for n in range(0, num_roles_to_create)] + bound_service_account_names = ['vault-auth'] + bound_service_account_namespaces = ['default'] + logging.debug('roles_to_create: %s' % roles_to_create) + for role_to_create in roles_to_create: + create_role_response = self.client.auth.kubernetes.create_role( + name=role_to_create, + bound_service_account_names=bound_service_account_names, + bound_service_account_namespaces=bound_service_account_namespaces, + mount_point=self.TEST_MOUNT_POINT, + ) + logging.debug('create_role_response: %s' % create_role_response) + + if raises: + with self.assertRaises(raises): + self.client.auth.kubernetes.list_roles( + mount_point=self.TEST_MOUNT_POINT, + ) + else: + list_roles_response = self.client.auth.kubernetes.list_roles( + mount_point=self.TEST_MOUNT_POINT, + ) + logging.debug('list_roles_response: %s' % list_roles_response) + self.assertEqual( + first=list_roles_response['keys'], + second=roles_to_create, + ) + + @parameterized.expand([ + param( + 'success', + ), + param( + 'nonexistent role name', + configure_role_first=False, + ), + ]) + def test_delete_role(self, label, configure_role_first=True, raises=None): + role_name = 'test-role' + bound_service_account_names = ['vault-auth'] + bound_service_account_namespaces = ['default'] + if configure_role_first: + create_role_response = self.client.auth.kubernetes.create_role( + name=role_name, + bound_service_account_names=bound_service_account_names, + bound_service_account_namespaces=bound_service_account_namespaces, + mount_point=self.TEST_MOUNT_POINT, + ) + logging.debug('create_role_response: %s' % create_role_response) + + if raises is not None: + with self.assertRaises(raises): + self.client.auth.kubernetes.delete_role( + name=role_name, + mount_point=self.TEST_MOUNT_POINT, + ) + else: + delete_role_response = self.client.auth.kubernetes.delete_role( + name=role_name, + mount_point=self.TEST_MOUNT_POINT, + ) + logging.debug('delete_role_response: %s' % delete_role_response) + self.assertEqual( + first=delete_role_response.status_code, + second=204, + ) diff --git a/tests/unit_tests/api/auth_methods/test_kubernetes.py b/tests/unit_tests/api/auth_methods/test_kubernetes.py new file mode 100644 index 000000000..47bd74f2f --- /dev/null +++ b/tests/unit_tests/api/auth_methods/test_kubernetes.py @@ -0,0 +1,74 @@ +import logging +from unittest import TestCase +from unittest import skipIf + +import requests_mock +from parameterized import parameterized + +from hvac.adapters import Request +from hvac.api.auth_methods import Kubernetes +from tests import utils + + +@skipIf(utils.vault_version_lt('0.8.3'), "Kubernetes auth method not available before Vault version 0.8.3") +class TestKubernetes(TestCase): + TEST_MOUNT_POINT = 'kubernetes-test' + + @parameterized.expand([ + ('success', dict(), None,), + ]) + @requests_mock.Mocker() + def test_login(self, label, test_params, raises, requests_mocker): + role_name = 'hvac' + test_policies = [ + "default", + "dev", + "prod", + ] + expected_status_code = 200 + mock_url = 'http://localhost:8200/v1/auth/{mount_point}/login'.format( + mount_point=self.TEST_MOUNT_POINT, + ) + mock_response = { + "auth": { + "client_token": "38fe9691-e623-7238-f618-c94d4e7bc674", + "accessor": "78e87a38-84ed-2692-538f-ca8b9f400ab3", + "policies": test_policies, + "metadata": { + "role": role_name, + "service_account_name": "vault-auth", + "service_account_namespace": "default", + "service_account_secret_name": "vault-auth-token-pd21c", + "service_account_uid": "aa9aa8ff-98d0-11e7-9bb7-0800276d99bf" + }, + "lease_duration": 2764800, + "renewable": True, + }, + } + requests_mocker.register_uri( + method='POST', + url=mock_url, + status_code=expected_status_code, + json=mock_response, + ) + kubernetes = Kubernetes(adapter=Request()) + if raises is not None: + with self.assertRaises(raises): + kubernetes.login( + role=role_name, + jwt='my-jwt', + mount_point=self.TEST_MOUNT_POINT, + **test_params + ) + else: + login_response = kubernetes.login( + role=role_name, + jwt='my-jwt', + mount_point=self.TEST_MOUNT_POINT, + **test_params + ) + logging.debug('login_response: %s' % login_response) + self.assertEqual( + first=login_response['auth']['policies'], + second=test_policies, + ) diff --git a/tox.ini b/tox.ini index f0a718e7d..c9307855c 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ skip_missing_interpreters = true [flake8] max-line-length = 160 -exclude: .git,.venv,.tox +exclude: .git,.venv,.tox,.idea,venv [testenv] commands = nosetests -s --with-coverage --cover-package=hvac --cover-html {posargs}