Skip to content

Commit

Permalink
Periodic checks for decapod
Browse files Browse the repository at this point in the history
Change-Id: I8daa6c955372b6cc45701c6382a8e810f70b7f8d
  • Loading branch information
9seconds committed May 25, 2017
1 parent c41ea85 commit 5b905db
Show file tree
Hide file tree
Showing 8 changed files with 547 additions and 0 deletions.
1 change: 1 addition & 0 deletions backend/admin/decapod_admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@
from decapod_admin import migration # NOQA
from decapod_admin import password_reset # NOQA
from decapod_admin import pdsh # NOQA
from decapod_admin import periodic_checks # NOQA
from decapod_admin import restore # NOQA
from decapod_admin import ssh # NOQA
30 changes: 30 additions & 0 deletions backend/admin/decapod_admin/cluster_checks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Checks for cluster health."""


import collections

from decapod_admin.cluster_checks.base import Connections # NOQA
from decapod_admin.cluster_checks import ceph_command
from decapod_admin.cluster_checks import installed_package_version
from decapod_admin.cluster_checks import repo_source


CHECKS = collections.OrderedDict()
CHECKS["ceph"] = ceph_command.Check
CHECKS["same_repo"] = repo_source.Check
CHECKS["installed_version"] = installed_package_version.Check
233 changes: 233 additions & 0 deletions backend/admin/decapod_admin/cluster_checks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base health checker"""


import asyncio
import collections
import os
import threading

import asyncssh

from decapod_common import log


LOG = log.getLogger(__name__)
"""Logger."""


ExecuteTaskResult = collections.namedtuple(
"ExecuteTaskResult", ["ok", "errors", "cancelled"])


class Connections:

def __init__(self, private_key, event_loop):
self.connections = {}
self.private_key = private_key
self.lock = threading.RLock()
self.event_loop = event_loop

async def get(self, srv):
key = srv.model_id

if key in self.connections:
return self.connections[key]

with self.lock:
if key not in self.connections:
self.connections[key] = await self.make_connection(srv)

return self.connections[key]

async def make_connection(self, srv):
return await asyncssh.connect(
srv.ip,
known_hosts=None,
username=srv.username,
client_keys=[self.private_key],
loop=self.event_loop
)

async def async_close(self):
coros = []

for value in self.connections.values():
value.close()
coros.append(value.wait_closed())

if coros:
await asyncio.wait(coros)

def close(self):
self.event_loop.run_until_complete(self.async_close())


class Task:

def __init__(self, connections, srv):
self.srv = srv
self.connections = connections
self.exception = None

async def run(self):
return self

async def get_connection(self):
return await self.connections.get(self.srv)

@property
def name(self):
return self.srv.model_id

@property
def ok(self):
return not bool(self.exception)

@property
def completed(self):
return bool(self.exception)


class CommandTask(Task):

@staticmethod
def get_bytes(text):
return text.encode("utf-8") if isinstance(text, str) else text

@staticmethod
def get_str(text):
return text if isinstance(text, str) else text.decode("utf-8")

def __init__(self, connections, srv, cmd):
super().__init__(connections, srv)

self.cmd = cmd
self.result = None

async def run(self):
connection = await self.get_connection()
self.result = await connection.run(self.cmd, check=True)
return self

@property
def completed(self):
return self.result is not None

@property
def ok(self):
return self.code == os.EX_OK

@property
def code(self):
if not self.completed:
return -1
return self.result.exit_status

@property
def stdout_bytes(self):
if not self.completed:
return b""
return self.get_bytes(self.result.stdout)

@property
def stdout_text(self):
if not self.completed:
return ""
return self.get_str(self.result.stdout)

@property
def stdout_lines(self):
return self.stdout_text.splitlines()

@property
def stderr_bytes(self):
if not self.completed:
return b""
return self.get_bytes(self.result.stderr)

@property
def stderr_text(self):
if not self.completed:
return ""
return self.get_str(self.result.stderr)

@property
def stderr_lines(self):
return self.stderr_text.splitlines()


class Check:

def __init__(self, connections, cluster, batch_size, event_loop):
self.cluster = cluster
self.connections = connections
self.batch_size = batch_size
self.event_loop = event_loop

def verify(self):
try:
return self.event_loop.run_until_complete(self.run())
except Exception as exc:
LOG.error(
"Cluster %s has failed check: %s",
self.cluster.model_id, exc)
raise exc

async def run(self):
pass

async def execute_tasks(self, *tasks):
to_run = [
(tsk, asyncio.ensure_future(tsk.run()))
for tsk in tasks
]
await asyncio.wait([future for _, future in to_run])

ok, errors, cancelled = [], [], []
for tsk, future in to_run:
if future.cancelled():
cancelled.append(tsk)
elif future.exception():
tsk.exception = future.exception()
errors.append(tsk)
else:
ok.append(tsk)

return ExecuteTaskResult(ok, errors, cancelled)

async def execute_cmd(self, cmd, *servers):
if not servers:
return []

tasks = [CommandTask(self.connections, srv, cmd) for srv in servers]
cmd = cmd.strip()
if not cmd.startswith("sudo"):
cmd = "sudo -EHn -- {0}".format(cmd)

return await self.execute_tasks(*tasks)

def server_iter(self):
batch_size = self.batch_size
all_servers = list(self.servers)

if not self.batch_size or self.batch_size < 0:
batch_size = len(self.servers)

while all_servers:
yield all_servers[:batch_size]
all_servers = all_servers[batch_size:]
42 changes: 42 additions & 0 deletions backend/admin/decapod_admin/cluster_checks/ceph_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Check that Ceph command is installed"""


from decapod_admin.cluster_checks import base
from decapod_common import log


LOG = log.getLogger(__name__)
"""Logger."""


class Check(base.Check):

async def run(self):
which_ceph_result = await self.execute_cmd(
"which ceph", *self.cluster.server_list)

if which_ceph_result.errors:
for error in which_ceph_result.errors:
LOG.error(
"Cannot execute ceph command on %s (%s): %s",
error.srv.ip,
error.srv.model_id,
error.exception
)

raise ValueError("No all hosts have working ceph command")
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Check that Ceph command is installed from the same repository."""


from decapod_admin.cluster_checks import base
from decapod_common import log


LOG = log.getLogger(__name__)
"""Logger."""


class Check(base.Check):

async def run(self):
policy_result = await self.execute_cmd(
"dpkg-query --showformat='${Version}' --show ceph-common",
*self.cluster.server_list)

if policy_result.errors:
for error in policy_result.errors:
LOG.error(
"Cannot execute dpkg-query policy command on %s (%s): %s",
error.srv.ip,
error.srv.model_id,
error.exception
)

raise ValueError("No all hosts have installed ceph")

repo_lines = get_repo_lines(policy_result.ok)
if len({line for _, line in repo_lines}) < 2:
return

for srv, line in repo_lines:
LOG.error("Server %s has ceph-common installed with %s",
srv.ip, line)

raise ValueError("Inconsistency in repo sources")


def get_repo_lines(results):
return [(res.srv, res.stdout_text.strip()) for res in results]

0 comments on commit 5b905db

Please sign in to comment.