<a href="https://colab.research.google.com/github/besimorhino/ai-workshop/blob/main/apache_log_analysis_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Apache Log Analysis: Classic Stats vs. ML

This notebook demonstrates how standard statistical techniques often beat or match ML for common log-analysis tasks—while using fewer CPU cycles.

**What you'll do:**
1. Load a sample Apache access log (or generate one).
2. Parse to a tidy DataFrame.
3. Perform common security analysis tasks with *simple statistics* (counts, rates, thresholds, z/MAD).
4. Attempt the same tasks with an *ML method* (Isolation Forest), and compare results.
5. Measure **wall time** (`%%time`) and **CPU effort** (process user+sys time deltas) for each step.

> TL;DR: For many operational questions (top talkers, spikes, brute-force/scans, hot endpoints), classic stats are faster, cheaper, and easier to explain. This workbook will prove it.

> Note: this workbook is designed to only work with Apache/Nginx combined logs. This workbook would require fairly intense rework to accept other log types.
---



## 0) Environment Setup
Installs dependencies. The helper `cpu_meter` context reports per-cell CPU effort.

In [None]:
%%capture
!pip -q install pandas numpy matplotlib scikit-learn psutil python-dateutil

In [None]:
import psutil, time, resource
from contextlib import contextmanager

@contextmanager
def cpu_meter(label="work"):
    """Context manager to measure user+system CPU time deltas for the current process."""
    p = psutil.Process()
    t0 = p.cpu_times()
    try:
        yield
    finally:
        t1 = p.cpu_times()
        user = t1.user - t0.user
        sysc = t1.system - t0.system
        print(f"[CPU] {label}: user={user:.4f}s system={sysc:.4f}s total={user+sysc:.4f}s")

## 1) Get Logs
Pick **one** of the options below.

- **A. Upload**: Use the file picker.
- **B. Download**: Supply a URL to a public CLF/combined log.
- **C. Generate**: Create synthetic logs with realistic patterns (scans, brute-force, spikes).

In [None]:
# A) Upload from your machine
from google.colab import files
print("Upload your Apache access log (Common or Combined format).")
uploaded = files.upload()  # choose one file
LOG_PATH = next(iter(uploaded.keys())) if uploaded else None
LOG_PATH

In [None]:
# B) Download from a URL (optional). Set URL then run the cell.
%%time
with cpu_meter("download"):
    URL = ""  # e.g., "https://example.com/sample_access.log"
    if URL:
        import urllib.request
        LOG_PATH = "downloaded_access.log"
        urllib.request.urlretrieve(URL, LOG_PATH)
LOG_PATH if 'LOG_PATH' in globals() else None

In [None]:
# C) Generate synthetic logs (set N_LINES to control size)
%%time
import random, datetime, ipaddress
from dateutil.tz import tzutc
from pathlib import Path

N_LINES = 200_000  # scale up/down for benchmarking
random.seed(42)

def rand_ip():
    return str(ipaddress.IPv4Address(random.getrandbits(32)))

def gen_log(n=10_000, path="synthetic_access.log"):
    start = datetime.datetime(2024, 1, 1, 0, 0, tzinfo=tzutc())
    methods = ["GET","POST","HEAD"]
    statuses = [200, 200, 200, 301, 302, 403, 404, 500]  # weighted
    urls = [
        "/", "/index.html", "/login", "/admin", "/api/v1/items",
        "/api/v1/items/1", "/wp-login.php", "/phpmyadmin/index.php",
        "/robots.txt", "/search?q=test", "/favicon.ico"
    ]
    uas = [
        "Mozilla/5.0", "curl/7.88.1", "python-requests/2.31", "sqlmap/1.7",
        "ZGrab/0.x", "Nmap Scripting Engine"
    ]
    # Hot/legit clients
    legit_ips = [rand_ip() for _ in range(200)]
    # A few noisy scanners/brute-forcers
    bad_ips = [rand_ip() for _ in range(8)]

    with open(path, "w") as f:
        t = start
        for i in range(n):
            # time marches, with occasional spikes
            dt = random.randint(0,3)
            if i % 5000 == 0:
                dt += random.randint(0,40)  # bursty inter-arrivals
            t = t + datetime.timedelta(seconds=dt)
            ip = random.choice(legit_ips if random.random()>0.04 else bad_ips)
            method = random.choice(methods)
            url = random.choice(urls)
            status = random.choice(statuses)
            size = random.randint(120, 5000)
            ua = random.choice(uas)
            referer = "-"
            # occasional brute-force pattern
            if ip in bad_ips and random.random() < 0.3:
                url = "/login"
                status = random.choice([401,403,200])
            # common log combined
            ts = t.strftime("%d/%b/%Y:%H:%M:%S %z")
            line = f"{ip} - - [{ts}] \"{method} {url} HTTP/1.1\" {status} {size} \"{referer}\" \"{ua}\"\n"
            f.write(line)
    return path

with cpu_meter("generate synthetic"):
    LOG_PATH = gen_log(N_LINES)
LOG_PATH

## 2) Parse Logs
Supports Apache **Combined Log Format**.
> Note: if you want to test with other logs, you must adjust the regex used here.

In [None]:
%%time
import re, pandas as pd
from datetime import datetime

# Updated pattern for Apache Combined Log Format based on unmatched lines
pattern = re.compile(r'''
^
(?P<ip>\S+)\s+                    # Client IP address
(?P<ident>\S+)\s+                 # Identity of the user, '-' if not available
(?P<user>\S+)\s+                  # Userid of the person requesting the document, '-' if not available
\[(?P<ts>[^\]]+)\]\s+             # Timestamp
"(?P<method>\S+)\s+               # Request method (GET, POST, etc.)
(?P<url>[^\s]+)\s+                # Requested URL
(?P<proto>[^"]+)"\s+              # Protocol (HTTP/1.1)
(?P<status>\d{3})\s+              # HTTP status code
(?P<size>\S+)\s+                  # Size of the object returned, '-' if none
"(?P<referer>(?:[^"\\]|\\.)*)"\s+ # Referer header (handles escaped quotes)
"(?P<ua>(?:[^"\\]|\\.)*)"         # User-Agent header (handles escaped quotes)
(?:.*?)\s*                        # Match any additional fields lazily (non-greedy)
$
''', re.VERBOSE)


def parse_lines(path):
    unmatched_lines = []
    with open(path, 'r', errors='ignore') as f:
        for line in f:
            m = pattern.match(line)
            if not m:
                unmatched_lines.append(line.strip())
                continue
            d = m.groupdict()
            try:
                d['status'] = int(d['status'])
            except ValueError:
                d['status'] = -1 # Indicate parsing error
            try:
                d['size'] = 0 if d['size'] == '-' else int(d['size'])
            except ValueError:
                 d['size'] = -1 # Indicate parsing error
            try:
                d['dt'] = datetime.strptime(d['ts'], "%d/%b/%Y:%H:%M:%S %z")
            except ValueError:
                 d['dt'] = None # Indicate parsing error
            yield d
    if unmatched_lines:
        print(f"Warning: {len(unmatched_lines)} lines did not match the regex pattern. First 10 unmatched lines:")
        for i, line in enumerate(unmatched_lines[:10]):
            print(f"  {i+1}: {line}")


with cpu_meter("parse->DataFrame"):
    df = pd.DataFrame(parse_lines(LOG_PATH))
df.head()

In [None]:
df.shape, df.dtypes

## 3) Classic Stats: Core Questions
Fast, interpretable baselines that often suffice in operations.

In [None]:
%%time
import numpy as np
with cpu_meter("groupby counts"):
    by_ip = df.groupby('ip').size().sort_values(ascending=False).rename('count')
by_ip.head(10)

In [None]:
%%time
with cpu_meter("status dist"):
    status_dist = df['status'].value_counts().sort_index()
status_dist

In [None]:
%%time
with cpu_meter("requests per minute + anomalies"):
    per_min = df.set_index('dt').resample('1min').size().rename('rpm').to_frame()
    per_min['z'] = (per_min['rpm'] - per_min['rpm'].mean()) / (per_min['rpm'].std() + 1e-9)
    mad = (per_min['rpm'] - per_min['rpm'].median()).abs().median() + 1e-9
    per_min['mad_z'] = 0.6745*(per_min['rpm']-per_min['rpm'].median())/mad
    anomalies = per_min[(per_min['z'].abs()>3) | (per_min['mad_z'].abs()>3)]
anomalies.head()

In [None]:
%%time
with cpu_meter("simple rules: 404 and login abuse"):
    by_ip_status = df.pivot_table(index='ip', columns='status', values='url', aggfunc='count', fill_value=0)
    by_ip_status['total'] = by_ip_status.sum(1)
    by_ip_status['rate_404'] = by_ip_status.get(404,0) / by_ip_status['total'].replace(0, np.nan)
    brute_candidates = (by_ip_status.get(401,0) + by_ip_status.get(403,0)) > 50
    scans_404 = (by_ip_status['rate_404'] > 0.5) & (by_ip_status['total']>30)
    flagged_ips = by_ip_status[ brute_candidates | scans_404 ].sort_values('total', ascending=False)
flagged_ips.head(10)

## 4) ML Approach (Isolation Forest)
We'll aggregate features per IP and run an outlier detector. Measure fit + score time and compare flagged entities.

In [None]:
%%time
with cpu_meter("feature engineering (per IP)"):
    feats = df.groupby('ip').agg(
        total=('size','sum'), # Corrected to use tuple (column, aggfunc)
        uniq_urls=('url','nunique'), # Corrected to use tuple (column, aggfunc)
        mean_size=('size','mean'), # Corrected to use tuple (column, aggfunc)
        p404=('status', lambda x: (x==404).mean()), # Corrected to use tuple (column, aggfunc)
        p401=('status', lambda x: (x==401).mean()), # Corrected to use tuple (column, aggfunc)
        p403=('status', lambda x: (x==403).mean()), # Corrected to use tuple (column, aggfunc)
        p5xx=('status', lambda x: (x>=500).mean())  # Corrected to use tuple (column, aggfunc)
    )
    # fill NaNs and scale rudimentarily
    X = feats.fillna(0)
X.head()

In [None]:
%%time
from sklearn.ensemble import IsolationForest
with cpu_meter("IsolationForest fit+score"):
    iso = IsolationForest(n_estimators=100, contamination=0.02, random_state=42, n_jobs=-1)
    preds = iso.fit_predict(X)
    scores = iso.decision_function(X)
X.assign(pred=preds, score=scores).sort_values('score').head(10)

### Compare with Simple Rules
Which IPs did each approach flag? Are they similar? How much compute did each take?

In [None]:
#%%time
with cpu_meter("compare sets"):
    ml_flagged = set(X.assign(pred=preds).query("pred==-1").index)
    rules_flagged = set(flagged_ips.index)
    overlap = ml_flagged & rules_flagged
    only_ml = ml_flagged - rules_flagged
    only_rules = rules_flagged - ml_flagged
    print("Counts -> ML:", len(ml_flagged), "Rules:", len(rules_flagged), "Overlap:", len(overlap))
    print("Only ML (sample up to 10):", list(sorted(only_ml))[:10])
    print("Only Rules (sample up to 10):", list(sorted(only_rules))[:10])

## 5) Throughput Benchmarks (scaling sense)
Estimate records/second for parsing & aggregation on your hardware.

In [None]:
%%time
with cpu_meter("parse throughput (re-parse)"):
    import itertools
    # re-parse first 100k lines for a fair per-iteration measure
    N = 100_000
    gen = itertools.islice(parse_lines(LOG_PATH), N)
    # Capture the start time before parsing
    start_time = time.time()
    df100k = pd.DataFrame(gen)
    # Capture the end time after parsing
    end_time = time.time()
    # Calculate the elapsed time
    elapsed_time = end_time - start_time
    # Calculate the rate
    rate = N / max(1e-9, elapsed_time)
    print(f"Parsing rate: {rate:.2f} lines/sec")
df100k.shape

In [None]:
%%time
with cpu_meter("groupby throughput"):
    _ = df.groupby('ip').size()
print("IPs:", _.shape[0])

## 6) Visual sanity checks (optional)
Simple Matplotlib plots; feel free to skip for pure benchmarking.

In [None]:
%%time
import matplotlib.pyplot as plt
with cpu_meter("plot rpm"):
    per_min['rpm'].plot(title='Requests per minute'); plt.xlabel('time'); plt.ylabel('rpm'); plt.show()
    anomalies['rpm'].plot(style='o', title='Anomalous minutes'); plt.show()

## 7) Takeaways
- **Speed & CPU**: Group-bys, counts, and simple thresholds are extremely fast and scalable.
- **Transparency**: Stats-based rules are easy to explain and tune.
- **ML overhead**: Feature engineering + model fitting add CPU cost; benefits appear mainly on complex, high-dimensional patterns or when labels/ground-truth are absent and rules are hard to craft.
- **Balanced approach**: Start with stats; add ML only where it clearly wins.