In [1]:
from road_network import fetch_road_network, extract_road_segments_DF
from accidents_montreal import fetch_accidents_montreal, extract_accidents_montreal_dataframe
from weather import get_weather
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import pow, col, min, udf, monotonically_increasing_id, sin, cos, radians
from pyspark.sql.functions import atan2, sqrt, rank, hash, abs, row_number, avg, conv, rint
from pyspark.sql.types import StringType
from math import fabs as mabs
import sys
import os

In [2]:
def init_spark():
    spark = (SparkSession
        .builder
        .appName("Accident Prediction")
        .getOrCreate())
    return spark
spark = init_spark()

In [3]:
def degree_to_DMS(lat, long):
    return (f"{int(mabs(lat))}°{int(mabs(lat)%1*60):.0f}'{(60*(((mabs(lat)%1)*60)%1)):.2f}\"N " +
        f"{int(mabs(long))}°{int(mabs(long)%1*60):.0f}'{(60*(((mabs(long)%1)*60)%1)):.2f}\"W")
print(degree_to_DMS(45.541695,-73.61023))
lat = -73.61023

45°32'30.10"N 73°36'36.83"W


In [4]:
fetch_road_network()
rnd = extract_road_segments_DF(spark)

Skip fetching road network: already downloaded
Skip extraction of road network dataframe: already done, reading from file


In [5]:
fetch_accidents_montreal()
amd = extract_accidents_montreal_dataframe(spark)

Skip fetching montreal accidents dataset: already downloaded
Skip extraction of accidents montreal dataframe: already done, reading from file


In [6]:
road_centers = (rnd
                .select(['street_id', 'street_name', 'street_type', 'center_long', 'center_lat'])
                .drop_duplicates()
                .persist())
sample_accidents = amd.sample(0.001)
print(f"Selected sample of {road_centers.count()} roads")
print(f"Selected sample of {sample_accidents.count()} accidents")

Selected sample of 95159 roads
Selected sample of 176 accidents


# Test preprocess function

In [10]:
from preprocess import match_accidents_with_roads

matches = match_accidents_with_roads(rnd, sample_accidents)
matches.show()
print(sample_accidents.count())
print(matches.count())

+-------------+----------+
|  ACCIDENT_ID| street_id|
+-------------+----------+
|1391569404362| 541424963|
| 670014898363| 845059405|
|1709396984529| 748834309|
|1005022347993|1908015220|
|  25769803871|1787623946|
|1494648619236|1429650182|
|1382979469585| 143884344|
| 506806141269|1138308306|
| 214748365481| 114209011|
| 910533067178| 372260410|
|1503238553709|1118321050|
|1348619730956|2038544023|
| 343597383806| 112208452|
| 412316860470| 996478651|
| 858993459314|2088518077|
|  25769804479|1067710633|
|1443109011489|1188637550|
|1400159339160| 801610511|
| 730144440322| 455627072|
|1666447311316| 313702200|
+-------------+----------+
only showing top 20 rows

176
176


# End test

In [18]:
# Source: https://www.movable-type.co.uk/scripts/latlong.html
earth_diameter = 6371 * 2 * 1000 # in meter
def distance_intermediate_formul(lat1, long1, lat2, long2):
    return (pow(sin(radians(col(lat1) - col(lat2))/2), 2)
            + (pow(sin(radians(col(long1) - col(long2))/2), 2) 
                * cos(radians(col(lat1))) * cos(radians(col(lat2)))))
distance_measure = atan2(sqrt(col('distance_inter')), sqrt(1-col('distance_inter')))

# Parameters

In [19]:
nb_top_road_center_preselected = 5
max_distance_accepted = 10 # in meters

In [20]:
degree_to_DMS_UDF = udf(degree_to_DMS, StringType())

# Compute distance between accident and road centers to identify the top nb_top_road_center_preselected closest roads
accidentWindow = Window.partitionBy("ACCIDENT_ID").orderBy("distance_measure")
accidents_top_k_roads = (sample_accidents
        .select('LOC_LAT', 'LOC_LONG', 'ACCIDENT_ID')
        .crossJoin(road_centers)
        .withColumn('distance_inter', distance_inter('LOC_LAT', 'LOC_LONG', 'center_lat', 'center_long'))
        .withColumn('distance_measure', distance_measure)
        .select('ACCIDENT_ID', 'street_id', 'distance_measure', 'LOC_LAT', 'LOC_LONG',
               rank().over(accidentWindow).alias('distance_rank'))
        .filter(col('distance_rank') <= nb_top_road_center_preselected)
        .drop('distance_measure', 'distance_rank')
        .persist())
accidents_top_k_roads.show()

+-------------+----------+---------+----------+
|  ACCIDENT_ID| street_id|  LOC_LAT|  LOC_LONG|
+-------------+----------+---------+----------+
| 549755813928|1648199549| 45.49889| -73.56224|
| 549755813928|1575796321| 45.49889| -73.56224|
| 549755813928|1311753815| 45.49889| -73.56224|
| 549755813928|1947818980| 45.49889| -73.56224|
| 549755813928| 716870755| 45.49889| -73.56224|
| 652835029049|1000643935|45.499893| -73.56596|
| 652835029049|1928234879|45.499893| -73.56596|
| 652835029049|    922576|45.499893| -73.56596|
| 652835029049|1648726532|45.499893| -73.56596|
| 652835029049|1170194706|45.499893| -73.56596|
|1142461301222|2055454547| 45.47162|-73.613754|
|1142461301222|1765955708| 45.47162|-73.613754|
|1142461301222| 334197242| 45.47162|-73.613754|
|1142461301222| 806718475| 45.47162|-73.613754|
|1142461301222|1228812616| 45.47162|-73.613754|
| 592705487425| 658190714| 45.49538| -73.56088|
| 592705487425| 573124083| 45.49538| -73.56088|
| 592705487425|1957202794| 45.49538| -73

In [86]:
# For each accident identify road point closest 
accidents_roads_first_match = (accidents_top_k_roads
        .join(rnd, 'street_id')
        .withColumn('distance_inter', distance_inter('LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long'))
        .withColumn('distance_measure', distance_measure)
        .select('ACCIDENT_ID', 'LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long', 'street_id', 'street_name', 
                row_number().over(accidentWindow).alias('distance_rank'), 'distance_measure')
        .filter(col('distance_rank') == 1)
        .withColumn('distance', col('distance_measure')*earth_diameter)
        .drop('distance_rank', 'distance_measure', 'LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long')
        .persist())

# If the distance is lower than max_distance_accepted we keep the accident/street matches
accidents_road_correct_match = (accidents_roads_first_match
                                .filter(col('distance') < max_distance_accepted)
                                .select('ACCIDENT_ID', 'street_id'))

# If not, we try to get a better match by adding intermediate points on the preselected streets

# For unsatisfying matches, retrieves the top nb_top_road_center_preselected streets with their points 
accidents_close_streets_coords = (accidents_roads_first_match
        .filter(col('distance') >= max_distance_accepted)
        .select('ACCIDENT_ID')
        .join(accidents_top_k_roads, 'ACCIDENT_ID')
        .join(rnd, 'street_id')
        .select('ACCIDENT_ID', 'street_id', 'LOC_LAT', 'LOC_LONG', 'coord_long', 'coord_lat')
        .persist())

# Add the intermediate points
street_rolling_window = Window.partitionBy('street_id').orderBy("coord_long").rowsBetween(0, +1)
accidents_close_streets_with_additional_coords = \
    (accidents_close_streets_coords
     .select('ACCIDENT_ID', 'street_id', 'LOC_LAT', 'LOC_LONG',
             avg('coord_long').over(street_rolling_window).alias('coord_long'),
             avg('coord_lat').over(street_rolling_window).alias('coord_lat'))
     .union(accidents_close_streets_coords)
     .dropDuplicates())

# Recompute distances between accident and new set of points and use closest point to identify street
accidents_roads_first_match_with_additional_coords = \
    (accidents_close_streets_with_additional_coords
     .withColumn('distance_inter', distance_inter('LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long'))
     .withColumn('distance_measure', distance_measure)
     .select('ACCIDENT_ID', 'street_id', 'LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long', 
                row_number().over(accidentWindow).alias('distance_rank'))
     .filter(col('distance_rank') == 1)
     .drop('distance_rank', 'LOC_LAT', 'LOC_LONG', 'coord_lat', 'coord_long'))

# Union accidents matched correctly with first method with the accidents for which we used more street points
accidents_road_correct_final_match =(accidents_road_correct_match
                                     .union(accidents_roads_first_match_with_additional_coords))

accidents_road_correct_final_match.show()
accidents_road_correct_final_match.count()

+-------------+----------+
|  ACCIDENT_ID| street_id|
+-------------+----------+
| 549755813928|1648199549|
| 652835029049|1000643935|
|1142461301222|2055454547|
| 592705487425| 658190714|
| 609885356553|1926341698|
|1340029796354|1645977632|
|1511828488243|1121234898|
|          677| 683515792|
| 962072674896|1695987729|
| 214748365007| 152730429|
|1322849927827|1258876348|
| 523986010346|1613469741|
| 816043786746|1824439941|
|1279900254698| 549880162|
| 171798692143|1400054421|
| 300647711308| 492773211|
| 850403525297| 681483415|
| 386547057013|1101431915|
| 876173328859|1805128626|
|1563368096346|1399926608|
+-------------+----------+
only showing top 20 rows



144

# Comparaison of new results with previous results

In [77]:
print('Distance still high')
(accidents_roads_first_match
 .filter(col('distance') >= max_distance_accepted)
 .withColumnRenamed('distance', 'old_distance')
  .withColumnRenamed('street_name', 'old_street_name')
   .withColumnRenamed('street_id', 'old_street_id')
 .join(accidents_roads_first_match_with_additional_coords, 'ACCIDENT_ID')
 .join(sample_accidents, 'ACCIDENT_ID')
 .withColumn('LOC', degree_to_DMS_UDF(col('LOC_LAT'), col('LOC_LONG')))
 .select('LOC', rint(col('old_distance')).alias('old_distance'), rint(col('distance')).alias('distance'), 'old_street_name', 'street_name')#, 'old_street_id', 'street_id')
 .filter(col('distance') > 20)
 .show(truncate=False))
print('Street changed: ')
(accidents_roads_first_match
 .filter(col('distance') >= max_distance_accepted)
 .withColumnRenamed('distance', 'old_distance')
  .withColumnRenamed('street_name', 'old_street_name')
   .withColumnRenamed('street_id', 'old_street_id')
 .join(accidents_roads_first_match_with_additional_coords, 'ACCIDENT_ID')
 .join(sample_accidents, 'ACCIDENT_ID')
 .withColumn('LOC', degree_to_DMS_UDF(col('LOC_LAT'), col('LOC_LONG')))
 .select('LOC', rint(col('old_distance')).alias('old_distance'), rint(col('distance')).alias('distance'), 'old_street_name', 'street_name', 'street_id', 'old_street_id')#, 'old_street_id', 'street_id')
 .filter(col('old_street_id') != col('street_id'))
 .drop('street_id', 'old_street_id')
 .show(truncate=False))
print(f'Number of detected high distance: {accidents_roads_first_match.filter(col("distance") >= max_distance_accepted).count()}')

Distance still high
+---------------------------+------------+--------+-----------------------+-----------------------+
|LOC                        |old_distance|distance|old_street_name        |street_name            |
+---------------------------+------------+--------+-----------------------+-----------------------+
|45°27'29.10"N 73°46'49.95"W|73.0        |56.0    |Avenue Deslauriers     |Avenue Deslauriers     |
|45°32'56.36"N 73°39'59.50"W|22.0        |22.0    |Boulevard Saint-Laurent|Boulevard Saint-Laurent|
|45°33'22.97"N 73°33'37.34"W|40.0        |40.0    |Boulevard Pie-IX       |Boulevard Pie-IX       |
+---------------------------+------------+--------+-----------------------+-----------------------+

Street changed: 
+---------------------------+------------+--------+------------------------------+--------------------+
|LOC                        |old_distance|distance|old_street_name               |street_name         |
+---------------------------+------------+--------+---

# Point in middle formula

In order to get the midpoint between to GPS coordinates, we would need to make some advance calculation again, but we don't necessarily want the midpoint but just one point between the two points corresponding to the GPS coordinates. Therefore we can just use the point (lat1+lat2)/2 (long1+long2)/2.

# Test of the distance measure

In [11]:
import pyspark.sql.functions as f
from pyspark.sql import Window
import pandas as pd

# First example distance between identical points, and second example between London and Arlington 
DF = pd.DataFrame({'lat1': [0, 51.5],
                   'long1': [0, 0],
                   'lat2': [0, 38.8],
                   'long2': [0, -77.1]
                  })
df = (spark.createDataFrame(DF)
        .withColumn('distance_inter', distance_inter('lat1', 'long1', 'lat2', 'long2'))
        .withColumn('distance_measure', distance_measure)
        .withColumn('distance', col('distance_measure') * earth_diameter)
         .select('distance'))
df.show()

+-----------------+
|         distance|
+-----------------+
|              0.0|
|5918185.064088763|
+-----------------+



# Test of the top k selection

In [12]:
import pyspark.sql.functions as f

k = 2
DF = pd.DataFrame({'a': [1,1,1,2,2,2,3,3,3],
                   'b': [1,2,3,1,2,3,1,2,3],
                   'c': [3,2,1,4,5,6,7,8,9]
                  })

df = spark.createDataFrame(DF)

window = Window.partitionBy("a").orderBy("c")

df.select('a', 'b', 'c', f.rank().over(window).alias('y')).filter(col('y') <= k).show()

+---+---+---+---+
|  a|  b|  c|  y|
+---+---+---+---+
|  1|  3|  1|  1|
|  1|  2|  2|  2|
|  3|  1|  7|  1|
|  3|  2|  8|  2|
|  2|  1|  4|  1|
|  2|  2|  5|  2|
+---+---+---+---+



# Test create several rows from a row

In [10]:
from pyspark.sql.functions import explode
import pandas as pd
from pyspark.sql import SparkSession, Window
DF = pd.DataFrame({
                   'p': [1, 1, 1, 0, 0, 0],
                   'a': [1, 2, 3, 4, 5, 6],
                   'b': [1, 2, 3, 6, 5, 4] 
                  })

w = Window.partitionBy('p').orderBy("b").rowsBetween(0, +1)
df = spark.createDataFrame(DF)
df2 = spark.createDataFrame(DF).select('p', avg('a').over(w).alias('p'), avg('b').over(w).alias('p'))
df.union(df2).dropDuplicates().orderBy('p', 'a').show()

+---+---+---+
|  p|  a|  b|
+---+---+---+
|  0|4.0|6.0|
|  0|4.5|5.5|
|  0|5.0|5.0|
|  0|5.5|4.5|
|  0|6.0|4.0|
|  1|1.0|1.0|
|  1|1.5|1.5|
|  1|2.0|2.0|
|  1|2.5|2.5|
|  1|3.0|3.0|
+---+---+---+

