### SIEM

In [None]:
import requests
import time
import re
import os
import logging
from dataclasses import dataclass
from typing import Generator, Optional
from io import IOBase
from datetime import datetime, timedelta

from dotenv import load_dotenv

logger = logging.getLogger('SIEM')
logger.setLevel(logging.ERROR)

load_dotenv()
api_token = os.getenv('API_TOKEN')

In [None]:
def tail_follow(file: IOBase) -> Generator[str, None, None]:
    file.seek(0, 2)
    while True:
        line = file.readline()
        if line:
            yield line
            continue
        time.sleep(1)

In [None]:
def papertrail_tail(token: str) -> Generator[str, None, None]:
    url = "https://papertrailapp.com/api/v1/events/search.json"
    headers = {"X-Papertrail-Token": token}

    tail = False
    min_id = None
    limit = 10000

    while True:
        params = {"limit": limit}
        if tail and min_id:
            params["min_id"] = min_id
        else:
            params["min_time"] = int(time.time()) - 10
        
        r = requests.get(url, params=params, headers=headers, timeout=30)
        r.raise_for_status()
        data = r.json()
        tail = data.get("tail", False)
        min_id = data.get("max_id", None)
        yield from (e["message"] for e in data["events"])
        time.sleep(5)

In [None]:
@dataclass
class PacketInfo:
    """ Information about a network packet"""
    timestamp: str
    protocol: str
    src: str
    dst: str
    data: str
    

In [None]:
# TODO Syslog format parsing?

def parse_tcpdump(line: str) -> Optional[PacketInfo]:
    pattern = re.compile(r'(\d+:\d+:\d+\.\d+) (\w+) ([a-zA-Z0-9.]+) > ([a-zA-Z0-9.]+): (.+)$')
    match = pattern.match(line)
    if match:
        return PacketInfo(*list(match.groups()))
    return None

In [None]:
for line in papertrail_tail(api_token):
    pkt = parse_tcpdump(line)
    print(pkt)
    if not pkt:
        print("Packet format error")
    else:
        print(f"Packet from {pkt.src} to {pkt.dst}: {pkt.data}")

In [None]:
# Port scan detection
packet_counts = {}

time_window = timedelta(seconds=10)
packet_threshold = 5

start_time = None

for line in papertrail_tail(api_token):
    pkt = parse_tcpdump(line)
    if not pkt:
        logger.warning(f"Packet format error: {line}")
    else:
        logger.info(f"Packet from {pkt.src} to {pkt.dst}: {pkt.data}")

        pkt_time = datetime.strptime(pkt.timestamp, "%H:%M:%S.%f")

        if start_time is None or pkt_time - start_time > time_window:
            packet_counts.clear()
            start_time = pkt_time
            
        if pkt.dst not in packet_counts:
            packet_counts[pkt.dst] = 0
        else:
            packet_counts[pkt.dst] += 1
        
        if packet_counts[pkt.dst] > packet_threshold:
            logger.critical(f"Port scan attack detected from: {pkt.src}")

In [None]:
# DOS (SYN flood) detection
syn_counts = {}

time_window = timedelta(seconds=10)
packet_threshold = 1000

start_time = None

for line in papertrail_tail(api_token):
    pkt = parse_tcpdump(line)
    if not pkt:
        logger.warning(f"Packet format error: {line}")
    else:
        logger.info(f"Packet from {pkt.src} to {pkt.dst}: {pkt.data}")

        pkt_time = datetime.strptime(pkt.timestamp, "%H:%M:%S.%f")

        if start_time is None or pkt_time - start_time > time_window:
            syn_counts.clear()
            start_time = pkt_time

        if pkt.protocol == "TCP" and "S" in pkt.data:
            if pkt.dst not in syn_counts:
                syn_counts[pkt.dst] = 0
            else:
                syn_counts[pkt.dst] += 1

        if syn_counts[pkt.dst] > packet_threshold:
            logger.critical(f"DOS (SYN flood) attack from: {pkt.src}")