In [1]:
import os
import pyspark
conf = pyspark.SparkConf()

# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')
conf.set('spark.sql.repl.eagerEval.enabled', True) # enabled for debuggig 
conf.set('spark.driver.memory','12g')
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)



In [2]:
from pyspark.sql.functions import col, to_date, when, concat, lit, count, avg
from pyspark.sql.types import DateType, StructType, StructField, DoubleType, StringType
from itertools import product
from pyspark.sql import functions as F
import requests
import googlemaps
import json
from flask import Flask, request, jsonify

In [3]:
arrest_data_df = spark.read.option("header", "true").csv("data/NYPD_Arrest_Data__Year_to_Date__20231212.csv")
arrest_data_df = arrest_data_df.withColumn("ARREST_DATE", to_date(arrest_data_df["ARREST_DATE"], "MM/dd/yyyy").cast(DateType()))
arrest_data_df = arrest_data_df \
    .select(col("ARREST_DATE"), col("ARREST_BORO"), col("AGE_GROUP"), col("PERP_SEX"), col("PERP_RACE"), col("Latitude"), col("Longitude")) 
arrest_data_df = arrest_data_df \
    .filter(col("ARREST_BORO") == "S") 

arrest_data_df = arrest_data_df.dropna()
print(arrest_data_df.count())
arrest_data_df

7415


ARREST_DATE,ARREST_BORO,AGE_GROUP,PERP_SEX,PERP_RACE,Latitude,Longitude
2023-04-10,S,18-24,F,WHITE HISPANIC,40.644996,-74.077263
2023-07-24,S,25-44,M,BLACK,40.628914,-74.125568
2023-03-07,S,25-44,M,WHITE,40.644996,-74.077263
2023-03-17,S,<18,M,WHITE,40.527587,-74.191658
2023-05-30,S,25-44,F,BLACK,40.644996,-74.077263
2023-03-07,S,25-44,M,WHITE,40.511577,-74.249302
2023-02-17,S,25-44,M,BLACK,40.59726661499576,-74.05761445439731
2023-05-16,S,25-44,U,BLACK,40.623238,-74.149217
2023-01-24,S,25-44,M,WHITE HISPANIC,40.623238,-74.149217
2023-01-23,S,25-44,M,WHITE,40.61865,-74.073309


In [4]:
# Define the boundaries
# min_latitude, max_latitude = 40.7, 40.901
# min_longitude, max_longitude = -74.05, -73.899

# Define the new boundaries according to data
min_latitude, max_latitude = 40.49, 62.08
min_longitude, max_longitude = -74.26, -73.68

# Number of divisions along each axis to create 1,000,000 zones (1000x1000)
num_divisions = 500

lat_step = (max_latitude - min_latitude) / num_divisions
lon_step = (max_longitude - min_longitude) / num_divisions

In [5]:
from pyspark.sql.functions import col, lead, round, monotonically_increasing_id
from pyspark.sql.window import Window


# Generate latitude and longitude DataFrames with lead values and round off to 6 decimal places
lat_df = spark.range(0, num_divisions + 1).select(round(min_latitude + col("id") * lat_step, 6).alias("y1"))
lat_df = lat_df.withColumn("y2", round(lead("y1").over(Window.orderBy("y1")), 6))

lon_df = spark.range(0, num_divisions + 1).select(round(min_longitude + col("id") * lon_step, 6).alias("x1"))
lon_df = lon_df.withColumn("x2", round(lead("x1").over(Window.orderBy("x1")), 6))

# Remove the last row from lat_df and lon_df as they do not form complete squares
lat_df = lat_df.where(col("y2").isNotNull())
lon_df = lon_df.where(col("x2").isNotNull())

# Cross-join latitude and longitude DataFrames to create zones
zone_df = lat_df.crossJoin(lon_df)

# Assign zone IDs
zone_df = zone_df.withColumn("zone_id", monotonically_increasing_id())

# zone_df.write.csv("path_to_output.csv")

# Display some of the zones
zone_df.select("x1", "y1", "x2", "y2", "zone_id")

x1,y1,x2,y2,zone_id
-74.26,40.49,-74.25884,40.53318,0
-74.25884,40.49,-74.25768,40.53318,1
-74.25768,40.49,-74.25652,40.53318,2
-74.25652,40.49,-74.25536,40.53318,3
-74.25536,40.49,-74.2542,40.53318,4
-74.2542,40.49,-74.25304,40.53318,5
-74.25304,40.49,-74.25188,40.53318,6
-74.25188,40.49,-74.25072,40.53318,7
-74.25072,40.49,-74.24956,40.53318,8
-74.24956,40.49,-74.2484,40.53318,9


In [6]:
from pyspark.sql.functions import col, round, floor

# Round the latitude and longitude in new_df to 6 decimal places
# new_df = arrest_data_df
new_df = arrest_data_df.withColumn("Latitude", round(col("Latitude"), 6))
new_df = arrest_data_df.withColumn("Longitude", round(col("Longitude"), 6))

# Join new_df with zone_df to assign zone_id
# The join condition checks if the latitude and longitude of new_df fall within the zone boundaries
joined_df = new_df.join(
    zone_df,
    (new_df.Latitude >= zone_df.y1) &
    (new_df.Latitude < zone_df.y2) &
    (new_df.Longitude >= zone_df.x1) &
    (new_df.Longitude < zone_df.x2),
    "left_outer"
)

# Update new_df to include the zone_id
new_df_with_zone_id = joined_df.select(new_df["*"], zone_df["zone_id"])

# new_df_with_zone_id = new_df_with_zone_id.withColumn("zone", floor(lit(col("zone_id")/num_divisions)))

# Show the updated DataFrame or process further
new_df_with_zone_id.show(10)

+-----------+-----------+---------+--------+--------------+-----------------+----------+-------+
|ARREST_DATE|ARREST_BORO|AGE_GROUP|PERP_SEX|     PERP_RACE|         Latitude| Longitude|zone_id|
+-----------+-----------+---------+--------+--------------+-----------------+----------+-------+
| 2023-03-07|          S|    25-44|       M|         WHITE|        40.511577|-74.249302|      9|
| 2023-07-26|          S|    25-44|       F|         WHITE|        40.511577|-74.249302|      9|
| 2023-07-14|          S|      <18|       M|         BLACK|40.51633659090547|-74.232823|     23|
| 2023-05-28|          S|    25-44|       F|         WHITE|        40.524733|-74.203081|     49|
| 2023-03-17|          S|      <18|       M|         WHITE|        40.527587|-74.191658|     58|
| 2023-07-31|          S|    25-44|       M|         WHITE|        40.538274|-74.210493|    542|
| 2023-02-19|          S|    25-44|       M|         WHITE|      40.55591155|-74.202339|    549|
| 2023-09-07|          S|    4

In [8]:
crime_count_per_day_zone = new_df_with_zone_id.groupBy("zone_id", "ARREST_DATE").agg(count("*").alias("daily_crimes"))

# Calculate the average crime rate per day for each zone
# This assumes the dataset spans the entire period for which you want to calculate the rate
crime_rate_per_zone = crime_count_per_day_zone.groupBy("zone_id").agg(avg("daily_crimes").alias("risk_score"))

{26: 1.0,
 558: 1.0,
 541: 1.0,
 1642: 1.0,
 1145: 1.2142857142857142,
 1127: 1.0,
 54: 1.1666666666666667,
 19: 1.25,
 1594: 1.3636363636363635,
 1077: 1.1333333333333333,
 1599: 1.3571428571428572,
 564: 1.0,
 1636: 1.0,
 588: 1.0,
 602: 1.0,
 530: 1.3333333333333333,
 22: 1.0588235294117647,
 1640: 1.125,
 1157: 1.0588235294117647,
 1152: 1.2619047619047619,
 1059: 1.0,
 7: 1.0,
 532: 1.0,
 34: 1.0,
 635: 2.3333333333333335,
 551: 1.0,
 1591: 1.5,
 529: 1.0,
 624: 1.125,
 1567: 1.0,
 570: 1.3333333333333333,
 1174: 2.0,
 50: 1.1666666666666667,
 638: 1.0,
 1110: 1.0,
 1632: 1.2592592592592593,
 1119: 1.3333333333333333,
 1606: 1.173913043478261,
 629: 1.3,
 1598: 1.2,
 1607: 1.1724137931034482,
 1571: 1.0,
 32: 1.0,
 555: 1.2857142857142858,
 1128: 1.0,
 1134: 1.1428571428571428,
 1108: 1.0,
 43: 1.0,
 1089: 1.25,
 84: 1.0,
 31: 1.0,
 1069: 1.5,
 596: 2.0,
 595: 1.6,
 39: 1.0,
 561: 1.0,
 1650: 1.3725490196078431,
 1141: 1.1428571428571428,
 1639: 1.125,
 1624: 1.1481481481481481,
 

In [32]:
crime_rate_per_zone.write.mode("overwrite").csv("zone_risk.csv")

In [47]:
zone_risk_df = spark.read.option("header", "false").csv("zone_risk.csv")
rows = zone_risk_df.collect()

# Converting rows to dictionary
zone_risk_dict = {int(row[0]): float(row[1]) for row in rows}

In [43]:
def find_zone_id(latitude, longitude):
    # Input validation
    if not (min_latitude <= latitude <= max_latitude) or not (min_longitude <= longitude <= max_longitude):
        return "Invalid latitude or longitude"

    # Calculate indexes
    lat_index = int((latitude - min_latitude) / lat_step)
    lon_index = int((longitude - min_longitude) / lon_step)

    # Handle edge cases
    if lat_index == num_divisions:
        lat_index -= 1
    if lon_index == num_divisions:
        lon_index -= 1

    # Calculate zone_id
    zone_id = lat_index * num_divisions + lon_index
    return zone_id

In [44]:
def identify_routes_risk_score(all_routes_data):
    route_object = all_routes_data

    # Function to identify zones for a route
    def identify_zones_for_route(route_coordinates):
        route_zones = []
        for coord in route_coordinates:
            zone = find_zone_id(coord['lat'], coord['long'])
            route_zones.append(zone)
        return route_zones

    route_zones_data = {}
    # Iterate through each route in the object
    for route_id, route_data in route_object.items():
        route_coordinates = route_data.get("route_coordinates", [])
        
        # Identify zones for the route coordinates
        route_zones = identify_zones_for_route(route_coordinates)

        risk_count = 0
        for zone in route_zones:
            if zone in zone_risk_dict:
                risk_count += zone_risk_dict[zone]

        t = {
            "Coordinate": route_coordinates,
            "distance": route_data.get("distance"),
            "time": route_data.get("time"),
            "risk_score": risk_count/len(route_coordinates)
        }
        route_zones_data[route_id] = t
    return route_zones_data

In [45]:
def get_all_routes_with_coordinates(api_key, origin, destination):
    # Initialize the Google Maps API client
    gmaps = googlemaps.Client(key=api_key)

    # Make the directions API request
    directions_result = gmaps.directions(origin, destination, mode="walking", alternatives=True)

    # Extract and format information about each route
    all_routes_data = {}
    for i, route in enumerate(directions_result):
        route_data = {
            "distance": route['legs'][0]['distance']['text'],
            "time": route['legs'][0]['duration']['text'],
            "route_coordinates": []
        }
        count = 0
        for step in route['legs'][0]['steps']:
            if count == 0:
                start_location = step['start_location']
                route_data["route_coordinates"].append({
                    "lat": start_location['lat'],
                    "long": start_location['lng']
                })
                count += 1
            end_location = step['end_location']
            route_data["route_coordinates"].append({
                "lat": end_location['lat'],
                "long": end_location['lng']
            })

        all_routes_data[str(i)] = route_data

    return all_routes_data

In [46]:
with open("api_key.txt", 'r') as file:
    api_key = file.readline()
source_address = "2975 Richmond Ave, Staten Island, NY 10314"
destination_address = "2505 Richmond Ave, Staten Island, NY 10314"

gmaps = googlemaps.Client(key=api_key)

# Make the directions API request
gmaps.directions(source_address, destination_address, mode="walking", alternatives=True)
all_routes_data = get_all_routes_with_coordinates(api_key, source_address, destination_address)
identify_routes_risk_score(all_routes_data)

{'0': {'Coordinate': [{'lat': 40.57260429999999, 'long': -74.1693918},
   {'lat': 40.5737799, 'long': -74.1679547},
   {'lat': 40.5738709, 'long': -74.1680732},
   {'lat': 40.5757573, 'long': -74.1668808},
   {'lat': 40.5767042, 'long': -74.1687887},
   {'lat': 40.5781927, 'long': -74.16750100000002},
   {'lat': 40.5783002, 'long': -74.1677007},
   {'lat': 40.5783843, 'long': -74.16763399999999},
   {'lat': 40.5792365, 'long': -74.169243},
   {'lat': 40.5826534, 'long': -74.1687644},
   {'lat': 40.5825012, 'long': -74.1687882},
   {'lat': 40.5834517, 'long': -74.1675064},
   {'lat': 40.583406, 'long': -74.1673015},
   {'lat': 40.5855083, 'long': -74.1664566},
   {'lat': 40.5857842, 'long': -74.16737429999999},
   {'lat': 40.58606959999999, 'long': -74.16715479999999},
   {'lat': 40.5863973, 'long': -74.16769049999999}],
  'distance': '1.3 mi',
  'time': '29 mins',
  'risk_score': 1.2693977591036414},
 '1': {'Coordinate': [{'lat': 40.57260429999999, 'long': -74.1693918},
   {'lat': 40.5