In [1]:
from road_network import fetch_road_network, extract_road_segments_DF
from accidents_montreal import fetch_accidents_montreal, extract_accidents_montreal_dataframe
from weather import get_weather
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import pow, col, min, udf, monotonically_increasing_id, sin, cos, radians
from pyspark.sql.functions import atan2, sqrt, rank, hash, abs, row_number
from pyspark.sql.types import StringType
from math import fabs as mabs
import sys
import os

In [2]:
def init_spark():
    spark = (SparkSession
        .builder
        .appName("Accident Prediction")
        .getOrCreate())
    return spark
spark = init_spark()

In [3]:
def degree_to_DMS(lat, long):
    return (f"{int(mabs(lat))}°{int(mabs(lat)%1*60):.0f}'{(60*(((mabs(lat)%1)*60)%1)):.2f}\"N " +
        f"{int(mabs(long))}°{int(mabs(long)%1*60):.0f}'{(60*(((mabs(long)%1)*60)%1)):.2f}\"W")
print(degree_to_DMS(45.541695,-73.61023))
lat = -73.61023

45°32'30.10"N 73°36'36.83"W


In [4]:
fetch_road_network()
rnd = extract_road_segments_DF(spark)

Skip fetching road network: already downloaded
Extracting road network dataframe...
Extracting road network dataframe done


In [5]:
fetch_accidents_montreal()
amd = extract_accidents_montreal_dataframe(spark)

Skip fetching montreal accidents dataset: already downloaded
Skip extraction of accidents montreal dataframe: already done, reading from file


In [6]:
road_centers = (rnd
                .select(['street_id', 'street_name', 'street_type', 'center_long', 'center_lat'])
                .drop_duplicates()
                .persist())
sample_accidents = amd.sample(0.001)
print(f"Selected sample of {road_centers.count()} roads")
print(f"Selected sample of {sample_accidents.count()} accidents")

Selected sample of 95159 roads
Selected sample of 175 accidents


In [7]:
# Source: https://www.movable-type.co.uk/scripts/latlong.html
earth_diameter = 6371 * 2 * 1000 # in meter
def distance_inter(lat1, long1, lat2, long2):
    return (pow(sin(radians(col(lat1) - col(lat2))/2), 2)
            + (pow(sin(radians(col(long1) - col(long2))/2), 2) 
                * cos(radians(col(lat1))) * cos(radians(col(lat2)))))
distance_measure = atan2(sqrt(col('distance_inter')), sqrt(1-col('distance_inter')))

# Parameters

In [20]:
nb_top_road_center_preselected = 5
max_distance_accepted = 10 # in meters

In [16]:
degree_to_DMS_UDF = udf(degree_to_DMS, StringType())

accidentWindow = Window.partitionBy("ACCIDENT_ID").orderBy("distance_measure")
accidents_top_k_roads = (sample_accidents
        .select('LOC_LAT', 'LOC_LONG', 'ACCIDENT_ID')
        .crossJoin(road_centers)
        .withColumn('distance_inter', distance_inter('LOC_LAT', 'LOC_LONG', 'center_lat', 'center_long'))
        .withColumn('distance_measure', distance_measure)
        .select('ACCIDENT_ID', 'street_id', 'distance_measure', 'LOC_LAT', 'LOC_LONG',
               rank().over(accidentWindow).alias('distance_rank'))
        .filter(col('distance_rank') <= nb_top_road_center_preselected)
#       .withColumn('distance', col('distance_measure')*earth_diameter)
        .drop('min_distance_measure', 'distance_measure', 'distance_rank')
        .persist())
accidents_top_k_roads.show()

+-------------+----------+---------+---------+
|  ACCIDENT_ID| street_id|  LOC_LAT| LOC_LONG|
+-------------+----------+---------+---------+
| 506806141671| 756028726| 45.58156| -73.6192|
| 506806141671|1375829732| 45.58156| -73.6192|
| 506806141671|2120002287| 45.58156| -73.6192|
| 506806141671|1192248106| 45.58156| -73.6192|
| 506806141671| 678165049| 45.58156| -73.6192|
|1142461300978|1330082358|45.451168|-73.59326|
|1142461300978|2051336403|45.451168|-73.59326|
|1142461300978|1480699038|45.451168|-73.59326|
|1142461300978|1375595716|45.451168|-73.59326|
|1142461300978| 666270826|45.451168|-73.59326|
| 944892805631| 657104228|45.476112| -73.5736|
| 944892805631|1035785479|45.476112| -73.5736|
| 944892805631|1187887361|45.476112| -73.5736|
| 944892805631| 474660343|45.476112| -73.5736|
| 944892805631| 594791159|45.476112| -73.5736|
|1709396984198| 714046868|45.559834|-73.57141|
|1709396984198|2103005111|45.559834|-73.57141|
|1709396984198| 434166247|45.559834|-73.57141|
|170939698419

In [33]:
accidents_roads_first_match = (accidents_top_k_roads
        .join(rnd, 'street_id')
        .withColumn('distance_inter', distance_inter('LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long'))
        .withColumn('distance_measure', distance_measure)
        .select('ACCIDENT_ID', 'LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long', 'street_id', 'street_name', 
                row_number().over(accidentWindow).alias('distance_rank'), 'distance_measure')
        .filter(col('distance_rank') == 1)
        .withColumn('distance', col('distance_measure')*earth_diameter)
        .drop('distance_rank', 'distance_measure', 'LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long')
        .persist())
accidents_road_correct_match = (accidents_roads_first_match.filter(col('distance') < max_distance_accepted))
accidents_road_corrected_match = (accidents_roads_first_match
        .filter(col('distance') >= max_distance_accepted)
        .select('ACCIDENT_ID')
        .join(accidents_top_k_roads, 'ACCIDENT_ID')
        .join(rnd, 'street_id')
        .select('ACCIDENT_ID', 'street_id', 'LOC_LAT', 'LOC_LONG', 'coord_long', 'coord_lat')
#         .withColumn('LOC', degree_to_DMS_UDF(col('LOC_LAT'), col('LOC_LONG')))
#         .withColumn('center', degree_to_DMS_UDF(col('center_lat'), col('center_long')))
#         .withColumn('coord', degree_to_DMS_UDF(col('coord_lat'), col('coord_long')))
#         .drop('LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long', 'center_lat', 'center_long')
        .drop(col('noop')))

accidents_road_corrected_match.show(truncate=False)

+-------------+----------+---------+---------+-----------+----------+
|ACCIDENT_ID  |street_id |LOC_LAT  |LOC_LONG |coord_long |coord_lat |
+-------------+----------+---------+---------+-----------+----------+
|506806141670 |686645377 |45.538074|-73.58583|-73.5860001|45.5371394|
|506806141670 |686645377 |45.538074|-73.58583|-73.5857124|45.5374471|
|506806141670 |686645377 |45.538074|-73.58583|-73.5852977|45.5378872|
|506806141670 |686645377 |45.538074|-73.58583|-73.5856495|45.5375139|
|1606317769284|915001323 |45.54867 |-73.57905|-73.5787104|45.5479203|
|1606317769284|915001323 |45.54867 |-73.57905|-73.5783611|45.5484578|
|1606317769284|915001323 |45.54867 |-73.57905|-73.5785358|45.5481891|
|283467842096 |173609494 |45.60073 |-73.61335|-73.6120329|45.5991349|
|283467842096 |173609494 |45.60073 |-73.61335|-73.6123441|45.5995802|
|283467842096 |173609494 |45.60073 |-73.61335|-73.6127443|45.6000455|
|283467842096 |173609494 |45.60073 |-73.61335|-73.6129728|45.6002572|
|283467842096 |17360

# Test of the distance measure

In [9]:
import pyspark.sql.functions as f
from pyspark.sql import Window
import pandas as pd

# First example distance between identical points, and second example between London and Arlington 
DF = pd.DataFrame({'lat1': [0, 51.5],
                   'long1': [0, 0],
                   'lat2': [0, 38.8],
                   'long2': [0, -77.1]
                  })
df = (spark.createDataFrame(DF)
        .withColumn('distance_inter', distance_inter('lat1', 'long1', 'lat2', 'long2'))
        .withColumn('distance_measure', distance_measure)
        .withColumn('distance', col('distance_measure') * earth_diameter)
         .select('distance'))
df.show()

+-----------------+
|         distance|
+-----------------+
|              0.0|
|5918185.064088763|
+-----------------+



# Test of the top k selection

In [17]:
import pyspark.sql.functions as f
from pyspark.sql import Window
import pandas as pd

k = 2
DF = pd.DataFrame({'a': [1,1,1,2,2,2,3,3,3],
                   'b': [1,2,3,1,2,3,1,2,3],
                   'c': [3,2,1,4,5,6,7,8,9]
                  })

df = spark.createDataFrame(DF)

window = Window.partitionBy("a").orderBy("c")

df.select('a', 'b', 'c', f.rank().over(window).alias('y')).filter(col('y') <= k).show()

+---+---+---+---+
|  a|  b|  c|  y|
+---+---+---+---+
|  1|  3|  1|  1|
|  1|  2|  2|  2|
|  3|  1|  7|  1|
|  3|  2|  8|  2|
|  2|  1|  4|  1|
|  2|  2|  5|  2|
+---+---+---+---+

