In [2]:
#original version
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Set Java environment variable
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

# Install PySpark
!pip install pyspark

# Import and setup
from pyspark.sql import SparkSession
from google.colab import drive

# Mount Drive
drive.mount('/content/drive')

# Initialize Spark
spark = SparkSession.builder \
    .appName("CSV ETL Pipeline") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()
sc = spark.sparkContext

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.82)] [Waiting for headers] [Connected to r2u.stat.                                                                                                    Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Hit:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Fetched 257 kB in 1s (194 kB/s)
Reading p

In [3]:
!java -version

openjdk version "11.0.26" 2025-01-21
OpenJDK Runtime Environment (build 11.0.26+4-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.26+4-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)


In [4]:
#FIXME:change directory to where the csv file are stored in EC2
crime = spark.read.csv("/content/drive/MyDrive/405 final/crime.csv", header=True, inferSchema=True)
#business = spark.read.csv("/content/drive/MyDrive/405 final/business.csv", header=True, inferSchema=True)
ins = spark.read.csv("/content/drive/MyDrive/405 final/ins.csv", header=True, inferSchema=True)

crime cleaning:
Crime: Premis Desc and Crm Cd 1 columns not null \\
convert 'Date Rptd'into datetime, and extract Year and month \\
convert dateOCC into timestamp and find DayOfWeek \\
Convert Time OCC into hour by floor divide by 100 and called hour  \\
Vict Sex, Vict Descent: change null to 'Unidentified'

In [5]:
# count the null value in each column
from pyspark.sql.functions import col, count, when
crime.select([count(when(col(c).isNull(), c)).alias(c) for c in crime.columns]).show()

+-----+---------+--------+--------+----+---------+-----------+--------+------+-----------+-------+--------+--------+------------+---------+-----------+--------------+-----------+------+-----------+--------+--------+--------+--------+--------+------------+---+---+
|DR_NO|Date Rptd|DATE OCC|TIME OCC|AREA|AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc|Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc|Weapon Used Cd|Weapon Desc|Status|Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION|Cross Street|LAT|LON|
+-----+---------+--------+--------+----+---------+-----------+--------+------+-----------+-------+--------+--------+------------+---------+-----------+--------------+-----------+------+-----------+--------+--------+--------+--------+--------+------------+---+---+
|    0|        0|       0|       0|   0|        0|          0|       0|     0|          0| 151692|       0|  144720|      144732|       16|        588|        677816|     677816|     1|          0|      11|  

In [6]:
crime.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- L

In [7]:
from pyspark.sql.functions import dayofweek
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp
crime = crime.filter(crime["Premis Desc"].isNotNull() & crime["Crm Cd 1"].isNotNull())
# convert Date Rptd and Date OCC column from string to datetime
crime = crime.withColumn("Date Rptd",
    when(
        crime["Date Rptd"].rlike(r"\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2} [AP]M"),
        to_timestamp(crime["Date Rptd"], "MM/dd/yyyy hh:mm:ss a")
    ).otherwise(None))
crime = crime.withColumn("DATE OCC",
    when(
        crime["DATE OCC"].rlike(r"\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2} [AP]M"),
        to_timestamp(crime["DATE OCC"], "MM/dd/yyyy hh:mm:ss a")
    ).otherwise(None))
# extract the day of the week info and hour info for OCC
crime = crime.withColumn("DayOfWeek OCC", dayofweek(crime["DATE OCC"]))
crime = crime.withColumn("TIME OCC", (col("TIME OCC") / 100).cast("int"))
crime = crime.withColumn(
    "DayOfWeek OCC",
    when(col("DayOfWeek OCC") == 1, "Sunday")
    .when(col("DayOfWeek OCC") == 2, "Monday")
    .when(col("DayOfWeek OCC") == 3, "Tuesday")
    .when(col("DayOfWeek OCC") == 4, "Wednesday")
    .when(col("DayOfWeek OCC") == 5, "Thursday")
    .when(col("DayOfWeek OCC") == 6, "Friday")
    .when(col("DayOfWeek OCC") == 7, "Saturday")
)
#fill null value
crime = crime.withColumn("Vict Sex", when(crime["Vict Sex"].isNull(), "Unidentified").otherwise(crime["Vict Sex"]))
crime = crime.withColumn("Vict Descent", when(crime["Vict Descent"].isNull(), "Unidentified").otherwise(crime["Vict Descent"]))


SPACER

Building_and_Safety_Inspections:
Latitude/Longitude is not null \\
then Extract latitude and longitude as float from Latitude/Longitude \\

convert 'Inspection Date'into datetime, and extract Year and month \\




In [8]:
ins.show(10, truncate = False)

+-------------------------+-----------------+--------------+---------------+---------------------+-----------------+----------------------+
|ADDRESS                  |PERMIT           |Permit Status |Inspection Date|Inspection Type      |Inspection Result|Latitude/Longitude    |
+-------------------------+-----------------+--------------+---------------+---------------------+-----------------+----------------------+
|10000 W SANTA MONICA BLVD|14044 10000 02293|Issued        |07/20/2016     |Rough-Ventilation    |Partial Approval |(34.06364, -118.41437)|
|1000 S SANTA FE AVE      |15016 10000 18196|Permit Finaled|07/22/2016     |Smoke Detectors      |Insp Cancelled   |(34.03143, -118.22981)|
|3680 N BUENA PARK DR     |15014 10000 04931|Issued        |07/18/2016     |Insulation           |Approved         |(34.13745, -118.38853)|
|1001 N LINDENWOOD LANE   |16042 90000 14712|Permit Finaled|07/20/2016     |Final                |Permit Finaled   |(34.07732, -118.48578)|
|2836 S ANCHOR AVE  

In [9]:
ins.printSchema()

root
 |-- ADDRESS: string (nullable = true)
 |-- PERMIT: string (nullable = true)
 |-- Permit Status: string (nullable = true)
 |-- Inspection Date: string (nullable = true)
 |-- Inspection Type: string (nullable = true)
 |-- Inspection Result: string (nullable = true)
 |-- Latitude/Longitude: string (nullable = true)



In [10]:
ins.select([count(when(col(c).isNull(), c)).alias(c) for c in ins.columns]).show()

+-------+------+-------------+---------------+---------------+-----------------+------------------+
|ADDRESS|PERMIT|Permit Status|Inspection Date|Inspection Type|Inspection Result|Latitude/Longitude|
+-------+------+-------------+---------------+---------------+-----------------+------------------+
|      1|     0|       481120|         205257|         481137|           481419|            481120|
+-------+------+-------------+---------------+---------------+-----------------+------------------+



In [11]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import year, month
ins = ins.filter(ins["Latitude/Longitude"].isNotNull())
ins = ins.withColumn("latitude", regexp_extract("Latitude/Longitude", r"\((-?\d+\.\d+),", 1).cast("float"))
ins = ins.withColumn("longitude", regexp_extract("Latitude/Longitude", r", (-?\d+\.\d+)\)", 1).cast("float"))
#drop the column called Latitude/Longitude from ins
ins = ins.drop("Latitude/Longitude")
ins = ins.withColumn("Inspection Date",
    when(
        ins["Inspection Date"].rlike(r"\d{2}/\d{2}/\d{4}"),
        to_timestamp(ins["Inspection Date"], "MM/dd/yyyy")
    ).otherwise(None))


# extract the year and month info from inspection date
ins = ins.withColumn("year", year(ins["Inspection Date"]))
ins = ins.withColumn("month", month(ins["Inspection Date"]))

In [None]:
#save file as parquet, I used the cell below to save csv during production
ins.write.parquet("/content/drive/MyDrive/405 final/ins_processed.parquet")
crime.write.parquet("/content/drive/MyDrive/405 final/crime_processed.parquet")

In [None]:
#I used this one: save file as 1 single csv
ins.coalesce(1).write.mode("overwrite").option("header", "true").csv("/content/drive/MyDrive/405 final/ins_csv")
crime.coalesce(1).write.mode("overwrite").option("header", "true").csv("/content/drive/MyDrive/405 final/crime_csv")

In [12]:
crime.show(3)

+---------+-------------------+-------------------+--------+----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------------+------------+-------+---------+-------------+
|    DR_NO|          Date Rptd|           DATE OCC|TIME OCC|AREA|AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|       Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|Cross Street|    LAT|      LON|DayOfWeek OCC|
+---------+-------------------+-------------------+--------+----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+-----------+------+------------+--------+--------+--------+--------+--------------

In [13]:
ins.show(3)

+--------------------+-----------------+--------------+-------------------+-----------------+-----------------+--------+----------+----+-----+
|             ADDRESS|           PERMIT| Permit Status|    Inspection Date|  Inspection Type|Inspection Result|latitude| longitude|year|month|
+--------------------+-----------------+--------------+-------------------+-----------------+-----------------+--------+----------+----+-----+
|10000 W SANTA MON...|14044 10000 02293|        Issued|2016-07-20 00:00:00|Rough-Ventilation| Partial Approval|34.06364|-118.41437|2016|    7|
| 1000 S SANTA FE AVE|15016 10000 18196|Permit Finaled|2016-07-22 00:00:00|  Smoke Detectors|   Insp Cancelled|34.03143|-118.22981|2016|    7|
|3680 N BUENA PARK DR|15014 10000 04931|        Issued|2016-07-18 00:00:00|       Insulation|         Approved|34.13745|-118.38853|2016|    7|
+--------------------+-----------------+--------------+-------------------+-----------------+-----------------+--------+----------+----+-----+

Spacer: below cells are experimental code that are not used in final scipt

In [None]:
from pyspark.sql.functions import col
from sedona.register import SedonaRegistrator
from sedona.utils.adapter import Adapter
from pyspark.sql.functions import col, expr, count

SedonaRegistrator.registerAll(spark)


  SedonaRegistrator.registerAll(spark)


In [None]:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [None]:
ins = ins.withColumn("ins_point", expr("ST_Point(longitude, latitude)"))

# Convert `crime` (crime locations) to a spatial DataFrame
crime = crime.withColumn("crime_point", expr("ST_Point(Lon, Lat)"))

# Create a 200m buffer around each crime location and ensure it is properly formatted
crime = crime.withColumn("crime_buffer", expr("ST_AsText(ST_Buffer(crime_point, 200))"))

# Convert buffer back to a geometry column for spatial join
crime = crime.withColumn("crime_buffer", expr("ST_GeomFromWKT(crime_buffer)"))

# Perform spatial join: Find inspections inside crime buffers
joined_df = crime.alias("c").join(
    ins.alias("i"),
    expr("ST_Contains(c.crime_buffer, i.ins_point)"),  # Check if inspection is within buffer
    "left"
).groupby("c.Lon", "c.Lat").agg(count("i.ins_point").alias("inspection_count"))


# Show results
joined_df.show()

Py4JJavaError: An error occurred while calling o239.showString.
: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.notEnoughMemoryToBuildAndBroadcastTableError(QueryExecutionErrors.scala:2213)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:187)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:224)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:219)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, ArrayType, StructType, StructField, DoubleType, StringType
from math import radians, cos, sin, asin, sqrt
import json

# Haversine distance calculation UDF
@F.udf(returnType=IntegerType())
def haversine_distance(lon1, lat1, lon2, lat2):
    """Calculate distance between two points in meters"""
    # Convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # Haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6371000  # Radius of earth in meters
    return int(c * r)

# Create grid IDs with slightly higher precision for better performance
def get_grid_id(lat, lon, precision=0.01):
    """Get grid cell id for approximate location bucketing"""
    return f"{int(lat/precision)},{int(lon/precision)}"

# Add grid IDs to both dataframes
crime = crime.withColumn("grid_id",
    F.udf(get_grid_id, StringType())(F.col("latitude"), F.col("longitude")))

ins = ins.withColumn("grid_id",
    F.udf(get_grid_id, StringType())(F.col("latitude"), F.col("longitude")))

# Create a map of grid cells to inspection points
# This will be small enough to broadcast
ins_by_grid = ins.groupBy("grid_id").agg(
    F.collect_list(
        F.struct("longitude", "latitude")
    ).alias("inspection_points")
)

# Convert to a Python dictionary for broadcasting
ins_grid_dict = {row["grid_id"]: row["inspection_points"]
                 for row in ins_by_grid.collect()}

# Broadcast the dictionary to all executors
ins_grid_broadcast = spark.sparkContext.broadcast(ins_grid_dict)

# Create a UDF that counts nearby inspections using the broadcast variable
@F.udf(returnType=IntegerType())
def count_nearby_inspections(grid_id, crime_lon, crime_lat):
    """Count inspections within 500m of a crime location"""
    # Get inspection points in the same grid cell
    inspection_points = ins_grid_broadcast.value.get(grid_id, [])

    # Count points within 500m
    count = 0
    for point in inspection_points:
        ins_lon = point["longitude"]
        ins_lat = point["latitude"]

        # Convert to radians
        lon1, lat1, lon2, lat2 = map(radians, [crime_lon, crime_lat, ins_lon, ins_lat])

        # Haversine formula
        dlon = lon2 - lon1
        dlat = lat2 - lat1
        a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
        c = 2 * asin(sqrt(a))
        r = 6371000  # Radius of earth in meters
        distance = c * r

        if distance <= 500:
            count += 1

    return count

# Apply the UDF to count nearby inspections for each crime
result = crime.withColumn(
    "nearby_inspections_count",
    count_nearby_inspections(
        F.col("grid_id"),
        F.col("longitude"),
        F.col("latitude")
    )
)

# Display result
result.select("latitude", "longitude", "nearby_inspections_count").show()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 