Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mgr/cephadm: set HEALTH warnings during apply phase in serve #43376

Merged
merged 7 commits into from Oct 11, 2021
25 changes: 17 additions & 8 deletions src/pybind/mgr/cephadm/module.py
Expand Up @@ -418,6 +418,7 @@ def __init__(self, *args: Any, **kwargs: Any):
self.agent_refresh_rate = 0
self.endpoint_port = 0
self.agent_starting_port = 0
self.apply_spec_fails: List[Tuple[str, str]] = []

self.notify('mon_map', None)
self.config_notify()
Expand Down Expand Up @@ -1533,16 +1534,10 @@ def _set_maintenance_healthcheck(self) -> None:

in_maintenance = self.inventory.get_host_with_state("maintenance")
if not in_maintenance:
if 'HOST_IN_MAINTENANCE' in self.health_checks:
del self.health_checks["HOST_IN_MAINTENANCE"]
self.remove_health_warning('HOST_IN_MAINTENANCE')
else:
s = "host is" if len(in_maintenance) == 1 else "hosts are"
self.health_checks["HOST_IN_MAINTENANCE"] = {
"severity": "warning",
"summary": f"{len(in_maintenance)} {s} in maintenance mode",
"detail": [f"{h} is in maintenance" for h in in_maintenance],
}
self.set_health_checks(self.health_checks)
self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [f"{h} is in maintenance" for h in in_maintenance])

@handle_orch_error
@host_exists()
Expand Down Expand Up @@ -2271,6 +2266,20 @@ def _apply(self, spec: GenericSpec) -> str:

return self._apply_service_spec(cast(ServiceSpec, spec))

def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
self.health_checks[name] = {
'severity': 'warning',
'summary': summary,
'count': count,
'detail': detail,
}
self.set_health_checks(self.health_checks)

def remove_health_warning(self, name: str) -> None:
if name in self.health_checks:
del self.health_checks[name]
self.set_health_checks(self.health_checks)

def _plan(self, spec: ServiceSpec) -> dict:
if spec.service_type == 'osd':
return {'service_name': spec.service_name(),
Expand Down
117 changes: 52 additions & 65 deletions src/pybind/mgr/cephadm/serve.py
Expand Up @@ -135,17 +135,9 @@ def _serve_sleep(self) -> None:

def _update_paused_health(self) -> None:
if self.mgr.paused:
self.mgr.health_checks['CEPHADM_PAUSED'] = {
'severity': 'warning',
'summary': 'cephadm background work is paused',
'count': 1,
'detail': ["'ceph orch resume' to resume"],
}
self.mgr.set_health_checks(self.mgr.health_checks)
self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, ["'ceph orch resume' to resume"])
else:
if 'CEPHADM_PAUSED' in self.mgr.health_checks:
del self.mgr.health_checks['CEPHADM_PAUSED']
self.mgr.set_health_checks(self.mgr.health_checks)
self.mgr.remove_health_warning('CEPHADM_PAUSED')

def _autotune_host_memory(self, host: str) -> None:
total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
Expand Down Expand Up @@ -385,47 +377,24 @@ def refresh(host: str) -> None:

self.mgr.config_checker.run_checks()

health_changed = False
for k in [
'CEPHADM_HOST_CHECK_FAILED',
'CEPHADM_FAILED_DAEMON',
'CEPHADM_REFRESH_FAILED',
]:
if k in self.mgr.health_checks:
del self.mgr.health_checks[k]
health_changed = True
self.mgr.remove_health_warning(k)
if bad_hosts:
self.mgr.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
'severity': 'warning',
'summary': '%d hosts fail cephadm check' % len(bad_hosts),
'count': len(bad_hosts),
'detail': bad_hosts,
}
health_changed = True
self.mgr.set_health_warning('CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
if failures:
self.mgr.health_checks['CEPHADM_REFRESH_FAILED'] = {
'severity': 'warning',
'summary': 'failed to probe daemons or devices',
'count': len(failures),
'detail': failures,
}
health_changed = True
self.mgr.set_health_warning('CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
failed_daemons = []
for dd in self.mgr.cache.get_daemons():
if dd.status is not None and dd.status == DaemonDescriptionStatus.error:
failed_daemons.append('daemon %s on %s is in %s state' % (
dd.name(), dd.hostname, dd.status_desc
))
if failed_daemons:
self.mgr.health_checks['CEPHADM_FAILED_DAEMON'] = {
'severity': 'warning',
'summary': '%d failed cephadm daemon(s)' % len(failed_daemons),
'count': len(failed_daemons),
'detail': failed_daemons,
}
health_changed = True
if health_changed:
self.mgr.set_health_checks(self.mgr.health_checks)
self.mgr.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(failed_daemons), failed_daemons)

def _check_host(self, host: str) -> Optional[str]:
if host not in self.mgr.inventory:
Expand Down Expand Up @@ -533,8 +502,7 @@ def _check_for_strays(self) -> None:
self.log.debug('_check_for_strays')
for k in ['CEPHADM_STRAY_HOST',
'CEPHADM_STRAY_DAEMON']:
if k in self.mgr.health_checks:
del self.mgr.health_checks[k]
self.mgr.remove_health_warning(k)
if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
ls = self.mgr.list_servers()
managed = self.mgr.cache.get_daemon_names()
Expand Down Expand Up @@ -579,23 +547,9 @@ def _check_for_strays(self) -> None:
'stray host %s has %d stray daemons: %s' % (
host, len(missing_names), missing_names))
if self.mgr.warn_on_stray_hosts and host_detail:
self.mgr.health_checks['CEPHADM_STRAY_HOST'] = {
'severity': 'warning',
'summary': '%d stray host(s) with %s daemon(s) '
'not managed by cephadm' % (
len(host_detail), host_num_daemons),
'count': len(host_detail),
'detail': host_detail,
}
self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
if self.mgr.warn_on_stray_daemons and daemon_detail:
self.mgr.health_checks['CEPHADM_STRAY_DAEMON'] = {
'severity': 'warning',
'summary': '%d stray daemon(s) not managed by cephadm' % (
len(daemon_detail)),
'count': len(daemon_detail),
'detail': daemon_detail,
}
self.mgr.set_health_checks(self.mgr.health_checks)
self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)

def _apply_all_services(self) -> bool:
r = False
Expand All @@ -615,30 +569,45 @@ def _apply_all_services(self) -> bool:
else:
for sn, spec in self.mgr.spec_store.active_specs.items():
specs.append(spec)
for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
self.mgr.remove_health_warning(name)
self.mgr.apply_spec_fails = []
for spec in specs:
try:
if self._apply_service(spec):
r = True
except Exception as e:
self.log.exception('Failed to apply %s spec %s: %s' % (
spec.service_name(), spec, e))
msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
self.log.exception(msg)
self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
warnings = []
for x in self.mgr.apply_spec_fails:
warnings.append(f'{x[0]}: {x[1]}')
self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
len(self.mgr.apply_spec_fails),
warnings)

return r

def _apply_service_config(self, spec: ServiceSpec) -> None:
if spec.config:
section = utils.name_to_config_section(spec.service_name())
for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
self.mgr.remove_health_warning(name)
invalid_config_options = []
options_failed_to_set = []
for k, v in spec.config.items():
try:
current = self.mgr.get_foreign_ceph_option(section, k)
except KeyError:
self.log.warning(
f'Ignoring invalid {spec.service_name()} config option {k}'
)
msg = f'Ignoring invalid {spec.service_name()} config option {k}'
self.log.warning(msg)
self.mgr.events.for_service(
spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
)
invalid_config_options.append(msg)
continue
if current != v:
self.log.debug(f'setting [{section}] {k} = {v}')
Expand All @@ -650,9 +619,14 @@ def _apply_service_config(self, spec: ServiceSpec) -> None:
'who': section,
})
except MonCommandFailed as e:
self.log.warning(
f'Failed to set {spec.service_name()} option {k}: {e}'
)
msg = f'Failed to set {spec.service_name()} option {k}: {e}'
self.log.warning(msg)
options_failed_to_set.append(msg)

if invalid_config_options:
self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(invalid_config_options), invalid_config_options)
if options_failed_to_set:
self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(options_failed_to_set), options_failed_to_set)

def _apply_service(self, spec: ServiceSpec) -> bool:
"""
Expand Down Expand Up @@ -737,9 +711,17 @@ def matches_network(host):
'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
except OrchestratorError as e:
self.log.error('Failed to apply %s spec %s: %s' % (
spec.service_name(), spec, e))
msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
self.log.error(msg)
self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
warnings = []
for x in self.mgr.apply_spec_fails:
warnings.append(f'{x[0]}: {x[1]}')
self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
len(self.mgr.apply_spec_fails),
warnings)
return False

r = None
Expand Down Expand Up @@ -815,6 +797,7 @@ def update_progress() -> None:
svc.fence_old_ranks(spec, rank_map, len(all_slots))

# create daemons
daemon_place_fails = []
for slot in slots_to_add:
# first remove daemon on conflicting port?
if slot.ports:
Expand Down Expand Up @@ -865,6 +848,7 @@ def update_progress() -> None:
f"on {slot.hostname}: {e}")
self.mgr.events.for_service(spec, 'ERROR', msg)
self.mgr.log.error(msg)
daemon_place_fails.append(msg)
# only return "no change" if no one else has already succeeded.
# later successes will also change to True
if r is None:
Expand All @@ -881,6 +865,9 @@ def update_progress() -> None:
)
daemons.append(sd)

if daemon_place_fails:
self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(daemon_place_fails), daemon_place_fails)

# remove any?
def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
daemon_ids = [d.daemon_id for d in remove_daemons]
Expand Down
39 changes: 39 additions & 0 deletions src/pybind/mgr/cephadm/tests/test_cephadm.py
Expand Up @@ -888,6 +888,45 @@ def test_daemon_add_fail(self, _run_cephadm, entity, success, spec, cephadm_modu
'entity': entity,
})

@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_daemon_place_fail_health_warning(self, _run_cephadm, cephadm_module):
_run_cephadm.return_value = ('{}', '', 0)
with with_host(cephadm_module, 'test'):
_run_cephadm.side_effect = OrchestratorError('fail')
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
assert not r
assert cephadm_module.health_checks.get('CEPHADM_DAEMON_PLACE_FAIL') is not None
assert cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['count'] == 1
assert 'Failed to place 1 daemon(s)' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['summary']
assert 'Failed while placing mgr.a on test: fail' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['detail']

@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_spec_fail_health_warning(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.return_value = ('{}', '', 0)
with with_host(cephadm_module, 'test'):
CephadmServe(cephadm_module)._apply_all_services()
ps = PlacementSpec(hosts=['fail'], count=1)
r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
assert not r
assert cephadm_module.apply_spec_fails
assert cephadm_module.health_checks.get('CEPHADM_APPLY_SPEC_FAIL') is not None
assert cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['count'] == 1
assert 'Failed to apply 1 service(s)' in cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['summary']

@mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_invalid_config_option_health_warning(self, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
_run_cephadm.return_value = ('{}', '', 0)
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
get_foreign_ceph_option.side_effect = KeyError
CephadmServe(cephadm_module)._apply_service_config(ServiceSpec('mgr', placement=ps, config={'test': 'foo'}))
assert cephadm_module.health_checks.get('CEPHADM_INVALID_CONFIG_OPTION') is not None
assert cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['count'] == 1
assert 'Ignoring 1 invalid config option(s)' in cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['summary']
assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['detail']

@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
@mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())
Expand Down