From ecb8314a551a54bbec4a3ec6909a84ccc7b0c356 Mon Sep 17 00:00:00 2001 From: Afonso Costa Date: Sun, 13 Feb 2022 14:32:05 +0000 Subject: [PATCH] Fixed writing algorithm and enhanced log line cases --- intrusion_monitor/db.py | 88 ++++++++++++++++------------------- intrusion_monitor/log_line.py | 30 ++++++------ intrusion_monitor/watchdog.py | 31 ++++++------ 3 files changed, 71 insertions(+), 78 deletions(-) diff --git a/intrusion_monitor/db.py b/intrusion_monitor/db.py index 6ab9a4d..b0340b4 100644 --- a/intrusion_monitor/db.py +++ b/intrusion_monitor/db.py @@ -7,6 +7,8 @@ from influxdb import InfluxDBClient +from .log_line import LogLine + INFLUXDB_HOST = os.getenv('INFLUXDB_HOST', 'localhost') INFLUXDB_PORT = os.getenv('INFLUXDB_PORT', '8086') INFLUXDB_DATABASE = os.getenv('INFLUXDB_DATABASE', 'intrusion_monitor') @@ -55,64 +57,52 @@ def check_status(self): return ver + def query_last_stored_ip_data(self, ip, time='1w'): + q = f'select * from failed_logins where ip=\'{ip}\' and time > now() - {time} order by time desc limit 1;' + res = list(self.conn.query(q)) - def write_log_line(self, log_line, ip_info): - if not ip_info: - logging.debug('IP info is not available. Discarding') - measure = [ - { - "measurement": "failed_logins", - "tags": { - "username": log_line.attempt_username, - "port": log_line.attempt_port, - "ip": log_line.attempt_ip - }, - "fields": { - "value": 1 - } - } - ] + if len(res) > 0: + res = res[0][0] else: - logging.debug('IP info is available') - measure = [ - { - "measurement": "failed_logins", - "tags": { - "geohash": ip_info['geohash'], - "latitude": ip_info['latitude'], - "longitude": ip_info['longitude'], - "username": log_line.attempt_username, - "port": log_line.attempt_port, - "ip": log_line.attempt_ip, - 'type': ip_info['type'], - 'continent_code': ip_info['continent_code'], - 'continent_name': ip_info['continent_name'], - 'country_code': ip_info['country_code'], - 'country_name': ip_info['country_name'], - 'region_code': ip_info['region_code'], - 'region_name': ip_info['region_name'], - 'city': ip_info['city'], - 'zip': ip_info['zip'], - 'country_flag_emoji': ip_info['location']['country_flag_emoji'], - 'capital': ip_info['location']['capital'], - 'calling_code': ip_info['location']['calling_code'], - 'is_eu': ip_info['location']['is_eu'] - }, - "fields": { - "value": 1 - } + # No results + pass + return 1 + + def write_log_line(self, log_line: LogLine, ip_info: dict): + + # + measure = { + "measurement": "failed_logins", + "tags": { + "host": log_line.hostname, + "log_process_name": log_line.process_name, + "log_process_id": log_line.process_id, + "log_line": log_line.log_line, + "username": log_line.attempt_username, + "port": log_line.attempt_port, + "ip": log_line.attempt_ip + }, + "fields": { + "success": 0 } - ] + } + + # If ip_info available, extend tags + if ip_info: + measure["api_response"] = ip_info + measure['tags'] = {**measure['tags'], **ip_info} - if log_line.timestamp and isinstance(log_line.timestamp, datetime): + log_line_timestamp = log_line.timestamp + if log_line_timestamp and isinstance(log_line_timestamp, datetime): logging.debug('Setting timestamp, since it is available') - measure[0]['time'] = log_line.timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') + measure['time'] = log_line_timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') - self.conn.write_points(measure) + logging.debug(f'Generated measure to be inserted: {measure}') + status = self.conn.write_points([measure]) - return measure + return status @staticmethod diff --git a/intrusion_monitor/log_line.py b/intrusion_monitor/log_line.py index c25d09e..8f3d616 100644 --- a/intrusion_monitor/log_line.py +++ b/intrusion_monitor/log_line.py @@ -4,12 +4,19 @@ from datetime import datetime import pytz +from .api import api_call from .utils import extract_ip_info TZ = pytz.timezone(os.getenv('TZ', 'Europe/London')) TZ_UTC = pytz.utc class LogLine: + failed_possibilities = ['Failed password for invalid user ', # This one must be tested first, otherwise it isnt caught by the next one + 'Failed password for ', + 'Connection closed by invalid user ', + 'Connection closed by authenticating user ', + 'Invalid user ' + ] def __init__(self, log_line): self.log_line = log_line @@ -39,10 +46,9 @@ def is_login_attempt(self): Returns a tuple with two values: line is login attempt and reason to consider as such. """ - possibilities = ('Connection closed by', 'Failed password for', ) status = (False, None,) - for p in possibilities: + for p in self.failed_possibilities: if self.message.startswith(p): status = (True, p,) break @@ -172,15 +178,9 @@ def attempt_username(self): In all case, the username is always preceeded by either 'for' or 'user' """ - possibilities = ['Failed password for invalid user ', # This one must be tested first, otherwise it isnt caught by the next one - 'Failed password for ', - 'Connection closed by invalid user ', - 'Connection closed by authenticating user ' - ] - is_found = False - for p in possibilities: + for p in self.failed_possibilities: suffix_p = self.message.split(p) if len(suffix_p) == 1: logging.debug(f'No matches for username prefix: "{p}"') @@ -202,9 +202,13 @@ def get_ip_info(self, db): Before attempting API call, it will search the database for an IP match in the previous week. """ - #data_db = db.query_last_stored_ip_data(self.attempt_ip) - #todo https://realpython.com/caching-external-api-requests/ - # if data_db: + + try: + data = api_call(self.attempt_ip) + return extract_ip_info(data) + except: + logging.error('API returned a non 200 code or response is not valid', exc_info=True) + return None + - return extract_ip_info(self.attempt_ip) diff --git a/intrusion_monitor/watchdog.py b/intrusion_monitor/watchdog.py index da3dbf0..1a0db4e 100644 --- a/intrusion_monitor/watchdog.py +++ b/intrusion_monitor/watchdog.py @@ -3,30 +3,29 @@ import os from .db import InfluxDB -from .log_parser import LogLine +from .log_line import LogLine SLEEP_SECONDS = 1 class Watchdog: - def __init__(self, log_path): + def __init__(self, log_path, db='influxdb'): self.log_path = log_path - self.db = InfluxDB() - def start(self): - - logging.debug('Checking connection status...') - status = self.db.check_status() - if status: - logging.debug(f'Got connection successfully. InfluxDB version {status}') + if db == 'influxdb': + self.db = InfluxDB() else: - logging.error('Connection timmed out') - raise ConnectionError('Could not connect. Connection timed out') + err = f'Database type {db} is not currently supported' + logging.error(err) + raise NotImplementedError(err) + + def start(self): # Start watching logs with open(self.log_path, 'r') as log_file: - logging.debug(f'Opened context manager for file {self.log_path}') + logging.debug(f'Opened context manager for file {self.log_path.absolute()}') + logging.info(f'Listening for new lines on {self.log_path.absolute()}...') for new_lines in self.line_watcher(log_file): logging.debug(f'Processing the lines found...') new_lines = self.filter_lines(new_lines) @@ -36,14 +35,14 @@ def start(self): log_line = LogLine(line) line_class = log_line.is_login_attempt() if line_class[0]: - logging.debug(f'Line is a login attempt. Reason: {line_class[1]}. Going to be processed...') - ip_info = log_line.get_ip_info() + logging.debug(f'Line is a login attempt. Reason: Matches "{line_class[1]}". Going to be processed...') + ip_info = log_line.get_ip_info(self.db) stored_data = self.db.write_log_line(log_line, ip_info) logging.info(f'Log line successfully written: {stored_data}') else: - logging.debug(f'Line is not a login attempt. Reason: {line_class[1]}. Skipping...') + logging.debug(f'Line is not a login attempt. Reason: Matches "{line_class[1]}". Skipping...') - logging.debug(f'Closed context manager to {self.log_path}') + logging.debug(f'Closed context manager to {self.log_path.absolute()}') logging.info(f'Watchdog has stopped') def line_watcher(self, file):