# Case-Level MDJ + PSP Dataset with Pretrial Recidivism Labels

## Overview
This notebook builds a **final case-level analytical dataset** by:

1. Starting from a **charge-level MDJ + PSP merged dataset**
2. Cleaning duplicated and overlapping MDJ / PSP fields
3. Aggregating charge-level records to the **case level**
4. Joining **pretrial recidivism labels** created in the previous notebook

The result is a **single row per MDJ case**, suitable for modeling
and downstream analysis.

## Inputs
- `mdj_merged.csv`: Charge-level MDJ + PSP merged data
- `labels_mdj_cpmc.csv`: Case-level pretrial recidivism labels

## Output
- `mdj_case_with_labels.csv`: Final modeling-ready case-level dataset

## Notes
- This notebook resolves duplicated MDJ/PSP fields by retaining
  the version with fewer missing values
- All joins are performed on standardized `id` and `docketnumber`

In [1]:
import pandas as pd
import numpy as np

In [2]:
MDJ_PSP_PATH   = "mdj_merged.csv"         
LABELS_PATH    = "csv data/labels_mdj_cpmc.csv"   
OUT_PATH       = "mdj_merged_recidivism_revised.csv"

In [3]:
mdj = pd.read_csv(MDJ_PSP_PATH)

print("mdj (MDJ&PSP) shape:", mdj.shape)

  mdj = pd.read_csv(MDJ_PSP_PATH)


mdj (MDJ&PSP) shape: (15995064, 71)


In [4]:
recid = pd.read_csv(LABELS_PATH)
print("labels shape:", recid.shape)

labels shape: (452265, 10)


In [5]:
for df in (mdj, recid):
    for c in ("id", "docketnumber"):
        if c in df.columns:
            df[c] = df[c].astype(str).str.strip().str.upper()


In [6]:
mdj.columns.tolist()

['docketnumber',
 'citationcomplaintnumber',
 'otn',
 'citytownboro',
 'countyofoffense',
 'casestatus',
 'filingdate',
 'offensedate',
 'complaintdate',
 'arrestdate',
 'title',
 'section',
 'subsection',
 'sequencenumber',
 'grade_x',
 'offensedisposition',
 'offensedispositiondate',
 'defendantdisplayname',
 'dob_x',
 'gender',
 'race_x',
 'ethnicity',
 'ori_x',
 'cost',
 'costadjustment',
 'limitedaccessoffenseindicator',
 'casedisposition',
 'casecategory',
 'zipcode',
 'defenseattorneyname',
 'defenseattorneyrepresentationtyp',
 'id',
 'dobyear',
 'dobmonth',
 'dobday',
 'name',
 'sex',
 'race_y',
 'rapstatus',
 'max_year',
 'max_month',
 'max_day',
 'dv_flag',
 'dob_y',
 'dispo_status',
 'arrest_year',
 'arrest_month',
 'arrest_day',
 'county',
 'ori_y',
 'juvflag',
 'laflag',
 'arrest_date',
 'charge',
 'grade_y',
 'disp_year',
 'disp_month',
 'disp_day',
 'disp1',
 'disp2',
 'susp_flag',
 'conv_flag',
 'offense_year',
 'offense_month',
 'offense_day',
 'offense_date',
 'disp_d

In [7]:
mdj_cleaned = mdj.copy()

# Drop exact duplicate columns first
mdj_cleaned = mdj_cleaned.loc[:, ~mdj_cleaned.columns.duplicated()]

# Prefer the “clean”/less-missing member of each logical pair
dup_pairs = [
    ("race_x","race_y"), ("dob_x","dob_y"), ("grade_x","grade_y"),
    ("arrestdate","arrest_date"), ("ori_x","ori_y"),
    ("gender","sex"), ("ofn_title","title"), ("ofn_section","section"),
    ("ofn_subsection","subsection")
]
for a,b in dup_pairs:
    if a in mdj_cleaned.columns and b in mdj_cleaned.columns:
        miss_a = mdj_cleaned[a].isna().mean()
        miss_b = mdj_cleaned[b].isna().mean()
        drop = a if miss_a >= miss_b else b
        mdj_cleaned = mdj_cleaned.drop(columns=[drop], errors="ignore")

In [8]:
date_cols = [
    "filingdate",
    "offensedate",
    "complaintdate",
    "offensedispositiondate",
    "disp_date",
    "arrest_date"
]

# Make sure the format for date variables
for col in date_cols:
    if col in mdj_cleaned.columns:
        mdj_cleaned[col] = pd.to_datetime(
            mdj_cleaned[col], errors="coerce"
        )

  mdj_cleaned[col] = pd.to_datetime(
  mdj_cleaned[col] = pd.to_datetime(


In [9]:
# ============================================================
# Helper functions for safe aggregation
# ============================================================

def safe_first(x):
    """Return the first non-null value in a Series."""
    return next((v for v in x if pd.notna(v)), np.nan)


def safe_flag_any(x):
    """Return 1 if any value evaluates to True, else 0."""
    arr = pd.to_numeric(x, errors="coerce")
    return int(np.nan_to_num(arr).astype(int).any())


def safe_min_dt(x):
    """Safely compute minimum datetime."""
    return pd.to_datetime(x, errors="coerce").min()


def safe_max_dt(x):
    """Safely compute maximum datetime."""
    return pd.to_datetime(x, errors="coerce").max()


def join_unique_preserve_order(x, sep=", ", max_items=None):
    """
    Join all non-null values in a column, preserving first-seen order
    and removing duplicates or empty strings.
    """
    vals = pd.Series(x).dropna().astype(str).str.strip()
    vals = vals[vals != ""]

    if vals.empty:
        return np.nan

    ordered_unique = list(dict.fromkeys(vals.tolist()))

    if max_items is not None:
        ordered_unique = ordered_unique[:max_items]

    return sep.join(ordered_unique)


# ============================================================
# Define aggregation rules 
# ============================================================

agg_map = {
    # Timeline fields
    "filingdate": safe_min_dt,
    "offensedate": safe_min_dt,
    "complaintdate": safe_min_dt,
    "offensedispositiondate": safe_max_dt,
    "disp_date": safe_max_dt,
    "arrest_date": safe_min_dt,

    # Case metadata
    "casestatus": safe_first,
    "casecategory": safe_first,
    "casedisposition": safe_first,
    "countyofoffense": safe_first,
    "county": safe_first,

    # Defendant / PSP attributes
    "defendantdisplayname": safe_first,
    "name": safe_first,
    "sex": safe_first,
    "race_y": safe_first,
    "ethnicity": safe_first,
    "dob_y": safe_first,

    # Flags
    "dv_flag": safe_flag_any,
    "juvflag": safe_flag_any,
    "laflag": safe_flag_any,
    "conv_flag": safe_flag_any,
    "susp_flag": safe_flag_any,

    # Monetary fields
    "cost": safe_first,
    "costadjustment": safe_first,
}


# ============================================================
# Columns to join fully 
# ============================================================

join_cols = [
    "charge",
    "title",
    "section",
    "subsection",
    "grade_x",
    "grade",
    "citation",
    "citationcomplaintnumber",
    "ofn_title",
    "ofn_section",
    "ofn_subsection",
]

for col in join_cols:
    if col in mdj_cleaned.columns:
        agg_map[col] = join_unique_preserve_order


# ============================================================
# Collapse charge-level data to case level
# ============================================================

mdj_case_level = (
    mdj_cleaned
        .groupby(["id", "docketnumber"], as_index=False)
        .agg(agg_map)
        .sort_values("filingdate", ascending=True)
        .reset_index(drop=True)
)

print(
    f"Collapsed to {len(mdj_case_level):,} unique cases "
    "with joined charge/statute fields."
)


# ============================================================
# Safe preview 
# ============================================================

preview_cols = [
    "id",
    "docketnumber",
    "charge",
    "title",
    "section",
    "subsection",
    "grade_x",
    "citation",
]

preview_cols = [c for c in preview_cols if c in mdj_case_level.columns]
mdj_case_level[preview_cols].head()

Collapsed to 434,235 unique cases with joined charge/statute fields.


Unnamed: 0,id,docketnumber,charge,title,section,subsection,grade_x,citation
0,99751210,MJ-57304-CR-0000001-2015,"182701A1, CC2701, CC2709, CC5503",18,"2709, 5503, 2701",A1,"S, M2","18-2701 (a)(1), 18-2701, 18-2709, 18-5503"
1,99945997,MJ-57304-CR-0000002-2015,"753802A1, 185104, VC3733A, 753733A, VC3802A1","75.0, 18.0","3309, 4581, 3736, 3802, 3323, 3714, 3361, 3334...","1, A2i, A, A1*, B","S, M, F3, M2","75-3802 (a)(1), 18-5104, 75-3733 (a)"
2,99596689,MJ-05203-CR-0000001-2015,"CC5506, CC5505, 185505",18,5506,,M3,"18-5506, 18-5505"
3,99655649,MJ-05206-CR-0000001-2015,"CC2701A1, 182701A1, 182709A1, CC2709A1",18,"2709, 2701",A1,"S, M2","18-2701 (a)(1), 18-2709 (a)(1)"
4,99871493,MJ-05003-CR-0000002-2015,"182701A1, CC2701A1",18,2701,A1,M2,18-2701 (a)(1)


In [11]:
# ============================================================
# Ensure datetime types before computing durations
# ============================================================

if "merged_case" in locals():

    date_cols = [
        "pretrial_start",
        "pretrial_end",
        "earliest_in_window_arrest",
    ]

    for col in date_cols:
        if col in merged_case.columns:
            merged_case[col] = pd.to_datetime(
                merged_case[col],
                errors="coerce"
            )


    # ============================================================
    # Compute pretrial duration (ONLY if both bounds exist)
    # ============================================================

    if {"pretrial_start", "pretrial_end"}.issubset(merged_case.columns):

        # Length of the pretrial window in days
        merged_case["pretrial_days"] = (
            merged_case["pretrial_end"] - merged_case["pretrial_start"]
        ).dt.days

        # Capped version for plotting / QA (does not affect labels)
        merged_case["pretrial_days_capped"] = (
            merged_case["pretrial_days"].clip(upper=1000)
        )


In [12]:
# ============================================================
# Standardize join keys (IDs and docket numbers)
# ============================================================

for df in (mdj_case_level, recid):
    df["id"] = (
        df["id"]
        .astype(str)
        .str.strip()
        .str.upper()
    )
    df["docketnumber"] = (
        df["docketnumber"]
        .astype(str)
        .str.strip()
        .str.upper()
    )


# ============================================================
# Select recidivism label columns defensively
# ============================================================

label_cols = [
    "id",
    "docketnumber",
    "pretrial_recidivism",
    "misdemeanor_recidivism",
    "felony_recidivism",
    "other_recidivism",
    "n_in_window_arrests",
    "earliest_in_window_arrest",
    "pretrial_start",
    "pretrial_end",
]

# Keep only columns that exist in the labels table
label_cols = [c for c in label_cols if c in recid.columns]


# ============================================================
# Merge case-level MDJ+PSP data with recidivism labels
# ============================================================

merged_case = mdj_case_level.merge(
    recid[label_cols],
    on=["id", "docketnumber"],
    how="left",
    validate="m:1"
)

print(f"Merge complete: {len(merged_case):,} rows")
print(
    "Matched cases with recidivism info:",
    merged_case["pretrial_recidivism"].notna().sum()
)

Merge complete: 434,235 rows
Matched cases with recidivism info: 388837


In [13]:
# Only consider cases that actually have a label
labeled = merged_case[merged_case["pretrial_recidivism"].notna()]

recid_rate = labeled["pretrial_recidivism"].mean()
recid_rate

np.float64(0.2599829748712185)

In [14]:
merged_case.to_csv("mdj_case_with_labels.csv", index=False)
print("Saved to mdj_case_with_labels.csv")

Saved to mdj_case_with_labels.csv
