Skip to content

Commit

Permalink
Fixed writing algorithm and enhanced log line cases
Browse files Browse the repository at this point in the history
  • Loading branch information
afonsoc12 committed Feb 13, 2022
1 parent 1269605 commit ecb8314
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 78 deletions.
88 changes: 39 additions & 49 deletions intrusion_monitor/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from influxdb import InfluxDBClient

from .log_line import LogLine

INFLUXDB_HOST = os.getenv('INFLUXDB_HOST', 'localhost')
INFLUXDB_PORT = os.getenv('INFLUXDB_PORT', '8086')
INFLUXDB_DATABASE = os.getenv('INFLUXDB_DATABASE', 'intrusion_monitor')
Expand Down Expand Up @@ -55,64 +57,52 @@ def check_status(self):

return ver

def query_last_stored_ip_data(self, ip, time='1w'):
q = f'select * from failed_logins where ip=\'{ip}\' and time > now() - {time} order by time desc limit 1;'

res = list(self.conn.query(q))

def write_log_line(self, log_line, ip_info):
if not ip_info:
logging.debug('IP info is not available. Discarding')
measure = [
{
"measurement": "failed_logins",
"tags": {
"username": log_line.attempt_username,
"port": log_line.attempt_port,
"ip": log_line.attempt_ip
},
"fields": {
"value": 1
}
}
]
if len(res) > 0:
res = res[0][0]

else:
logging.debug('IP info is available')
measure = [
{
"measurement": "failed_logins",
"tags": {
"geohash": ip_info['geohash'],
"latitude": ip_info['latitude'],
"longitude": ip_info['longitude'],
"username": log_line.attempt_username,
"port": log_line.attempt_port,
"ip": log_line.attempt_ip,
'type': ip_info['type'],
'continent_code': ip_info['continent_code'],
'continent_name': ip_info['continent_name'],
'country_code': ip_info['country_code'],
'country_name': ip_info['country_name'],
'region_code': ip_info['region_code'],
'region_name': ip_info['region_name'],
'city': ip_info['city'],
'zip': ip_info['zip'],
'country_flag_emoji': ip_info['location']['country_flag_emoji'],
'capital': ip_info['location']['capital'],
'calling_code': ip_info['location']['calling_code'],
'is_eu': ip_info['location']['is_eu']
},
"fields": {
"value": 1
}
# No results
pass
return 1

def write_log_line(self, log_line: LogLine, ip_info: dict):

#
measure = {
"measurement": "failed_logins",
"tags": {
"host": log_line.hostname,
"log_process_name": log_line.process_name,
"log_process_id": log_line.process_id,
"log_line": log_line.log_line,
"username": log_line.attempt_username,
"port": log_line.attempt_port,
"ip": log_line.attempt_ip
},
"fields": {
"success": 0
}
]
}

# If ip_info available, extend tags
if ip_info:
measure["api_response"] = ip_info
measure['tags'] = {**measure['tags'], **ip_info}

if log_line.timestamp and isinstance(log_line.timestamp, datetime):
log_line_timestamp = log_line.timestamp
if log_line_timestamp and isinstance(log_line_timestamp, datetime):
logging.debug('Setting timestamp, since it is available')
measure[0]['time'] = log_line.timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
measure['time'] = log_line_timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')

self.conn.write_points(measure)
logging.debug(f'Generated measure to be inserted: {measure}')
status = self.conn.write_points([measure])

return measure
return status


@staticmethod
Expand Down
30 changes: 17 additions & 13 deletions intrusion_monitor/log_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
from datetime import datetime
import pytz

from .api import api_call
from .utils import extract_ip_info

TZ = pytz.timezone(os.getenv('TZ', 'Europe/London'))
TZ_UTC = pytz.utc

class LogLine:
failed_possibilities = ['Failed password for invalid user ', # This one must be tested first, otherwise it isnt caught by the next one
'Failed password for ',
'Connection closed by invalid user ',
'Connection closed by authenticating user ',
'Invalid user '
]

def __init__(self, log_line):
self.log_line = log_line
Expand Down Expand Up @@ -39,10 +46,9 @@ def is_login_attempt(self):
Returns a tuple with two values: line is login attempt and reason to consider as such.
"""
possibilities = ('Connection closed by', 'Failed password for', )

status = (False, None,)
for p in possibilities:
for p in self.failed_possibilities:
if self.message.startswith(p):
status = (True, p,)
break
Expand Down Expand Up @@ -172,15 +178,9 @@ def attempt_username(self):
In all case, the username is always preceeded by either 'for' or 'user'
"""

possibilities = ['Failed password for invalid user ', # This one must be tested first, otherwise it isnt caught by the next one
'Failed password for ',
'Connection closed by invalid user ',
'Connection closed by authenticating user '
]

is_found = False

for p in possibilities:
for p in self.failed_possibilities:
suffix_p = self.message.split(p)
if len(suffix_p) == 1:
logging.debug(f'No matches for username prefix: "{p}"')
Expand All @@ -202,9 +202,13 @@ def get_ip_info(self, db):
Before attempting API call, it will search the database for an IP match in the previous week.
"""
#data_db = db.query_last_stored_ip_data(self.attempt_ip)
#todo https://realpython.com/caching-external-api-requests/
# if data_db:

try:
data = api_call(self.attempt_ip)
return extract_ip_info(data)
except:
logging.error('API returned a non 200 code or response is not valid', exc_info=True)
return None



return extract_ip_info(self.attempt_ip)
31 changes: 15 additions & 16 deletions intrusion_monitor/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,29 @@
import os

from .db import InfluxDB
from .log_parser import LogLine
from .log_line import LogLine

SLEEP_SECONDS = 1


class Watchdog:

def __init__(self, log_path):
def __init__(self, log_path, db='influxdb'):
self.log_path = log_path
self.db = InfluxDB()

def start(self):

logging.debug('Checking connection status...')
status = self.db.check_status()
if status:
logging.debug(f'Got connection successfully. InfluxDB version {status}')
if db == 'influxdb':
self.db = InfluxDB()
else:
logging.error('Connection timmed out')
raise ConnectionError('Could not connect. Connection timed out')
err = f'Database type {db} is not currently supported'
logging.error(err)
raise NotImplementedError(err)

def start(self):

# Start watching logs
with open(self.log_path, 'r') as log_file:
logging.debug(f'Opened context manager for file {self.log_path}')
logging.debug(f'Opened context manager for file {self.log_path.absolute()}')
logging.info(f'Listening for new lines on {self.log_path.absolute()}...')
for new_lines in self.line_watcher(log_file):
logging.debug(f'Processing the lines found...')
new_lines = self.filter_lines(new_lines)
Expand All @@ -36,14 +35,14 @@ def start(self):
log_line = LogLine(line)
line_class = log_line.is_login_attempt()
if line_class[0]:
logging.debug(f'Line is a login attempt. Reason: {line_class[1]}. Going to be processed...')
ip_info = log_line.get_ip_info()
logging.debug(f'Line is a login attempt. Reason: Matches "{line_class[1]}". Going to be processed...')
ip_info = log_line.get_ip_info(self.db)
stored_data = self.db.write_log_line(log_line, ip_info)
logging.info(f'Log line successfully written: {stored_data}')
else:
logging.debug(f'Line is not a login attempt. Reason: {line_class[1]}. Skipping...')
logging.debug(f'Line is not a login attempt. Reason: Matches "{line_class[1]}". Skipping...')

logging.debug(f'Closed context manager to {self.log_path}')
logging.debug(f'Closed context manager to {self.log_path.absolute()}')
logging.info(f'Watchdog has stopped')

def line_watcher(self, file):
Expand Down

0 comments on commit ecb8314

Please sign in to comment.