Skip to content

Commit

Permalink
Add neutron-api charm policyd test
Browse files Browse the repository at this point in the history
  • Loading branch information
ajkavanagh committed Oct 3, 2019
1 parent fe70f56 commit 6b75086
Showing 1 changed file with 70 additions and 174 deletions.
244 changes: 70 additions & 174 deletions zaza/openstack/charm_tests/policyd/tests.py
Expand Up @@ -25,22 +25,6 @@
verify that the charm has implemented policy overrides and ensured that the
service actually picks them up).
In order to use the generic tests, just include them in the specific test
class. The KeystonePolicydTest as an example does:
class KeystonePolicydTest(PolicydTest,
ch_keystone.BaseKeystoneTest,
test_utils.OpenStackBaseTest):
@classmethod
def setUpClass(cls, application_name=None):
super(KeystonePolicydTest, cls).setUpClass(application_name)
Note that the generic test class (PolicyDTest) comes first, and then the
ch_keystone.BaseKeystoneTest, followed by the test_utils.OpenStackBaseTest.
This is to get the order of super().setUpClass(...) calls to work with
application_name.
If a charm doesn't require a specific test, then the GenericPolicydTest class
can be used that just includes the two generic tests. The config in the
tests.yaml would stil be required. See the PolicydTest class docstring for
Expand All @@ -54,6 +38,7 @@ def setUpClass(cls, application_name=None):
import zipfile

import keystoneauth1
import neutronclient.exceptions

import zaza.model as zaza_model

Expand Down Expand Up @@ -127,47 +112,6 @@ def _set_policy_with(self, rules):
zaza_model.block_until_wl_status_info_starts_with(
self.application_name, "PO:", negate_match=False)

def test_002_policyd_bad_yaml(self):
"""Test bad yaml file in the zip file is handled."""
bad = {
"file2.yaml": "{'rule': '!}"
}
bad_zip_path = self._make_zip_file_from('bad.zip', bad)
logging.info("Attaching bad zip file as a resource")
zaza_model.attach_resource(self.application_name,
'policyd-override',
bad_zip_path)
zaza_model.block_until_all_units_idle()
logging.debug("Now setting config to true")
self._set_config(True)
# ensure that the workload status info line starts with PO (broken):
# to show that it didn't work
logging.info(
"Checking for workload status line starts with PO (broken):")
zaza_model.block_until_wl_status_info_starts_with(
self.application_name, "PO (broken):")
logging.debug("App status is valid for broken yaml file")
zaza_model.block_until_all_units_idle()
# now verify that no file got landed on the machine
path = os.path.join(
"/etc", self._service_name, "policy.d", 'file2.yaml')
logging.info("Now checking that file {} is not present.".format(path))
zaza_model.block_until_file_missing(self.application_name, path)
self._set_config(False)
zaza_model.block_until_all_units_idle()
logging.info("OK")


class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
"""Generic policyd test for any charm without a specific test.
It includes a single additional test to test_002_policyd_bad_yaml() in the
base class PolicydTest which checks that a good yaml file is placed into
the correct directory. This additional test (test_001_policyd_good_yaml)
is not needed in the specialised tests as the feature is implicitly tested
when the specialised test fails with the rule overrided.
"""

def test_001_policyd_good_yaml(self):
"""Test that the policyd with a good zipped yaml file."""
good = {
Expand Down Expand Up @@ -215,6 +159,42 @@ def test_001_policyd_good_yaml(self):

logging.info("OK")

def test_002_policyd_bad_yaml(self):
"""Test bad yaml file in the zip file is handled."""
bad = {
"file2.yaml": "{'rule': '!}"
}
bad_zip_path = self._make_zip_file_from('bad.zip', bad)
logging.info("Attaching bad zip file as a resource")
zaza_model.attach_resource(self.application_name,
'policyd-override',
bad_zip_path)
zaza_model.block_until_all_units_idle()
logging.debug("Now setting config to true")
self._set_config(True)
# ensure that the workload status info line starts with PO (broken):
# to show that it didn't work
logging.info(
"Checking for workload status line starts with PO (broken):")
zaza_model.block_until_wl_status_info_starts_with(
self.application_name, "PO (broken):")
logging.debug("App status is valid for broken yaml file")
zaza_model.block_until_all_units_idle()
# now verify that no file got landed on the machine
path = os.path.join(
"/etc", self._service_name, "policy.d", 'file2.yaml')
logging.info("Now checking that file {} is not present.".format(path))
zaza_model.block_until_file_missing(self.application_name, path)
self._set_config(False)
zaza_model.block_until_all_units_idle()
logging.info("OK")


class GenericPolicydTest(PolicydTest, test_utils.OpenStackBaseTest):
"""Generic policyd test for any charm without a specific test."""

pass


class PolicydOperationFailedException(Exception):
"""This is raised by the get_client_and_attempt_operation() method.
Expand Down Expand Up @@ -248,14 +228,22 @@ class BasePolicydSpecialization(PolicydTest,
The test will fail if the first call fails for any reason, or if the 2nd
call doesn't raise PolicydOperationFailedException or raises any other
exception.
"""
To use this class, follow the keystone example:
class KeystonePolicydTest(BasePolicydSpecialization):
_rule = "{'identity:list_services': '!'}"
def get_client_and_attempt_operation(self, keystone_session):
... etc.
"""
# this needs to be defined as the rule that gets placed into a yaml policy
# override. It is a string of the form: 'some-rule: "!"'
# i.e. disable some policy and then try and test it.
_rule = None

# the name to log at the beginning of the test
# Optional: the name to log at the beginning of the test
_test_name = None

@classmethod
Expand All @@ -275,7 +263,7 @@ def get_client_and_attempt_operation(self, keystone_session):
:param keystone_session: the keystone session to use to obtain the
client necessary for the test.
:type keystone_session: ???
:type keystone_session: keystoneauth1.session.Session
:raises: PolicydOperationFailedException if operation fails.
"""
raise NotImplementedError("This method must be overridden")
Expand All @@ -286,7 +274,7 @@ def _get_keystone_session(self, ip):
:param ip: the IP address to get the session against.
:type ip: str
:returns: a keystone session to the IP address
:rtype: ???
:rtype: keystoneauth1.session.Session
"""
logging.info('keystone IP {}'.format(ip))
openrc = {
Expand Down Expand Up @@ -374,14 +362,14 @@ class KeystonePolicydTest(BasePolicydSpecialization):
_rule = "{'identity:list_services': '!'}"

def get_client_and_attempt_operation(self, keystone_session):
"""Override this method to perform the operation.
"""Attempt to list services. If it fails, raise an exception.
This operation should pass normally for the demo_user, and fail when
the rule has been overriden (see the `rule` class variable.
:param keystone_session: the keystone session to use to obtain the
client necessary for the test.
:type keystone_session: ???
:type keystone_session: keystoneauth1.session.Session
:raises: PolicydOperationFailedException if operation fails.
"""
keystone_client = openstack_utils.get_keystone_session_client(
Expand All @@ -392,117 +380,25 @@ def get_client_and_attempt_operation(self, keystone_session):
raise PolicydOperationFailedException()


class OldKeystonePolicydTest(PolicydTest,
ch_keystone.BaseKeystoneTest,
test_utils.OpenStackBaseTest):
"""Specific test for policyd for keystone charm."""
class NeutronApiTest(BasePolicydSpecialization):
"""Test the policyd override using the keystone client."""

@classmethod
def setUpClass(cls, application_name=None):
"""Run class setup for running KeystonePolicydTest tests."""
super(KeystonePolicydTest, cls).setUpClass(application_name)

def test_disable_service(self):
"""Test that service can be disabled."""
logging.info("Doing policyd override to disable listing domains")
self._set_policy_with(
{'rule.yaml': "{'identity:list_services': '!'}"})

# verify that the policy.d override does disable the endpoint
with self.config_change(
{'preferred-api-version': self.default_api_version,
'use-policyd-override': 'False'},
{'preferred-api-version': '3',
'use-policyd-override': 'True'},
application_name="keystone"):
zaza_model.block_until_all_units_idle()
for ip in self.keystone_ips:
try:
logging.info('keystone IP {}'.format(ip))
openrc = {
'API_VERSION': 3,
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
}
if self.tls_rid:
openrc['OS_CACERT'] = \
openstack_utils.KEYSTONE_LOCAL_CACERT
openrc['OS_AUTH_URL'] = (
openrc['OS_AUTH_URL'].replace('http', 'https'))
logging.info('keystone IP {}'.format(ip))
keystone_session = openstack_utils.get_keystone_session(
openrc, scope='DOMAIN')
keystone_client = (
openstack_utils.get_keystone_session_client(
keystone_session))
keystone_client.services.list()
raise zaza_exceptions.PolicydError(
'Retrieve service list as admin with project scoped '
'token passed and should have failed. IP = {}'
.format(ip))
except keystoneauth1.exceptions.http.Forbidden:
logging.info("keystone IP:{} policyd override disabled "
"services listing by demo user"
.format(ip))

# now verify (with the config off) that we can actually access
# these points
with self.config_change(
{'preferred-api-version': self.default_api_version},
{'preferred-api-version': '3'},
application_name="keystone"):
zaza_model.block_until_all_units_idle()
for ip in self.keystone_ips:
try:
logging.info('keystone IP {}'.format(ip))
openrc = {
'API_VERSION': 3,
'OS_USERNAME': ch_keystone.DEMO_ADMIN_USER,
'OS_PASSWORD': ch_keystone.DEMO_ADMIN_USER_PASSWORD,
'OS_AUTH_URL': 'http://{}:5000/v3'.format(ip),
'OS_USER_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
'OS_DOMAIN_NAME': ch_keystone.DEMO_DOMAIN,
}
if self.tls_rid:
openrc['OS_CACERT'] = \
openstack_utils.KEYSTONE_LOCAL_CACERT
openrc['OS_AUTH_URL'] = (
openrc['OS_AUTH_URL'].replace('http', 'https'))
logging.info('keystone IP {}'.format(ip))
keystone_session = openstack_utils.get_keystone_session(
openrc, scope='DOMAIN')
keystone_client = (
openstack_utils.get_keystone_session_client(
keystone_session))
keystone_client.services.list()
logging.info("keystone IP:{} without policyd override "
"services list working"
.format(ip))
except keystoneauth1.exceptions.http.Forbidden:
raise zaza_exceptions.PolicydError(
'Retrieve services list as demo user with project '
'scoped token passed and should have passed. IP = {}'
.format(ip))

logging.info('OK')


# class NeutronAPITest(PolicydTest,
# test_utils.OpenStackBaseTest):
# """Specific test for policyd for neutron-api charm."""

# def test_disable_service(self):
# """Test that service can be disabled."""
# logging.info("Doing policyd override to disable listing domains")
# self._set_policy_with(
# {'rule.yaml': "{'identity:list_services': '!'}"})
# # Get authenticated clients
# keystone_client = openstack_utils.get_keystone_session_client(
# keystone_session)
# neutron_client = openstack_utils.get_neutron_session_client(
# keystone_session)
_rule = "{'get_network': '!'}"

def get_client_and_attempt_operation(self, keystone_session):
"""Attempt to ????
This operation should pass normally for the demo_user, and fail when
the rule has been overriden (see the `rule` class variable.
:param keystone_session: the keystone session to use to obtain the
client necessary for the test.
:type keystone_session: keystoneauth1.session.Session
:raises: PolicydOperationFailedException if operation fails.
"""
neutron_client = openstack_utils.get_neutron_session_client(
keystone_session)
try:
neutron_client.list_networks()
except neutronclient.exceptions.Forbidden:
raise PolicydOperationFailedException()

0 comments on commit 6b75086

Please sign in to comment.