From 5c33c055a5185738991ad6acf269531e9562595d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jaworski?= Date: Tue, 28 Oct 2014 15:05:57 +0100 Subject: [PATCH] initial version of ianitor --- README.md | 84 ++++++++++++++- requirements-tests.txt | 2 + requirements.txt | 2 +- setup.py | 2 +- src/ianitor/args_parser.py | 184 +++++++++++++++++++++++++++++++++ src/ianitor/script.py | 83 +++++++++++++++ src/ianitor/service.py | 115 +++++++++++++++++++++ test-requirements.txt | 1 - tests/test_args_parser.py | 36 +++++++ tests/test_service.py | 205 +++++++++++++++++++++++++++++++++++++ tox.ini | 2 +- 11 files changed, 710 insertions(+), 6 deletions(-) create mode 100644 requirements-tests.txt create mode 100644 src/ianitor/args_parser.py create mode 100644 src/ianitor/script.py create mode 100644 src/ianitor/service.py delete mode 100644 test-requirements.txt create mode 100644 tests/test_args_parser.py create mode 100644 tests/test_service.py diff --git a/README.md b/README.md index 5c4efe8..08e6a50 100644 --- a/README.md +++ b/README.md @@ -18,12 +18,92 @@ your existing process/service supervision tool like Simply install with pip: - pip install ianitor + $ pip install ianitor And you're ready to go with: - ianitor - yourapp --some-switch + $ ianitor appname -- ./yourapp --some-switch +You can check if service is registered diggin' into consul DNS service: + + $ dig @localhost -p 8600 appname.service.consul + ; <<>> DiG 9.9.3-P1 <<>> @localhost -p 8600 appname.service.consul + ; (1 server found) + ;; global options: +cmd + ;; Got answer: + ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 25966 + ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0 + ;; WARNING: recursion requested but not available + + ;; QUESTION SECTION: + ;appname.service.consul. IN A + + ;; ANSWER SECTION: + appname.service.consul. 0 IN A 10.54.54.214 + + ;; Query time: 44 msec + ;; SERVER: 127.0.0.1#8600(127.0.0.1) + ;; WHEN: Tue Oct 28 13:53:09 CET 2014 + ;; MSG SIZE rcvd: 78 + +Full usage: + + usage: ianitor [-h] [--consul-agent hostname[:port]] [--ttl seconds] + [--heartbeat seconds] [--tags tag] [--id ID] [--port PORT] [-v] + service-name -- command [arguments] + + Doorkeeper for consul discovered services. + + positional arguments: + service-name service name in consul cluster + + optional arguments: + -h, --help show this help message and exit + --consul-agent=hostname[:port] set consul agent address + --ttl=seconds set TTL of service in consul cluster + --heartbeat=seconds set rocess poll heartbeat (defaults to + ttl/10) + --tags=tag set service tags in consul cluster (can be + used multiple times) + --id=ID set service id - must be node unique + (defaults to service name) + --port=PORT set service port + -v, --verbose enable logging to stdout (use multiple times + to increase verbosity) + + +## How does ianitor work? + +ianitor spawns process using python's `subprocess.Popen()` with command line +specified after `--` . It redirects its own stdin to child's stdin and +childs stdout/stderr to his own stdout/stderr. + +This way ianitor does not interfere with logging of managed service if it +logs to stdout. Moreover ianitor does not log anything to make it easier to +plug it in your existing process supervision tool. + +ianitor handles service registration in consul agent as well as keeping +registered service entry in consul in "healthy" state by continously requesting +it's [TTL health check endpoint](http://www.consul.io/docs/agent/checks.html). + +## Example supervisord config + +Assuming that you have some service under supervisord supervision: + + [program:rabbitmq] + command=/usr/sbin/rabbitmq-server + priority=0 + + autostart=true + +Simply wrap it with ianitor call: + + [program:rabbitmq] + command=/usr/local/bin/ianitor rabbitmq -- /usr/sbin/rabbitmq-server + priority=0 + + autostart=true + ## Licence This code is under [WTFPL](https://en.wikipedia.org/wiki/WTFPL). diff --git a/requirements-tests.txt b/requirements-tests.txt new file mode 100644 index 0000000..68e751a --- /dev/null +++ b/requirements-tests.txt @@ -0,0 +1,2 @@ +pytest +mock \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 663bd1f..6024d6c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -requests \ No newline at end of file +python-consul \ No newline at end of file diff --git a/setup.py b/setup.py index cb3d1a8..4448d39 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ def get_version(version_tuple): name='ianitor', version=VERSION, author='MichaƂ Jaworski', - author_email='swistakm@gmail.com', + author_email='m.jaworski@clearcode.cc', description='Doorkeeper for consul discovered services.', long_description=README, diff --git a/src/ianitor/args_parser.py b/src/ianitor/args_parser.py new file mode 100644 index 0000000..2aed076 --- /dev/null +++ b/src/ianitor/args_parser.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +import sys +import argparse +import logging + +logger = logging.getLogger(__name__) + +DEFAULT_CONSUL_HTTP_API_PORT = 8500 +DEFAULT_TTL = 10 + + +class CustomFormatter(argparse.HelpFormatter): + def __init__(self, prog): + # default max_help_position increased for readability + super(CustomFormatter, self).__init__(prog, max_help_position=50) + + def _format_action_invocation(self, action): + """ + Hack _format_action_invocation to display metavar for only one + of options + """ + if not action.option_strings: + metavar, = self._metavar_formatter(action, action.dest)(1) + return metavar + + else: + parts = [] + + # if the Optional doesn't take a value, format is: + # -s, --long + if action.nargs == 0: + parts.extend(action.option_strings) + + # if the Optional takes a value, format is: + # -s ARGS, --long ARGS + else: + default = action.dest.upper() + args_string = self._format_args(action, default) + + # here is the hack: do not add args to first option part + # if it is both long and short + if len(action.option_strings) > 1: + parts.append(action.option_strings[0] + "") + remaining = action.option_strings[1:] + else: + remaining = action.option_strings + + for option_string in remaining: + parts.append('%s=%s' % (option_string, args_string)) + + return ', '.join(parts) + + def add_usage(self, usage, actions, groups, prefix=None): + """ + Hack add_usage to add fake "-- command [arguments]" to usage + """ + actions.append(argparse._StoreAction( + option_strings=[], + dest="-- command [arguments]" + )) + return super(CustomFormatter, self).add_usage( + usage, actions, groups, prefix + ) + + +def coordinates(coordinates_string): + """ parse coordinates string + :param coordinates_string: string in "hostname" or "hostname:port" format + :return: (hostname, port) two-tuple + """ + if ':' in coordinates_string: + try: + hostname, port = coordinates_string.split(":") + port = int(port) + + if not hostname: + raise ValueError() + + except ValueError: + raise ValueError("Coordinate should be hostname or hostname:port ") + else: + hostname = coordinates_string + port = DEFAULT_CONSUL_HTTP_API_PORT + + return hostname, port + + +def get_parser(): + """ Create ianotor argument parser with a set of reasonable defaults + :return: argument parser + """ + parser = argparse.ArgumentParser( + "ianitor", + description="Doorkeeper for consul discovered services.", + formatter_class=CustomFormatter, + ) + + parser.add_argument( + "--consul-agent", + metavar="hostname[:port]", type=coordinates, default="localhost", + help="set consul agent address" + ) + + parser.add_argument( + "--ttl", + metavar="seconds", type=float, default=DEFAULT_TTL, + help="set TTL of service in consul cluster" + ) + + parser.add_argument( + "--heartbeat", + metavar="seconds", type=float, default=None, + help="set rocess poll heartbeat (defaults to ttl/10)", + ) + + parser.add_argument( + "--tags", + action="append", metavar="tag", + help="set service tags in consul cluster (can be used multiple times)", + ) + + parser.add_argument( + "--id", + help="set service id - must be node unique (defaults to service name)" + ) + + parser.add_argument( + "--port", + help="set service port", + ) + + parser.add_argument( + "-v", "--verbose", + action="count", + help="enable logging to stdout (use multiple times to increase verbosity)", # noqa + ) + + parser.add_argument( + metavar="service-name", + dest="service_name", + help="service name in consul cluster", + ) + + return parser + + +def parse_args(): + """ + Parse program arguments. + + This function ensures that argv arguments after '--' won't be parsed by + `argparse` and will be returned as separate list. + + :return: (args, command) two-tuple + """ + + parser = get_parser() + + try: + split_point = sys.argv.index('--') + + except ValueError: + if "--help" in sys.argv or "-h" in sys.argv or len(sys.argv) == 1: + parser.print_help() + exit(0) + else: + parser.print_usage() + print(parser.prog, ": error: command missing") + exit(1) + + else: + argv = sys.argv[1:split_point] + invocation = sys.argv[split_point + 1:] + + args = parser.parse_args(argv) + + # set default heartbeat to ttl / 10. if not specified + if not args.heartbeat: + args.heartbeat = args.ttl / 10. + logger.debug( + "heartbeat not specified, setting to %s" % args.heartbeat + ) + + return args, invocation diff --git a/src/ianitor/script.py b/src/ianitor/script.py new file mode 100644 index 0000000..5aea5f7 --- /dev/null +++ b/src/ianitor/script.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +from time import sleep +import signal +import logging + +import consul + +from ianitor.service import Service +from ianitor.args_parser import parse_args + + +SIGNALS = [ + member + for member + in dir(signal) + if member.startswith("SIG") and '_' not in member +] + + +logger = logging.getLogger(__name__) + + +def setup_logging(verbosity): + ilogger = logging.getLogger('ianitor') + + if verbosity: + handler = logging.StreamHandler() + if verbosity == 1: + handler.setLevel(logging.ERROR) + if verbosity == 2: + handler.setLevel(logging.WARNING) + if verbosity >= 3: + handler.setLevel(logging.DEBUG) + else: + handler = logging.NullHandler() + + formatter = logging.Formatter( + '[%(levelname)s] %(name)s: %(message)s' + ) + + handler.setFormatter(formatter) + ilogger.setLevel(logging.DEBUG) + ilogger.addHandler(handler) + + +def main(): + args, command = parse_args() + setup_logging(args.verbose) + + session = consul.Consul(*args.consul_agent) + + service = Service( + command, + session=session, + ttl=args.ttl, + service_name=args.service_name, + service_id=args.id, + tags=args.tags, + port=args.port + ) + + service.start() + + def signal_handler(signal_number, *_): + service.process.send_signal(signal_number) + + for signal_name in SIGNALS: + try: + signum = getattr(signal, signal_name) + signal.signal(signum, signal_handler) + except RuntimeError: + # signals that cannot be catched will raise RuntimeException + pass + + while sleep(args.heartbeat) or service.is_up(): + service.keep_alive() + + logger.info("process quit with errorcode %s %s" % ( + service.process.poll(), + "(signal)" if service.process.poll() < 0 else "" + )) + + service.deregister() diff --git a/src/ianitor/service.py b/src/ianitor/service.py new file mode 100644 index 0000000..2a389b2 --- /dev/null +++ b/src/ianitor/service.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +from contextlib import contextmanager +import subprocess +import logging +from requests import ConnectionError + +logger = logging.getLogger(__name__) + + +@contextmanager +def ignore_connection_errors(action="unknown"): + try: + yield + except ConnectionError: + logger.error("connection error on <%s> action failed" % action) + + +class Service(object): + def __init__(self, command, session, ttl, service_name, + service_id=None, tags=None, port=None): + self.command = command + self.session = session + self.process = None + + self.ttl = ttl + self.service_name = service_name + self.tags = tags or [] + self.port = port + self.service_id = service_id or service_name + + self.check_id = "service:" + self.service_id + + def start(self): + """ Start service process. + + :return: + """ + logger.debug("starting service: %s" % " ".join(self.command)) + self.process = subprocess.Popen(self.command) + self.register() + + def is_up(self): + """ + Poll service process to check if service is up. + + :return: + """ + logger.debug("polling service") + return bool(self.process) and self.process.poll() is None + + def kill(self): + """ + Kill service process and make sure it is deregistered from consul + cluster. + + :return: + """ + logger.debug("killing service") + if self.process is None: + raise RuntimeError("Process does not exist") + + self.process.kill() + self.deregister() + + def register(self): + """ + Register service in consul cluster. + + :return: None + """ + logger.debug("registering service") + with ignore_connection_errors(): + self.session.agent.service.register( + name=self.service_name, + service_id=self.service_id, + port=self.port, + tags=self.tags, + # format it into XXXs format + ttl="%ss" % self.ttl, + ) + + def deregister(self): + """ + Deregister service from consul cluster. + + :return: None + """ + logger.debug("deregistering service") + + with ignore_connection_errors("deregister"): + self.session.agent.service.deregister(self.service_id) + + def keep_alive(self): + """ + Keep alive service in consul cluster marking TTL check pass + on consul agent. + + If some cases it can happen that service registry disappeared from + consul cluster. This method registers service again if it happens. + + :return: None + """ + with ignore_connection_errors("ttl_pass"): + if not self.session.health.check.ttl_pass(self.check_id): + # register and ttl_pass again if it failed + logger.warning("service keep-alive failed, re-registering") + self.register() + self.session.health.check.ttl_pass(self.check_id) + + def __del__(self): + """ + Cleanup processes on del + """ + if self.process and self.process.poll() is None: + self.kill() diff --git a/test-requirements.txt b/test-requirements.txt deleted file mode 100644 index 55b033e..0000000 --- a/test-requirements.txt +++ /dev/null @@ -1 +0,0 @@ -pytest \ No newline at end of file diff --git a/tests/test_args_parser.py b/tests/test_args_parser.py new file mode 100644 index 0000000..0de46b9 --- /dev/null +++ b/tests/test_args_parser.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +import pytest +from mock import patch + +from ianitor import args_parser + + +def test_coordinates(): + assert args_parser.coordinates("localhost:1222") == ("localhost", 1222) + + assert args_parser.coordinates("localhost") == ( + "localhost", args_parser.DEFAULT_CONSUL_HTTP_API_PORT + ) + + with pytest.raises(ValueError): + args_parser.coordinates("localhost:12:") + + with pytest.raises(ValueError): + args_parser.coordinates("localhost:") + + with pytest.raises(ValueError): + args_parser.coordinates(":123") + + +@patch('sys.argv', ["ianitor", "tailf", '--', 'tailf', 'something']) +def test_parse_args(): + args, invocation = args_parser.parse_args() + assert invocation == ['tailf', 'something'] + +TEST_TTL = 100 + + +@patch('sys.argv', ["ianitor", "tailf", '--ttl', str(TEST_TTL), '--', 'tailf', 'something']) # noqa +def test_default_heartbeat(): + args, invocation = args_parser.parse_args() + assert args.heartbeat == TEST_TTL / 10. diff --git a/tests/test_service.py b/tests/test_service.py new file mode 100644 index 0000000..ac060af --- /dev/null +++ b/tests/test_service.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- +from time import sleep +import mock + +from ianitor import service +from consul import Consul + + +def get_tailf_service(session): + return service.Service( + ["tailf", "/dev/null"], + session=session, + service_name="tailf", + # small ttl for faster testing + ttl=1, + ) + + +def test_service_start(): + session = Consul() + tailf = get_tailf_service(session) + + with mock.patch.object(service.Service, "register") as register_method: + tailf.start() + + assert bool(tailf.is_up()) + register_method.assert_any_call() + + +def test_is_up_false_if_not_started(): + session = Consul() + tailf = get_tailf_service(session) + + assert not tailf.is_up() + + +def test_remove_services(): + """ + ~ Yo dawg I herd that you like tests so we put test inside your test so + you can test while you test + + Note: this is not a tests of `ianitor` but a tests helper. Still we must be + sure that this helper works so we test it. + """ + session = Consul() + agent = session.agent + + services = agent.services() + + for srv, description in services.items(): + if description["ID"] != 'consul': + agent.service.deregister(description["ID"]) + + # this is consul 0.4.1 behavior - consul is one of services + services = agent.services() + if 'consul' in services: + services.pop('consul') + + assert not services + + +def test_service_register(): + session = Consul() + agent = session.agent + + tailf = get_tailf_service(session) + tailf.start() + + test_remove_services() + tailf.register() + + assert agent.services() + + +def test_deregister(): + session = Consul() + agent = session.agent + + tailf = get_tailf_service(session) + tailf.start() + + test_remove_services() + tailf.register() + tailf.deregister() + + # this is consul 0.4.1 behavior - consul is one of services + services = agent.services() + if 'consul' in services: + services.pop('consul') + assert not services + + +def test_kill(): + test_remove_services() + + session = Consul() + agent = session.agent + + tailf = get_tailf_service(session) + + # test service registered after start + tailf.start() + assert agent.services() + + # and deregistered after restart + with mock.patch.object(service.Service, "deregister") as r: + tailf.kill() + r.assert_any_call() + + +def _get_service_status(session, service_obj): + _, check_response = session.health.service(service_obj.service_name) + + try: + checks = check_response[0]["Checks"] + except IndexError: + # return none because check does not even exist + return + + # from pprint import pprint; pprint(checks) + service_check = list(filter( + lambda check: check["ServiceName"] == service_obj.service_name, checks + )).pop() + + return service_check["Status"] + + +def test_keep_alive(): + """ + Integration test for keeping service alive in consul cluster + """ + test_remove_services() + session = Consul() + tailf = get_tailf_service(session) + + # assert that there is no checks yet + _, checks = session.health.service(tailf.service_name) + assert not checks + + # test that service health check is unknown or critical after registration + tailf.register() + # small sleep for cluster consensus + sleep(0.1) + assert _get_service_status(session, tailf) in ("unknown", "critical") + + # assert service is healthy back again after keep alive + tailf.keep_alive() + sleep(0.1) + assert _get_service_status(session, tailf) == "passing" + + # assert service health check fails after ttl passed + sleep(tailf.ttl + 0.5) + assert _get_service_status(session, tailf) == "critical" + + +def test_keepalive_reregister(): + """ + Integration test that keep-alive registers service again if it disapears + """ + test_remove_services() + session = Consul() + tailf = get_tailf_service(session) + + # [integration] assert service is healthy + tailf.register() + tailf.keep_alive() + sleep(0.1) + assert _get_service_status(session, tailf) == "passing" + + # [integration] assert that check + test_remove_services() + assert _get_service_status(session, tailf) is None + + # [integration] assert that keepalive makes service registered again + tailf.keep_alive() + sleep(0.1) + assert _get_service_status(session, tailf) == "passing" + + +def test_ignore_connection_failures(): + session = Consul(host="invalid") + + tailf = get_tailf_service(session) + + # assert service starts + tailf.start() + assert tailf.process.poll() is None + + with mock.patch('ianitor.service.logger') as logger: + tailf.register() + assert logger.error.called + + with mock.patch('ianitor.service.logger') as logger: + tailf.keep_alive() + assert logger.error.called + + with mock.patch('ianitor.service.logger') as logger: + tailf.deregister() + assert logger.error.called + + tailf.deregister() + + # assert service can be killed + tailf.kill() + assert tailf.process.poll() diff --git a/tox.ini b/tox.ini index bdb7089..9cfa472 100644 --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,7 @@ envlist = py27,py34,pep8 [testenv] deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt + -r{toxinidir}/requirements-tests.txt setenv = VIRTUAL_ENV = {envdir} commands = py.test {posargs} sitepackages = False