In [1]:
    %run ../notebooks/00_setup_paths.ipynb

utils/ folder added to Python import path
project_root:     C:\Users\akaas\crime-projectMain
raw_dir:          C:\Users\akaas\crime-projectMain\data
parquet_dir:      C:\Users\akaas\crime-projectMain\data_parquet
processed_dir:    C:\Users\akaas\crime-projectMain\data_processed
models_dir:       C:\Users\akaas\crime-projectMain\models
logs_dir:         C:\Users\akaas\crime-projectMain\logs
utils_dir:        C:\Users\akaas\crime-projectMain\utils


In [2]:
from spark_init import init_spark

spark = init_spark("CrimeProject_Phase2", driver_memory="12g")
spark

Spark Initialized: CrimeProject_Phase2


In [3]:
from pyspark.sql.functions import (
    col, collect_set, collect_list, count, sum as spark_sum
)
from pathlib import Path

In [4]:
segment_dfs = {}

for segment_folder in parquet_dir.iterdir():
    if segment_folder.is_dir():
        files = list(segment_folder.glob("*.parquet"))
        if not files:
            continue

        print(f"Loading {segment_folder.name} ({len(files)} files)")
        df = spark.read.parquet(*[str(f) for f in files])
        segment_dfs[segment_folder.name] = df

print("All segments loaded into segment_dfs dictionary.")

Loading administrative_segment (11 files)
Loading arrestee_segment (11 files)
Loading batch_header (1 files)
Loading group_b_arrest_report_segment (11 files)
Loading offender_segment (11 files)
Loading offense_segment (11 files)
Loading property_segment (11 files)
Loading victim_segment (11 files)
Loading window_arrestee_segment (9 files)
Loading window_exceptional_clearance_segment (9 files)
Loading window_recovered_property_segment (9 files)
All segments loaded into segment_dfs dictionary.


In [5]:
from pyspark.sql.functions import col, lit
#Loads all parquet files in a folder.Ensures schema consistency
def safe_load_and_cast(segment_folder):
    files = list(segment_folder.glob("*.parquet"))
    dfs = []
    
    # Step 1: Detect all columns that appear in any file
    all_cols = set()
    for f in files:
        temp = spark.read.parquet(str(f))
        all_cols |= set(temp.columns)
    
    # Step 2: Load each file individually + cast all cols as string
    for f in files:
        d = spark.read.parquet(str(f))
        
        # add missing columns
        for c in all_cols:
            if c not in d.columns:
                d = d.withColumn(c, lit(None))
        
        # cast all columns to STRING
        for c in all_cols:
            d = d.withColumn(c, col(c).cast("string"))
        
        dfs.append(d.select(sorted(all_cols)))
    
    # Step 3: Union all normalized dfs
    final_df = dfs[0]
    for d in dfs[1:]:
        final_df = final_df.unionByName(d)
    
    return final_df

In [6]:
import shutil
normalized_dir = processed_dir / "normalized"
normalized_dir.mkdir(exist_ok=True)

for segment, df in segment_dfs.items():
    print(f"Processing {segment} safely…")

    out_dir = normalized_dir / segment
    if out_dir.exists():
        shutil.rmtree(out_dir)

    safe_df = safe_load_and_cast(parquet_dir / segment)

    safe_df.write.mode("overwrite").parquet(str(out_dir))
    print(f"Saved normalized: {segment} \n")


Processing administrative_segment safely…
Saved normalized: administrative_segment 

Processing arrestee_segment safely…
Saved normalized: arrestee_segment 

Processing batch_header safely…
Saved normalized: batch_header 

Processing group_b_arrest_report_segment safely…
Saved normalized: group_b_arrest_report_segment 

Processing offender_segment safely…
Saved normalized: offender_segment 

Processing offense_segment safely…
Saved normalized: offense_segment 

Processing property_segment safely…
Saved normalized: property_segment 

Processing victim_segment safely…
Saved normalized: victim_segment 

Processing window_arrestee_segment safely…
Saved normalized: window_arrestee_segment 

Processing window_exceptional_clearance_segment safely…
Saved normalized: window_exceptional_clearance_segment 

Processing window_recovered_property_segment safely…
Saved normalized: window_recovered_property_segment 



In [7]:
admin = spark.read.parquet(str(normalized_dir / "administrative_segment"))
offense = spark.read.parquet(str(normalized_dir / "offense_segment"))
victim = spark.read.parquet(str(normalized_dir / "victim_segment"))
offender = spark.read.parquet(str(normalized_dir / "offender_segment"))
property_df = spark.read.parquet(str(normalized_dir / "property_segment"))
arrestee = spark.read.parquet(str(normalized_dir / "arrestee_segment"))

In [8]:
offense_agg = (
    offense.groupBy("unique_incident_id")
    .agg(
        collect_set("ucr_offense_code").alias("offense_codes"),
        count("*").alias("num_offenses")
    )
)

In [9]:
victim_agg = (
    victim.groupBy("unique_incident_id")
    .agg(
        count("*").alias("num_victims")
    )
)

In [10]:
offender_agg = (
    offender.groupBy("unique_incident_id")
    .agg(
        count("*").alias("num_offenders")
    )
)

In [11]:
from pyspark.sql.functions import col, sum as spark_sum, expr

property_agg = (
    property_df
    .withColumn(
        "value_of_property_num",
        expr("coalesce(try_cast(value_of_property AS double), 0)")
    )
    .groupBy("unique_incident_id")
    .agg(
        spark_sum("value_of_property_num").alias("total_property_value")
    )
)

In [12]:
arrestee_agg = (
    arrestee.groupBy("unique_incident_id")
    .agg(
        count("*").alias("num_arrestees")
    )
)

In [13]:
incident_master = (
    admin
    .join(offense_agg, "unique_incident_id", "left")
    .join(victim_agg, "unique_incident_id", "left")
    .join(offender_agg, "unique_incident_id", "left")
    .join(property_agg, "unique_incident_id", "left")
    .join(arrestee_agg, "unique_incident_id", "left")
)

print("Master table created.")
incident_master.printSchema()

Master table created.
root
 |-- unique_incident_id: string (nullable = true)
 |-- city_submissions: string (nullable = true)
 |-- cleared_exceptionally: string (nullable = true)
 |-- exceptional_clearance_date: string (nullable = true)
 |-- incident_date: string (nullable = true)
 |-- incident_date_hour: string (nullable = true)
 |-- incident_number: string (nullable = true)
 |-- ori: string (nullable = true)
 |-- report_date_indicator: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_abb: string (nullable = true)
 |-- total_arrestee_segments: string (nullable = true)
 |-- total_offender_segments: string (nullable = true)
 |-- total_offense_segments: string (nullable = true)
 |-- total_victim_segments: string (nullable = true)
 |-- year: string (nullable = true)
 |-- offense_codes: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- num_offenses: long (nullable = true)
 |-- num_victims: long (nullable = true)
 |-- num_offenders: long (n

In [14]:
master_dir = processed_dir / "incidents_master"
master_dir.mkdir(exist_ok=True)
incident_master.write.mode("overwrite").parquet(str(master_dir))
print(f"Master incident table saved to: {master_dir}")

Master incident table saved to: C:\Users\akaas\crime-projectMain\data_processed\incidents_master
