diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..956404c --- /dev/null +++ b/.gitignore @@ -0,0 +1,134 @@ +/.idea + +# Created by .ignore support plugin (hsz.mobi) +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ +*.secret diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..1c8c784 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,17 @@ +language: python +python: + - 2.7 + - 3.4 + - 3.5 + - 3.6 + - 3.7 + - 3.8 + +install: + - pip install tox coveralls + +script: + - tox -e $(echo py$TRAVIS_PYTHON_VERSION | tr -d .) + +after_success: + - coveralls diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e76d345 --- /dev/null +++ b/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2020, Boris Savelev +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..d877bcb --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# HTTP Monitoring tool + +[![Build Status](https://travis-ci.com/bsavelev/httpmon.svg?branch=master)](https://travis-ci.com/bsavelev/httpmon) +[![Coverage Status](https://coveralls.io/repos/github/bsavelev/httpmon/badge.svg?branch=master)](https://coveralls.io/github/bsavelev/httpmon?branch=master) + +## About +Distribution HTTP monitoring tool with Apache Kafka data pipeline and Postgres as storage. + +## Usage +### Producer + +```python ./httpmon-cli.py --kafka-server localhost:9093 --kafka-topic test -v producer -u https://www.google.com --period 2 --timeout 10 --body '.*'``` + +Will check: +* Url `https://www.google.com` +* Every `2` second +* With timeout `10` seconds +* Check response body with `.*` +* Push check result to Apache Kafka at `localhost:9093` + +### Consumer +```python ./httpmon-cli.py --kafka-server localhost:9093 --kafka-topic test consumer --uri postgresql://postgres:secret@localhost/httpmon``` + +Will read stream from Apache Kafka and write date into Postgres DB specified with `--uri` option. + +#### Database preparation + +Create database and prepare it with initialization SQL: +```psql postgresql://postgres:secret@localhost/httpmon < sql/init.sql``` diff --git a/httpmon-cli.py b/httpmon-cli.py new file mode 100644 index 0000000..a3d697f --- /dev/null +++ b/httpmon-cli.py @@ -0,0 +1,90 @@ +import logging +import argparse + +from httpmon.producer import regex_arg, check_loop, KafkaProducerWrapper +from httpmon.consumer import KafkaConsumerWrapper, consumer_loop, PGExporter +from httpmon.utils import prepare_kafka_connection_args + +logger = logging.getLogger(__name__) + + +def main(): + parser = argparse.ArgumentParser( + description='HTTP checker with Apache Kafka processing') + parser.add_argument( + '--kafka-server', type=str, + required=True, help='Kafka server [host:port]') + parser.add_argument( + '--kafka-topic', type=str, + required=True, help='Kafka topic') + parser.add_argument( + '--kafka-ca', type=argparse.FileType('r'), + help='Kafka SSL CA path') + parser.add_argument( + '--kafka-cert', type=argparse.FileType('r'), + help='Kafka SSL certificate path') + parser.add_argument( + '--kafka-key', type=argparse.FileType('r'), + help='Kafka SSL key path') + parser.add_argument( + "-v", "--verbosity", action="count", + help="increase output verbosity", default=0) + subparser = parser.add_subparsers(help='sub-command help', dest='command') + parser_producer = subparser.add_parser( + 'producer', help='Perform HTTP check') + parser_consumer = subparser.add_parser( + 'consumer', help='Consume message and store in SQL') + parser_consumer.add_argument( + '-u', '--uri', type=str, + required=True, help='PostgreSQL connection uri') + parser_producer.add_argument( + '-u', '--url', type=str, required=True, help='URL to check') + parser_producer.add_argument( + '-t', '--timeout', type=float, default=10.0, help='Check timeout') + parser_producer.add_argument( + '-p', '--period', type=float, default=5.0, help='Check period') + parser_producer.add_argument( + '-b', '--body', type=regex_arg, help='Regex for body check') + parser_producer.add_argument( + '-o', '--oneshot', + action='store_true', default=False, help='Run check once') + args = parser.parse_args() + + level = logging.INFO + if args.verbosity > 0: + level = logging.DEBUG + + logging.basicConfig( + format='%(asctime)s [%(levelname)s] ' + '[%(module)s:%(funcName)s:%(lineno)s] %(message)s', + level=level + ) + + kafka_connection_args = prepare_kafka_connection_args(args) + + if args.command == 'producer': + url = args.url + timeout = args.timeout + body_check_re = args.body + period = args.period + oneshot = args.oneshot + + logger.info( + 'run with: url=%s; period=%f; timeout=%f; body_check_re=%s' % ( + url, period, timeout, body_check_re + )) + + check_loop( + url, period, timeout, body_check_re, + KafkaProducerWrapper(kafka_connection_args, args.kafka_topic), + oneshot=oneshot, + ) + elif args.command == 'consumer': + consumer_loop( + KafkaConsumerWrapper(kafka_connection_args, args.kafka_topic), + PGExporter(args.uri) + ) + + +if __name__ == '__main__': + main() diff --git a/httpmon/__init__.py b/httpmon/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/httpmon/consumer.py b/httpmon/consumer.py new file mode 100644 index 0000000..85a4860 --- /dev/null +++ b/httpmon/consumer.py @@ -0,0 +1,76 @@ +from copy import deepcopy +import threading +import json +import logging +from datetime import datetime +import time +import uuid +from psycopg2 import pool as pgpool +import psycopg2 +from kafka import KafkaConsumer + +logger = logging.getLogger(__name__) + + +class KafkaConsumerWrapper: + def __init__(self, connection_args, topic): + self.topic = topic + consumer_args = { + 'auto_offset_reset': "earliest", + 'group_id': 'httpmon', + 'client_id': uuid.uuid4(), + 'value_deserializer': lambda v: json.loads(v.decode('utf-8')), + } + connection_args.update(consumer_args) + self.consumer = KafkaConsumer(**connection_args) + self.consumer.subscribe([self.topic]) + + def messages(self): + raw_msgs = self.consumer.poll(timeout_ms=1000) + for tp, msgs in raw_msgs.items(): + for msg in msgs: + logger.info('got msg: %s' % str(msg)) + yield msg.value + self.consumer.commit() + + +class PGExporter: + def __init__(self, connection_args, pool_size=10): + self.pool = pgpool.ThreadedConnectionPool(1, pool_size, connection_args) + + @staticmethod + def worker(pool, sql, params): + conn = None + while not conn: + try: + conn = pool.getconn() + except pgpool.PoolError: + logger.warning("No free connection in pool") + time.sleep(1) + + with conn.cursor() as curs: + try: + curs.execute(sql, params) + conn.commit() + logger.info("Put to DB: %s" % params) + except psycopg2.Error as e: + logger.error(e) + pool.putconn(conn) + + def submit(self, message): + sql = """INSERT INTO checks + (url, timestamp, code, body_check_valid, time) + VALUES + (%(url)s, %(timestamp_dt)s, %(code)s, %(body_check_valid)s, %(time)s) + """ + message = deepcopy(message) + message['timestamp_dt'] = datetime.fromtimestamp(message['timestamp']) + thread = threading.Thread( + target=self.worker, args=(self.pool, sql, message)) + thread.start() + + +def consumer_loop(consumer, exporter): + while True: + for message in consumer.messages(): + exporter.submit(message) diff --git a/httpmon/producer.py b/httpmon/producer.py new file mode 100644 index 0000000..c91b0e4 --- /dev/null +++ b/httpmon/producer.py @@ -0,0 +1,97 @@ +import re +import threading +import time +import requests +import logging +import json +from kafka import KafkaProducer + +logger = logging.getLogger(__name__) + + +class KafkaProducerWrapper: + def __init__(self, connection_args, topic, kafka_cls=KafkaProducer): + self.topic = topic + connection_args['value_serializer'] = \ + lambda v: json.dumps(v).encode('utf-8') + self.producer = kafka_cls(**connection_args) + + def send(self, message): + logger.info('put to queue: %s' % message) + self.producer.send(self.topic, message) + self.producer.flush() + + +def check_body(body, regex): + """match string with regex""" + if body and regex: + return bool(re.match(regex, str(body))) + else: + return False + + +def check(url, body_check_re=r'', timeout=10.0, producer=None): + """check url with timeout and body regex check""" + timestamp = time.time() + code = 0 + body = None + response_time = timeout + + try: + response = requests.get( + url, + timeout=timeout, + allow_redirects=False, + headers={ + 'UserAgent': 'Aiven health checker/0.1' + } + ) + except requests.Timeout: + code = 499 # client closed request + except requests.RequestException: + pass + else: + code = response.status_code + response_time = response.elapsed.total_seconds() + body = response.content + + body_check_valid = check_body(body, body_check_re) + + message = { + 'timestamp': timestamp, + 'code': code, + 'body_check_valid': body_check_valid, + 'time': response_time, + 'url': url, + } + if producer: + producer.send(message) + return message + + +def check_loop( + url, period=5, timeout=10, body_check_re='', + producer=None, oneshot=False): + """inf loop for url checks + """ + while True: + worker = threading.Thread(target=check, kwargs={ + 'url': url, + 'timeout': timeout, + 'body_check_re': body_check_re, + 'producer': producer, + }) + logger.info('check url=%s' % url) + worker.start() + time.sleep(period) + if oneshot: + return + + +def regex_arg(regex): + """custom regex type from https://stackoverflow.com/a/37472037""" + try: + re.compile(regex) + except re.error: + raise ValueError + return regex diff --git a/httpmon/stub.py b/httpmon/stub.py new file mode 100644 index 0000000..b559d57 --- /dev/null +++ b/httpmon/stub.py @@ -0,0 +1,12 @@ +from collections import defaultdict + + +class KafkaProducerStub: + def __init__(self, *args, **kwargs): + self.messages = defaultdict(list) + + def send(self, topic, message): + self.messages[topic].append(message) + + def flush(self): + pass diff --git a/httpmon/utils.py b/httpmon/utils.py new file mode 100644 index 0000000..f562ed0 --- /dev/null +++ b/httpmon/utils.py @@ -0,0 +1,29 @@ +from collections import namedtuple + +KafkaArgs = namedtuple( + 'KafkaArgs', + [ + 'kafka_server', + 'kafka_ca', + 'kafka_cert', + 'kafka_key', + ]) + + +def prepare_kafka_connection_args(args): + kafka_ssl = {} + for kafka_arg in ['kafka_ca', 'kafka_cert', 'kafka_key']: + kafka_ssl[kafka_arg] = getattr(args, kafka_arg) + + kafka_connection_args = { + 'bootstrap_servers': args.kafka_server, + } + if all(kafka_ssl.values()): + kafka_connection_args_ssl = { + 'security_protocol': 'SSL', + 'ssl_cafile': kafka_ssl['kafka_ca'], + 'ssl_certfile': kafka_ssl['kafka_cert'], + 'ssl_keyfile': kafka_ssl['kafka_key'], + } + kafka_connection_args.update(kafka_connection_args_ssl) + return kafka_connection_args diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0b82ca1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +kafka-python==2.0.1 +psycopg2==2.8.5 +requests==2.24.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..5c6311d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[bdist_wheel] +universal=1 + +[metadata] +license_file = LICENSE diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b83e807 --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +from setuptools import setup + +setup( + name='httpmon', + version='0.1', + packages=['tests', 'httpmon'], + url='https://github.com/bsavelev/httpmon', + license='BSD', + author='Boris Savelev', + author_email='boris.savelev@gmail.com', + description='Distribution HTTP monitoring tool with Apache Kafka data pipeline and Postgres as storage', + scripts=('httpmon-cli.py',), + data_files=[('sql', ['sql/init.sql'])], + install_requires=[ + 'requests', + 'psycopg2', + 'kafka-python', + ] +) diff --git a/sql/init.sql b/sql/init.sql new file mode 100644 index 0000000..dc3cfe9 --- /dev/null +++ b/sql/init.sql @@ -0,0 +1,13 @@ +CREATE SEQUENCE checks_id_seq; +CREATE TABLE public.checks +( + id integer NOT NULL DEFAULT nextval('checks_id_seq'), + created timestamp without time zone NOT NULL DEFAULT now(), + url text COLLATE pg_catalog."default" NOT NULL, + code integer NOT NULL, + body_check_valid boolean NOT NULL, + "time" real NOT NULL, + "timestamp" timestamp without time zone NOT NULL, + CONSTRAINT checks_pkey PRIMARY KEY (id) + +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..c6b3f4b --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,56 @@ +import os +import tempfile +import shutil +import time +import unittest +from httpmon.producer import check, KafkaProducerWrapper +from httpmon.consumer import PGExporter, KafkaConsumerWrapper +from httpmon.utils import prepare_kafka_connection_args, KafkaArgs + + +class IntegrationTestCase(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + for name in ['KAFKA_CRT', 'KAFKA_KEY', 'KAFKA_CA']: + with open(os.path.join(self.tempdir, name), 'w') as fp: + fp.write(os.environ.get(name, '')) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + @property + def _kafka_connection(self): + args = KafkaArgs( + kafka_server=os.environ.get('KAFKA_SERVER'), + kafka_ca=os.path.join(self.tempdir, 'KAFKA_CA'), + kafka_cert=os.path.join(self.tempdir, 'KAFKA_CRT'), + kafka_key=os.path.join(self.tempdir, 'KAFKA_KEY'), + ) + if os.environ.get('KAFKA_NO_SSL'): + # localhost testing purpose + args = KafkaArgs( + kafka_server=os.environ.get('KAFKA_SERVER'), + kafka_ca=None, + kafka_cert=None, + kafka_key=None, + ) + + return prepare_kafka_connection_args(args) + + def test_run(self): + """ + Make one check, push to kafka, read from kafka, put to DB + """ + consumer = KafkaConsumerWrapper(self._kafka_connection, 'unittest') + exporter = PGExporter(os.environ.get('PG_URI')) + producer = KafkaProducerWrapper(self._kafka_connection, 'unittest') + need_message = True + while need_message: + # parallel testing create many consumers + # produce until got one message + check('https://www.google.com', producer=producer) + time.sleep(1) + for message in consumer.messages(): + exporter.submit(message) + need_message = False diff --git a/tests/test_producer.py b/tests/test_producer.py new file mode 100644 index 0000000..b4bcff6 --- /dev/null +++ b/tests/test_producer.py @@ -0,0 +1,65 @@ +import unittest +from httpmon.producer import ( + check, check_body, check_loop, KafkaProducerWrapper +) +from httpmon.stub import KafkaProducerStub + + +class CheckBodyTestCase(unittest.TestCase): + def test_empty_body(self): + self.assertFalse(check_body('', '.*')) + + def test_empty_regex(self): + self.assertFalse(check_body('test', '')) + + def test_match(self): + self.assertTrue(check_body('ffff', '.*')) + + def test_not_match(self): + self.assertFalse(check_body('ffff', 'nonexists')) + + +class CheckURLTestCase(unittest.TestCase): + def test_200(self): + r = check('https://www.google.com') + self.assertEqual(r['code'], 200) + + def test_301(self): + """test not follow redirects""" + r = check('https://google.com') + self.assertEqual(r['code'], 301) + + def test_nonexists_domain(self): + r = check('https://google1111.com') + self.assertEqual(r['code'], 0) + + def test_request_timeout(self): + r = check( + 'http://slowwly.robertomurray.co.uk' + '/delay/2000/url/https://www.google.com', + timeout=1) + self.assertEqual(r['code'], 499) + self.assertEqual(r['time'], 1) + + def test_200_body_check(self): + r = check('https://www.google.com', body_check_re='.*Google.*') + self.assertTrue(r['body_check_valid']) + + def test_with_producer(self): + topic = 'unittest' + producer = KafkaProducerWrapper({}, topic=topic, kafka_cls=KafkaProducerStub) + r = check('https://www.google.com', producer=producer) + self.assertEqual(r['code'], 200) + self.assertEqual(len(producer.producer.messages[topic]), 1) + self.assertEqual(producer.producer.messages[topic][0]['code'], 200) + + +class CheckLoopTestCase(unittest.TestCase): + def test_loop(self): + topic = 'unittest' + producer = KafkaProducerWrapper({}, topic=topic, kafka_cls=KafkaProducerStub) + check_loop( + 'https://www.google.com', + producer=producer, period=1, oneshot=True) + self.assertEqual(len(producer.producer.messages[topic]), 1) + self.assertEqual(producer.producer.messages[topic][0]['code'], 200) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..4b4c142 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,42 @@ +import unittest +from httpmon.utils import prepare_kafka_connection_args, KafkaArgs + + +class TestPrepareKafka(unittest.TestCase): + + def test_no_ssl(self): + args = KafkaArgs( + kafka_server='server', + kafka_ca=None, + kafka_cert=None, + kafka_key=None, + ) + r = prepare_kafka_connection_args(args) + self.assertDictEqual(r, {'bootstrap_servers': 'server'}) + + def test_part_ssl(self): + args = KafkaArgs( + kafka_server='server', + kafka_ca=__file__, + kafka_cert=None, + kafka_key=None, + ) + r = prepare_kafka_connection_args(args) + self.assertDictEqual(r, {'bootstrap_servers': 'server'}) + + def test_ssl(self): + args = KafkaArgs( + kafka_server='server', + kafka_ca='path', + kafka_cert='path', + kafka_key='path', + ) + r = prepare_kafka_connection_args(args) + expected = { + 'security_protocol': 'SSL', + 'ssl_cafile': 'path', + 'ssl_certfile': 'path', + 'ssl_keyfile': 'path', + 'bootstrap_servers': 'server' + } + self.assertDictEqual(r, expected) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..ca486d2 --- /dev/null +++ b/tox.ini @@ -0,0 +1,10 @@ +[tox] +envlist = py{27,34,35,36,37,38} + +[testenv] +deps = + pytest + pytest-cov +commands = + py.test {posargs:--cov=httpmon} +passenv = KAFKA_SERVER KAFKA_NO_SSL KAFKA_CA KAFKA_CRT KAFKA_KEY PG_URI