# A) Log Detections
Goal: parse a synthetic web access log, detect anomalies, and export alerts.

In [1]:
from pathlib import Path
import re, datetime
import pandas as pd
BASE = Path('..').resolve()
print(BASE)

C:\Project\Miss Shoes\01_Case\A_Log_Detections


In [2]:
log_path = BASE / 'data' / 'web_access.log'
with open(log_path) as f:
    for i in range(5):
        print(next(f).rstrip())

192.0.2.77 - - [11/Sep/2025:16:00:01 +0000] "GET /wp-admin HTTP/1.1" 403 954 "-" "Chrome/124.0"
10.0.0.44 - - [11/Sep/2025:16:00:02 +0000] "GET /category/boots HTTP/1.1" 200 544 "-" "Mozilla/5.0"
10.0.0.42 - - [11/Sep/2025:16:00:03 +0000] "GET / HTTP/1.1" 404 1928 "-" "curl/8.0.1"
10.0.0.44 - - [11/Sep/2025:16:00:09 +0000] "GET /search?q=shoes HTTP/1.1" 304 2578 "-" "Edge/118.0"
10.0.0.58 - - [11/Sep/2025:16:00:16 +0000] "GET /category/boots HTTP/1.1" 200 2576 "-" "Chrome/124.0"


In [None]:
LOG_RE = re.compile(r'(?P<ip>\S+) \S+ \S+ \[(?P<ts>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) [^"]+" (?P<status>\d{3}) (?P<size>\d+) "[^"]*" "(?P<ua>[^\"]*)"')
TS_FMT = '%d/%b/%Y:%H:%M:%S %z'
rows = []
with open(log_path) as f:
    for line in f:
        m = LOG_RE.search(line)
        if m:
            d = m.groupdict()
            d['status'] = int(d['status'])
            d['ts'] = datetime.datetime.strptime(d['ts'], TS_FMT)
            rows.append(d)

df = pd.DataFrame(rows)
df.head()

Unnamed: 0,ip,ts,method,path,status,size,ua
0,192.0.2.77,2025-09-11 16:00:01+00:00,GET,/wp-admin,403,954,Chrome/124.0
1,10.0.0.44,2025-09-11 16:00:02+00:00,GET,/category/boots,200,544,Mozilla/5.0
2,10.0.0.42,2025-09-11 16:00:03+00:00,GET,/,404,1928,curl/8.0.1
3,10.0.0.44,2025-09-11 16:00:09+00:00,GET,/search?q=shoes,304,2578,Edge/118.0
4,10.0.0.58,2025-09-11 16:00:16+00:00,GET,/category/boots,200,2576,Chrome/124.0


In [4]:
from collections import defaultdict, deque
RULES = {
 'R001_BRUTE': {'desc':'>=10 failed (401/403) from same IP within 5 min','sev':'High'},
 'R002_TRAVERSAL': {'desc':'Path traversal pattern ../ or %2e%2e','sev':'High'},
 'R003_SQLI': {'desc':'SQL injection (UNION SELECT / OR 1=1)','sev':'High'},
 'R004_ADMIN_PROBE': {'desc':'Admin probing','sev':'Medium'},
 'R005_500_SPIKE': {'desc':'>=5 HTTP 500 from same IP within 5 min','sev':'Medium'},
 'R006_BAD_UA': {'desc':'Scanner UA (sqlmap/nikto)','sev':'Medium'},
}

def detect(df):
    alerts = []
    df = df.sort_values('ts')
    # content rules
    for _,r in df.iterrows():
        p = str(r['path']).lower()
        if '../' in p or '%2e%2e' in p:
            alerts.append((r['ts'], r['ip'], 'R002_TRAVERSAL', r['path'], r['status'], 1))
        if 'union%20select' in p or 'union select' in p or ' or 1=1' in p:
            alerts.append((r['ts'], r['ip'], 'R003_SQLI', r['path'], r['status'], 1))
        if any(x in p for x in ['/admin','/wp-admin','/phpmyadmin']):
            alerts.append((r['ts'], r['ip'], 'R004_ADMIN_PROBE', r['path'], r['status'], 1))
        if 'sqlmap' in str(r['ua']).lower() or 'nikto' in str(r['ua']).lower():
            alerts.append((r['ts'], r['ip'], 'R006_BAD_UA', r['path'], r['status'], 1))
    # burst rules
    for rule, cond, th in [
        ('R001_BRUTE', lambda s: s in [401,403], 10),
        ('R005_500_SPIKE', lambda s: s==500, 5),
    ]:
        by_ip = defaultdict(list)
        for _,r in df.iterrows():
            if cond(r['status']):
                by_ip[r['ip']].append(r['ts'])
        for ip, times in by_ip.items():
            dq = deque()
            for ts in sorted(times):
                dq.append(ts)
                while (ts - dq[0]).total_seconds() > 300:
                    dq.popleft()
                if len(dq) >= th:
                    alerts.append((ts, ip, rule, '<aggregate>', 0, len(dq)))
    out = pd.DataFrame(alerts, columns=['timestamp','ip','rule_id','sample_path','status','count'])
    if out.empty: return out
    out['severity'] = out['rule_id'].map(lambda r: RULES[r]['sev'])
    out['description'] = out['rule_id'].map(lambda r: RULES[r]['desc'])
    return out

alerts = detect(df)
alerts.head(10)

Unnamed: 0,timestamp,ip,rule_id,sample_path,status,count,severity,description
0,2025-09-11 16:00:01+00:00,192.0.2.77,R004_ADMIN_PROBE,/wp-admin,403,1,Medium,Admin probing
1,2025-09-11 16:01:05+00:00,10.0.0.12,R003_SQLI,/search?q=%27%20UNION%20SELECT%20credit_card%2...,500,1,High,SQL injection (UNION SELECT / OR 1=1)
2,2025-09-11 16:01:31+00:00,192.0.2.77,R003_SQLI,/search?q=%27%20UNION%20SELECT%20credit_card%2...,404,1,High,SQL injection (UNION SELECT / OR 1=1)
3,2025-09-11 16:01:31+00:00,192.0.2.77,R006_BAD_UA,/search?q=%27%20UNION%20SELECT%20credit_card%2...,404,1,Medium,Scanner UA (sqlmap/nikto)
4,2025-09-11 16:02:45+00:00,198.51.100.25,R004_ADMIN_PROBE,/phpmyadmin,403,1,Medium,Admin probing
5,2025-09-11 16:03:01+00:00,198.51.100.25,R004_ADMIN_PROBE,/admin,401,1,Medium,Admin probing
6,2025-09-11 16:04:35+00:00,192.0.2.77,R002_TRAVERSAL,/../../etc/passwd,404,1,High,Path traversal pattern ../ or %2e%2e
7,2025-09-11 16:05:09+00:00,198.51.100.25,R004_ADMIN_PROBE,/admin,500,1,Medium,Admin probing
8,2025-09-11 16:05:09+00:00,198.51.100.25,R006_BAD_UA,/admin,500,1,Medium,Scanner UA (sqlmap/nikto)
9,2025-09-11 16:05:27+00:00,192.0.2.77,R004_ADMIN_PROBE,/phpmyadmin,404,1,Medium,Admin probing


In [6]:
# Alerts evidence
out_csv = BASE / 'outputs' / 'alerts_evidence.csv'
alerts.to_csv(out_csv, index=False)
out_csv

WindowsPath('C:/Project/Miss Shoes/01_Case/outputs/alerts_evidence.csv')

In [7]:
# Quick metrics for one-pager
alerts.groupby(['severity','rule_id']).size().reset_index(name='count')

Unnamed: 0,severity,rule_id,count
0,High,R002_TRAVERSAL,7
1,High,R003_SQLI,8
2,Medium,R004_ADMIN_PROBE,28
3,Medium,R006_BAD_UA,9
