Baseline analytics + anomaly detection

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import Window
import pyspark.sql.types as T
import datetime


RAW_TABLE = "raw_events_demo"  
LOOKBACK_DAYS = 7        
METRICS_TABLE = "curated_metrics_demo" 
INCIDENTS_TABLE = "monitoring_incidents"


StatementMeta(, a526cad2-784f-4328-b401-553161cc39e8, 3, Finished, Available, Finished)

In [2]:
#Aggregation produces agg_df with cnt, errors, error_rate, avg_latency
from pyspark.sql import functions as F
from pyspark.sql import types as T


raw_df = spark.table(RAW_TABLE)

cols = set(raw_df.columns)
print("Columns in raw table:", cols)

#ensure event_ts is timestamp type
if "event_ts" in cols:
    raw_df = raw_df.withColumn("event_ts", F.to_timestamp("event_ts"))
else:
    #if there is another time column you can map it here; else use ingest_ts as proxy
    if "ingest_ts" in cols:
        raw_df = raw_df.withColumn("event_ts", F.to_timestamp("ingest_ts"))
    else:
        # no timestamps cannot time-aggregate meaningfully create synthetic event_ts = now
        raw_df = raw_df.withColumn("event_ts", F.current_timestamp())

#ensure ingest_ts exists
if "ingest_ts" not in cols:
    raw_df = raw_df.withColumn("ingest_ts", F.current_timestamp())

#derive latency_ms (ingest_ts - event_ts)
raw_df = raw_df.withColumn(
    "latency_ms",
    (F.col("ingest_ts").cast("long") - F.col("event_ts").cast("long")) * F.lit(1000)
)

#derive error flag with heuristics

error_expr = None

if "error" in cols:
    # if 'error' exists 
    error_expr = F.when(F.col("error").isin([True, "true", "True", 1, "1"]), F.lit(1)).otherwise(F.lit(0))
elif {"status_code", "http_status", "status"}.intersection(cols):
    # check several common names
    # normalize statuses to int if possible
    status_col = None
    for c in ["status_code", "http_status", "status"]:
        if c in cols:
            status_col = c
            break
    # cast to int then check >=400
    error_expr = F.when(F.col(status_col).cast("int") >= 400, F.lit(1)).otherwise(F.lit(0))
elif "payload" in cols:
    # payload often a large JSON string â€” do a contains check for common keywords
    payload = F.lower(F.coalesce(F.col("payload").cast("string"), F.lit("")))
    error_expr = F.when(
        (payload.contains("error")) | (payload.contains("exception")) | (payload.contains("failed")) | (payload.contains("fail")),
        F.lit(1)
    ).otherwise(F.lit(0))
else:
    # no way to determine errors assume zero (safe default)
    error_expr = F.lit(0)

raw_df = raw_df.withColumn("error_flag", error_expr.cast("int"))

# Filter lookback window and aggregate into hourly (or minute) bins
agg_df = raw_df \
    .filter(F.col("event_ts") >= F.current_timestamp() - F.expr(f"INTERVAL {LOOKBACK_DAYS} DAYS")) \
    .withColumn("hour", F.date_trunc("hour", F.col("event_ts"))) \
    .groupBy("hour", "source") \
    .agg(
        F.count(F.lit(1)).alias("cnt"),
        F.sum("error_flag").alias("errors"),
        F.avg(F.col("latency_ms")).alias("avg_latency")
    ) \
    .withColumn("error_rate", F.col("errors") / F.col("cnt"))

# Turn null avg_latency into null or 0 as you prefer; keep null to indicate missing
agg_df = agg_df.select("hour","source","cnt","errors","error_rate","avg_latency")
agg_df.write.format("delta").mode("overwrite").saveAsTable(METRICS_TABLE)
display(agg_df.orderBy(F.col("hour").desc()).limit(50))


StatementMeta(, a526cad2-784f-4328-b401-553161cc39e8, 4, Finished, Available, Finished)

Columns in raw table: {'ingest_ts', 'event_type', 'event_ts', 'payload', 'source', 'actor', 'temp_C', 'text_description', 'station', 'repo', 'wind_speed_mps'}


SynapseWidget(Synapse.DataFrame, 135ca534-bb40-4437-8ced-5107dd51c99b)

In [3]:
#create incidents table (managed Delta table) if not exists
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {INCIDENTS_TABLE} (
  incident_id STRING,
  detected_ts TIMESTAMP,
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  source STRING,
  metric STRING,
  value DOUBLE,
  baseline DOUBLE,
  stddev DOUBLE,
  zscore DOUBLE,
  threshold DOUBLE,
  status STRING,
  evidence_path STRING
)
USING DELTA
""")


StatementMeta(, a526cad2-784f-4328-b401-553161cc39e8, 5, Finished, Available, Finished)

DataFrame[]

In [4]:
# compute rolling mean/std and zscore on 'cnt' (count)
N = 7  # window size in rows (hours) for rolling baseline
win = Window.partitionBy("source").orderBy("hour").rowsBetween(-N, 0)

metrics = agg_df \
  .withColumn("mean_cnt", F.avg("cnt").over(win)) \
  .withColumn("std_cnt", F.stddev_samp("cnt").over(win)) \
  .withColumn("zscore_cnt", (F.col("cnt") - F.col("mean_cnt")) / F.col("std_cnt")) \
  .withColumn("mean_err_rate", F.avg("error_rate").over(win)) \
  .withColumn("std_err_rate", F.stddev_samp("error_rate").over(win)) \
  .withColumn("zscore_err", (F.col("error_rate") - F.col("mean_err_rate")) / F.col("std_err_rate"))

# handle divide-by-zero / null std by replacing with 0 (or null) so zscore logic works
metrics = metrics.fillna({"zscore_cnt": 0.0, "zscore_err": 0.0})
display(metrics.orderBy(F.col("hour").desc()).limit(50))


StatementMeta(, a526cad2-784f-4328-b401-553161cc39e8, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 317275ed-54fd-4e6b-aa10-d36da75b0209)

In [7]:
# detect anomalies threshold
Z_THRESH = 0.5

anoms = metrics.filter((F.col("zscore_cnt") > Z_THRESH) | (F.col("zscore_err") > Z_THRESH)) \
    .select(
        F.concat_ws("-", F.col("source"), F.col("hour").cast("string"), F.monotonically_increasing_id()).alias("incident_id"),
        F.current_timestamp().alias("detected_ts"),
        F.col("hour").alias("window_start"),
        (F.col("hour") + F.expr("interval 1 hour")).alias("window_end"),
        F.col("source"),
        F.when(F.col("zscore_cnt") > F.col("zscore_err"), F.lit("count")).otherwise(F.lit("error_rate")).alias("metric"),
        F.when(F.col("zscore_cnt") > F.col("zscore_err"), F.col("cnt")).otherwise(F.col("error_rate")).cast("double").alias("value"),
        F.when(F.col("zscore_cnt") > F.col("zscore_err"), F.col("mean_cnt")).otherwise(F.col("mean_err_rate")).cast("double").alias("baseline"),
        F.when(F.col("zscore_cnt") > F.col("zscore_err"), F.col("std_cnt")).otherwise(F.col("std_err_rate")).cast("double").alias("stddev"),
        F.when(F.col("zscore_cnt") > F.col("zscore_err"), F.col("zscore_cnt")).otherwise(F.col("zscore_err")).alias("zscore"),
        F.lit(float(Z_THRESH)).alias("threshold"),
        F.lit("open").alias("status"),
        F.lit("").alias("evidence_path")
    )

# If there are no anomalies this DataFrame will be empty
display(anoms)

# Append anomalies into monitoring_incidents table
if anoms.rdd.isEmpty():
    print("No anomalies detected")
else:
    anoms.write.format("delta").mode("append").saveAsTable(INCIDENTS_TABLE)
    print("Inserted", anoms.count(), "incident(s) into", INCIDENTS_TABLE)


StatementMeta(, a526cad2-784f-4328-b401-553161cc39e8, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b047374e-d57c-43a2-babf-06d94fdea35f)

Inserted 4 incident(s) into monitoring_incidents


In [8]:
# read back incidents
inc = spark.sql(f"SELECT * FROM {INCIDENTS_TABLE} ORDER BY detected_ts DESC LIMIT 50")
display(inc)


StatementMeta(, a526cad2-784f-4328-b401-553161cc39e8, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 771d8bab-6c6e-41ad-9601-8f61261ffb49)