forked from openstack-charmers/zaza-openstack-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tests.py
189 lines (161 loc) · 7.27 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# Copyright 2019 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Encapsulate policyd testing."""
import logging
import os
import shutil
import tempfile
import zipfile
from juju.errors import JujuError
import zaza
import zaza.model as zaza_model
import zaza.utilities.juju as zaza_juju
import zaza.openstack.charm_tests.test_utils as test_utils
import zaza.openstack.utilities.openstack as openstack_utils
class PolicydTest(test_utils.OpenStackBaseTest):
"""Charm operation tests.
The policyd test needs some config from the tests.yaml in order to work
properly. A top level key of "tests_options". Under that key is
'policyd', and then the k:v of 'service': <name>. e.g. for keystone
tests_options:
policyd:
service: keystone
"""
@classmethod
def setUpClass(cls, application_name=None):
super(PolicydTest, cls).setUpClass(application_name)
if (openstack_utils.get_os_release() <
openstack_utils.get_os_release('xenial_queens')):
cls.SkipTest("Test not valid before xenial_queens")
cls._tmp_dir = tempfile.mkdtemp()
cls._service_name = \
cls.test_config['tests_options']['policyd']['service']
@classmethod
def tearDownClass(cls):
super(PolicydTest, cls).tearDownClass()
try:
shutil.rmtree(cls._tmp_dir, ignore_errors=True)
except Exception as e:
logging.error("Removing the policyd tempdir/files failed: {}"
.format(str(e)))
def _set_config(self, state):
s = "True" if state else "False"
config = {"use-policyd-override": s}
logging.info("Setting config to {}".format(config))
zaza_model.set_application_config(self.application_name, config)
def _make_zip_file_from(self, name, files):
"""Make a zip file from a dictionary of filename: string.
:param name: the name of the zip file
:type name: PathLike
:param files: a dict of name: string to construct the files from.
:type files: Dict[str, str]
:returns: temp file that is the zip file.
:rtype: PathLike
"""
path = os.path.join(self._tmp_dir, name)
with zipfile.ZipFile(path, "w") as zfp:
for name, contents in files.items():
zfp.writestr(name, contents)
return path
def test_policyd_good_yaml(self):
# Test that the policyd with a good zipped yaml file puts the yaml file
# in the right directory
good = {
'file1.yaml': "{'rule1': '!'}"
}
good_zip_path = self._make_zip_file_from('good.zip', good)
logging.info("About to attach the resource")
zaza_model.attach_resource(self.application_name,
'policyd-override',
good_zip_path)
logging.info("... waiting for idle")
zaza_model.block_until_all_units_idle()
logging.info("Now setting config to true")
self._set_config(True)
# check that the file gets to the right location
path = os.path.join(
"/etc", self._service_name, "policy.d", 'file1.yaml')
logging.info("Now checking for file contents: {}".format(path))
zaza_model.block_until_file_has_contents(self.application_name,
path,
"rule1: '!'")
# ensure that the workload status info line starts with PO:
logging.info("Checking for workload status line starts with PO:")
block_until_wl_status_info_starts_with(self.application_name, "PO:")
logging.info("App status is valid")
# disable the policy override
logging.info("Disabling policy override ...")
self._set_config(False)
# check that the status no longer has "PO:" on it.
# we have to do it twice due to async races and that some info lines
# erase the PO: bit prior to actuall getting back to idle. The double
# check verifies that the charms have started, the idle waits until it
# is finiehed, and then the final check really makes sure they got
# switched off.
block_until_wl_status_info_starts_with(
self.application_name, "PO:", negate_match=True)
zaza_model.block_until_all_units_idle()
block_until_wl_status_info_starts_with(
self.application_name, "PO:", negate_match=True)
# verify that the file no longer exists
logging.info("Checking that {} has been removed".format(path))
block_until_file_missing(self.application_name, path)
logging.info("...done")
async def async_block_until_wl_status_info_starts_with(
app, status, model_name=None, negate_match=False, timeout=2700):
"""Block until the all the units have a desired workload status that starts
with status.
:param app: the application to check against
:type app: str
:param status: Status to wait for at the start of the string
:type status: str
:param model_name: Name of model to query.
:type model_name: Union[None, str]
:param negate_match: Wait until the match is not true; i.e. none match
:type negate_match: Union[None, bool]
:param timeout: Time to wait for unit to achieved desired status
:type timeout: float
"""
async def _unit_status():
model_status = await zaza_model.async_get_status()
wl_infos = [v['workload-status']['info']
for k, v in model_status.applications[app]['units'].items()
if k.split('/')[0] == app]
g = (s.startswith(status) for s in wl_infos)
if negate_match:
return not(any(g))
else:
return all(g)
async with zaza_model.run_in_model(model_name):
await zaza_model.async_block_until(_unit_status, timeout=timeout)
block_until_wl_status_info_starts_with = zaza.sync_wrapper(
async_block_until_wl_status_info_starts_with)
async def async_block_until_file_missing(
app, path, model_name=None, timeout=2700):
async def _check_for_file(model):
units = model.applications[app].units
results = []
for unit in units:
try:
output = await unit.run('test -e {}; echo $?'.format(path))
contents = output.data.get('results')['Stdout']
results.append("1" in contents)
# libjuju throws a generic error for connection failure. So we
# cannot differentiate between a connectivity issue and a
# target file not existing error. For now just assume the
# latter.
except JujuError:
results.append(False)
return all(results)
block_until_file_missing = zaza.sync_wrapper(async_block_until_file_missing)