diff --git a/zaza/openstack/charm_tests/policyd/tests.py b/zaza/openstack/charm_tests/policyd/tests.py index 8599d0289..7030a9afb 100644 --- a/zaza/openstack/charm_tests/policyd/tests.py +++ b/zaza/openstack/charm_tests/policyd/tests.py @@ -25,22 +25,6 @@ verify that the charm has implemented policy overrides and ensured that the service actually picks them up). -In order to use the generic tests, just include them in the specific test -class. The KeystonePolicydTest as an example does: - - class KeystonePolicydTest(PolicydTest, - ch_keystone.BaseKeystoneTest, - test_utils.OpenStackBaseTest): - - @classmethod - def setUpClass(cls, application_name=None): - super(KeystonePolicydTest, cls).setUpClass(application_name) - -Note that the generic test class (PolicyDTest) comes first, and then the -ch_keystone.BaseKeystoneTest, followed by the test_utils.OpenStackBaseTest. -This is to get the order of super().setUpClass(...) calls to work with -application_name. - If a charm doesn't require a specific test, then the GenericPolicydTest class can be used that just includes the two generic tests. The config in the tests.yaml would stil be required. See the PolicydTest class docstring for @@ -54,6 +38,7 @@ def setUpClass(cls, application_name=None): import zipfile import keystoneauth1 +import neutronclient.exceptions import zaza.model as zaza_model @@ -127,47 +112,6 @@ def _set_policy_with(self, rules): zaza_model.block_until_wl_status_info_starts_with( self.application_name, "PO:", negate_match=False) - def test_002_policyd_bad_yaml(self): - """Test bad yaml file in the zip file is handled.""" - bad = { - "file2.yaml": "{'rule': '!}" - } - bad_zip_path = self._make_zip_file_from('bad.zip', bad) - logging.info("Attaching bad zip file as a resource") - zaza_model.attach_resource(self.application_name, - 'policyd-override', - bad_zip_path) - zaza_model.block_until_all_units_idle() - logging.debug("Now setting config to true") - self._set_config(True) - # ensure that the workload status info line starts with PO (broken): - # to show that it didn't work - logging.info( - "Checking for workload status line starts with PO (broken):") - zaza_model.block_until_wl_status_info_starts_with( - self.application_name, "PO (broken):") - logging.debug("App status is valid for broken yaml file") - zaza_model.block_until_all_units_idle() - # now verify that no file got landed on the machine - path = os.path.join( - "/etc", self._service_name, "policy.d", 'file2.yaml') - logging.info("Now checking that file {} is not present.".format(path)) - zaza_model.block_until_file_missing(self.application_name, path) - self._set_config(False) - zaza_model.block_until_all_units_idle() - logging.info("OK") - - -class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest): - """Generic policyd test for any charm without a specific test. - - It includes a single additional test to test_002_policyd_bad_yaml() in the - base class PolicydTest which checks that a good yaml file is placed into - the correct directory. This additional test (test_001_policyd_good_yaml) - is not needed in the specialised tests as the feature is implicitly tested - when the specialised test fails with the rule overrided. - """ - def test_001_policyd_good_yaml(self): """Test that the policyd with a good zipped yaml file.""" good = { @@ -215,6 +159,42 @@ def test_001_policyd_good_yaml(self): logging.info("OK") + def test_002_policyd_bad_yaml(self): + """Test bad yaml file in the zip file is handled.""" + bad = { + "file2.yaml": "{'rule': '!}" + } + bad_zip_path = self._make_zip_file_from('bad.zip', bad) + logging.info("Attaching bad zip file as a resource") + zaza_model.attach_resource(self.application_name, + 'policyd-override', + bad_zip_path) + zaza_model.block_until_all_units_idle() + logging.debug("Now setting config to true") + self._set_config(True) + # ensure that the workload status info line starts with PO (broken): + # to show that it didn't work + logging.info( + "Checking for workload status line starts with PO (broken):") + zaza_model.block_until_wl_status_info_starts_with( + self.application_name, "PO (broken):") + logging.debug("App status is valid for broken yaml file") + zaza_model.block_until_all_units_idle() + # now verify that no file got landed on the machine + path = os.path.join( + "/etc", self._service_name, "policy.d", 'file2.yaml') + logging.info("Now checking that file {} is not present.".format(path)) + zaza_model.block_until_file_missing(self.application_name, path) + self._set_config(False) + zaza_model.block_until_all_units_idle() + logging.info("OK") + + +class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest): + """Generic policyd test for any charm without a specific test.""" + + pass + class PolicydOperationFailedException(Exception): """This is raised by the get_client_and_attempt_operation() method. @@ -248,14 +228,22 @@ class BasePolicydSpecialization(PolicydTest, The test will fail if the first call fails for any reason, or if the 2nd call doesn't raise PolicydOperationFailedException or raises any other exception. - """ + To use this class, follow the keystone example: + + class KeystonePolicydTest(BasePolicydSpecialization): + + _rule = "{'identity:list_services': '!'}" + + def get_client_and_attempt_operation(self, keystone_session): + ... etc. + """ # this needs to be defined as the rule that gets placed into a yaml policy # override. It is a string of the form: 'some-rule: "!"' # i.e. disable some policy and then try and test it. _rule = None - # the name to log at the beginning of the test + # Optional: the name to log at the beginning of the test _test_name = None @classmethod @@ -275,7 +263,7 @@ def get_client_and_attempt_operation(self, keystone_session): :param keystone_session: the keystone session to use to obtain the client necessary for the test. - :type keystone_session: ??? + :type keystone_session: keystoneauth1.session.Session :raises: PolicydOperationFailedException if operation fails. """ raise NotImplementedError("This method must be overridden") @@ -286,7 +274,7 @@ def _get_keystone_session(self, ip): :param ip: the IP address to get the session against. :type ip: str :returns: a keystone session to the IP address - :rtype: ??? + :rtype: keystoneauth1.session.Session """ logging.info('keystone IP {}'.format(ip)) openrc = { @@ -374,14 +362,14 @@ class KeystonePolicydTest(BasePolicydSpecialization): _rule = "{'identity:list_services': '!'}" def get_client_and_attempt_operation(self, keystone_session): - """Override this method to perform the operation. + """Attempt to list services. If it fails, raise an exception. This operation should pass normally for the demo_user, and fail when the rule has been overriden (see the `rule` class variable. :param keystone_session: the keystone session to use to obtain the client necessary for the test. - :type keystone_session: ??? + :type keystone_session: keystoneauth1.session.Session :raises: PolicydOperationFailedException if operation fails. """ keystone_client = openstack_utils.get_keystone_session_client( @@ -392,117 +380,25 @@ def get_client_and_attempt_operation(self, keystone_session): raise PolicydOperationFailedException() -class OldKeystonePolicydTest(PolicydTest, - ch_keystone.BaseKeystoneTest, - test_utils.OpenStackBaseTest): - """Specific test for policyd for keystone charm.""" +class NeutronApiTest(BasePolicydSpecialization): + """Test the policyd override using the keystone client.""" - @classmethod - def setUpClass(cls, application_name=None): - """Run class setup for running KeystonePolicydTest tests.""" - super(KeystonePolicydTest, cls).setUpClass(application_name) - - def test_disable_service(self): - """Test that service can be disabled.""" - logging.info("Doing policyd override to disable listing domains") - self._set_policy_with( - {'rule.yaml': "{'identity:list_services': '!'}"}) - - # verify that the policy.d override does disable the endpoint - with self.config_change( - {'preferred-api-version': self.default_api_version, - 'use-policyd-override': 'False'}, - {'preferred-api-version': '3', - 'use-policyd-override': 'True'}, - application_name="keystone"): - zaza_model.block_until_all_units_idle() - for ip in self.keystone_ips: - try: - logging.info('keystone IP {}'.format(ip)) - openrc = { - 'API_VERSION': 3, - 'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER, - 'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD, - 'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip), - 'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, - 'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, - } - if self.tls_rid: - openrc['OS_CACERT'] = \ - openstack_utils.KEYSTONE_LOCAL_CACERT - openrc['OS_AUTH_URL'] = ( - openrc['OS_AUTH_URL'].replace('http', 'https')) - logging.info('keystone IP {}'.format(ip)) - keystone_session = openstack_utils.get_keystone_session( - openrc, scope='DOMAIN') - keystone_client = ( - openstack_utils.get_keystone_session_client( - keystone_session)) - keystone_client.services.list() - raise zaza_exceptions.PolicydError( - 'Retrieve service list as admin with project scoped ' - 'token passed and should have failed. IP = {}' - .format(ip)) - except keystoneauth1.exceptions.http.Forbidden: - logging.info("keystone IP:{} policyd override disabled " - "services listing by demo user" - .format(ip)) - - # now verify (with the config off) that we can actually access - # these points - with self.config_change( - {'preferred-api-version': self.default_api_version}, - {'preferred-api-version': '3'}, - application_name="keystone"): - zaza_model.block_until_all_units_idle() - for ip in self.keystone_ips: - try: - logging.info('keystone IP {}'.format(ip)) - openrc = { - 'API_VERSION': 3, - 'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER, - 'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD, - 'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip), - 'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, - 'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN, - } - if self.tls_rid: - openrc['OS_CACERT'] = \ - openstack_utils.KEYSTONE_LOCAL_CACERT - openrc['OS_AUTH_URL'] = ( - openrc['OS_AUTH_URL'].replace('http', 'https')) - logging.info('keystone IP {}'.format(ip)) - keystone_session = openstack_utils.get_keystone_session( - openrc, scope='DOMAIN') - keystone_client = ( - openstack_utils.get_keystone_session_client( - keystone_session)) - keystone_client.services.list() - logging.info("keystone IP:{} without policyd override " - "services list working" - .format(ip)) - except keystoneauth1.exceptions.http.Forbidden: - raise zaza_exceptions.PolicydError( - 'Retrieve services list as demo user with project ' - 'scoped token passed and should have passed. IP = {}' - .format(ip)) - - logging.info('OK') - - -# class NeutronAPITest(PolicydTest, - # test_utils.OpenStackBaseTest): - # """Specific test for policyd for neutron-api charm.""" - - # def test_disable_service(self): - # """Test that service can be disabled.""" - # logging.info("Doing policyd override to disable listing domains") - # self._set_policy_with( - # {'rule.yaml': "{'identity:list_services': '!'}"}) - # # Get authenticated clients - # keystone_client = openstack_utils.get_keystone_session_client( - # keystone_session) - # neutron_client = openstack_utils.get_neutron_session_client( - # keystone_session) + _rule = "{'get_network': '!'}" + def get_client_and_attempt_operation(self, keystone_session): + """Attempt to ???? + + This operation should pass normally for the demo_user, and fail when + the rule has been overriden (see the `rule` class variable. + :param keystone_session: the keystone session to use to obtain the + client necessary for the test. + :type keystone_session: keystoneauth1.session.Session + :raises: PolicydOperationFailedException if operation fails. + """ + neutron_client = openstack_utils.get_neutron_session_client( + keystone_session) + try: + neutron_client.list_networks() + except neutronclient.exceptions.Forbidden: + raise PolicydOperationFailedException()