In [5]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [6]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("crime_hotspot")
    .config("spark.local.dir", "/Users/neethusatravada/spark_tmp")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

print("Temp dir:", spark.conf.get("spark.local.dir"))


Temp dir: /Users/neethusatravada/spark_tmp


25/11/10 13:43:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
# %%
from pathlib import Path

# ✅ Paths customized for your project
CRIMES_CSV  = Path("/Users/neethusatravada/Documents/DATABASE/crime_prediction_project/data/raw/chicago_crimes_2001_present.csv")
OUT_PARQUET = Path("/Users/neethusatravada/Documents/DATABASE/crime_prediction_project/outputs/clean_crimes_parquet")

OUT_PARQUET.mkdir(parents=True, exist_ok=True)

print("CSV exists:", CRIMES_CSV.exists())
print("Parquet out:", OUT_PARQUET.resolve())


CSV exists: True
Parquet out: /Users/neethusatravada/Documents/DATABASE/crime_prediction_project/outputs/clean_crimes_parquet


In [8]:
# %%
# Read the full dataset into Spark
crimes_raw = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(str(CRIMES_CSV))
)

print("Raw row count:", crimes_raw.count())
crimes_raw.printSchema()
crimes_raw.show(5, truncate=False)




Raw row count: 8435617
root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)

+--------+-----------+----------------------+-----------------------+----

                                                                                

In [9]:
# %% 
def to_snake(name: str) -> str:
    return (
        name.lower()
        .replace(" ", "_")
        .replace("/", "_")
        .replace("-", "_")
        .replace("(", "")
        .replace(")", "")
    )

df = crimes_raw
for c in df.columns:
    df = df.withColumnRenamed(c, to_snake(c))

# Convert date to timestamp (correct Chicago format)
df = df.withColumn("date_ts", F.to_timestamp("date", "MM/dd/yyyy hh:mm:ss a"))

# Quick sanity check
df.select("date", "date_ts").show(10, truncate=False)


+----------------------+-------------------+
|date                  |date_ts            |
+----------------------+-------------------+
|07/29/2022 03:39:00 AM|2022-07-29 03:39:00|
|01/03/2023 04:44:00 PM|2023-01-03 16:44:00|
|08/10/2020 09:45:00 AM|2020-08-10 09:45:00|
|08/26/2017 10:00:00 AM|2017-08-26 10:00:00|
|09/06/2023 05:00:00 PM|2023-09-06 17:00:00|
|09/06/2023 11:00:00 AM|2023-09-06 11:00:00|
|05/21/2019 08:20:00 AM|2019-05-21 08:20:00|
|07/07/2021 10:30:00 AM|2021-07-07 10:30:00|
|06/14/2022 02:47:00 PM|2022-06-14 14:47:00|
|09/21/2022 10:00:00 PM|2022-09-21 22:00:00|
+----------------------+-------------------+
only showing top 10 rows


In [10]:
# %%
# Drop duplicates
df = df.dropDuplicates()

# Remove rows with nulls in key fields
critical_cols = ["primary_type", "date_ts", "latitude", "longitude"]
for col in critical_cols:
    df = df.filter(F.col(col).isNotNull())

# Chicago bounding box filter
df = df.filter(
    (F.col("latitude").between(41.60, 42.10)) &
    (F.col("longitude").between(-87.95, -87.50))
)

# Remove future-dated records
df = df.filter(F.col("date_ts") <= F.current_timestamp())

print("Cleaned row count:", df.count())




Cleaned row count: 8341471


                                                                                

In [11]:
# %%
df = (
    df
    .withColumn("year", F.year("date_ts"))
    .withColumn("month", F.month("date_ts"))
    .withColumn("hour", F.hour("date_ts"))
    .withColumn("dayofweek", F.date_format("date_ts", "E"))
    .withColumn(
        "season",
        F.when(F.col("month").isin(12, 1, 2), "winter")
         .when(F.col("month").isin(3, 4, 5), "spring")
         .when(F.col("month").isin(6, 7, 8), "summer")
         .otherwise("fall")
    )
    .withColumn("lat_round", F.round("latitude", 3))
    .withColumn("lon_round", F.round("longitude", 3))
)

df.select("date_ts","year","month","hour","dayofweek","season","lat_round","lon_round").show(5, truncate=False)




+-------------------+----+-----+----+---------+------+---------+---------+
|date_ts            |year|month|hour|dayofweek|season|lat_round|lon_round|
+-------------------+----+-----+----+---------+------+---------+---------+
|2023-09-06 21:55:00|2023|9    |21  |Wed      |fall  |41.917   |-87.714  |
|2023-09-06 13:58:00|2023|9    |13  |Wed      |fall  |41.745   |-87.597  |
|2023-10-08 00:01:00|2023|10   |0   |Sun      |fall  |41.973   |-87.679  |
|2023-09-07 18:00:00|2023|9    |18  |Thu      |fall  |41.95    |-87.746  |
|2023-09-07 03:45:00|2023|9    |3   |Thu      |fall  |41.778   |-87.78   |
+-------------------+----+-----+----+---------+------+---------+---------+
only showing top 5 rows


                                                                                

In [12]:
# Check for missing values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Check for duplicates
print(f"Total records: {df.count()}")
print(f"Distinct records: {df.distinct().count()}")

# Handle missing values in important columns
df = df.filter(F.col("latitude").isNotNull() & F.col("longitude").isNotNull())

25/11/10 13:44:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+------------+----+----------+--------+---------+--------+-------+-----+----+---------+------+---------+---------+
| id|case_number|date|block|iucr|primary_type|description|location_description|arrest|domestic|beat|district|  ward|community_area|fbi_code|x_coordinate|y_coordinate|year|updated_on|latitude|longitude|location|date_ts|month|hour|dayofweek|season|lat_round|lon_round|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+------------+----+----------+--------+---------+--------+-------+-----+----+---------+------+---------+---------+
|  0|          0|   0|    0|   0|           0|          0|                9820|     0|       0|   0|      47|605532|        604445|       0|           0|           0|   0|         0|       0|        

                                                                                

Total records: 0


25/11/10 13:53:21 ERROR Executor: Exception in task 10.0 in stage 25.0 (TID 158)
org.apache.spark.SparkDateTimeException: [CANNOT_PARSE_TIMESTAMP] Text 'false' could not be parsed at index 0. Use `try_to_timestamp` to tolerate invalid input string and return NULL instead. SQLSTATE: 22007
	at org.apache.spark.sql.errors.QueryExecutionErrors$.ansiDateTimeParseError(QueryExecutionErrors.scala:279)
	at org.apache.spark.sql.errors.QueryExecutionErrors.ansiDateTimeParseError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEva

DateTimeException: [CANNOT_PARSE_TIMESTAMP] Text 'AGGRAVATED: OTHER DANG WEAPON' could not be parsed at index 0. Use `try_to_timestamp` to tolerate invalid input string and return NULL instead. SQLSTATE: 22007

In [15]:
# Crime counts by type
df.groupBy("primary_type").count().orderBy(F.desc("count")).show(10)

# Crimes by hour
df.groupBy("hour").count().orderBy("hour").show()

# Crimes by season
df.groupBy("season").count().show()

# Crimes by day of week
df.groupBy("dayofweek").count().show()

                                                                                

+-------------------+------+
|       primary_type| count|
+-------------------+------+
|              THEFT|943844|
|            BATTERY|862639|
|    CRIMINAL DAMAGE|554710|
|          NARCOTICS|497065|
|            ASSAULT|288185|
|           BURGLARY|271516|
|MOTOR VEHICLE THEFT|232175|
|      OTHER OFFENSE|194431|
|            ROBBERY|175361|
| DECEPTIVE PRACTICE|147082|
+-------------------+------+
only showing top 10 rows


                                                                                

+----+------+
|hour| count|
+----+------+
|   0|473837|
|   1|266800|
|   2|225792|
|   3|184344|
|   4|140776|
|   5|118057|
|   6|135858|
|   7|192540|
|   8|281635|
|   9|355451|
|  10|352843|
|  11|369480|
|  12|475539|
|  13|394681|
|  14|419171|
|  15|445241|
|  16|424030|
|  17|431276|
|  18|456303|
|  19|468608|
+----+------+
only showing top 20 rows


                                                                                

+------+-------+
|season|  count|
+------+-------+
|winter|1839700|
|summer|2293989|
|spring|2108058|
|  fall|2099724|
+------+-------+





+---------+-------+
|dayofweek|  count|
+---------+-------+
|      Sun|1140742|
|      Mon|1180580|
|      Thu|1185506|
|      Sat|1197557|
|      Wed|1197134|
|      Fri|1251353|
|      Tue|1188599|
+---------+-------+



                                                                                

In [11]:
# Check for missing values (handles both numeric and string columns)
from pyspark.sql.functions import col, count, when, isnan
from pyspark.sql.types import DoubleType, FloatType

print("="*70)
print("MISSING VALUES CHECK")
print("="*70)

# Get numeric and non-numeric columns
numeric_cols = [c for c, dtype in df.dtypes if dtype in ['double', 'float', 'int', 'bigint']]
non_numeric_cols = [c for c in df.columns if c not in numeric_cols]

# Check missing values for numeric columns (use isnan)
if numeric_cols:
    missing_numeric = df.select([
        count(when(col(c).isNull() | isnan(c), c)).alias(c) 
        for c in numeric_cols
    ])
    print("\nMissing values in NUMERIC columns:")
    missing_numeric.show()

# Check missing values for non-numeric columns (only use isNull)
if non_numeric_cols:
    missing_non_numeric = df.select([
        count(when(col(c).isNull(), c)).alias(c) 
        for c in non_numeric_cols
    ])
    print("\nMissing values in NON-NUMERIC columns:")
    missing_non_numeric.show()

MISSING VALUES CHECK

Missing values in NUMERIC columns:


                                                                                

+---+----+--------+------+--------------+------------+------------+----+--------+---------+-----+----+---------+---------+
| id|beat|district|  ward|community_area|x_coordinate|y_coordinate|year|latitude|longitude|month|hour|lat_round|lon_round|
+---+----+--------+------+--------------+------------+------------+----+--------+---------+-----+----+---------+---------+
|  0|   0|      47|605532|        604445|           0|           0|   0|       0|        0|    0|   0|        0|        0|
+---+----+--------+------+--------------+------------+------------+----+--------+---------+-----+----+---------+---------+


Missing values in NON-NUMERIC columns:




+-----------+----+-----+----+------------+-----------+--------------------+------+--------+--------+----------+--------+-------+---------+------+
|case_number|date|block|iucr|primary_type|description|location_description|arrest|domestic|fbi_code|updated_on|location|date_ts|dayofweek|season|
+-----------+----+-----+----+------------+-----------+--------------------+------+--------+--------+----------+--------+-------+---------+------+
|          0|   0|    0|   0|           0|          0|                9820|     0|       0|       0|         0|       0|      0|        0|     0|
+-----------+----+-----+----+------------+-----------+--------------------+------+--------+--------+----------+--------+-------+---------+------+



                                                                                

In [16]:
# Download Chicago weather data
import requests
import pandas as pd
from datetime import datetime

print("="*70)
print("DOWNLOADING CHICAGO WEATHER DATA")
print("="*70)

# Option 1: Download from NOAA Climate Data Online
# We'll use a pre-processed dataset from a reliable source

weather_url = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/"

# For simplicity, let's use a different approach - download historical weather from Open-Meteo
# This is a free weather API with historical data

print("\n1. Fetching weather data from Open-Meteo API...")
print("   (This may take a few minutes...)")

# We'll get daily weather data for Chicago (2001-2024)
# Chicago coordinates: 41.8781, -87.6298

base_url = "https://archive-api.open-meteo.com/v1/archive"

# Define date range
start_date = "2001-01-01"
end_date = "2024-12-31"

params = {
    "latitude": 41.8781,
    "longitude": -87.6298,
    "start_date": start_date,
    "end_date": end_date,
    "daily": "temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,windspeed_10m_max",
    "temperature_unit": "fahrenheit",
    "windspeed_unit": "mph",
    "precipitation_unit": "inch",
    "timezone": "America/Chicago"
}

try:
    response = requests.get(base_url, params=params, timeout=60)
    response.raise_for_status()
    weather_data = response.json()
    
    # Convert to DataFrame
    weather_df = pd.DataFrame({
        'date': weather_data['daily']['time'],
        'temp_max': weather_data['daily']['temperature_2m_max'],
        'temp_min': weather_data['daily']['temperature_2m_min'],
        'temp_mean': weather_data['daily']['temperature_2m_mean'],
        'precipitation': weather_data['daily']['precipitation_sum'],
        'wind_speed': weather_data['daily']['windspeed_10m_max']
    })
    
    # Convert date to datetime
    weather_df['date'] = pd.to_datetime(weather_df['date'])
    
    # Save to CSV
    weather_path = '../data/raw/chicago_weather_2001_2024.csv'
    weather_df.to_csv(weather_path, index=False)
    
    print(f"   ✓ Downloaded {len(weather_df):,} days of weather data")
    print(f"   ✓ Saved to: {weather_path}")
    print(f"   ✓ Date range: {weather_df['date'].min()} to {weather_df['date'].max()}")
    
    # Show sample
    print("\n   Sample weather data:")
    print(weather_df.head())
    
except Exception as e:
    print(f"   ✗ Error downloading weather data: {e}")
    print("   → We'll create a backup method...")

DOWNLOADING CHICAGO WEATHER DATA

1. Fetching weather data from Open-Meteo API...
   (This may take a few minutes...)
   ✓ Downloaded 8,766 days of weather data
   ✓ Saved to: ../data/raw/chicago_weather_2001_2024.csv
   ✓ Date range: 2001-01-01 00:00:00 to 2024-12-31 00:00:00

   Sample weather data:
        date  temp_max  temp_min  temp_mean  precipitation  wind_speed
0 2001-01-01      22.2      12.4       17.1          0.000        11.5
1 2001-01-02      15.1       4.2        9.8          0.000        15.3
2 2001-01-03      26.4       9.6       19.2          0.008        16.9
3 2001-01-04      30.9      16.5       23.6          0.000        22.3
4 2001-01-05      34.0      25.0       31.1          0.004        17.5


In [17]:
print("\n" + "="*70)
print("DOWNLOADING CHICAGO CENSUS/DEMOGRAPHIC DATA")
print("="*70)

print("\n2. Fetching census data from Chicago Data Portal...")

# Chicago community areas demographic data
census_url = "https://data.cityofchicago.org/resource/kn9c-c2s2.json"

try:
    # Get census data via API
    response = requests.get(census_url, params={"$limit": 100}, timeout=30)
    response.raise_for_status()
    census_data = response.json()
    
    # Convert to DataFrame
    census_df = pd.DataFrame(census_data)
    
    # Save to CSV
    census_path = '../data/raw/chicago_census_community_areas.csv'
    census_df.to_csv(census_path, index=False)
    
    print(f"   ✓ Downloaded census data for {len(census_df)} community areas")
    print(f"   ✓ Saved to: {census_path}")
    
    # Show sample
    print("\n   Sample census data:")
    print(census_df.head())
    
except Exception as e:
    print(f"   ✗ Error downloading census data: {e}")
    print("   → Will try alternative source...")


DOWNLOADING CHICAGO CENSUS/DEMOGRAPHIC DATA

2. Fetching census data from Chicago Data Portal...
   ✓ Downloaded census data for 78 community areas
   ✓ Saved to: ../data/raw/chicago_census_community_areas.csv

   Sample census data:
  ca community_area_name percent_of_housing_crowded  \
0  1         Rogers Park                        7.7   
1  2          West Ridge                        7.8   
2  3              Uptown                        3.8   
3  4      Lincoln Square                        3.4   
4  5        North Center                        0.3   

  percent_households_below_poverty percent_aged_16_unemployed  \
0                             23.6                        8.7   
1                             17.2                        8.8   
2                               24                        8.9   
3                             10.9                        8.2   
4                              7.5                        5.2   

  percent_aged_25_without_high_school_diplo

In [18]:
print("\n" + "="*70)
print("DOWNLOADING CHICAGO CENSUS/DEMOGRAPHIC DATA")
print("="*70)

print("\n2. Fetching census data from Chicago Data Portal...")

# Chicago community areas demographic data
census_url = "https://data.cityofchicago.org/resource/kn9c-c2s2.json"

try:
    # Get census data via API
    response = requests.get(census_url, params={"$limit": 100}, timeout=30)
    response.raise_for_status()
    census_data = response.json()
    
    # Convert to DataFrame
    census_df = pd.DataFrame(census_data)
    
    # Save to CSV
    census_path = '../data/raw/chicago_census_community_areas.csv'
    census_df.to_csv(census_path, index=False)
    
    print(f"   ✓ Downloaded census data for {len(census_df)} community areas")
    print(f"   ✓ Saved to: {census_path}")
    
    # Show sample
    print("\n   Sample census data:")
    print(census_df.head())
    
except Exception as e:
    print(f"   ✗ Error downloading census data: {e}")
    print("   → Will try alternative source...")


DOWNLOADING CHICAGO CENSUS/DEMOGRAPHIC DATA

2. Fetching census data from Chicago Data Portal...
   ✓ Downloaded census data for 78 community areas
   ✓ Saved to: ../data/raw/chicago_census_community_areas.csv

   Sample census data:
  ca community_area_name percent_of_housing_crowded  \
0  1         Rogers Park                        7.7   
1  2          West Ridge                        7.8   
2  3              Uptown                        3.8   
3  4      Lincoln Square                        3.4   
4  5        North Center                        0.3   

  percent_households_below_poverty percent_aged_16_unemployed  \
0                             23.6                        8.7   
1                             17.2                        8.8   
2                               24                        8.9   
3                             10.9                        8.2   
4                              7.5                        5.2   

  percent_aged_25_without_high_school_diplo

In [12]:
# Continue with the rest of cleaning
print("="*70)
print("DATA CLEANING & QUALITY CHECKS")
print("="*70)

# 1. Total records
total_records = df.count()
print(f"\n1. Total Records: {total_records:,}")

# 2. Check for duplicates
print("\n2. Duplicate Check:")
distinct_records = df.distinct().count()
duplicates = total_records - distinct_records
print(f"   Distinct records: {distinct_records:,}")
print(f"   Duplicate records: {duplicates:,}")

# 3. Remove records with missing critical data (coordinates and date)
print("\n3. Cleaning Data...")
df_clean = df.filter(
    col("latitude").isNotNull() & 
    col("longitude").isNotNull() &
    col("date_ts").isNotNull() &
    col("primary_type").isNotNull()
)

cleaned_count = df_clean.count()
removed = total_records - cleaned_count
print(f"   Records after cleaning: {cleaned_count:,}")
print(f"   Records removed: {removed:,} ({(removed/total_records)*100:.2f}%)")

# 4. Check coordinate validity (Chicago boundaries)
print("\n4. Coordinate Validity Check:")
valid_coords = df_clean.filter(
    (col("latitude") >= 41.6) & (col("latitude") <= 42.1) &  # Chicago lat range
    (col("longitude") >= -87.9) & (col("longitude") <= -87.5)  # Chicago lon range
).count()

invalid_coords = cleaned_count - valid_coords
print(f"   Valid coordinates: {valid_coords:,} ({(valid_coords/cleaned_count)*100:.2f}%)")
print(f"   Invalid coordinates: {invalid_coords:,} ({(invalid_coords/cleaned_count)*100:.2f}%)")

# 5. Filter to valid coordinates only
df_clean = df_clean.filter(
    (col("latitude") >= 41.6) & (col("latitude") <= 42.1) &
    (col("longitude") >= -87.9) & (col("longitude") <= -87.5)
)

final_count = df_clean.count()
print(f"\n5. Final Clean Dataset: {final_count:,} records")

# 6. Cache the clean dataset for faster processing
df_clean.cache()
print(f"   ✓ Dataset cached in memory")

print("\n" + "="*70)
print("✓ DATA CLEANING COMPLETE")
print("="*70)

# Show sample of clean data
print("\nSample of clean data:")
df_clean.select("date_ts", "primary_type", "latitude", "longitude", "year", "season").show(5, truncate=False)

DATA CLEANING & QUALITY CHECKS


                                                                                


1. Total Records: 8,341,471

2. Duplicate Check:


                                                                                

   Distinct records: 8,341,471
   Duplicate records: 0

3. Cleaning Data...


                                                                                

   Records after cleaning: 8,341,471
   Records removed: 0 (0.00%)

4. Coordinate Validity Check:


                                                                                

   Valid coordinates: 8,313,956 (99.67%)
   Invalid coordinates: 27,515 (0.33%)


[Stage 102:>                                                       (0 + 8) / 15]



25/11/10 11:54:48 ERROR Executor: Exception in task 7.0 in stage 102.0 (TID 689)
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:54)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:559)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.sortedIterator(UnsafeKVExternalSorter.java:206)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.finishAggregate(HashAggregateExec.scala:226)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowI

Py4JJavaError: An error occurred while calling o1187.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 102.0 failed 1 times, most recent failure: Lost task 7.0 in stage 102.0 (TID 689) (10-20-4-140.dynapool.wireless.nyu.edu executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:54)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:559)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.sortedIterator(UnsafeKVExternalSorter.java:206)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.finishAggregate(HashAggregateExec.scala:226)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$3055/0x000000e001ec71e0.apply(Unknown Source)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:54)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:559)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.sortedIterator(UnsafeKVExternalSorter.java:206)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.finishAggregate(HashAggregateExec.scala:226)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$3055/0x000000e001ec71e0.apply(Unknown Source)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Restart Spark with MUCH more memory
spark = SparkSession.builder \
    .appName("CrimePrediction") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .getOrCreate()

print("✓ Spark restarted with 8GB memory")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/10 13:38:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✓ Spark restarted with 8GB memory


In [2]:
# Cell 2: Load DATA WITH SAMPLING from the start
print("="*70)
print("LOADING DATA (WITH SAMPLING FOR MEMORY EFFICIENCY)")
print("="*70)

# Load ONLY 2 million records to avoid memory issues
df = spark.read.csv(
    "../data/raw/Crimes_-_2001_to_Present.csv", 
    header=True, 
    inferSchema=True
).limit(2000000)  # Load only 2M records

print(f"✓ Loaded {df.count():,} records")

# Convert date
df = df.withColumn("date_ts", F.to_timestamp("Date", "MM/dd/yyyy hh:mm:ss a"))

# Add features
df = (
    df
    .withColumn("year", F.year("date_ts"))
    .withColumn("month", F.month("date_ts"))
    .withColumn("hour", F.hour("date_ts"))
    .withColumn("dayofweek", F.date_format("date_ts", "E"))
    .withColumn(
        "season",
        F.when(F.col("month").isin(12, 1, 2), "winter")
         .when(F.col("month").isin(3, 4, 5), "spring")
         .when(F.col("month").isin(6, 7, 8), "summer")
         .otherwise("fall")
    )
    .withColumn("lat_round", F.round("latitude", 3))
    .withColumn("lon_round", F.round("longitude", 3))
)

print("✓ Feature engineering complete")

LOADING DATA (WITH SAMPLING FOR MEMORY EFFICIENCY)


25/11/10 13:40:14 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: ../data/raw/Crimes_-_2001_to_Present.csv.
java.io.FileNotFoundException: File ../data/raw/Crimes_-_2001_to_Present.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/neethusatravada/Documents/DATABASE/crime_prediction_project/data/raw/Crimes_-_2001_to_Present.csv. SQLSTATE: 42K03

In [20]:
# Find the crime data file
import os

print("Looking for crime data file...")

# Check different possible locations
possible_paths = [
    "../data/raw/",
    "./data/raw/",
    "../data/",
    "./",
]

crime_file = None
for path in possible_paths:
    if os.path.exists(path):
        files = [f for f in os.listdir(path) if 'crime' in f.lower() or 'Crimes' in f]
        if files:
            crime_file = os.path.join(path, files[0])
            print(f"✓ Found crime data: {crime_file}")
            break

if not crime_file:
    print("✗ Crime data not found. Please provide the path to your crime CSV file.")
else:
    print(f"\nFile size: {os.path.getsize(crime_file) / (1024**3):.2f} GB")

Looking for crime data file...
✓ Found crime data: ../data/raw/chicago_crimes_2001_present.csv

File size: 1.86 GB


In [21]:
print("="*70)
print("LOADING CRIME DATA")
print("="*70)

# Replace with your actual path from Step 1
crime_path = crime_file  # Use the path found above

# Load with limit for memory efficiency
df = spark.read.csv(
    crime_path,
    header=True,
    inferSchema=True
).limit(2000000)  # 2M records

print(f"✓ Loaded {df.count():,} crime records")

# Convert date and add features
df = df.withColumn("date_ts", F.to_timestamp("Date", "MM/dd/yyyy hh:mm:ss a"))

df = (
    df
    .withColumn("year", F.year("date_ts"))
    .withColumn("month", F.month("date_ts"))
    .withColumn("day", F.dayofmonth("date_ts"))
    .withColumn("hour", F.hour("date_ts"))
    .withColumn("dayofweek", F.date_format("date_ts", "E"))
    .withColumn(
        "season",
        F.when(F.col("month").isin(12, 1, 2), "winter")
         .when(F.col("month").isin(3, 4, 5), "spring")
         .when(F.col("month").isin(6, 7, 8), "summer")
         .otherwise("fall")
    )
    .withColumn("crime_date", F.to_date("date_ts"))  # Important for joining with weather
)

# Clean data
df_clean = df.filter(
    (F.col("latitude").isNotNull()) & 
    (F.col("longitude").isNotNull()) &
    (F.col("date_ts").isNotNull())
)

print(f"✓ Clean records: {df_clean.count():,}")
print("✓ Feature engineering complete")

LOADING CRIME DATA


                                                                                

✓ Loaded 2,000,000 crime records




✓ Clean records: 1,971,196
✓ Feature engineering complete


                                                                                

In [22]:
print("\n" + "="*70)
print("INTEGRATING WEATHER DATA")
print("="*70)

# Load weather data
weather_path = "../data/raw/chicago_weather_2001_2024.csv"
weather_spark = spark.read.csv(weather_path, header=True, inferSchema=True)

# Convert weather date to date type
weather_spark = weather_spark.withColumn("weather_date", F.to_date("date"))

print(f"✓ Loaded {weather_spark.count():,} days of weather data")

# Join crime data with weather data
df_integrated = df_clean.join(
    weather_spark,
    df_clean.crime_date == weather_spark.weather_date,
    "left"
)

print(f"✓ Integrated crime + weather: {df_integrated.count():,} records")

# Show sample
print("\nSample of integrated data:")
df_integrated.select(
    "date_ts", "primary_type", "temp_mean", "precipitation", "wind_speed"
).show(5, truncate=False)


INTEGRATING WEATHER DATA
✓ Loaded 8,766 days of weather data


                                                                                

✓ Integrated crime + weather: 1,971,196 records

Sample of integrated data:


{"ts": "2025-11-10 14:35:54.703", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `primary_type` cannot be resolved. Did you mean one of the following? [`Primary Type`, `District`, `crime_date`, `date_ts`, `Arrest`]. SQLSTATE: 42703", "context": {"file": "jdk.internal.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o657.select.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `primary_type` cannot be resolved. Did you mean one of the following? [`Primary Type`, `District`, `crime_date`, `date_ts`, `Arrest`]. SQLSTATE: 42703;\n'Project [date_ts#1553, 'primary_type, temp_mean#1617, precipitation#1618, wind_speed#1619]\n+- Join Left

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `primary_type` cannot be resolved. Did you mean one of the following? [`Primary Type`, `District`, `crime_date`, `date_ts`, `Arrest`]. SQLSTATE: 42703;
'Project [date_ts#1553, 'primary_type, temp_mean#1617, precipitation#1618, wind_speed#1619]
+- Join LeftOuter, (crime_date#1560 = weather_date#1621)
   :- Filter ((isnotnull(latitude#1523) AND isnotnull(longitude#1524)) AND isnotnull(date_ts#1553))
   :  +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, year#1554, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, date_ts#1553, month#1555, day#1556, ... 4 more fields]
   :     +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, year#1554, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, date_ts#1553, month#1555, day#1556, ... 3 more fields]
   :        +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, year#1554, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, date_ts#1553, month#1555, day#1556, ... 2 more fields]
   :           +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, year#1554, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, date_ts#1553, month#1555, day#1556, ... 1 more fields]
   :              +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, year#1554, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, date_ts#1553, month#1555, dayofmonth(cast(date_ts#1553 as date)) AS day#1556]
   :                 +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, year#1554, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, date_ts#1553, month(cast(date_ts#1553 as date)) AS month#1555]
   :                    +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, year(cast(date_ts#1553 as date)) AS year#1554, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, date_ts#1553]
   :                       +- Project [ID#1504, Case Number#1505, Date#1506, Block#1507, IUCR#1508, Primary Type#1509, Description#1510, Location Description#1511, Arrest#1512, Domestic#1513, Beat#1514, District#1515, Ward#1516, Community Area#1517, FBI Code#1518, X Coordinate#1519, Y Coordinate#1520, Year#1521, Updated On#1522, Latitude#1523, Longitude#1524, Location#1525, to_timestamp(Date#1506, Some(MM/dd/yyyy hh:mm:ss a), TimestampType, Some(America/New_York), true) AS date_ts#1553]
   :                          +- GlobalLimit 2000000
   :                             +- LocalLimit 2000000
   :                                +- Relation [ID#1504,Case Number#1505,Date#1506,Block#1507,IUCR#1508,Primary Type#1509,Description#1510,Location Description#1511,Arrest#1512,Domestic#1513,Beat#1514,District#1515,Ward#1516,Community Area#1517,FBI Code#1518,X Coordinate#1519,Y Coordinate#1520,Year#1521,Updated On#1522,Latitude#1523,Longitude#1524,Location#1525] csv
   +- Project [date#1614, temp_max#1615, temp_min#1616, temp_mean#1617, precipitation#1618, wind_speed#1619, to_date(date#1614, None, Some(America/New_York), true) AS weather_date#1621]
      +- Relation [date#1614,temp_max#1615,temp_min#1616,temp_mean#1617,precipitation#1618,wind_speed#1619] csv


In [23]:
# Show sample with correct column names (use backticks for columns with spaces)
print("\nSample of integrated data:")
df_integrated.select(
    "date_ts", 
    F.col("Primary Type"),  # Correct column name
    "temp_mean", 
    "precipitation", 
    "wind_speed"
).show(5, truncate=False)


Sample of integrated data:




+-------------------+---------------+---------+-------------+----------+
|date_ts            |Primary Type   |temp_mean|precipitation|wind_speed|
+-------------------+---------------+---------+-------------+----------+
|2020-08-10 09:45:00|ROBBERY        |75.2     |0.295        |14.6      |
|2023-09-06 17:00:00|CRIMINAL DAMAGE|75.7     |0.331        |19.8      |
|2023-09-06 11:00:00|THEFT          |75.7     |0.331        |19.8      |
|2019-05-21 08:20:00|BURGLARY       |49.1     |0.409        |21.9      |
|2021-07-07 10:30:00|SEX OFFENSE    |75.2     |0.15         |12.4      |
+-------------------+---------------+---------+-------------+----------+
only showing top 5 rows


                                                                                

In [26]:
print("\n" + "="*70)
print("INTEGRATING CENSUS/DEMOGRAPHIC DATA")
print("="*70)

# Load census data - USE THE CORRECT FILENAME
census_path = "../data/raw/chicago_census_community_areas.csv"  # ← FIXED PATH
census_spark = spark.read.csv(census_path, header=True, inferSchema=True)

print(f"✓ Loaded census data: {census_spark.count()} community areas")

# Show census columns to see what's available
print("\nCensus columns available:")
census_spark.printSchema()

# The census data has "ca" column for community area number
# Join with crime data by Community Area
df_final = df_integrated.join(
    census_spark,
    df_integrated["Community Area"] == census_spark["ca"],  # ← Match column names
    "left"
)

print(f"✓ Final integrated dataset: {df_final.count():,} records")

# Cache the final dataset
df_final.cache()
print("✓ Final dataset cached")

print("\n" + "="*70)
print("✓ ALL DATA INTEGRATION COMPLETE!")
print("="*70)

# Show sample with all data sources - CORRECT COLUMN NAMES
print("\nSample of fully integrated data:")
df_final.select(
    "date_ts", 
    F.col("Primary Type"),
    "temp_mean", 
    "precipitation", 
    F.col("Community Area"),
    F.col("per_capita_income_")  # ← Fixed: added underscore
).show(10, truncate=False)


INTEGRATING CENSUS/DEMOGRAPHIC DATA
✓ Loaded census data: 78 community areas

Census columns available:
root
 |-- ca: integer (nullable = true)
 |-- community_area_name: string (nullable = true)
 |-- percent_of_housing_crowded: double (nullable = true)
 |-- percent_households_below_poverty: double (nullable = true)
 |-- percent_aged_16_unemployed: double (nullable = true)
 |-- percent_aged_25_without_high_school_diploma: double (nullable = true)
 |-- percent_aged_under_18_or_over_64: double (nullable = true)
 |-- per_capita_income_: integer (nullable = true)
 |-- hardship_index: integer (nullable = true)



[Stage 133:>                                                        (0 + 1) / 1]

✓ Final integrated dataset: 1,971,196 records
✓ Final dataset cached

✓ ALL DATA INTEGRATION COMPLETE!

Sample of fully integrated data:
+-------------------+-------------------+---------+-------------+--------------+------------------+
|date_ts            |Primary Type       |temp_mean|precipitation|Community Area|per_capita_income_|
+-------------------+-------------------+---------+-------------+--------------+------------------+
|2020-08-10 09:45:00|ROBBERY            |75.2     |0.295        |24            |43198             |
|2023-09-06 17:00:00|CRIMINAL DAMAGE    |75.7     |0.331        |32            |65526             |
|2023-09-06 11:00:00|THEFT              |75.7     |0.331        |32            |65526             |
|2019-05-21 08:20:00|BURGLARY           |49.1     |0.409        |29            |12034             |
|2021-07-07 10:30:00|SEX OFFENSE        |75.2     |0.15         |54            |8201              |
|2022-06-14 14:47:00|ROBBERY            |84.8     |0.0         

25/11/10 14:40:54 WARN CacheManager: Asked to cache already cached data.        


In [27]:
print("\n" + "="*70)
print("WEEK 2 - FINAL STATISTICAL SUMMARY REPORT")
print("Team: Kiran Ghumare, Neethu Satravada, Sajitha Mathi")
print("="*70)

print(f"\n✅ DATASET OVERVIEW:")
print(f"   Total integrated records: {df_final.count():,}")
print(f"   Weather data: 8,766 days (2001-2024)")
print(f"   Census areas: {census_spark.count()} community areas")
print(f"   Date range: 2001-2024")

print(f"\n✅ DATA SOURCES INTEGRATED:")
print(f"   ✓ Crime data (Chicago Police Department)")
print(f"   ✓ Weather data (Open-Meteo API)")
print(f"   ✓ Census/demographic data (Chicago Data Portal)")

print(f"\n✅ FEATURES ENGINEERED:")
print(f"   Temporal: year, month, day, hour, day_of_week, season")
print(f"   Weather: temperature, precipitation, wind_speed")
print(f"   Demographic: per_capita_income, hardship_index, poverty rates")
print(f"   Spatial: latitude, longitude, community_area")

print("\n" + "="*70)
print("KEY INSIGHTS & PATTERNS")
print("="*70)

# 1. Top 10 Crime Types
print("\n1. TOP 10 CRIME TYPES:")
df_final.groupBy(F.col("Primary Type")).count() \
    .orderBy(F.desc("count")) \
    .show(10, truncate=False)

# 2. Crimes by Season with Weather
print("\n2. CRIMES BY SEASON (with average weather):")
df_final.groupBy("season").agg(
    F.count("*").alias("crime_count"),
    F.round(F.avg("temp_mean"), 1).alias("avg_temp_F"),
    F.round(F.avg("precipitation"), 2).alias("avg_precip_inch")
).orderBy(F.desc("crime_count")).show()

# 3. Peak Crime Hours
print("\n3. PEAK CRIME HOURS:")
df_final.groupBy("hour").count() \
    .orderBy(F.desc("count")) \
    .show(5)

# 4. Crime by Day of Week
print("\n4. CRIMES BY DAY OF WEEK:")
df_final.groupBy("dayofweek").count() \
    .orderBy(F.desc("count")) \
    .show()

# 5. Weather Impact Analysis
print("\n5. WEATHER IMPACT ON CRIME:")
print("   High Temperature Days (>80°F) vs Low Temperature Days (<40°F):")
high_temp = df_final.filter(F.col("temp_mean") > 80).count()
low_temp = df_final.filter(F.col("temp_mean") < 40).count()
print(f"   High temp crimes: {high_temp:,}")
print(f"   Low temp crimes: {low_temp:,}")
print(f"   Ratio: {high_temp/low_temp:.2f}x more crimes in hot weather")

# 6. Socioeconomic Analysis
print("\n6. CRIME BY INCOME LEVELS:")
df_final.groupBy(
    F.when(F.col("per_capita_income_") < 15000, "Low Income (<$15k)")
     .when((F.col("per_capita_income_") >= 15000) & (F.col("per_capita_income_") < 30000), "Medium Income ($15k-$30k)")
     .otherwise("High Income (>$30k)")
     .alias("income_level")
).count().orderBy(F.desc("count")).show()

print("\n" + "="*70)
print("✅ WEEK 2 DELIVERABLES COMPLETED!")
print("="*70)

print("\n📋 DELIVERABLES CHECKLIST:")
print("   ✅ Clean, integrated dataset created")
print("      - Crime data cleaned and validated")
print("      - Weather data integrated by date")
print("      - Census data integrated by community area")
print("   ✅ Feature engineering completed")
print("      - Temporal features (year, month, hour, season, etc.)")
print("      - Weather features (temperature, precipitation, wind)")
print("      - Demographic features (income, poverty, education)")
print("   ✅ Exploratory Data Analysis completed")
print("      - Crime patterns by time analyzed")
print("      - Weather impact assessed")
print("      - Socioeconomic correlations identified")
print("   ✅ Statistical summary report generated")

print("\n🎯 READY FOR WEEK 3: Geospatial Analysis & Hotspot Detection")
print("="*70)


WEEK 2 - FINAL STATISTICAL SUMMARY REPORT
Team: Kiran Ghumare, Neethu Satravada, Sajitha Mathi

✅ DATASET OVERVIEW:
   Total integrated records: 1,971,196
   Weather data: 8,766 days (2001-2024)
   Census areas: 78 community areas
   Date range: 2001-2024

✅ DATA SOURCES INTEGRATED:
   ✓ Crime data (Chicago Police Department)
   ✓ Weather data (Open-Meteo API)
   ✓ Census/demographic data (Chicago Data Portal)

✅ FEATURES ENGINEERED:
   Temporal: year, month, day, hour, day_of_week, season
   Weather: temperature, precipitation, wind_speed
   Demographic: per_capita_income, hardship_index, poverty rates
   Spatial: latitude, longitude, community_area

KEY INSIGHTS & PATTERNS

1. TOP 10 CRIME TYPES:
+-------------------+------+
|Primary Type       |count |
+-------------------+------+
|THEFT              |415186|
|BATTERY            |385291|
|CRIMINAL DAMAGE    |233811|
|NARCOTICS          |201587|
|ASSAULT            |131088|
|MOTOR VEHICLE THEFT|107739|
|BURGLARY           |106393|
|

In [28]:
df_final.write.parquet("../data/processed/integrated_crime_data.parquet", mode="overwrite")
print("✓ Saved integrated dataset for Week 3!")

AnalysisException: [COLUMN_ALREADY_EXISTS] The column `date` already exists. Choose another name or rename the existing column. SQLSTATE: 42711

In [30]:
# Save the integrated dataset from Week 2 - FIXED
print("="*70)
print("SAVING WEEK 2 DATA FOR WEEK 3")
print("="*70)

# Select only the columns we need (avoid duplicates)
df_to_save = df_final.select(
    "date_ts",
    F.col("Primary Type").alias("crime_type"),
    F.col("Latitude").alias("latitude"),
    F.col("Longitude").alias("longitude"),
    F.col("Community Area").alias("community_area"),
    "temp_mean",
    "precipitation",
    "wind_speed",
    "year",
    "month",
    "hour",
    "dayofweek",
    "season",
    F.col("per_capita_income_").alias("per_capita_income")
)

# Save to Parquet
output_path = "../data/processed/integrated_crime_data.parquet"
df_to_save.write.parquet(output_path, mode="overwrite")

print(f"✓ Saved integrated dataset to: {output_path}")
print(f"✓ Records saved: {df_to_save.count():,}")
print("\n✅ Week 2 data preserved!")

SAVING WEEK 2 DATA FOR WEEK 3


[Stage 158:>                                                        (0 + 1) / 1]

✓ Saved integrated dataset to: ../data/processed/integrated_crime_data.parquet
✓ Records saved: 1,971,196

✅ Week 2 data preserved!


                                                                                

In [31]:
# Install libraries needed for Week 3
!pip install h3 folium scikit-learn matplotlib seaborn geopandas
!pip install pysal esda libpysal

print("✓ All Week 3 libraries installed!")

Collecting h3
  Downloading h3-4.3.1-cp313-cp313-macosx_11_0_arm64.whl.metadata (18 kB)
Collecting folium
  Downloading folium-0.20.0-py2.py3-none-any.whl.metadata (4.2 kB)
Collecting scikit-learn
  Downloading scikit_learn-1.7.2-cp313-cp313-macosx_12_0_arm64.whl.metadata (11 kB)
Collecting geopandas
  Downloading geopandas-1.1.1-py3-none-any.whl.metadata (2.3 kB)
Collecting branca>=0.6.0 (from folium)
  Downloading branca-0.8.2-py3-none-any.whl.metadata (1.7 kB)
Collecting xyzservices (from folium)
  Downloading xyzservices-2025.10.0-py3-none-any.whl.metadata (4.3 kB)
Collecting scipy>=1.8.0 (from scikit-learn)
  Downloading scipy-1.16.3-cp313-cp313-macosx_14_0_arm64.whl.metadata (62 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Downloading joblib-1.5.2-py3-none-any.whl.metadata (5.6 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn)
  Downloading threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Collecting pyogrio>=0.7.2 (from geopandas)
  Downloading pyogrio-0.11.1-cp3

In [33]:
print("="*70)
print("WEEK 3: GEOSPATIAL ANALYSIS & HOTSPOT DETECTION")
print("Team: Kiran Ghumare, Neethu Satravada, Sajitha Mathi")
print("="*70)

# Load the saved data
df_geo = spark.read.parquet("../data/processed/integrated_crime_data.parquet")

print(f"✓ Loaded {df_geo.count():,} records")

# Show column names to confirm
print("\nAvailable columns:")
print(df_geo.columns)

# Convert to Pandas - use correct column names (already lowercase)
print("\nConverting to Pandas for geospatial analysis...")
df_pandas = df_geo.select(
    "date_ts",
    "crime_type",      # Already renamed
    "latitude",        # Already renamed
    "longitude",       # Already renamed
    "community_area",  # Already renamed
    "temp_mean",
    "year",
    "month",
    "hour",
    "season"
).toPandas()

print(f"✓ Converted {len(df_pandas):,} records to Pandas")
print("\n✅ Data ready for geospatial analysis!")

# Show sample
print("\nSample data:")
print(df_pandas.head())

WEEK 3: GEOSPATIAL ANALYSIS & HOTSPOT DETECTION
Team: Kiran Ghumare, Neethu Satravada, Sajitha Mathi
✓ Loaded 1,971,196 records

Available columns:
['date_ts', 'crime_type', 'latitude', 'longitude', 'community_area', 'temp_mean', 'precipitation', 'wind_speed', 'year', 'month', 'hour', 'dayofweek', 'season', 'per_capita_income']

Converting to Pandas for geospatial analysis...


                                                                                

✓ Converted 1,971,196 records to Pandas

✅ Data ready for geospatial analysis!

Sample data:
              date_ts       crime_type   latitude  longitude  community_area  \
0 2020-08-10 09:45:00          ROBBERY  41.908418 -87.677407            24.0   
1 2023-09-06 17:00:00  CRIMINAL DAMAGE  41.886018 -87.633938            32.0   
2 2023-09-06 11:00:00            THEFT  41.871835 -87.626151            32.0   
3 2019-05-21 08:20:00         BURGLARY  41.856547 -87.695605            29.0   
4 2021-07-07 10:30:00      SEX OFFENSE  41.655116 -87.594883            54.0   

   temp_mean  year  month  hour  season  
0       75.2  2020      8     9  summer  
1       75.7  2023      9    17    fall  
2       75.7  2023      9    11    fall  
3       49.1  2019      5     8  spring  
4       75.2  2021      7    10  summer  


In [None]:
import h3
import pandas as pd
import numpy as np

print("="*70)
print("H3 HEXAGONAL SPATIAL INDEXING")
print("="*70)

# Check h3 version
print(f"h3 version: {h3.__version__}")

# Add H3 index to each crime (resolution 8 = ~0.46 km² hexagons)
print("\nGenerating H3 hexagonal grid indices...")

# Use the correct function based on h3 version
def get_h3_index(lat, lon, resolution=8):
    try:
        # Try new API (h3 v4+)
        return h3.latlng_to_cell(lat, lon, resolution)
    except AttributeError:
        # Fall back to old API (h3 v3)
        return h3.geo_to_h3(lat, lon, resolution)

df_pandas['h3_index'] = df_pandas.apply(
    lambda row: get_h3_index(row['latitude'], row['longitude'], 8),
    axis=1
)

print(f"✓ Added H3 indices to {len(df_pandas):,} records")

# Aggregate crimes by hexagon
hex_crimes = df_pandas.groupby('h3_index').agg({
    'crime_type': 'count',
    'latitude': 'mean',
    'longitude': 'mean'
}).reset_index()

hex_crimes.columns = ['h3_index', 'crime_count', 'lat_center', 'lon_center']

print(f"✓ Created {len(hex_crimes):,} hexagonal cells")
print(f"✓ Average crimes per hex: {hex_crimes['crime_count'].mean():.1f}")

# Show top crime hotspot hexagons
print("\nTop 10 Crime Hotspot Hexagons:")
print(hex_crimes.nlargest(10, 'crime_count')[['h3_index', 'crime_count', 'lat_center', 'lon_center']])

print("\n✅ H3 spatial indexing complete!")

: 

In [35]:
pip install -U --pre h3

Note: you may need to restart the kernel to use updated packages.
