In [0]:
%pip install geoip2 netaddr
dbutils.library.restartPython()

In [0]:
import builtins
import pyspark.sql.functions as F

common = dbutils.import_notebook("lib.common")  # repo의 /lib/common.py
builtins.detect = common.detect                # @detect 데코레이터를 룰 노트북이 찾게 함
builtins.Output = common.Output
builtins.dbutils = dbutils
builtins.spark = spark
builtins.F = F

In [0]:
# 룰 노트북들이 import 시점에 dbutils.widgets.get("earliest"/"latest")를 호출하므로
# Runner에서 위젯을 먼저 만들어둔다.

for k in ["earliest", "latest"]:
    try:
        dbutils.widgets.remove(k)
    except:
        pass

dbutils.widgets.text("earliest", "") # 예시 2026-02-01 00:00:00
dbutils.widgets.text("latest", "")

In [0]:
import sys

nb_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
repo_ws_root = "/".join(nb_path.split("/")[:4])          # /Repos/<user>/<repo>
repo_fs_root = f"/Workspace{repo_ws_root}"               # /Workspace/Repos/<user>/<repo>
materialized_fs_root = f"{repo_fs_root}/materialized_py" # /Workspace/Repos/<user>/<repo>/materialized_py

if materialized_fs_root not in sys.path:
    sys.path.insert(0, materialized_fs_root)

print("added_to_sys.path =", materialized_fs_root)
print("sys.path[0:3] =", sys.path[0:3])


In [0]:
# Databricks 위젯(파라미터)
dbutils.widgets.text("rule_group", "binary")      # binary | behavioral
dbutils.widgets.text("rule_ids", "")              # 옵션: 콤마로 rule_id 지정 (비우면 전체)
dbutils.widgets.dropdown("dry_run", "false", ["false", "true"])  # true면 저장 skip

dry_run = dbutils.widgets.get("dry_run").strip().lower() == "true"
rule_group = dbutils.widgets.get("rule_group").strip()
rule_ids_raw = dbutils.widgets.get("rule_ids").strip()
rule_ids = [x.strip() for x in rule_ids_raw.split(",") if x.strip()] if rule_ids_raw else []

# 룰 로드
query = f"""
SELECT rule_id, rule_group, module_path, callable_name, lookback_minutes
FROM sandbox.audit_poc.rule_registry
WHERE enabled = true
  AND rule_group = '{rule_group}'
"""

if rule_ids:
    ids = ",".join([f"'{x}'" for x in rule_ids])
    query += f" AND rule_id IN ({ids})"

rules_df = spark.sql(query)

display(rules_df)
print("loaded_rules_count =", rules_df.count())


In [0]:
from pyspark.sql.functions import col, current_timestamp, expr, coalesce

# checkpoint 조인해서 window 계산
# earliest = last_success_end_ts가 있으면 그 값, 없으면 now - lookback_minutes
# latest   = now
windows_df = (
    rules_df.alias("r")
    .join(
        spark.table("sandbox.audit_poc.rule_checkpoint").alias("c"),
        on="rule_id",
        how="left"
    )
    .select(
        col("r.rule_id"),
        col("r.rule_group"),
        col("r.module_path"),
        col("r.callable_name"),
        col("r.lookback_minutes"),
        coalesce(
            col("c.last_success_end_ts"),
            expr("current_timestamp() - INTERVAL 1 MINUTE * lookback_minutes")
        ).alias("window_start_ts"),
        current_timestamp().alias("window_end_ts")
    )
)

display(windows_df.select("rule_id","window_start_ts","window_end_ts","lookback_minutes").orderBy("rule_id"))
print("windows_count =", windows_df.count())


In [0]:
import os

nb_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
repo_ws_root = "/".join(nb_path.split("/")[:4])  # /Repos/<user>/<repo>

detections_ws_root = f"{repo_ws_root}/base/detections"
detections_fs_root = f"/Workspace{detections_ws_root}"
rule_groups = [
    d for d in sorted(os.listdir(detections_fs_root))
    if os.path.isdir(os.path.join(detections_fs_root, d)) and not d.startswith(("_", "."))
]

updates = []
for rg in rule_groups:
    folder_ws = f"{detections_ws_root}/{rg}"
    folder_fs = f"{detections_fs_root}/{rg}"

    for fname in sorted(os.listdir(folder_fs)):
        if not fname.endswith(".py") or fname.startswith("_"):
            continue
        rule_id = fname[:-3]
        notebook_path = f"{folder_ws}/{fname}"
        updates.append((rule_id, notebook_path, rg))

df_upd = spark.createDataFrame(updates, "rule_id string, notebook_path string, rule_group string")
df_upd.createOrReplaceTempView("upd")

spark.sql("""
MERGE INTO sandbox.audit_poc.rule_registry t
USING upd s
ON t.rule_id = s.rule_id
WHEN MATCHED THEN UPDATE SET
  t.notebook_path = s.notebook_path,
  t.rule_group = COALESCE(t.rule_group, s.rule_group),
  t.run_mode = 'NOTEBOOK',
  t.updated_at = current_timestamp()
""")

display(spark.sql("SELECT rule_group, COUNT(*) cnt, COUNT(notebook_path) nb_cnt FROM sandbox.audit_poc.rule_registry GROUP BY rule_group ORDER BY rule_group"))


In [0]:
# 통합 테이블 적재함수 
import pyspark.sql.functions as F

UNIFIED_TBL = "sandbox.audit_poc.findings_unified"

_EXCLUDE_FROM_PAYLOAD = {
    "observed_at", "rule_id", "run_id", "window_start_ts", "window_end_ts", "dedupe_key"
}

def _resolve_event_ts_col(df):
    for c in ["EVENT_TIME", "event_time", "event_ts", "EVENT_TS", "timestamp", "time"]:
        if c in df.columns:
            return F.col(c).cast("timestamp")
    return F.current_timestamp()

def _payload_struct(df):
    cols = [c for c in df.columns if c not in _EXCLUDE_FROM_PAYLOAD]
    if not cols:
        return F.lit("{}")
    return F.to_json(F.struct(*[F.col(c) for c in cols]))

def _write_unified(rule_id: str, df, run_id: str, start_ts, end_ts) -> None:
    event_ts_expr = _resolve_event_ts_col(df)
    payload_json_expr = _payload_struct(df)

    unified_df = (
        df.select(
            event_ts_expr.alias("event_ts"),
            F.to_date(event_ts_expr).alias("event_date"),
            F.lit(rule_id).alias("rule_id"),
            F.lit(run_id).alias("run_id"),
            F.lit(start_ts).alias("window_start_ts"),
            F.lit(end_ts).alias("window_end_ts"),
            F.current_timestamp().alias("observed_at"),
            payload_json_expr.alias("payload_json"),
        )
        .withColumn(
            "dedupe_key",
            F.sha2(
                F.concat_ws(
                    "||",
                    F.col("rule_id"),
                    F.coalesce(F.col("event_ts").cast("string"), F.lit("")),
                    F.col("payload_json"),
                ),
                256,
            ),
        )
    )

    (unified_df.write.format("delta")
        .mode("append")
        .saveAsTable(UNIFIED_TBL)
    )


In [0]:
import time, importlib
from pyspark.sql.functions import current_timestamp, lit, sha2, concat_ws


run_id = f"{rule_group}_{int(time.time())}"

selected_rules = windows_df.filter(f"rule_group = '{rule_group}'").collect()

results = []

_EXCLUDE_FROM_DEDUPE = {"observed_at", "run_id", "window_start_ts", "window_end_ts"}

for r in selected_rules:
    rule_id = r["rule_id"]
    start_ts = r["window_start_ts"]
    end_ts = r["window_end_ts"]

    # registry에서 module_path/callable_name 재조회(최신값 보장)
    meta = spark.sql(f"""
    SELECT module_path, callable_name
    FROM sandbox.audit_poc.rule_registry
    WHERE rule_id = '{rule_id}' AND enabled = true
    """).collect()
    if not meta:
        continue
    module_path, callable_name = meta[0]["module_path"], meta[0]["callable_name"]

    t0 = time.time()
    status = "SUCCESS"
    err = None
    row_count = None

    try:
        mod = dbutils.import_notebook(module_path)
        fn = getattr(mod, callable_name)
        df = fn(earliest=str(start_ts), latest=str(end_ts))

        row_count = df.count()

        out_df = (
            df
            .withColumn("observed_at", current_timestamp())
            .withColumn("rule_id", lit(rule_id))
            .withColumn("run_id", lit(run_id))
            .withColumn("window_start_ts", lit(start_ts))
            .withColumn("window_end_ts", lit(end_ts))
        )

        base_cols = [c for c in out_df.columns if c not in _EXCLUDE_FROM_DEDUPE]
        if "rule_id" not in base_cols:
            base_cols = ["rule_id"] + base_cols

        out_df = out_df.withColumn(
            "dedupe_key",
            sha2(concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in base_cols]), 256)
        )
        if rule_id == 'user_account_created':
            print(rule_id)
            print(out_df.select(concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in base_cols]).alias("value")).toPandas()["value"].tolist())
            print(out_df.select(sha2(concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in base_cols]),256).alias("value")).toPandas()["value"].tolist())

        target_table = f"sandbox.audit_poc.findings_{rule_id}"

        if not dry_run:
            (out_df.write.format("delta")
                .mode("append")
                .option("mergeSchema", "true")
                .saveAsTable(target_table)
            )

            # 통합 테이블 저장
            _write_unified(rule_id, df, run_id, start_ts, end_ts)

    except Exception as e:
        status = "FAILED"
        err = str(e)[:4000]

    duration_ms = int((time.time() - t0) * 1000)
    results.append((run_id, rule_id, str(start_ts), str(end_ts), status, row_count, duration_ms, err))

summary_df = spark.createDataFrame(
    results,
    "run_id string, rule_id string, window_start string, window_end string, status string, row_count long, duration_ms long, error string"
)
display(summary_df.orderBy("status", "rule_id"))
