# Simple DNS Anomaly Detection Lab

Hands-on notebook demonstrating:
1. Load DNS dataset
2. Statistical Analysis (Z-score, IQR)
3. ML Anomaly Detection (Isolation Forest, One-Class SVM)
4. Detection Rules (Sigma, IDS/Firewall examples)


## Step 1: Load dataset

In [None]:
import pandas as pd
DATA_PATH = r"/mnt/data/zeek_dns_export.csv"
df = pd.read_csv(DATA_PATH)
print("Rows, Cols:", df.shape)
print("Columns:", df.columns.tolist()[:20], "...")
df.head()


In [None]:
# Normalize Zeek DNS field names if present
rename_map = {
    'source.ip': 'src_ip',
    'destination.ip': 'dst_ip',
    'zeek.dns.query': 'query',
    'zeek.dns.qtype_name': 'qtype',
    'zeek.dns.rcode': 'rcode'
}
for k,v in rename_map.items():
    if k in df.columns and v not in df.columns:
        df = df.rename(columns={k:v})

df['query'] = df.get('query', pd.Series(index=df.index, dtype='object')).fillna('').astype(str)
df['query_len'] = df['query'].astype(str).str.len()

print(df[['src_ip','query','query_len']].head())


## Step 2: Statistical Analysis (Z-score, IQR)

In [None]:
from scipy import stats
import numpy as np

# group per src_ip
agg = df.groupby('src_ip').agg(
    queries_per_ip=('query','count'),
    avg_query_len=('query_len','mean')
).reset_index()

# Z-score on queries_per_ip
agg['zscore'] = stats.zscore(agg['queries_per_ip'])
agg['zscore_flag'] = agg['zscore'].abs() > 3

# IQR
Q1 = agg['queries_per_ip'].quantile(0.25)
Q3 = agg['queries_per_ip'].quantile(0.75)
IQR = Q3 - Q1
lower, upper = Q1 - 1.5*IQR, Q3 + 1.5*IQR
agg['iqr_flag'] = (agg['queries_per_ip'] < lower) | (agg['queries_per_ip'] > upper)

agg.sort_values('queries_per_ip', ascending=False).head(10)


## Step 3: ML Anomaly Detection (Isolation Forest, One-Class SVM)

In [None]:
from sklearn.preprocessing import RobustScaler
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM

features = ['queries_per_ip','avg_query_len']
X = agg[features].fillna(0).values
scaler = RobustScaler()
X_scaled = scaler.fit_transform(X)

# Isolation Forest
if_model = IsolationForest(contamination=0.05, random_state=42)
agg['if_flag'] = if_model.fit_predict(X_scaled) == -1
agg['if_score'] = -if_model.decision_function(X_scaled)

# One-Class SVM
ocsvm = OneClassSVM(kernel='rbf', gamma='scale', nu=0.05)
agg['ocsvm_flag'] = ocsvm.fit_predict(X_scaled) == -1
agg['ocsvm_score'] = -ocsvm.decision_function(X_scaled)

agg[['src_ip','queries_per_ip','if_score','if_flag','ocsvm_score','ocsvm_flag']].head(10)


## Step 4: Detection Rules

Examples of exporting anomaly detection into rules.

In [None]:
import yaml, numpy as np

# Derive threshold (95th percentile of queries_per_ip)
thr = int(np.percentile(agg['queries_per_ip'], 95))

# Sigma rule
sigma = {
  'title': 'Excessive DNS Queries Per Host',
  'id': 'dns-excess-queries-baseline',
  'description': 'Detect hosts with unusually high DNS queries per IP',
  'logsource': {'product':'zeek','service':'dns'},
  'detection': {'selection': {'queries_per_ip': {'min': thr}}, 'condition': 'selection'},
  'level': 'high'
}
with open('/mnt/data/sigma_dns_example.yml','w') as f:
    yaml.safe_dump(sigma,f)
print("Sigma rule written to /mnt/data/sigma_dns_example.yml")

# IDS/firewall example (pseudo):
print("\nIDS/Firewall example:")
print(f"alert dns any any -> any any (msg:\"Excessive DNS Queries\"; threshold: type both, track by_src, count {thr}, seconds 3600; sid:1000001;)")
