# Log Anomaly Detection mit Machine Learning

Dieses Notebook demonstriert, wie Machine Learning f√ºr die Erkennung von Anomalien in Security-Logs eingesetzt werden kann:

1. **Synthetische Logs generieren** - Normale und anomale Events simulieren
2. **Feature Engineering** - Log-Daten f√ºr ML aufbereiten
3. **Isolation Forest Training** - Anomalie-Modell trainieren
4. **Anomaly Detection** - Verd√§chtige Events identifizieren
5. **Report Generation** - Ergebnisse visualisieren und dokumentieren

**Lernziele:**
- Verstehen, wie ML bei Security-Monitoring hilft
- Isolation Forest f√ºr Anomalie-Erkennung einsetzen
- Log-Daten f√ºr ML-Modelle vorbereiten
- Anomalie-Reports erstellen und interpretieren

**Use Cases:**
- Erkennung von ungew√∂hnlichen Login-Zeiten
- Identifikation verd√§chtiger IP-Adressen
- Auff√§llige Zugriffsmuster auf Ressourcen
- Fr√ºherkennung von Insider-Threats

## Schritt 1: Setup - Imports und Vorbereitung

Zuerst importieren wir alle ben√∂tigten Bibliotheken und definieren die Pfade f√ºr unsere Demo-Dateien.

In [None]:
import datetime as dt
import hashlib
import random
from typing import List, Dict

import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest

# Pfade definieren
logs_file = "logs.csv"
report_file = "anomalies_report.txt"

print("‚úì Imports erfolgreich")
print(f"‚úì Log-Datei: {logs_file}")
print(f"‚úì Report-Datei: {report_file}")

---

# Teil 1: Synthetische Log-Daten generieren

## Normale vs. Anomale Events

Wir simulieren Security-Logs mit zwei Arten von Events:

**Normale Events:**
- Geschehen w√§hrend typischer Arbeitszeiten (08:00-18:00)
- Von bekannten Benutzern (alice, bob, charlie, diana)
- Von internen IP-Adressen (10.x.x.x)
- Typische Aktionen: login, logout, file_access

**Anomale Events:**
- Geschehen zu ungew√∂hnlichen Zeiten (nachts 00:00-05:00 oder 22:00-23:00)
- Manchmal von unbekannten Benutzern (eve, mallory)
- Von externen/ungew√∂hnlichen IP-Adressen (192.168.x.x)
- Gleiche Aktionen, aber verd√§chtiger Kontext

## Helper-Funktion: Zuf√§llige IP-Adresse

In [None]:
def random_ip() -> str:
    """Generiert eine zuf√§llige IPv4-Adresse im privaten Bereich 10.0.0.0/8."""
    return f"10.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"

# Test
print(f"Beispiel IP: {random_ip()}")

## Normale Events generieren

Wir erstellen 500 normale Log-Eintr√§ge w√§hrend typischer Arbeitszeiten.

In [None]:
random.seed(42)

users_normal = ["andi", "dominik", "marcel", "diana"]
actions = ["login", "logout", "file_access"]
resources = ["/etc/passwd", "/var/log/syslog", "/home/shared/report.pdf"]

n_normal = 500
rows: List[Dict[str, str]] = []
start_date = dt.datetime.now(dt.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)

for _ in range(n_normal):
    # Zeitpunkt zwischen 8 und 18 Uhr
    hour = random.randint(8, 18)
    minute = random.randint(0, 59)
    ts = start_date + dt.timedelta(hours=hour, minutes=minute) + dt.timedelta(days=random.randint(0, 2))
    
    user = random.choice(users_normal)
    ip = random_ip()
    action = random.choices(actions, weights=[0.5, 0.2, 0.3])[0]
    resource = random.choice(resources) if action == "file_access" else ""
    
    rows.append({
        "timestamp": ts.isoformat(),
        "user": user,
        "ip": ip,
        "action": action,
        "resource": resource,
        "anomaly": 0
    })

print(f"‚úì {len(rows)} normale Events generiert")
print(f"\nBeispiel (erste 3 Events):")
for row in rows[:3]:
    print(f"  {row['timestamp'][:19]} | {row['user']:8} | {row['ip']:15} | {row['action']}")

## Anomale Events generieren

Jetzt f√ºgen wir 10 verd√§chtige Events hinzu, die zu ungew√∂hnlichen Zeiten stattfinden oder von unbekannten IP-Adressen kommen.

In [None]:
users_anom = ["eve", "mallory"]
n_anom = 10

for _ in range(n_anom):
    # Ungew√∂hnliche Zeiten: nachts (0-5 Uhr) oder sp√§t abends (22-23 Uhr)
    hour = random.choice([random.randint(0, 5), random.randint(22, 23)])
    minute = random.randint(0, 59)
    ts = start_date + dt.timedelta(hours=hour, minutes=minute) + dt.timedelta(days=random.randint(0, 2))
    
    user = random.choice(users_normal + users_anom)
    # Andere IP-Range f√ºr anomale Events
    ip = f"192.168.{random.randint(0, 255)}.{random.randint(1, 254)}"
    action = random.choice(actions)
    resource = random.choice(resources) if action == "file_access" else ""
    
    rows.append({
        "timestamp": ts.isoformat(),
        "user": user,
        "ip": ip,
        "action": action,
        "resource": resource,
        "anomaly": 1
    })

print(f"‚úì {n_anom} anomale Events generiert")
print(f"‚úì Total: {len(rows)} Events")
print(f"\nBeispiel (letzte 3 Events - anomal):")
for row in rows[-3:]:
    print(f"  {row['timestamp'][:19]} | {row['user']:8} | {row['ip']:15} | {row['action']}")

## DataFrame erstellen und speichern

Wir konvertieren die Events in ein Pandas DataFrame und speichern sie als CSV.

In [None]:
df = pd.DataFrame(rows)
df.to_csv(logs_file, index=False)

print(f"‚úì Logs gespeichert in: {logs_file}")
print(f"\nDataFrame Info:")
print(f"  Shape: {df.shape}")
print(f"  Columns: {list(df.columns)}")
print(f"\nVerteilung:")
print(f"  Normale Events: {(df['anomaly'] == 0).sum()}")
print(f"  Anomale Events: {(df['anomaly'] == 1).sum()}")
print(f"\nErste 5 Zeilen:")
df.head()

---

# Teil 2: Feature Engineering

## Was ist Feature Engineering?

Machine Learning Modelle arbeiten mit numerischen Daten. Wir m√ºssen unsere Log-Daten (Text, Timestamps) in Features umwandeln:

**Zeitbasierte Features:**
- Stunde des Tages (0-23)
- Wochentag (0-6)

**Kategorische Features (Hashing):**
- Benutzername ‚Üí numerischer Hash
- IP-Adresse ‚Üí numerischer Hash
- Aktion ‚Üí numerischer Hash
- Resource ‚Üí numerischer Hash

**Warum Hashing?** Einfache Methode, um Text in konsistente Zahlen umzuwandeln.

## Hash-Funktion f√ºr kategorische Features

In [None]:
def feature_hash(value: str) -> float:
    """Konvertiert einen String in einen Float zwischen 0 und 1 via SHA256."""
    digest = hashlib.sha256(value.encode("utf-8")).hexdigest()[:8]
    return int(digest, 16) / 0xFFFFFFFF

# Test
print("Beispiel Hashes:")
print(f"  alice  ‚Üí {feature_hash('alice'):.6f}")
print(f"  bob    ‚Üí {feature_hash('bob'):.6f}")
print(f"  login  ‚Üí {feature_hash('login'):.6f}")
print(f"  logout ‚Üí {feature_hash('logout'):.6f}")

## Features aus Logs extrahieren

Jetzt laden wir die Log-Datei und extrahieren alle Features f√ºr das ML-Modell.

In [None]:
df = pd.read_csv(logs_file)

# Zeitbasierte Features
ts = pd.to_datetime(df["timestamp"], utc=True)
df["hour"] = ts.dt.hour
df["day"] = ts.dt.dayofweek

# Kategorische Features hashen
df["user_h"] = df["user"].apply(feature_hash)
df["ip_h"] = df["ip"].apply(feature_hash)
df["action_h"] = df["action"].apply(feature_hash)
df["resource_h"] = df["resource"].fillna("").apply(feature_hash)

print("‚úì Feature Engineering abgeschlossen")
print(f"\nNeue Spalten:")
print(f"  {list(df.columns)}")
print(f"\nBeispiel Features (erste 3 Zeilen):")
df[["hour", "day", "user_h", "ip_h", "action_h"]].head(3)

---

# Teil 3: Isolation Forest Training

## Was ist Isolation Forest?

Isolation Forest ist ein un√ºberwachter ML-Algorithmus zur Anomalie-Erkennung:

**Funktionsweise:**
- Erstellt zuf√§llige Decision Trees
- Anomalien sind leichter zu "isolieren" (weniger Splits n√∂tig)
- Normale Daten ben√∂tigen mehr Splits
- Score < 0: h√∂here Wahrscheinlichkeit f√ºr Anomalie

**Vorteile:**
- Keine gelabelten Daten n√∂tig
- Schnell und effizient
- Gut f√ºr hochdimensionale Daten

**Parameter:**
- `contamination`: Erwarteter Anteil von Anomalien (hier: 0.02 = 2%)

## Modell trainieren und Anomalien vorhersagen

In [None]:
# Feature-Matrix f√ºr das Modell
features = df[["hour", "day", "user_h", "ip_h", "action_h", "resource_h"]]

# Isolation Forest trainieren
contamination = 0.02  # Erwarten 2% Anomalien
model = IsolationForest(contamination=contamination, random_state=42)
model.fit(features)

# Anomalie-Scores und Predictions
df["anomaly_score"] = model.decision_function(features)
df["anomaly_pred"] = model.predict(features)

# -1 = Anomalie, 1 = Normal
anomalies = df[df["anomaly_pred"] == -1]

print(f"‚úì Modell trainiert mit {len(df)} Events")
print(f"\nErgebnisse:")
print(f"  Erkannte Anomalien: {len(anomalies)}")
print(f"  Normale Events:     {(df['anomaly_pred'] == 1).sum()}")
print(f"\nScore-Statistiken:")
print(f"  Min Score:  {df['anomaly_score'].min():.4f}")
print(f"  Max Score:  {df['anomaly_score'].max():.4f}")
print(f"  Mean Score: {df['anomaly_score'].mean():.4f}")

## Erkannte Anomalien anzeigen

In [None]:
print("üö® Erkannte Anomalien:\n")
for idx, row in anomalies.iterrows():
    print(f"[{idx}] {row['timestamp'][:19]}")
    print(f"    User:     {row['user']}")
    print(f"    IP:       {row['ip']}")
    print(f"    Action:   {row['action']}")
    print(f"    Resource: {row['resource']}")
    print(f"    Score:    {row['anomaly_score']:.4f}")
    print(f"    Tats√§chlich anomal: {'JA' if row['anomaly'] == 1 else 'NEIN'}")
    print()

# DataFrame mit relevanten Spalten
anomalies[["timestamp", "user", "ip", "action", "resource", "anomaly_score", "anomaly"]]

---

# Teil 4: Report Generation

## Anomalie-Report erstellen

Wir erstellen einen ausf√ºhrlichen Text-Report f√ºr Security-Teams.

In [None]:
with open(report_file, "w", encoding="utf-8") as f:
    total = len(df)
    num_anom = len(anomalies)
    
    # Header
    f.write("=" * 70 + "\n")
    f.write("               ANOMALY DETECTION REPORT\n")
    f.write("=" * 70 + "\n\n")
    
    # Zusammenfassung
    f.write(f"Analysierte Events:      {total}\n")
    f.write(f"Erkannte Anomalien:      {num_anom}\n")
    f.write(f"Anomalierate:            {num_anom/total*100:.2f}%\n")
    f.write(f"Modell:                  Isolation Forest\n")
    f.write(f"Contamination Parameter: {contamination}\n")
    f.write("\n" + "-" * 70 + "\n\n")
    
    if num_anom > 0:
        f.write("VERD√ÑCHTIGE EVENTS:\n\n")
        for idx, row in anomalies.iterrows():
            f.write(f"Event #{idx}\n")
            f.write(f"  Timestamp:  {row['timestamp']}\n")
            f.write(f"  User:       {row['user']}\n")
            f.write(f"  IP-Adresse: {row['ip']}\n")
            f.write(f"  Aktion:     {row['action']}\n")
            f.write(f"  Resource:   {row['resource']}\n")
            f.write(f"  Score:      {row['anomaly_score']:.4f}\n")
            f.write(f"  Stunde:     {row['hour']}:00\n")
            f.write("\n")
    else:
        f.write("Keine Anomalien gefunden.\n")
    
    f.write("-" * 70 + "\n")
    f.write("Ende des Reports\n")

print(f"‚úì Report erstellt: {report_file}")

## Report anzeigen

In [None]:
with open(report_file, "r", encoding="utf-8") as f:
    report_content = f.read()
    print(report_content)

---

# Teil 5: Modell-Evaluation

## Vergleich mit Ground Truth

Da wir die Daten synthetisch generiert haben, wissen wir, welche Events tats√§chlich anomal sind. Vergleichen wir die Vorhersagen mit der Realit√§t.

In [None]:
# Confusion Matrix manuell berechnen
true_anomalies = df[df["anomaly"] == 1]
predicted_anomalies = df[df["anomaly_pred"] == -1]

# True Positives: Korrekt als Anomalie erkannt
tp = len(df[(df["anomaly"] == 1) & (df["anomaly_pred"] == -1)])

# False Positives: F√§lschlicherweise als Anomalie markiert
fp = len(df[(df["anomaly"] == 0) & (df["anomaly_pred"] == -1)])

# False Negatives: √úbersehene Anomalien
fn = len(df[(df["anomaly"] == 1) & (df["anomaly_pred"] == 1)])

# True Negatives: Korrekt als normal erkannt
tn = len(df[(df["anomaly"] == 0) & (df["anomaly_pred"] == 1)])

# Metriken
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print("‚ïê" * 50)
print("           MODEL EVALUATION")
print("‚ïê" * 50)
print(f"\nConfusion Matrix:")
print(f"  True Positives:  {tp:3d}  (korrekt erkannte Anomalien)")
print(f"  False Positives: {fp:3d}  (falsche Alarme)")
print(f"  False Negatives: {fn:3d}  (√ºbersehene Anomalien)")
print(f"  True Negatives:  {tn:3d}  (korrekt erkannte normale Events)")
print(f"\nMetriken:")
print(f"  Precision: {precision:.2%}  (Wie viele erkannte Anomalien waren echt?)")
print(f"  Recall:    {recall:.2%}  (Wie viele echte Anomalien wurden gefunden?)")
print(f"  F1-Score:  {f1:.2%}  (Harmonic Mean von Precision & Recall)")
print(f"\nInterpretation:")
if f1 > 0.8:
    print("  ‚úì Exzellente Erkennung!")
elif f1 > 0.6:
    print("  ‚úì Gute Erkennung")
elif f1 > 0.4:
    print("  ‚ö† Moderate Erkennung - Tuning empfohlen")
else:
    print("  ‚ö† Schwache Erkennung - Parameter anpassen")

---

# Zusammenfassung & Best Practices

## Was haben wir gelernt?

**1. Synthetische Daten generieren**
- Simulierte normale und anomale Security-Events
- Zeitbasierte Muster f√ºr realistische Logs
- Ground Truth f√ºr Evaluation verf√ºgbar

**2. Feature Engineering f√ºr ML**
- Zeitbasierte Features (Stunde, Wochentag)
- Hashing f√ºr kategorische Variablen
- Vorbereitung numerischer Features

**3. Isolation Forest f√ºr Anomalie-Erkennung**
- Un√ºberwachtes Learning - keine Labels n√∂tig
- Contamination-Parameter bestimmt Sensitivit√§t
- Decision Function liefert Anomalie-Scores

**4. Report Generation**
- Strukturierte Dokumentation f√ºr Security-Teams
- √úbersichtliche Darstellung verd√§chtiger Events
- Metriken f√ºr Modell-Evaluation

## Best Practices f√ºr Production

**Datenqualit√§t:**
- Saubere, konsistente Log-Formate
- Ausreichend historische Daten
- Regelm√§√üige Daten-Updates

**Feature Engineering:**
- Dom√§nenspezifisches Wissen einbeziehen
- Feature-Wichtigkeit analysieren
- Neue Features iterativ testen

**Modell-Tuning:**
- Contamination-Parameter anpassen
- Cross-Validation verwenden
- False Positives minimieren (wichtig f√ºr SOC-Teams!)

**Monitoring:**
- Modell regelm√§√üig neu trainieren
- Drift Detection implementieren
- Feedback-Loop f√ºr falsche Alarme

**Integration:**
- SIEM-Integration f√ºr automatische Alerts
- Ticketing-Systeme anbinden
- Eskalationsprozesse definieren

## N√§chste Schritte

1. Mit realen Log-Daten experimentieren
2. Weitere Features explorieren
3. Alternative Algorithmen testen (One-Class SVM, Autoencoder)
4. Threshold-Optimierung f√ºr Production
5. Integration in Security-Pipeline

## Weitere Ressourcen

- Scikit-learn Isolation Forest Docs: https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html
- OWASP Logging Cheat Sheet: https://cheatsheetseries.owasp.org/cheatsheets/Logging_Cheat_Sheet.html