In [1]:
# ================================================================
# Threat Hunting AI – AWS Hackathon 
# ================================================================
# Run with Boss of the SOC (BOTS) Dataset Version 1 (https://github.com/splunk/botsv1)
# or AWS permissions for CloudTrail/CloudWatch are restricted (No ingested data)
# it falls back to synthetic/generative events so the pipeline runs

import os, re, io, json, gzip, random
from pathlib import Path
from collections import Counter
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import IsolationForest


In [2]:
#This makes the pipeline environment-aware: if AWS access is restricted, it automatically falls back to synthetic logs.

In [3]:

## can pull CloudTrail/CloudWatch logs
## no access in hackathon sandbox -> return false to skips AWS and use synthetic data

try:
    import boto3
    from botocore.exceptions import ClientError, EndpointConnectionError
    AWS_AVAILABLE = True
except Exception:
    AWS_AVAILABLE = False

plt.rcParams["figure.facecolor"] = "white"


## Synthetic SSH/auth-like logs
def generate_synthetic(n_normal=800, n_attacks=60) -> pd.DataFrame:
    normals = [
        "Accepted password for user from 10.0.0.{i} port {p} ssh2",
        "session opened for user ubuntu by (uid=1000)",
        "CRON[pid]: (root) CMD (run-parts /etc/cron.hourly)",
        "systemd[1]: Started Daily apt download",
        "sudo: user : TTY=pts/{t} ; PWD=/home/user ; USER=root ; COMMAND=/usr/bin/apt update",
        "Accepted publickey for ec2-user from 10.0.1.{i} port {p} ssh2",
    ]
    attacks = [
        "Failed password for root from 185.213.2.{i} port {p} ssh2",
        "Failed password for invalid user admin from 45.83.1.{i} port {p} ssh2",
        "sudo: pam_unix(sudo:auth): authentication failure; tty=/dev/pts/{t} rhost=193.0.2.{i}",
        "Suspicious lateral movement: new SSH key added for user ubuntu from 198.51.100.{i}",
        "Port scan detected from 203.0.113.{i}",
    ]
    rows=[]
    for _ in range(n_normal):
        rows.append({"message": random.choice(normals).format(i=random.randint(1,240), p=random.randint(1000,65000), t=random.randint(0,6))})
    for _ in range(n_attacks):
        rows.append({"message": random.choice(attacks).format(i=random.randint(1,240), p=random.randint(1000,65000), t=random.randint(0,6))})
    random.shuffle(rows)
    df = pd.DataFrame(rows)
    df["source"] = "synthetic"
    return df

In [4]:
import os, glob
import pandas as pd
# ---- CloudTrail regions to try (skip or edit as needed) 
CLOUDTRAIL_REGIONS = ("us-east-1","us-east-2","us-west-2","ap-southeast-1","ap-southeast-2") 

# point to saved BOTSv1 files 
BOTSV1_LOCAL_GLOBS = [
    "/tmp/botsv1/*.csv.gz",        
    "/tmp/botsv1/*.json.gz"        
]
BOTSV1_S3_PREFIX ="s3://datalakethreathunting/raw/botsv1/"
BOTSV1_MAX_PER_FILE = 20_000      

_BOTSV1_FALLBACK_COLS = [
    "signature","event_type","message","msg","query","uri","method","user",
    "src","src_ip","srcaddr","dest","dest_ip","dstaddr",
    "src_port","sport","dest_port","dport","protocol","action",
    "severity","status","rcode","host","sourcetype"
]


In [5]:

def _assemble_message(df: pd.DataFrame) -> pd.Series:
    if "_raw" in df.columns:
        return df["_raw"].astype(str)
    keep = [c for c in _BOTSV1_FALLBACK_COLS if c in df.columns]
    if keep:
        return df[keep].astype(str).agg(lambda r: " ".join(f"{k}={v}" for k,v in r.items()), axis=1)
    return df.astype(str).agg(" ".join, axis=1)

def _read_any(path: str, nrows: int | None) -> pd.DataFrame:
    p = path.lower()
    if p.endswith(".csv") or p.endswith(".csv.gz"):
        return pd.read_csv(path, compression="infer", low_memory=False, nrows=nrows)
    if p.endswith(".json") or p.endswith(".json.gz"):
        return pd.read_json(path, lines=True, compression="infer", nrows=nrows)
    return pd.read_csv(path, compression="infer", low_memory=False, nrows=nrows)

def _list_bots_files() -> list[str]:
    out = []
    for g in BOTSV1_LOCAL_GLOBS:
        out.extend(glob.glob(g))
    if BOTSV1_S3_PREFIX:
        try:
            import s3fs
            fs = s3fs.S3FileSystem()
            for key in fs.find(BOTSV1_S3_PREFIX.rstrip("/") + "/"):
                if key.endswith((".csv", ".csv.gz", ".json", ".json.gz")):
                    out.append("s3://" + key)
        except Exception as e:
            print("[BOTSv1] s3fs listing error:", e)
    return sorted(out)

def ingest_botsv1(max_per_file: int = BOTSV1_MAX_PER_FILE) -> pd.DataFrame:
    files = _list_bots_files()
    if not files:
        print("[BOTSv1] No files found. Set BOTSV1_LOCAL_GLOBS or BOTSV1_S3_PREFIX.")
        return pd.DataFrame(columns=["message","source"])
    frames = []
    for path in files:
        try:
            dfb = _read_any(path, nrows=max_per_file)
            if dfb.empty:
                continue
            msg = _assemble_message(dfb)
            src = os.path.basename(path).replace(".gz","")
            frames.append(pd.DataFrame({"message": msg, "source": f"botsv1:{src}"}))
            print(f"[BOTSv1] loaded {len(dfb):>7} rows from {path}")
        except Exception as e:
            print(f"[BOTSv1] read failed: {path} -> {type(e).__name__}: {e}")
    if not frames:
        return pd.DataFrame(columns=["message","source"])
    bots = pd.concat(frames, ignore_index=True).dropna(subset=["message"])
    bots["message"] = bots["message"].astype(str).str.replace(r"\s+", " ", regex=True).str.strip()
    return bots


# ---- Try BOTSv1 ingestion ----
bots_df = ingest_botsv1()

if not bots_df.empty:
    # BOTSv1 found → use BOTH BOTSv1 + synthetic
    synth_df = generate_synthetic()
    df = pd.concat([bots_df[["message","source"]], synth_df], ignore_index=True)
    base_source = "BOTSv1 + synthetic"
    print(f"Using BOTSv1 ({len(bots_df)}) + synthetic ({len(synth_df)}) logs")
else:
    # BOTSv1 missing → fallback to synthetic only
    df = generate_synthetic()
    base_source = "synthetic"
    print("Using synthetic generator only, rows=", len(df))

print(f"Base dataset → {base_source}, rows={len(df)}")
display(df.head(3))



[BOTSv1] loaded   20000 rows from /tmp/botsv1/WinSecurity.csv.gz
[BOTSv1] loaded   20000 rows from /tmp/botsv1/stream-dns.csv.gz
[BOTSv1] loaded   20000 rows from /tmp/botsv1/stream-http.csv.gz
[BOTSv1] loaded   20000 rows from /tmp/botsv1/suricata.csv.gz
[BOTSv1] loaded   20000 rows from /tmp/botsv1/sysmon.csv.gz
Using BOTSv1 (100000) + synthetic (860) logs
Base dataset → BOTSv1 + synthetic, rows=100860


Unnamed: 0,message,source
0,08/28/2016 23:59:00 PM LogName=Security Source...,botsv1:WinSecurity.csv
1,08/28/2016 23:59:00 PM LogName=Security Source...,botsv1:WinSecurity.csv
2,08/28/2016 23:59:00 PM LogName=Security Source...,botsv1:WinSecurity.csv


In [6]:
## generative logs – CloudTrail & CloudWatch 
def pull_cloudtrail_event_history(regions=CLOUDTRAIL_REGIONS, max_per_region=400):
    if not AWS_AVAILABLE:
        print("[CloudTrail] boto3 not available; skipping.")
        return pd.DataFrame()
    events = []
    for rg in regions:
        try:
            ct = boto3.client("cloudtrail", region_name=rg)
            token = None
            pulled = 0
            while True:
                kwargs = {"MaxResults": 50}
                if token: kwargs["NextToken"] = token
                resp = ct.lookup_events(**kwargs)
                for e in resp.get("Events", []):
                    rec = json.loads(e["CloudTrailEvent"])
                    msg = f'{rec.get("eventTime")} {rec.get("eventName")} user={rec.get("userIdentity",{}).get("type")} src={rec.get("sourceIPAddress")} svc={rec.get("eventSource")}'
                    events.append({"message": msg, "source": f"cloudtrail:{rg}"})
                    pulled += 1
                token = resp.get("NextToken")
                if not token or pulled >= max_per_region:
                    break
            print(f"[CloudTrail] {rg}: {pulled} events")
        except ClientError as ce:
            print(f"[CloudTrail] {rg}: ClientError -> {ce.response.get('Error', {}).get('Code')}")
        except EndpointConnectionError:
            print(f"[CloudTrail] {rg}: endpoint not reachable")
        except Exception as ex:
            print(f"[CloudTrail] {rg}: {type(ex).__name__}: {ex}")
    return pd.DataFrame(events)

def pull_cloudwatch_logs(sample_groups=2, max_lines=400):
    if not AWS_AVAILABLE:
        print("[CloudWatch] boto3 not available; skipping.")
        return pd.DataFrame()
    rows=[]
    try:
        logs = boto3.client("logs")
        groups = logs.describe_log_groups(limit=sample_groups).get("logGroups", [])
        for g in groups[:sample_groups]:
            name = g["logGroupName"]
            streams = logs.describe_log_streams(logGroupName=name, orderBy="LastEventTime", descending=True, limit=1)
            if not streams.get("logStreams"):
                continue
            stream = streams["logStreams"][0]["logStreamName"]
            ev = logs.get_log_events(logGroupName=name, logStreamName=stream, limit=max_lines).get("events", [])
            for e in ev:
                msg = (e.get("message","") or "").strip()
                if msg:
                    rows.append({"message": msg, "source": f"cloudwatch:{name}"})
        print(f"[CloudWatch] loaded {len(rows)} lines from {len(groups[:sample_groups])} group(s)")
    except ClientError as ce:
        print(f"[CloudWatch] ClientError -> {ce.response.get('Error', {}).get('Code')}")
    except Exception as ex:
        print(f"[CloudWatch] {type(ex).__name__}: {ex}")
    return pd.DataFrame(rows)

def synthesize_cloudtrail_like(n=500):
    svc = ["ec2.amazonaws.com","iam.amazonaws.com","s3.amazonaws.com","sts.amazonaws.com"]
    names = ["RunInstances","TerminateInstances","AssumeRole","PutBucketPolicy","CreateUser","AttachRolePolicy","ConsoleLogin","CreateAccessKey"]
    users = ["IAMUser","AssumedRole","Root"]
    rows=[]
    rng = np.random.default_rng(42)
    for _ in range(n):
        rows.append({
            "message": f'2025-09-21T12:{rng.integers(0,60):02d}:33Z {rng.choice(names)} user={rng.choice(users)} src=203.0.113.{rng.integers(1,255)} svc={rng.choice(svc)}',
            "source": "cloudtrail:synthetic"
        })
    print(f"[Synthetic] generated {n} CloudTrail-like events")
    return pd.DataFrame(rows)

# Try CloudTrail → CloudWatch → fallback synthetic
ct_df = pull_cloudtrail_event_history()
cw_df = pull_cloudwatch_logs() if ct_df.empty else pd.DataFrame()
synthetic_df = pd.concat([ct_df, cw_df], ignore_index=True) if not cw_df.empty else ct_df

if synthetic_df.empty:
    synthetic_df = synthesize_cloudtrail_like(600)

print("synthetic events:", synthetic_df.shape)
display(synthetic_df.head(3))


[CloudTrail] us-east-1: ClientError -> AccessDeniedException
[CloudTrail] us-east-2: ClientError -> AccessDeniedException
[CloudTrail] us-west-2: ClientError -> AccessDeniedException
[CloudTrail] ap-southeast-1: ClientError -> AccessDeniedException
[CloudTrail] ap-southeast-2: ClientError -> AccessDeniedException
[CloudWatch] loaded 185 lines from 2 group(s)
synthetic events: (185, 2)


Unnamed: 0,message,source
0,[I 2025-09-21 15:19:48.622 ServerApp] Saving f...,cloudwatch:/aws/sagemaker/NotebookInstances
1,[I 2025-09-21 13:49:58.660 ServerApp] Starting...,cloudwatch:/aws/sagemaker/studio
2,[W 2025-09-21 13:50:00.042 ServerApp] No sessi...,cloudwatch:/aws/sagemaker/studio


In [None]:
# Merge into main df
df = pd.concat([df, synthetic_df], ignore_index=True).dropna(subset=["message"])
print("Combined dataset rows:", len(df))

# %% [Vectorize → IsolationForest anomaly detection]
vectorizer = TfidfVectorizer(
    lowercase=True,
    max_features=4000,
    ngram_range=(1,2),
    token_pattern=r"(?u)\b\w+\b"
)
X = vectorizer.fit_transform(df["message"])

iso = IsolationForest(
    n_estimators=200,
    contamination=0.04,   # ~4% anomalies (tune if needed)
    random_state=42,
    n_jobs=-1
)
iso.fit(X)
pred = iso.predict(X)               # -1 = anomaly, 1 = normal
scores = -iso.score_samples(X)      # higher = more suspicious

df["anomaly_score"] = scores
df["prediction"]   = np.where(pred==-1, "anomaly", "normal")

print("Prediction counts:\n", df["prediction"].value_counts())

# %% [Visualization – anomaly scores over time + top events table]
plt.figure(figsize=(10,4))
plt.plot(df["anomaly_score"].values)
plt.title("Threat Hunting – Anomaly Scores Over Time")
plt.xlabel("Event index")
plt.ylabel("Suspiciousness (higher = more anomalous)")
plt.show()

TOP_N = 15
top = df.sort_values("anomaly_score", ascending=False).head(TOP_N)[["anomaly_score","message","source"]]
display(top)


Combined dataset rows: 101045


In [None]:

# Maps those to MITRE ATT&CK techniques
def explain(msg: str, score: float, high_thr: float, med_thr: float):
    reasons = []
    if re.search(r"Failed password|authentication failure|invalid user", msg, re.I):
        reasons.append("Multiple failed authentication attempts (possible brute force) [MITRE T1110]")
    if re.search(r"port scan|scan detected", msg, re.I):
        reasons.append("Port scanning behavior [Reconnaissance]")
    if re.search(r"sudo:.*authentication failure", msg, re.I):
        reasons.append("Privilege escalation attempt via sudo [Privilege Escalation]")
    if re.search(r"new SSH key|authorized_keys", msg, re.I):
        reasons.append("SSH key tampering (persistence / lateral movement) [TA0003/TA0008]")
    if re.search(r"\broot\b", msg, re.I):
        reasons.append("Activity targeting root account (high impact if compromised)")
    if not reasons:
        reasons.append("Unusual pattern vs baseline (text anomaly)")

    severity = "LOW"
    if score >= high_thr: severity = "HIGH"
    elif score >= med_thr: severity = "MEDIUM"
    return severity + " | " + "; ".join(reasons)

high_thr = np.percentile(df["anomaly_score"], 97)   # top 3% = HIGH
med_thr  = np.percentile(df["anomaly_score"], 90)   # 90–97% = MEDIUM

#Shows Top 50 suspicious events with explanations.
results = df.sort_values("anomaly_score", ascending=False).head(50).copy()
results["explanation"] = results.apply(lambda r: explain(r["message"], r["anomaly_score"], high_thr, med_thr), axis=1)

print("=== Findings (Top 50 with explanations) ===")
display(results[["anomaly_score","explanation","message","source"]].head(20))

escalate = results[results["explanation"].str.startswith("HIGH")].copy()
print("\n=== ESCALATE to analyst (HIGH only) ===")
display(escalate[["anomaly_score","explanation","message","source"]])

results.to_csv("threat_hunting_findings.csv", index=False)
escalate.to_csv("threat_hunting_escalations.csv", index=False)
print("Saved CSVs: threat_hunting_findings.csv, threat_hunting_escalations.csv")

# %% Agentic to do correlation + hypotheses + next actions
ip_re = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
user_re = re.compile(r"\bfor\s+([A-Za-z0-9_\-]+)\b")

def extract_indicators(msg: str):
    ips = ip_re.findall(msg)
    users = user_re.findall(msg)
    if " root " in f" {msg.lower()} " and "root" not in users:
        users.append("root")
    return ips, users
##hypothesis
def classify_tag(msg: str):
    tags = []
    if re.search(r"Failed password|authentication failure|invalid user", msg, re.I):
        tags.append(("BruteForce", "T1110"))
    if re.search(r"port scan|scan detected", msg, re.I):
        tags.append(("Recon/PortScan", "TA0043"))
    if re.search(r"sudo:.*authentication failure", msg, re.I):
        tags.append(("PrivEsc-Sudo", "T1068/T1548"))
    if re.search(r"new SSH key|authorized_keys", msg, re.I):
        tags.append(("Persistence-SSHKey", "T1098/T1090"))
    if re.search(r"\broot\b", msg, re.I):
        tags.append(("HighValueAccount-root", "TA0001"))
    if not tags:
        tags.append(("Anomalous-Behavior", "Unknown"))
    return tags

PLAYBOOK = {
    "BruteForce": {
        "hypothesis": "Adversary attempting credential brute force against SSH.",
        "next_checks": [
            "Count failed logins per source IP & username (last 15m)",
            "Check for successful login after bursts of failures",
            "Geo-IP & reputation of source addresses",
            "Verify lockout thresholds & MFA"
        ],
        "look_in": ["auth.log / CloudWatch", "IAM/SSO logs", "VPC Flow Logs"]
    },
    "Recon/PortScan": {
        "hypothesis": "External host scanning ports to enumerate services.",
        "next_checks": [
            "Aggregate destination ports & rate per source IP",
            "Check WAF/VPC Flow logs for resets/denies",
            "Look for subsequent exploit attempts from same IP"
        ],
        "look_in": ["VPC Flow Logs", "WAF logs", "ALB/NLB access logs"]
    },
    "PrivEsc-Sudo": {
        "hypothesis": "Privilege escalation attempt via sudo or misconfig.",
        "next_checks": [
            "List recent sudo failures & successes",
            "Review /etc/sudoers changes & package installs",
            "Correlate with new services or cron edits"
        ],
        "look_in": ["sudo logs", "OS config audit", "CloudTrail/SSM"]
    },
    "Persistence-SSHKey": {
        "hypothesis": "SSH authorized_keys modified for persistence/lateral movement.",
        "next_checks": [
            "Diff authorized_keys across hosts/users",
            "Trace SSH from suspect IP to other hosts",
            "Rotate keys / force MFA revalidation"
        ],
        "look_in": ["/home/*/.ssh/authorized_keys", "SSH server logs", "EDR telemetry"]
    },
    "HighValueAccount-root": {
        "hypothesis": "Activity targeting root account; high impact if compromised.",
        "next_checks": [
            "Confirm root SSH login disabled",
            "Search for password changes or sudo to root",
            "Enable alerting on any root session events"
        ],
        "look_in": ["auth.log", "Hardening baseline", "SIEM rules"]
    },
    "Anomalous-Behavior": {
        "hypothesis": "Unusual pattern vs. baseline; requires triage.",
        "next_checks": [
            "Cluster similar messages vs. historical frequency",
            "Check if host/user/IP is new or rare"
        ],
        "look_in": ["SIEM baseline", "Asset inventory", "Change management"]
    }
}

subset = results.copy()
subset["severity"] = subset["explanation"].str.extract(r"^(HIGH|MEDIUM|LOW)")
subset = subset[subset["severity"].isin(["HIGH","MEDIUM"])].copy()

rows = []
#get IP address and username tfrom logs
ip_counter, user_counter = Counter(), Counter()
hi_thr = np.percentile(df["anomaly_score"], 97)

for _, r in subset.iterrows():
    msg = r["message"]
    score = float(r["anomaly_score"])
    tags = classify_tag(msg)
    ips, users = extract_indicators(msg)
    for ip in ips: ip_counter[ip] += 1
    for u in users: user_counter[u] += 1
    for tag, mitre in tags:
        pb = PLAYBOOK[tag]
        rows.append({
            "severity": "HIGH" if score >= hi_thr else "MEDIUM",
            "type": tag,
            "mitre": mitre,
            "anomaly_score": round(score, 6),
            "ip": ", ".join(ips) if ips else "-",
            "user": ", ".join(users) if users else "-",
            "hypothesis": pb["hypothesis"],
            "next_checks": " | ".join(pb["next_checks"]),
            "look_in": ", ".join(pb["look_in"]),
            "message": (msg[:160] + "...") if len(msg) > 160 else msg
        })

#output csv file
agent_df = pd.DataFrame(rows).sort_values(["severity","anomaly_score"], ascending=[True, False]).reset_index(drop=True)

print("=== Agentic Correlation & Hypotheses ===")
display(agent_df[["severity","type","mitre","anomaly_score","ip","user","hypothesis","next_checks"]].head(15))

print("\n=== Hot Indicators (correlated) ===")
hot_ips = pd.DataFrame(ip_counter.most_common(10), columns=["ip","count"])
hot_users = pd.DataFrame(user_counter.most_common(10), columns=["user","count"])
display(hot_ips)
display(hot_users)

agent_df[agent_df["severity"]=="HIGH"].to_csv("agent_escalation_package.csv", index=False)
print("\nSaved escalation package: agent_escalation_package.csv")


In [None]:
from datetime import datetime, timedelta, timezone
import re

# --- Helper to safely parse ISO timestamps ---
def parse_iso(ts):
    try:
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))
    except Exception:
        return None

# --- Timeline extraction ---
ts = []
iso_re = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
for m in df["message"].astype(str).tolist():
    match = iso_re.search(m)
    ts.append(parse_iso(match.group(0)) if match else None)

base = datetime.now(timezone.utc) - timedelta(minutes=len(df)//2)

df["ts"] = [
    t if t else base + timedelta(seconds=i*5)
    for i, t in enumerate(ts)
]


In [None]:
# Severity bands
high_thr = np.percentile(df["anomaly_score"], 97)
med_thr  = np.percentile(df["anomaly_score"], 90)
def sev(s): 
    return "HIGH" if s>=high_thr else ("MEDIUM" if s>=med_thr else "LOW")
df["severity"] = df["anomaly_score"].apply(sev)

# Plot A :severity.
plt.figure(figsize=(11,4))
colors = {"HIGH":"crimson","MEDIUM":"orange","LOW":"steelblue"}
for sv in ["LOW","MEDIUM","HIGH"]:
    dsub = df[df["severity"]==sv]
    plt.scatter(dsub["ts"], dsub["anomaly_score"], s=12, alpha=0.65, label=sv, c=colors[sv])
plt.title("Threat Hunting Timeline – Suspicious Events by Severity")
plt.xlabel("Time")
plt.ylabel("Anomaly Score")
plt.legend()
plt.tight_layout()
plt.show()

# Plot B :BOTSv1 dataset provides historic labeled logs (2016), while synthetic data simulates recent cloud events (2025)
plt.figure(figsize=(11,5))
for src, color in [("botsv1", "steelblue"), ("synthetic", "crimson")]:
    dsub = df[df["source"].str.contains(src, case=False, na=False)]
    plt.scatter(dsub["ts"], dsub["anomaly_score"], s=12, alpha=0.6, label=src, c=color)
plt.title("Threat Hunting Timeline – BOTSv1 vs Synthetic Events")
plt.xlabel("Time")
plt.ylabel("Anomaly Score")
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
# ==== CONFIDENCE SCORE (Model + Rules) ====
import re
#Rule-based detection
def rule_signal(msg: str):
    sig = 0.0
    if re.search(r"Failed password|invalid user|authentication failure", msg, re.I): sig += 0.35
    if re.search(r"new SSH key|suspicious lateral movement|tampering", msg, re.I): sig += 0.40
    if re.search(r"root|admin", msg, re.I): sig += 0.25
    return min(sig, 1.0)

df["rule_signal"] = df["message"].astype(str).map(rule_signal)
#Hybrid confidence score
df["confidence_score"] = 0.6*df["anomaly_score"] + 0.4*df["rule_signal"]

def explain_event(msg):
    if "failed" in msg.lower():
        return "Multiple failed authentication attempts"
    elif "ssh key" in msg.lower():
        return "SSH key tampering / lateral movement"
    elif "root" in msg.lower():
        return "Root account activity"
    else:
        return "General anomaly detected"

df["explanation"] = df["message"].apply(explain_event)

In [None]:
# Show top suspicious events
df.sort_values("confidence_score", ascending=False).head(10)[
    ["confidence_score","anomaly_score","rule_signal","explanation","message"]
]

# ==== ATTACK PATH GRAPH (IP ↔ Users) ====
import networkx as nx

edges = []
for msg in df["message"].astype(str).tolist():
    m = re.search(r"from (\d+\.\d+\.\d+\.\d+)", msg)
    u = re.search(r"user (\w+)", msg)
    if m and u:
        edges.append((m.group(1), u.group(1)))

G = nx.Graph()
G.add_edges_from(edges)

plt.figure(figsize=(9,6))
nx.draw(G, with_labels=True, node_color="lightblue", font_size=8, node_size=1200)
plt.title("Attack Path Graph – IPs ↔ Users")
plt.show()


In [None]:
# ==== REAL-TIME STREAMING THREAT HUNTING SIMULATION ====
import random
import time
from IPython.display import clear_output

# Define event templates
event_templates = [
    "Failed password for invalid user {user} from {ip} port 22 ssh2",
    "Accepted password for user {user} from {ip} port 22 ssh2",
    "Suspicious lateral movement: new SSH key added for {user}",
    "sudo: pam_unix(sudo:auth): authentication failure; user={user} from {ip}",
    "Process tampering detected: {user} attempted privilege escalation from {ip}"
]

users = ["admin", "root", "ubuntu", "dbuser", "testuser"]
ips = [f"192.168.1.{i}" for i in range(2,50)] + [f"185.213.2.{i}" for i in range(200,220)]




In [None]:
# Live streaming loop (10 iterations demo)
stream_data = []
for i in range(10):  
    msg = random.choice(event_templates).format(user=random.choice(users), ip=random.choice(ips))
    anomaly_score = round(random.uniform(0.2,0.9), 3)
    sev = "HIGH" if anomaly_score > 0.7 else ("MEDIUM" if anomaly_score > 0.5 else "LOW")

    event = {
        "ts": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
        "message": msg,
        "anomaly_score": anomaly_score,
        "severity": sev
    }
    stream_data.append(event)

    # Refresh display
    clear_output(wait=True)
    print(f"=== Streaming Threat Hunting Agent (demo) – Iteration {i+1}/10 ===\n")
    for e in stream_data[-5:]:  # show last 5 events
        print(f"[{e['ts']}] {e['severity']} | {e['message']} | score={e['anomaly_score']}")

    time.sleep(1.5)  # delay to mimic real feed

# ==== Automated Response (SOAR / WAF / NACL) ====
import re, json, time, ipaddress, pandas as pd
from datetime import datetime

DRY_RUN = True                # Integrate to enable real actions
CONF_THRESHOLD = 0.90         # Minimum confidence threshold to trigger the response, can change depends on organization needs

from IPython.display import clear_output

# ANSI color codes
COLORS = {
    "LOW": "\033[92m",      # Green
    "MEDIUM": "\033[93m",   # Yellow
    "HIGH": "\033[91m",     # Red
    "ENDC": "\033[0m",      # Reset
    "BOLD": "\033[1m"
}

def automated_response(event):
    ip_match = re.search(r"from (\d+\.\d+\.\d+\.\d+)", event["message"])
    ip = ip_match.group(1) if ip_match else None
    sev = event["severity"]

    if event["anomaly_score"] >= CONF_THRESHOLD or sev == "HIGH":
        print(f"\n{COLORS['BOLD']}{COLORS['HIGH']}🚨 [AUTO-RESPONSE TRIGGERED]{COLORS['ENDC']}")
        print(f"   Suspicious event: {event['message']}")
        if ip:
            print(f"   ➡️ Blocking IP {ip} in AWS WAF / NACL (simulated)")
        print(f"   ➡️ Disabling suspicious account (simulated)")
        print("   ➡️ Escalating alert to SOC analyst via Slack/Email (simulated)\n")

# Streaming loop with color-coded severities
stream_data = []
for i in range(10):  
    msg = random.choice(event_templates).format(user=random.choice(users), ip=random.choice(ips))
    anomaly_score = round(random.uniform(0.2,0.99), 3)
    sev = "HIGH" if anomaly_score > 0.7 else ("MEDIUM" if anomaly_score > 0.5 else "LOW")

    event = {
        "ts": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
        "message": msg,
        "anomaly_score": anomaly_score,
        "severity": sev
    }
    stream_data.append(event)

    # Refresh display
    clear_output(wait=True)
    print(f"=== Streaming Threat Hunting Agent (demo) – Iteration {i+1}/10 ===\n")
    for e in stream_data[-5:]:  # show last 5 events
        color = COLORS[e["severity"]]
        endc = COLORS["ENDC"]
        print(f"{color}[{e['ts']}] {e['severity']} | {e['message']} | score={e['anomaly_score']}{endc}")

    # Call fake SOAR
    automated_response(event)

    time.sleep(1.5) 


In [None]:
#  Extract suspicious public IPs from high-confidence alerts
if "confidence_score" not in df.columns:
    raise RuntimeError("Column confidence_score not found. Please run scoring cell first.")

candidates = df[df["confidence_score"] >= CONF_THRESHOLD].copy()
ip_re = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")

def is_public_ip(s):
    try:
        return ipaddress.ip_address(s).is_global
    except Exception:
        return False

block_ips = set()
for m in candidates["message"].astype(str):
    for ip in ip_re.findall(m):
        if is_public_ip(ip):
            block_ips.add(ip)

print(f"[AUTO-RESPONSE] Alerts >= {CONF_THRESHOLD}: {len(candidates)}")
print(f"[AUTO-RESPONSE] Candidate IPs to block: {sorted(block_ips)}")
if not block_ips:
    print("[AUTO-RESPONSE] No public IPs found, stopping here.")


In [None]:
#SOAR API endpoint
SOAR_WEBHOOK_URL = ""        
SOAR_API_KEY     = ""        

def soar_block_ip(ip, vendor="SOAR"):
    if DRY_RUN or not SOAR_WEBHOOK_URL:
        print(f"[DRY-RUN] {vendor}: Would block IP => {ip}")
        return {"status":"dry-run","ip":ip,"vendor":vendor}
    try:
        import requests
        payload = {
            "action": "block_ip",
            "ip": ip,
            "source": "ThreatHuntingAgent",
            "confidence": "very_high",
            "timestamp": datetime.utcnow().isoformat() + "Z"
        }
        headers = {"Content-Type":"application/json"}
        if SOAR_API_KEY:
            headers["Authorization"] = f"Bearer {SOAR_API_KEY}"
        r = requests.post(SOAR_WEBHOOK_URL, json=payload, headers=headers, timeout=10)
        print(f"[SOAR] {ip} -> {r.status_code} {r.text[:200]}")
        return {"status":r.status_code,"ip":ip,"vendor":vendor}
    except Exception as e:
        print(f"[SOAR] Error: {e}")
        return {"status":"error","ip":ip,"vendor":vendor,"err":str(e)}


In [None]:
# === Unified Auto-Response Pipeline ===
#SOAR+WAF+NACL

DRY_RUN = True

def auto_response_pipeline(block_ips, 
                           soar=True, waf=True, nacl=False, 
                           dry_run=True):
    """
    block_ips: set/list of IPs to block
    soar, waf, nacl: choose which actions to run
    dry_run: if True, no real AWS/SOAR changes (safe demo mode)
    """
    actions_log = []
    if not block_ips:
        print("[AUTO] No IPs to process.")
        return actions_log

    print(f"[AUTO] Processing {len(block_ips)} suspicious IP(s): {sorted(block_ips)}")

    # --- SOAR Integration ---
    if soar:
        for ip in sorted(block_ips):
            actions_log.append(soar_block_ip(ip, vendor="SOAR"))
    
    # --- AWS WAFv2 Integration ---
    if waf and WAF_IPSET_ID and WAF_NAME:
        added = waf_block_ips(block_ips)
        for a in added:
            actions_log.append({
                "status": "ok" if not dry_run else "dry-run",
                "ip": a, "vendor": "WAFv2"
            })
    
    # --- AWS VPC NACL Integration ---
    if nacl:
        for ip in sorted(block_ips):
            actions_log.append(nacl_deny_ip(ip))
    
    # Save results
    if actions_log:
        pd.DataFrame(actions_log).to_csv("auto_response_actions.csv", index=False)
        print("[AUTO] Saved action log: auto_response_actions.csv")
    else:
        print("[AUTO] No automated actions executed.")

    return actions_log
# === Suspicious IPs (detected from pipeline) ===
suspicious_ips = ["185.213.2.205", "203.0.113.77"]

# === SOAR Webhook Demo (DRY-RUN) ===
DRY_RUN = True
SOAR_WEBHOOK_URL = ""   # keep blank for demo
SOAR_API_KEY     = ""   # optional

def soar_block_ip(ip, vendor="SOAR"):
    if DRY_RUN or not SOAR_WEBHOOK_URL:
        print(f"[DRY-RUN] {vendor}: Would block IP => {ip}")
        return {"status":"dry-run","ip":ip,"vendor":vendor}
    try:
        import requests
        from datetime import datetime
        payload = {
            "action": "block_ip",
            "ip": ip,
            "source": "ThreatHuntingAgent",
            "confidence": "very_high",
            "timestamp": datetime.utcnow().isoformat() + "Z"
        }
        headers = {"Content-Type":"application/json"}
        if SOAR_API_KEY:
            headers["Authorization"] = f"Bearer {SOAR_API_KEY}"
        r = requests.post(SOAR_WEBHOOK_URL, json=payload, headers=headers, timeout=10)
        print(f"[SOAR] {ip} -> {r.status_code} {r.text[:200]}")
        return {"status":r.status_code,"ip":ip,"vendor":vendor}
    except Exception as e:
        print(f"[SOAR] Error: {e}")
        return {"status":"error","ip":ip,"vendor":vendor,"err":str(e)}

# === Run demo ===
print(f"[AUTO] Processing suspicious IPs: {suspicious_ips}")
for ip in suspicious_ips:
    soar_block_ip(ip)


