# Unified Schema for Network Security Analytics

Normalize Zeek `conn.log` and `dns.log` into a single, consistent schema
ready for enrichment, detection rules, and cross-source correlation.

In [None]:
import sys
from pathlib import Path

sys.path.insert(0, str(Path.cwd().parent))

import pandas as pd

from scripts.zeek_to_dataframe import load_zeek_log, CONN_SCHEMA, DNS_SCHEMA
from scripts.normalize import (
    normalize_conn,
    normalize_dns,
    merge_normalized,
    UNIFIED_SCHEMA,
)

pd.set_option("display.max_columns", None)
pd.set_option("display.width", 200)

## 1 — Load raw Zeek logs

In [None]:
ZEEK_DIR = Path("../data/zeek_logs/sample")

conn_raw = load_zeek_log(ZEEK_DIR / "conn.log", schema=CONN_SCHEMA)
dns_raw = load_zeek_log(ZEEK_DIR / "dns.log", schema=DNS_SCHEMA)

print(f"conn.log: {len(conn_raw)} rows, {len(conn_raw.columns)} columns")
print(f"dns.log:  {len(dns_raw)} rows, {len(dns_raw.columns)} columns")

In [None]:
# Raw Zeek field names — these differ between log types
print("conn.log columns:", list(conn_raw.columns))
print()
print("dns.log columns:", list(dns_raw.columns))

## 2 — Normalize to unified schema

Field mapping:

| Zeek field | Unified field | Notes |
|------------|---------------|-------|
| `ts` | `timestamp` | Already datetime64[ns, UTC] |
| `id.orig_h` | `src_ip` | Source IP address |
| `id.orig_p` | `src_port` | Source port |
| `id.resp_h` | `dst_ip` | Destination IP address |
| `id.resp_p` | `dst_port` | Destination port |
| `proto` | `protocol` | tcp, udp, icmp |
| `query` | `dns_query` | DNS-specific |
| `qtype_name` | `dns_qtype` | DNS-specific |
| `rcode_name` | `dns_rcode` | DNS-specific |

In [None]:
conn_norm = normalize_conn(conn_raw)
dns_norm = normalize_dns(dns_raw)

print(f"Normalized conn: {len(conn_norm)} rows, {len(conn_norm.columns)} columns")
print(f"Normalized dns:  {len(dns_norm)} rows, {len(dns_norm.columns)} columns")

In [None]:
# Both DataFrames now share the same column names
print("Unified columns:", list(conn_norm.columns))

In [None]:
# Inspect normalized conn data
conn_norm[["timestamp", "log_type", "src_ip", "dst_ip", "dst_port", "protocol", "service"]].head()

In [None]:
# Inspect normalized dns data — note dns_* fields are populated
dns_norm[["timestamp", "log_type", "src_ip", "dst_ip", "protocol", "dns_query", "dns_rcode"]].head()

In [None]:
# Verify consistent dtypes across both normalized DataFrames
print("conn_norm dtypes:")
print(conn_norm.dtypes.to_string())
print()
print("dns_norm dtypes:")
print(dns_norm.dtypes.to_string())

## 3 — Merge into single unified DataFrame

In [None]:
unified = merge_normalized(conn_norm, dns_norm)

print(f"Unified DataFrame: {len(unified)} rows")
print(f"Log type distribution:")
print(unified["log_type"].value_counts().to_string())

In [None]:
# Sorted by timestamp — interleaved conn and dns events
unified[["timestamp", "log_type", "src_ip", "dst_ip", "dst_port", "protocol", "dns_query"]].head(15)

## 4 — Ready for enrichment

The unified DataFrame can now be passed to enrichment pipelines that expect
consistent field names. Example enrichments:

- **GeoIP lookup** on `src_ip` and `dst_ip`
- **ASN lookup** to identify hosting providers
- **Threat intel matching** against `dst_ip` or `dns_query`
- **Internal/external tagging** based on RFC1918 ranges

In [None]:
# Example: tag internal vs external destination IPs
import ipaddress

PRIVATE_NETWORKS = [
    ipaddress.ip_network("10.0.0.0/8"),
    ipaddress.ip_network("172.16.0.0/12"),
    ipaddress.ip_network("192.168.0.0/16"),
]

def is_internal(ip_str: str) -> bool:
    """Return True if IP is in RFC1918 private ranges."""
    if pd.isna(ip_str):
        return False
    try:
        ip = ipaddress.ip_address(ip_str)
        return any(ip in net for net in PRIVATE_NETWORKS)
    except ValueError:
        return False

unified["dst_internal"] = unified["dst_ip"].apply(is_internal)

print("Destination IP classification:")
print(unified["dst_internal"].value_counts().to_string())

In [None]:
# External connections only — typical scope for threat hunting
external = unified[~unified["dst_internal"]]
external[["timestamp", "log_type", "src_ip", "dst_ip", "dst_port", "dns_query"]].head(10)

In [None]:
# Export unified dataset
out_path = Path("../data/processed/unified.parquet")
unified.to_parquet(out_path, index=False)
print(f"Wrote {len(unified)} rows to {out_path}")