# Case Events from Judge Favorable Dispositions
    Judge×Charge Event Playbooks.” It sticks to your CSV schemas and outputs three artifacts:
        1. rates_by_judge_charge – favorable rates by Judge_Name × current_offense_description
        2. tokens_by_judge_charge_topK – top event tokens (lift vs. each judge×charge baseline) within 180 days pre-Disposition_Date
        3. bigrams_by_judge_charge_topK – top event bigrams (likely orders of actions) in that same window

In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T
from pyspark.sql import Window as W
from delta import *

load_dotenv(override=True)

os.environ["HADOOP_HOME"] = r"C:\hadoop\hadoop-3.4.0"
os.environ["hadoop.home.dir"] = r"C:\hadoop\hadoop-3.4.0"

blobacct = os.getenv("AZURE_DATALAKE_ACCOUNT_NAME")
accesskey = os.getenv("AZURE_DATALAKE_ACCOUNT_KEY")

bronze_db = os.getenv("LOCAL_DELTA_BRONZE_DB")
silver_db = os.getenv("LOCAL_DELTA_SILVER_DB")

clerkSession = SparkSession.builder.appName("Clerk_Favorable_Outcome_Events_Analysis_Bronze")\
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.azurebfs.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem") \
    .config("spark.jars.packages",
          "io.delta:delta-spark_2.12:3.2.0,"
          "org.apache.hadoop:hadoop-azure:3.4.0,"
          "org.apache.hadoop:hadoop-common:3.4.0") \
    .config(f"fs.azure.account.key.{blobacct}.dfs.core.windows.net", accesskey) \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")

sparkClerk = configure_spark_with_delta_pip(clerkSession).getOrCreate()

### Load the Delta Tables into Spark Dataframes
    setup cofiguration veriables

In [16]:
import pymodules.CaseDeltaLake as cdl

WINDOW_DAYS = 180
TOP_K_PER_GROUP = 15
MIN_GROUP_CASES = 10 # stability filter: min total cases per (Judge, Charge)
MIN_PATTERN_CASES= 3 # stability filter: token/bigram must appear in ≥ this many cases

case_df = cdl.CaseDeltaLake().delta_table_schema("cases", sparkClerk)
charge_df = cdl.CaseDeltaLake().delta_table_schema("charges", sparkClerk)
disposition_df = cdl.CaseDeltaLake().delta_table_schema("dispositions", sparkClerk)
event_df = cdl.CaseDeltaLake().delta_table_schema("events", sparkClerk)

event_df.printSchema()
case_df.printSchema()
charge_df.printSchema()
disposition_df.printSchema()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|         Case_Number|   string|   NULL|
|State_Reporting_N...|   string|   NULL|
|             Caption|   string|   NULL|
|           Case_Type|   string|   NULL|
|      Case_Type_Code|   string|   NULL|
|          Court_Type|   string|   NULL|
|     Court_Type_Code|   string|   NULL|
|          Filed_Date|     date|   NULL|
|        Created_Date|     date|   NULL|
|    Disposition_Code|   string|   NULL|
|    Disposition_Date|     date|   NULL|
|  Disposition_Status|   string|   NULL|
|      Court_Location|   string|   NULL|
|     Magistrate_Name|   string|   NULL|
|          Judge_Name|   string|   NULL|
|                BCCN|   string|   NULL|
+--------------------+---------+-------+

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|        Offense_Date|     date|   NULL|
|       Charge_

### NORMALIZATIONS & FLAGS
    charges: felony flag from current_offense_degree

In [3]:
if not {"Case_Number","Charge_Number","Current_Offense_Degree","Current_Offense_Description"}.issubset(set(charge_df.columns)):
    missing = {"Case_Number","Charge_Number","Current_Offense_Degree","Current_Offense_Description"} - set(charge_df.columns)
    raise ValueError(f"Delta Table for charge_df missing required columns: {missing}")

charge_df = charge_df.withColumn(
    "is_felony",
    (
        F.upper(F.col("Current_Offense_Degree")).contains("(F")
    )
)

# dispositions: favorable flag from disposition_summary
if not {"Case_Number","Charge_Number","Disposition_Summary"}.issubset(set(disposition_df.columns)):
    missing = {"Case_Number","Charge_Number","Disposition_Summary"} - set(disposition_df.columns)
    raise ValueError(f"Delta Table for disposition_df missing required columns: {missing}")

fav_regex =v_regex = "ADJUDICATION WITHHELD|WITHHELD|NOT GUILTY|ACQUITTED|DISMISSED|NOLLE PROSEQUI|NOL PROS"
disposition_df = disposition_df.withColumn("disposition_upper", F.upper(F.col("Disposition_Summary")))
disposition_df = disposition_df.withColumn("is_favorable", F.col("disposition_upper").rlike(v_regex))

# cases: must have Case_Number, Judge_Name, Disposition_Date
if not {"Case_Number","Judge_Name","Disposition_Date"}.issubset(set(case_df.columns)):
    missing = {"Case_Number","Judge_Name","Disposition_Date"} - set(case_df.columns)
    raise ValueError(f"Delta Table for case_df missing required columns: {missing}")

### BUILD CHARGE-LEVEL DATASET (join charges + dispositions + cases)

In [4]:
cd = (charge_df
      .join(disposition_df.select("Case_Number","Charge_Number","is_favorable"), ["Case_Number","Charge_Number"], "left")
      .join(case_df.select("Case_Number","Judge_Name","Disposition_Date"), charge_df.Case_Number == case_df.Case_Number, "left")
     )

# Focus on felony charges only
felony = cd.filter(F.col("is_felony") == True)

### FAVORABLE RATES BY JUDGE × CHARGE DESCRIPTION

In [5]:
rates_by_judge_charge = (
    felony.groupBy("Judge_Name","Current_Offense_Description", "Current_Offense_Statute")
    .agg(
        F.count(F.lit(1)).alias("total_charges"),
        F.sum(F.when(F.col("is_favorable") == True, 1).otherwise(0)).alias("favorable")
    )
    .withColumn("favorable_rate", F.col("favorable")/F.col("total_charges"))
)

### 180-DAY PRE-DISPOSITION EVENT WINDOW & TOKENIZATION
    Token priority: eventdocuments, then description, then additionaltext

In [None]:
from pyspark.sql.functions import col


event_df = event_df.withColumn("EventDocuments", F.regexp_replace(col("EventDocuments"), r"\|", ""))
event_df = event_df.withColumn("EventDocuments", F.regexp_replace(col("EventDocuments"), "eFile:", ""))

if not {"Case_Number","EventDate"}.issubset(set(event_df.columns)):
    raise ValueError("event_df.csv must have 'Case_Number' and parsable 'EventDate'")

pref = F.when(F.length(F.col("EventDocuments")) > 0, F.col("EventDocuments")) \
         .otherwise(F.when(F.length(F.col("Description")) > 0, F.col("Description")))

events_tok = (
    event_df
    .withColumn("token", F.upper(F.trim(pref)))
    .filter(F.length("token") > 1)
)

# Join judge + Disposition_TS onto events
evj = (
    events_tok
    .join(case_df
          .select("Case_Number","Judge_Name","Disposition_Date"),
          events_tok.Case_Number == case_df.Case_Number,
          "left")
)

evj = evj.drop(case_df.Case_Number)

# Filter to 180-day window: event_ts ∈ [Disposition_TS - 180d, Disposition_TS]
ev_win = (
    evj.select("Case_Number", "Judge_Name", "token", "EventDate", "Disposition_Date")
    .filter((F.col("Disposition_Date").isNotNull()) & (F.col("EventDate").isNotNull()))
    .filter(F.col("EventDate") <= F.col("Disposition_Date"))
    .filter(F.col("EventDate") >= F.col("Disposition_Date") - F.expr(f"INTERVAL {WINDOW_DAYS} DAYS"))
        
)

+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|      Description|   string|   NULL|
|      EventAmount|   string|   NULL|
|   AdditionalText|   string|   NULL|
|        Date2Desc|   string|   NULL|
|            Date2|   string|   NULL|
|        PartyList|   string|   NULL|
|       Party2List|   string|   NULL|
|        CauseList|   string|   NULL|
|EventDocumentList|   string|   NULL|
|      Case_Number|   string|   NULL|
|   EventDocuments|   string|   NULL|
|        EventDate|     date|   NULL|
+-----------------+---------+-------+



### CASE-LEVEL OUTCOMES BY (case, Judge, charge_description)

In [12]:

felony = felony.drop(case_df.Case_Number)
# CASE-LEVEL OUTCOMES BY (case, Judge, charge_description)
case_charge_outcomes = (
    felony.select("Case_Number", "Judge_Name", "Current_Offense_Description", "Current_Offense_Statute", "is_favorable")
          .groupBy("Case_Number","Judge_Name","Current_Offense_Description", "Current_Offense_Statute")
          .agg(F.max(F.col("is_favorable").cast("int")).alias("is_fav_int"))
          .withColumn("is_favorable", (F.col("is_fav_int") == 1))
          .drop("is_fav_int")
)

# Baselines: per (Judge, charge_description)
base = (
    case_charge_outcomes
    .groupBy("Judge_Name","Current_Offense_Description", "Current_Offense_Statute")
    .agg(
        F.countDistinct("Case_Number").alias("total_cases"),
        F.sum(F.when(F.col("is_favorable") == True, 1).otherwise(0)).alias("fav_cases")
    )
    .withColumn("p_fav", F.col("fav_cases")/F.col("total_cases"))
)


### TOKEN LIFT by (Judge, charge_description)

In [13]:
from pyspark.sql import functions as F, Window as W

# 1) Unique (Case, token) within your event window
case_token = ev_win.select("Case_Number", "token").dropDuplicates()

# 2) Attach case-level attributes and outcomes
ct_jc = (
    case_token
    .join(case_charge_outcomes, "Case_Number", "left")
    .select(
        "Case_Number",
        "Judge_Name",
        "Current_Offense_Description",
        "Current_Offense_Statute",
        "token",
        "is_favorable"
    )
    .dropDuplicates([
        "Case_Number",
        "Judge_Name",
        "Current_Offense_Description",
        "Current_Offense_Statute",
        "token"
    ])
)

# 3) Counts per (Judge, Offense, Statute, token)
tok_counts = (
    ct_jc.groupBy("Judge_Name", "Current_Offense_Description", "Current_Offense_Statute", "token")
         .agg(
             F.countDistinct("Case_Number").alias("cases_with_pattern"),
             F.coalesce(F.sum(F.col("is_favorable").cast("int")), F.lit(0)).alias("fav_cases_with_pattern")
         )
)

# 4) Join baselines and compute conditional p and lift (guarded)
# Expect `base` to have: Judge_Name, Current_Offense_Description, Current_Offense_Statute, total_cases, p_fav
tok_stats = (
    tok_counts
    # .join(F.broadcast(base), ["Judge_Name","Current_Offense_Description","Current_Offense_Statute"], "left")  # enable if base is small
    .join(base, ["Judge_Name","Current_Offense_Description","Current_Offense_Statute"], "left")
    .withColumn(
        "p_fav_given_pattern",
        F.when(F.col("cases_with_pattern") > 0,
               F.col("fav_cases_with_pattern").cast("double") / F.col("cases_with_pattern").cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )
    .withColumn(
        "lift",
        F.when((F.col("p_fav").isNotNull()) & (F.col("p_fav") > F.lit(0)),
               F.col("p_fav_given_pattern") / F.col("p_fav").cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )
)

# 5) Stability filters
stable_tok = tok_stats.filter(
    (F.col("total_cases") >= F.lit(MIN_GROUP_CASES)) &
    (F.col("cases_with_pattern") >= F.lit(MIN_PATTERN_CASES))
)

# 6) Rank within each (Judge, Offense, Statute)
w_tok = W.partitionBy("Judge_Name","Current_Offense_Description","Current_Offense_Statute") \
         .orderBy(
             F.col("lift").desc_nulls_last(),
             F.col("p_fav_given_pattern").desc_nulls_last(),
             F.col("fav_cases_with_pattern").desc(),
             F.col("cases_with_pattern").desc()
         )

ranked_tok = (
    stable_tok
    .withColumn("rank", F.row_number().over(w_tok))
    .filter(F.col("rank") <= F.lit(TOP_K_PER_GROUP))
)


### BIGRAM LIFT by (Judge, charge_description)
    For bigrams, we need per‑case, time‑ordered tokens within the window joined to charge groups

In [14]:
from pyspark.sql import functions as F, Window as W

# Aliases
ev = ev_win.alias("ev")
cco = case_charge_outcomes.select(
    "Case_Number",
    "Judge_Name",
    "Current_Offense_Description",
    "Current_Offense_Statute",
    "is_favorable"
).alias("cco")

# 1) Join with explicit column projection to avoid collisions
ev_win_jc = (
    ev.join(cco, on="Case_Number", how="left")
      .select(
          "ev.Case_Number",
          "ev.token",
          "ev.EventDate",            # ensure timestamp
          # add a stable tie-breaker if available (EventTS/EventId)
          # "ev.EventTS",
          "cco.Judge_Name",
          "cco.Current_Offense_Description",
          "cco.Current_Offense_Statute",
          "cco.is_favorable"
      )
)

# Optional: enforce types + deterministic ordering column
ev_win_jc = (
    ev_win_jc
    .withColumn("EventDate", F.col("EventDate").cast("timestamp"))
    # .withColumn("EventTS", F.col("EventTS").cast("timestamp"))
)

# 2) Build bigrams with stable ordering per (case, judge, charge)
order_cols = ["EventDate"]  # replace with ["EventDate", "EventTS"] or ["EventDate","EventId"] if available
w = W.partitionBy(
        "Case_Number","Judge_Name","Current_Offense_Description","Current_Offense_Statute"
    ).orderBy(*order_cols)

ev_with_next = ev_win_jc.withColumn("next_token", F.lead("token").over(w))

bigrams = (
    ev_with_next
    .filter(F.col("next_token").isNotNull())
    .withColumn("bigram", F.concat_ws(" | ", F.col("token"), F.col("next_token")))
    .select(
        "Case_Number",
        "Judge_Name",
        "Current_Offense_Description",
        "Current_Offense_Statute",
        "bigram",
        "is_favorable"
    )
    # Presence of bigram in a case (avoid multiple counts per case)
    .dropDuplicates([
        "Case_Number",
        "Judge_Name",
        "Current_Offense_Description",
        "Current_Offense_Statute",
        "bigram"
    ])
)

# 3) Aggregate counts per group
bg_counts = (
    bigrams.groupBy("Judge_Name","Current_Offense_Description","Current_Offense_Statute","bigram")
           .agg(
               F.countDistinct("Case_Number").alias("cases_with_pattern"),
               F.coalesce(F.sum(F.col("is_favorable").cast("int")), F.lit(0)).alias("fav_cases_with_pattern")
           )
)

# 4) Join baseline and compute probabilities + lift (null/zero-safe)
# base must contain: Judge_Name, Current_Offense_Description, Current_Offense_Statute, total_cases, p_fav
bg_stats = (
    bg_counts
    # .join(F.broadcast(base), ["Judge_Name","Current_Offense_Description","Current_Offense_Statute"], "left")
    .join(base, ["Judge_Name","Current_Offense_Description","Current_Offense_Statute"], "left")
    .withColumn(
        "p_fav_given_pattern",
        F.when(F.col("cases_with_pattern") > 0,
               F.col("fav_cases_with_pattern").cast("double") /
               F.col("cases_with_pattern").cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )
    .withColumn(
        "lift",
        F.when((F.col("p_fav").isNotNull()) & (F.col("p_fav") > F.lit(0)),
               F.col("p_fav_given_pattern") / F.col("p_fav").cast("double"))
         .otherwise(F.lit(None).cast("double"))
    )
)

# 5) Stability thresholds
stable_bg = bg_stats.filter(
    (F.col("total_cases") >= F.lit(MIN_GROUP_CASES)) &
    (F.col("cases_with_pattern") >= F.lit(MIN_PATTERN_CASES))
)

# 6) Rank top-K per (Judge, Offense, Statute)
w_bg = W.partitionBy("Judge_Name","Current_Offense_Description","Current_Offense_Statute").orderBy(
    F.col("lift").desc_nulls_last(),
    F.col("p_fav_given_pattern").desc_nulls_last(),
    F.col("fav_cases_with_pattern").desc(),
    F.col("cases_with_pattern").desc()
)

ranked_bg = (
    stable_bg
    .withColumn("rank", F.row_number().over(w_bg))
    .filter(F.col("rank") <= F.lit(TOP_K_PER_GROUP))
)



### Write Event Tokens by Judge and Charge by Favorable Cases - to Azure Delta Lake

In [None]:
import pymodules.CaseDeltaLake as cdl

cdl.CaseDeltaLake().write_delta_bronze(ranked_tok, "_delta_views/vw_EventsRankedByToken", sparkClerk)

cdl.CaseDeltaLake().write_delta_bronze(ranked_bg, "_delta_views/vw_EventsRankedByBigram", sparkClerk)

DataFrame name: _delta_views/vw_EventsRankedByToken
DataFrame name: _delta_views/vw_EventsRankedByBigram
