Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
Boris Savelev committed Aug 22, 2020
0 parents commit ec868bb
Show file tree
Hide file tree
Showing 19 changed files with 726 additions and 0 deletions.
134 changes: 134 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/.idea

# Created by .ignore support plugin (hsz.mobi)
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
*.secret
17 changes: 17 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
language: python
python:
- 2.7
- 3.4
- 3.5
- 3.6
- 3.7
- 3.8

install:
- pip install tox coveralls

script:
- tox -e $(echo py$TRAVIS_PYTHON_VERSION | tr -d .)

after_success:
- coveralls
29 changes: 29 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
BSD 3-Clause License

Copyright (c) 2020, Boris Savelev
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# HTTP Monitoring tool

[![Build Status](https://travis-ci.com/bsavelev/httpmon.svg?branch=master)](https://travis-ci.com/bsavelev/httpmon)
[![Coverage Status](https://coveralls.io/repos/github/bsavelev/httpmon/badge.svg?branch=master)](https://coveralls.io/github/bsavelev/httpmon?branch=master)

## About
Distribution HTTP monitoring tool with Apache Kafka data pipeline and Postgres as storage.

## Usage
### Producer

```python ./httpmon-cli.py --kafka-server localhost:9093 --kafka-topic test -v producer -u https://www.google.com --period 2 --timeout 10 --body '.*<html.*>'```

Will check:
* Url `https://www.google.com`
* Every `2` second
* With timeout `10` seconds
* Check response body with `.*<html.*>`
* Push check result to Apache Kafka at `localhost:9093`

### Consumer
```python ./httpmon-cli.py --kafka-server localhost:9093 --kafka-topic test consumer --uri postgresql://postgres:secret@localhost/httpmon```

Will read stream from Apache Kafka and write date into Postgres DB specified with `--uri` option.

#### Database preparation

Create database and prepare it with initialization SQL:
```psql postgresql://postgres:secret@localhost/httpmon < sql/init.sql```
90 changes: 90 additions & 0 deletions httpmon-cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
import argparse

from httpmon.producer import regex_arg, check_loop, KafkaProducerWrapper
from httpmon.consumer import KafkaConsumerWrapper, consumer_loop, PGExporter
from httpmon.utils import prepare_kafka_connection_args

logger = logging.getLogger(__name__)


def main():
parser = argparse.ArgumentParser(
description='HTTP checker with Apache Kafka processing')
parser.add_argument(
'--kafka-server', type=str,
required=True, help='Kafka server [host:port]')
parser.add_argument(
'--kafka-topic', type=str,
required=True, help='Kafka topic')
parser.add_argument(
'--kafka-ca', type=argparse.FileType('r'),
help='Kafka SSL CA path')
parser.add_argument(
'--kafka-cert', type=argparse.FileType('r'),
help='Kafka SSL certificate path')
parser.add_argument(
'--kafka-key', type=argparse.FileType('r'),
help='Kafka SSL key path')
parser.add_argument(
"-v", "--verbosity", action="count",
help="increase output verbosity", default=0)
subparser = parser.add_subparsers(help='sub-command help', dest='command')
parser_producer = subparser.add_parser(
'producer', help='Perform HTTP check')
parser_consumer = subparser.add_parser(
'consumer', help='Consume message and store in SQL')
parser_consumer.add_argument(
'-u', '--uri', type=str,
required=True, help='PostgreSQL connection uri')
parser_producer.add_argument(
'-u', '--url', type=str, required=True, help='URL to check')
parser_producer.add_argument(
'-t', '--timeout', type=float, default=10.0, help='Check timeout')
parser_producer.add_argument(
'-p', '--period', type=float, default=5.0, help='Check period')
parser_producer.add_argument(
'-b', '--body', type=regex_arg, help='Regex for body check')
parser_producer.add_argument(
'-o', '--oneshot',
action='store_true', default=False, help='Run check once')
args = parser.parse_args()

level = logging.INFO
if args.verbosity > 0:
level = logging.DEBUG

logging.basicConfig(
format='%(asctime)s [%(levelname)s] '
'[%(module)s:%(funcName)s:%(lineno)s] %(message)s',
level=level
)

kafka_connection_args = prepare_kafka_connection_args(args)

if args.command == 'producer':
url = args.url
timeout = args.timeout
body_check_re = args.body
period = args.period
oneshot = args.oneshot

logger.info(
'run with: url=%s; period=%f; timeout=%f; body_check_re=%s' % (
url, period, timeout, body_check_re
))

check_loop(
url, period, timeout, body_check_re,
KafkaProducerWrapper(kafka_connection_args, args.kafka_topic),
oneshot=oneshot,
)
elif args.command == 'consumer':
consumer_loop(
KafkaConsumerWrapper(kafka_connection_args, args.kafka_topic),
PGExporter(args.uri)
)


if __name__ == '__main__':
main()
Empty file added httpmon/__init__.py
Empty file.
76 changes: 76 additions & 0 deletions httpmon/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from copy import deepcopy
import threading
import json
import logging
from datetime import datetime
import time
import uuid
from psycopg2 import pool as pgpool
import psycopg2
from kafka import KafkaConsumer

logger = logging.getLogger(__name__)


class KafkaConsumerWrapper:
def __init__(self, connection_args, topic):
self.topic = topic
consumer_args = {
'auto_offset_reset': "earliest",
'group_id': 'httpmon',
'client_id': uuid.uuid4(),
'value_deserializer': lambda v: json.loads(v.decode('utf-8')),
}
connection_args.update(consumer_args)
self.consumer = KafkaConsumer(**connection_args)
self.consumer.subscribe([self.topic])

def messages(self):
raw_msgs = self.consumer.poll(timeout_ms=1000)
for tp, msgs in raw_msgs.items():
for msg in msgs:
logger.info('got msg: %s' % str(msg))
yield msg.value
self.consumer.commit()


class PGExporter:
def __init__(self, connection_args, pool_size=10):
self.pool = pgpool.ThreadedConnectionPool(1, pool_size, connection_args)

@staticmethod
def worker(pool, sql, params):
conn = None
while not conn:
try:
conn = pool.getconn()
except pgpool.PoolError:
logger.warning("No free connection in pool")
time.sleep(1)

with conn.cursor() as curs:
try:
curs.execute(sql, params)
conn.commit()
logger.info("Put to DB: %s" % params)
except psycopg2.Error as e:
logger.error(e)
pool.putconn(conn)

def submit(self, message):
sql = """INSERT INTO checks
(url, timestamp, code, body_check_valid, time)
VALUES
(%(url)s, %(timestamp_dt)s, %(code)s, %(body_check_valid)s, %(time)s)
"""
message = deepcopy(message)
message['timestamp_dt'] = datetime.fromtimestamp(message['timestamp'])
thread = threading.Thread(
target=self.worker, args=(self.pool, sql, message))
thread.start()


def consumer_loop(consumer, exporter):
while True:
for message in consumer.messages():
exporter.submit(message)
Loading

0 comments on commit ec868bb

Please sign in to comment.