In [2]:
import os
import pyspark
conf = pyspark.SparkConf()

# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')
conf.set('spark.sql.repl.eagerEval.enabled', True) # enabled for debuggig
conf.set('spark.driver.memory','12g')
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)



In [3]:
from pyspark.sql.functions import col, to_date, when, concat, lit, count, avg
from pyspark.sql.types import DateType, StructType, StructField, DoubleType, StringType
from itertools import product
from pyspark.sql import functions as F
import requests
import googlemaps
import json
import requests
import re
from flask import Flask, request, jsonify

In [4]:
arrest_data_df = spark.read.option("header", "true").csv("/content/NYPD_ARREST_DATA.csv")
arrest_data_df = arrest_data_df.withColumn("ARREST_DATE", to_date(arrest_data_df["ARREST_DATE"], "MM/dd/yyyy").cast(DateType()))
arrest_data_df = arrest_data_df \
    .select(col("ARREST_DATE"), col("ARREST_BORO"), col("AGE_GROUP"), col("PERP_SEX"), col("PERP_RACE"), col("Latitude"), col("Longitude"))

arrest_data_df = arrest_data_df.dropna()
print(arrest_data_df.count())
arrest_data_df

5100805


ARREST_DATE,ARREST_BORO,AGE_GROUP,PERP_SEX,PERP_RACE,Latitude,Longitude
2019-01-26,M,45-64,M,BLACK,40.800694331000045,-73.94110928599997
2016-01-06,K,25-44,M,BLACK,40.64865008500004,-73.95033556299995
2018-11-15,K,25-44,M,BLACK,40.67458330800008,-73.93022154099998
2006-09-13,K,45-64,M,BLACK,40.67125445700003,-73.926713851
2018-10-24,M,45-64,M,WHITE HISPANIC,40.716195914000025,-73.99749074599998
2019-04-23,K,45-64,M,BLACK HISPANIC,40.67458330800008,-73.93022154099998
2019-05-04,B,25-44,M,BLACK,40.883382579000056,-73.90233330899997
2019-05-23,M,45-64,F,BLACK,40.81485028900005,-73.95668184799996
2018-10-23,M,25-44,M,WHITE HISPANIC,40.76443435900006,-73.988265529
2018-11-10,M,25-44,M,WHITE HISPANIC,40.72025522300004,-74.00709027999993


In [5]:
# Define the new boundaries according to data
min_latitude, max_latitude = 40.49, 62.08
min_longitude, max_longitude = -74.26, -73.68

# Number of divisions along each axis to create 25,000,000 zones (5000x5000)
num_divisions = 5000

lat_step = (max_latitude - min_latitude) / num_divisions
lon_step = (max_longitude - min_longitude) / num_divisions

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def find_zone_id(latitude, longitude):
    # Input validation
    if not (min_latitude <= latitude <= max_latitude) or not (min_longitude <= longitude <= max_longitude):
        return "Invalid latitude or longitude"

    # Calculate indexes
    lat_index = int((latitude - min_latitude) / lat_step)
    lon_index = int((longitude - min_longitude) / lon_step)

    # Handle edge cases
    if lat_index == num_divisions:
        lat_index -= 1
    if lon_index == num_divisions:
        lon_index -= 1

    # Calculate zone_id
    zone_id = lat_index * num_divisions + lon_index
    return zone_id

find_zone_id_udf = udf(find_zone_id, IntegerType())

In [7]:
from pyspark.sql.functions import col, round, floor

new_df = arrest_data_df.withColumn("Latitude", round(col("Latitude"), 6)) \
                      .withColumn("Longitude", round(col("Longitude"), 6))

new_df_with_zone_id = new_df.withColumn('zone_id', find_zone_id_udf(col("Latitude"), col("Longitude")))

# Show the updated DataFrame or process further
new_df_with_zone_id.show(10)

+-----------+-----------+---------+--------+--------------+---------+----------+-------+
|ARREST_DATE|ARREST_BORO|AGE_GROUP|PERP_SEX|     PERP_RACE| Latitude| Longitude|zone_id|
+-----------+-----------+---------+--------+--------------+---------+----------+-------+
| 2019-01-26|          M|    45-64|       M|         BLACK|40.800694|-73.941109| 357749|
| 2016-01-06|          K|    25-44|       M|         BLACK| 40.64865|-73.950336| 182669|
| 2018-11-15|          K|    25-44|       M|         BLACK|40.674583|-73.930222| 212842|
| 2006-09-13|          K|    45-64|       M|         BLACK|40.671254|-73.926714| 207873|
| 2018-10-24|          M|    45-64|       M|WHITE HISPANIC|40.716196|-73.997491| 262263|
| 2019-04-23|          K|    45-64|       M|BLACK HISPANIC|40.674583|-73.930222| 212842|
| 2019-05-04|          B|    25-44|       M|         BLACK|40.883383|-73.902333| 458083|
| 2019-05-23|          M|    45-64|       F|         BLACK| 40.81485|-73.956682| 377614|
| 2018-10-23|        

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def find_zone_id(latitude, longitude):
    # Input validation
    if not (min_latitude <= latitude <= max_latitude) or not (min_longitude <= longitude <= max_longitude):
        return "Invalid latitude or longitude"

    # Calculate indexes
    lat_index = int((latitude - min_latitude) / lat_step)
    lon_index = int((longitude - min_longitude) / lon_step)

    # Handle edge cases
    if lat_index == num_divisions:
        lat_index -= 1
    if lon_index == num_divisions:
        lon_index -= 1

    # Calculate zone_id
    zone_id = lat_index * num_divisions + lon_index
    return zone_id

find_zone_id_udf = udf(find_zone_id, IntegerType())

In [9]:
crime_count_per_day_zone = new_df_with_zone_id.groupBy("zone_id", "ARREST_DATE").agg(count("*").alias("daily_crimes"))

crime_rate_per_zone = crime_count_per_day_zone.groupBy("zone_id").agg(avg("daily_crimes").alias("risk_score"))
crime_rate_per_zone.dropna()

zone_id,risk_score
417823,1.195945945945946
197258,1.234866828087167
249022,1.1428571428571428
277349,1.3571428571428572
438279,1.4150641025641026
438386,1.46078431372549
413073,1.2421052631578948
403389,1.2300884955752212
269295,1.0
237337,1.0754716981132075


In [10]:
crime_rate_per_zone.write.mode("overwrite").csv("ZONE_RISK.csv")

In [13]:
zone_risk_df = spark.read.option("header", "false").csv("/content/ZONE_RISK.csv")
zone_risk_df = zone_risk_df.dropna()
rows = zone_risk_df.collect()

# Converting rows to dictionary
zone_risk_dict = {int(row[0]): float(row[1]) for row in rows}
zone_risk_dict

{417823: 1.195945945945946,
 197258: 1.234866828087167,
 249022: 1.1428571428571428,
 277349: 1.3571428571428572,
 438279: 1.4150641025641026,
 438386: 1.4607843137254901,
 413073: 1.2421052631578948,
 403389: 1.2300884955752212,
 269295: 1.0,
 237337: 1.0754716981132075,
 312801: 1.2682926829268293,
 237810: 1.193798449612403,
 118989: 1.075,
 262389: 1.4055829228243022,
 218049: 1.2830188679245282,
 177496: 1.394736842105263,
 102524: 1.1754385964912282,
 242387: 1.0,
 313148: 1.625,
 282396: 1.2696078431372548,
 382774: 1.3772321428571428,
 378053: 1.2513089005235603,
 208595: 1.0666666666666667,
 217119: 1.3703703703703705,
 352731: 1.421487603305785,
 218175: 1.2465986394557824,
 372890: 1.3148148148148149,
 167071: 1.0925925925925926,
 198038: 1.125984251968504,
 377793: 1.2537313432835822,
 397863: 1.4341317365269461,
 198173: 1.1224489795918366,
 292608: 1.25,
 413791: 1.2857142857142858,
 423378: 1.2439024390243902,
 192988: 1.15,
 268622: 1.1710526315789473,
 213312: 1.184210

In [14]:
def identify_routes_risk_score(all_routes_data):
    route_object = all_routes_data

    # Function to identify zones for a route
    def identify_zones_for_route(route_coordinates):
        route_zones = []
        for coord in route_coordinates:
            zone = find_zone_id(coord['lat'], coord['long'])
            route_zones.append(zone)
        return route_zones

    route_zones_data = {}
    # Iterate through each route in the object
    for route_id, route_data in route_object.items():
        route_coordinates = route_data.get("route_coordinates", [])

        # Identify zones for the route coordinates
        route_zones = identify_zones_for_route(route_coordinates)

        risk_count = 0
        for zone in route_zones:
            if zone in zone_risk_dict:
                risk_count += zone_risk_dict[zone]

        t = {
            "Coordinate": route_coordinates,
            "distance": route_data.get("distance"),
            "time": route_data.get("time"),
            "risk_score": risk_count/len(route_coordinates)
        }
        route_zones_data[route_id] = t
    return route_zones_data

In [15]:
def get_lat_lng(api_key, address):
    # Construct the Geocoding API URL
    geocode_url = "https://maps.gomaps.pro/maps/api/geocode/json"
    params = {
        "address": address,
        "key": api_key
    }

    # Make the API request
    response = requests.get(geocode_url, params=params)

    # Check for a successful response
    if response.status_code != 200:
        raise Exception(f"Error geocoding address: {response.status_code}, {response.text}")

    # Parse the JSON response
    geocode_data = response.json()

    # Extract latitude and longitude
    if "results" in geocode_data and geocode_data["results"]:
        location = geocode_data["results"][0]["geometry"]["location"]
        return location["lat"], location["lng"]
    else:
        raise Exception("No results found for the provided address")


In [16]:
def get_all_routes_with_coordinates(api_key, origin_address, destination_address):
    # Convert addresses to latitude and longitude
    origin_lat, origin_lng = get_lat_lng(api_key, origin_address)
    destination_lat, destination_lng = get_lat_lng(api_key, destination_address )

    # Construct the Directions API URL
    base_url = "https://maps.gomaps.pro/maps/api/directions/json"
    params = {
        "origin": f"{origin_lat},{origin_lng}",
        "destination": f"{destination_lat},{destination_lng}",
        "mode": "walking",
        "alternatives": "true",
        "key": api_key
    }

    # Make the API request
    response = requests.get(base_url, params=params)

    # Debugging: Check response status
    if response.status_code != 200:
        raise Exception(f"Error fetching directions: {response.status_code}, {response.text}")
    # Parse the JSON response
    try:
        directions_result = response.json()  # Convert response to JSON
    except ValueError:
        raise Exception("Error parsing JSON response from Gomaps Directions API")

    # Ensure routes are available
    if "routes" not in directions_result:
        raise Exception("No routes found in the Directions API response")

   # Function to clean HTML tags
    def clean_html(instruction):
        # Remove all HTML tags using regex
        clean_instruction = re.sub(r'<[^>]*>', '', instruction)
        return clean_instruction.strip()

    all_routes_steps = {}

    # Iterate through all available routes
    for route_idx, route in enumerate(directions_result.get("routes", [])):
        steps = route["legs"][0]["steps"]
        total_distance = route["legs"][0]["distance"]["text"]
        total_duration = route["legs"][0]["duration"]["text"]
        route_steps = []

        # Add total distance and total time for the route
        route_steps.append(f"Total Distance: {total_distance}")
        route_steps.append(f"Total Duration: {total_duration}")

        route_coordinates = []  # To store coordinates for risk calculation

        for step in steps:
            instruction = step.get("html_instructions", "No instruction provided.")
            distance = step["distance"]["text"]
            duration = step["duration"]["text"]

            # Clean the HTML from the instruction
            clean_instruction = clean_html(instruction)

            # Extract latitude and longitude for this step
            end_location = step["end_location"]
            lat = end_location["lat"]
            lng = end_location["lng"]

            # Store the coordinates
            route_coordinates.append({"lat": lat, "long": lng})

            # Format the step with clean instructions
            step_text = f"{clean_instruction} ({distance}, {duration})"
            route_steps.append(step_text)

        # Store all formatted steps for the route along with coordinates

        all_routes_steps[str(route_idx)] = {
            "steps": route_steps,
            "route_coordinates": route_coordinates,
            "total_distance": total_distance,
            "total_duration": total_duration
        }

    return all_routes_steps

In [18]:
with open("api_key.txt", 'r') as file:
    api_key = file.readline()
source_address = "70 Washington Square S, New York, NY 10012"
destination_address = "6 MetroTech Center, Brooklyn, NY 11201"

all_routes_data = get_all_routes_with_coordinates(api_key, source_address, destination_address)
route_risk_scores = identify_routes_risk_score(all_routes_data)

for route_id, route_info in route_risk_scores.items():
    print(f"Route {route_id}:")

    print(f"Risk Score: {route_info.get('risk_score', 'Unknown')}")

    for step in all_routes_data[route_id]['steps']:
        print(step)

    print("\n")


Route 0:
Risk Score: 1.0694396595033464
Total Distance: 2.8 mi
Total Duration: 1 hour 5 mins
Head southeast on Washington Square S toward Schwartz Plaza (128 ft, 1 min)
Continue onto West 4th Street (0.1 mi, 3 mins)
At Duane Reade, continue onto E 4th St (0.2 mi, 4 mins)
Turn right onto BoweryPass by Bank of America Financial Center (on the right in 0.7 mi) (0.8 mi, 20 mins)
Slight left toward Manhattan Bridge Pedestrian Path (154 ft, 1 min)
Turn left onto Manhattan Bridge Pedestrian Path (1.2 mi, 28 mins)
Slight right to stay on Manhattan Bridge Pedestrian Path (308 ft, 1 min)
Turn right toward Jay St (56 ft, 1 min)
Sharp left onto Jay StParts of this road are closed Mon–Fri 7:00 AM – 7:00 PMDestination will be on the left (0.3 mi, 7 mins)


Route 1:
Risk Score: 0.9836122564220768
Total Distance: 3.1 mi
Total Duration: 1 hour 12 mins
Head southeast on Washington Square S toward Schwartz Plaza (128 ft, 1 min)
Continue onto West 4th Street (499 ft, 2 mins)
Turn right onto Mercer St (0.7