In [0]:
%pip install geoip2 netaddr
dbutils.library.restartPython()

In [0]:
import builtins
import pyspark.sql.functions as F

common = dbutils.import_notebook("lib.common")  # repo의 /lib/common.py
builtins.detect = common.detect                # @detect 데코레이터를 룰 노트북이 찾게 함
builtins.Output = common.Output
builtins.dbutils = dbutils
builtins.spark = spark
builtins.F = F

In [0]:
# 룰 노트북들이 import 시점에 dbutils.widgets.get("earliest"/"latest")를 호출하므로
# Runner에서 위젯을 먼저 만들어둔다.

for k in ["earliest", "latest"]:
    try:
        dbutils.widgets.remove(k)
    except:
        pass

dbutils.widgets.text("earliest", "")
dbutils.widgets.text("latest", "")

In [0]:
import sys

nb_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
repo_ws_root = "/".join(nb_path.split("/")[:4])          # /Repos/<user>/<repo>
repo_fs_root = f"/Workspace{repo_ws_root}"               # /Workspace/Repos/<user>/<repo>
materialized_fs_root = f"{repo_fs_root}/materialized_py" # /Workspace/Repos/<user>/<repo>/materialized_py

if materialized_fs_root not in sys.path:
    sys.path.insert(0, materialized_fs_root)

print("added_to_sys.path =", materialized_fs_root)
print("sys.path[0:3] =", sys.path[0:3])


In [0]:
# Databricks 위젯(파라미터)
dbutils.widgets.text("rule_group", "binary")      # binary | behavioral
dbutils.widgets.text("rule_ids", "")              # 옵션: 콤마로 rule_id 지정 (비우면 전체)

rule_group = dbutils.widgets.get("rule_group").strip()
rule_ids_raw = dbutils.widgets.get("rule_ids").strip()
rule_ids = [x.strip() for x in rule_ids_raw.split(",") if x.strip()] if rule_ids_raw else []

# 룰 로드 (네가 쓰는 스키마: sandbox.audit_poc)
query = f"""
SELECT rule_id, rule_group, module_path, callable_name, lookback_minutes
FROM sandbox.audit_poc.rule_registry
WHERE enabled = true
  AND rule_group = '{rule_group}'
"""

if rule_ids:
    ids = ",".join([f"'{x}'" for x in rule_ids])
    query += f" AND rule_id IN ({ids})"

rules_df = spark.sql(query)

display(rules_df)
print("loaded_rules_count =", rules_df.count())


In [0]:
from pyspark.sql.functions import col, current_timestamp, expr, coalesce

# checkpoint 조인해서 window 계산
# earliest = last_success_end_ts가 있으면 그 값, 없으면 now - lookback_minutes
# latest   = now
windows_df = (
    rules_df.alias("r")
    .join(
        spark.table("sandbox.audit_poc.rule_checkpoint").alias("c"),
        on="rule_id",
        how="left"
    )
    .select(
        col("r.rule_id"),
        col("r.rule_group"),
        col("r.module_path"),
        col("r.callable_name"),
        col("r.lookback_minutes"),
        coalesce(
            col("c.last_success_end_ts"),
            expr("current_timestamp() - INTERVAL 1 MINUTE * lookback_minutes")
        ).alias("window_start_ts"),
        current_timestamp().alias("window_end_ts")
    )
)

display(windows_df.select("rule_id","window_start_ts","window_end_ts","lookback_minutes").orderBy("rule_id"))
print("windows_count =", windows_df.count())


In [0]:
import os

nb_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
repo_ws_root = "/".join(nb_path.split("/")[:4])  # /Repos/<user>/<repo>

updates = []
for rg in ["binary", "behavioral"]:
    folder_ws = f"{repo_ws_root}/base/detections/{rg}"
    folder_fs = f"/Workspace{folder_ws}"

    for fname in sorted(os.listdir(folder_fs)):
        if not fname.endswith(".py") or fname.startswith("_"):
            continue
        rule_id = fname[:-3]
        notebook_path = f"{folder_ws}/{fname}"
        updates.append((rule_id, notebook_path))

df_upd = spark.createDataFrame(updates, "rule_id string, notebook_path string")
df_upd.createOrReplaceTempView("upd")

spark.sql("""
MERGE INTO sandbox.audit_poc.rule_registry t
USING upd s
ON t.rule_id = s.rule_id
WHEN MATCHED THEN UPDATE SET
  t.notebook_path = s.notebook_path,
  t.run_mode = 'NOTEBOOK',
  t.updated_at = current_timestamp()
""")

display(spark.sql("SELECT rule_group, COUNT(*) cnt, COUNT(notebook_path) nb_cnt FROM sandbox.audit_poc.rule_registry GROUP BY rule_group"))


In [0]:
# 룰 1개 테스트: dbutils.import_notebook 방식

one = windows_df.orderBy("rule_id").limit(1).collect()[0]
rule_id = one["rule_id"]
start_ts = one["window_start_ts"]
end_ts = one["window_end_ts"]

r = spark.sql(f"""
SELECT module_path, callable_name
FROM sandbox.audit_poc.rule_registry
WHERE rule_id = '{rule_id}'
""").collect()[0]

module_path = r["module_path"]      # 예: base.detections.binary.attempted_logon_from_denied_ip
callable_name = r["callable_name"]  # 예: attempted_logon_from_denied_ip

print("TEST RULE:", rule_id, module_path, callable_name)
print("WINDOW:", start_ts, "->", end_ts)

mod = dbutils.import_notebook(module_path)
fn = getattr(mod, callable_name)

result_df = fn(earliest=str(start_ts), latest=str(end_ts))

print("result_count =", result_df.count())
display(result_df.limit(50))


In [0]:
from pyspark.sql.functions import current_timestamp, lit, sha2, concat_ws

target_table = f"sandbox.audit_poc.findings_{rule_id}"

# 공통 컬럼 붙이기 + dedupe_key 만들기(빈 DF여도 스키마 맞춤용)
out_df = (
    result_df
    .withColumn("observed_at", current_timestamp())
    .withColumn("rule_id", lit(rule_id))
    .withColumn("run_id", lit("manual_test"))
    .withColumn("window_start_ts", lit(start_ts))
    .withColumn("window_end_ts", lit(end_ts))
)

# 결과에 명확한 키 컬럼이 없으니, 전체 row를 문자열로 해시해서 dedupe_key로 사용
out_df = out_df.withColumn("dedupe_key", sha2(concat_ws("||", *[out_df[c].cast("string") for c in out_df.columns]), 256))

# 테이블 생성/적재 (0건이면 스키마만 생성되도록)
(out_df.write
 .format("delta")
 .mode("append")
 .option("mergeSchema", "true")
 .saveAsTable(target_table)
)

print("saved_to:", target_table)
display(spark.sql(f"SELECT COUNT(*) AS cnt FROM {target_table}"))


In [0]:
import time, importlib
from pyspark.sql.functions import current_timestamp, lit, sha2, concat_ws

run_id = f"manual_binary_{int(time.time())}"

# binary 룰들만
binary_rules = windows_df.filter("rule_group = 'binary'").collect()

results = []

for r in binary_rules:
    rule_id = r["rule_id"]
    start_ts = r["window_start_ts"]
    end_ts = r["window_end_ts"]

    # registry에서 module_path/callable_name 재조회(최신값 보장)
    meta = spark.sql(f"""
    SELECT module_path, callable_name
    FROM sandbox.audit_poc.rule_registry
    WHERE rule_id = '{rule_id}' AND enabled = true
    """).collect()
    if not meta:
        continue
    module_path, callable_name = meta[0]["module_path"], meta[0]["callable_name"]

    t0 = time.time()
    status = "SUCCESS"
    err = None
    row_count = None

    try:
        mod = dbutils.import_notebook(module_path)
        fn = getattr(mod, callable_name)
        df = fn(earliest=str(start_ts), latest=str(end_ts))

        row_count = df.count()

        out_df = (
            df
            .withColumn("observed_at", current_timestamp())
            .withColumn("rule_id", lit(rule_id))
            .withColumn("run_id", lit(run_id))
            .withColumn("window_start_ts", lit(start_ts))
            .withColumn("window_end_ts", lit(end_ts))
        )

        # dedupe_key: row 전체 해시
        out_df = out_df.withColumn(
            "dedupe_key",
            sha2(concat_ws("||", *[out_df[c].cast("string") for c in out_df.columns]), 256)
        )

        target_table = f"sandbox.audit_poc.findings_{rule_id}"
        (out_df.write.format("delta")
            .mode("append")
            .option("mergeSchema", "true")
            .saveAsTable(target_table)
        )

    except Exception as e:
        status = "FAILED"
        err = str(e)[:4000]

    duration_ms = int((time.time() - t0) * 1000)
    results.append((run_id, rule_id, str(start_ts), str(end_ts), status, row_count, duration_ms, err))

# 요약 출력
summary_df = spark.createDataFrame(
    results,
    "run_id string, rule_id string, window_start string, window_end string, status string, row_count long, duration_ms long, error string"
)
display(summary_df.orderBy("status", "rule_id"))


In [0]:
import time, importlib
from pyspark.sql.functions import current_timestamp, lit, sha2, concat_ws

run_id = f"manual_behavioral_{int(time.time())}"

# behavioral 룰들만
behavioral_rules = windows_df.filter("rule_group = 'behavioral'").collect()

results = []

for r in behavioral_rules:
    rule_id = r["rule_id"]
    start_ts = r["window_start_ts"]
    end_ts = r["window_end_ts"]

    # registry에서 module_path/callable_name 재조회(최신값 보장)
    meta = spark.sql(f"""
    SELECT module_path, callable_name
    FROM sandbox.audit_poc.rule_registry
    WHERE rule_id = '{rule_id}' AND enabled = true
    """).collect()
    if not meta:
        continue
    module_path, callable_name = meta[0]["module_path"], meta[0]["callable_name"]

    t0 = time.time()
    status = "SUCCESS"
    err = None
    row_count = None

    try:
        mod = dbutils.import_notebook(module_path)
        fn = getattr(mod, callable_name)
        df = fn(earliest=str(start_ts), latest=str(end_ts))

        row_count = df.count()

        out_df = (
            df
            .withColumn("observed_at", current_timestamp())
            .withColumn("rule_id", lit(rule_id))
            .withColumn("run_id", lit(run_id))
            .withColumn("window_start_ts", lit(start_ts))
            .withColumn("window_end_ts", lit(end_ts))
        )

        # dedupe_key: row 전체 해시
        out_df = out_df.withColumn(
            "dedupe_key",
            sha2(concat_ws("||", *[out_df[c].cast("string") for c in out_df.columns]), 256)
        )

        target_table = f"sandbox.audit_poc.findings_{rule_id}"
        (out_df.write.format("delta")
            .mode("append")
            .option("mergeSchema", "true")
            .saveAsTable(target_table)
        )

    except Exception as e:
        status = "FAILED"
        err = str(e)[:4000]

    duration_ms = int((time.time() - t0) * 1000)
    results.append((run_id, rule_id, str(start_ts), str(end_ts), status, row_count, duration_ms, err))

# 요약 출력
summary_df = spark.createDataFrame(
    results,
    "run_id string, rule_id string, window_start string, window_end string, status string, row_count long, duration_ms long, error string"
)
display(summary_df.orderBy("status", "rule_id"))


In [0]:
# 통합 테이블 적재함수
import pyspark.sql.functions as F

UNIFIED_TBL = "sandbox.audit_poc.findings_unified"
DEFAULT_SEVERITY = "medium"

# rule_registry에 severity 컬럼이 있으면 읽고, 없으면 전부 기본값 사용
_registry_cols = set(spark.table(REGISTRY_TBL).columns)
_has_severity = "severity" in _registry_cols

if _has_severity:
    _sev_rows = spark.table(REGISTRY_TBL).select("rule_id", "severity").collect()
    SEVERITY_MAP = {r["rule_id"]: (r["severity"] or DEFAULT_SEVERITY) for r in _sev_rows}
else:
    SEVERITY_MAP = {}

def _resolve_event_ts_col(df):
    for c in ["EVENT_TIME", "event_time", "event_ts", "EVENT_TS", "timestamp", "time"]:
        if c in df.columns:
            return F.col(c).cast("timestamp")
    return F.current_timestamp()

def _write_unified(rule_id: str, df, run_id: str, start_ts, end_ts) -> None:
    event_ts_expr = _resolve_event_ts_col(df)
    payload_json_expr = F.to_json(F.struct(*[F.col(c) for c in df.columns])) if df.columns else F.lit("{}")

    severity = SEVERITY_MAP.get(rule_id, DEFAULT_SEVERITY)

    unified_df = (
        df.select(
            event_ts_expr.alias("event_ts"),
            F.to_date(event_ts_expr).alias("event_date"),
            F.lit(rule_id).alias("rule_id"),
            F.lit(severity).alias("severity"),
            F.lit(run_id).alias("run_id"),
            F.lit(start_ts).alias("window_start_ts"),
            F.lit(end_ts).alias("window_end_ts"),
            F.current_timestamp().alias("observed_at"),
            payload_json_expr.alias("payload_json"),
        )
        .withColumn(
            "dedupe_key",
            F.sha2(
                F.concat_ws(
                    "||",
                    F.col("rule_id"),
                    F.coalesce(F.col("event_ts").cast("string"), F.lit("")),
                    F.col("payload_json"),
                ),
                256,
            ),
        )
    )

    (unified_df.write
        .format("delta")
        .mode("append")
        .saveAsTable(UNIFIED_TBL)
    )
