In [1]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from math import radians, cos, sin, asin, sqrt
def dist(lat1, long1, lat2, long2):
    lat1, long1, lat2, long2 = map(radians, [lat1, long1, lat2, long2])
    # haversine formula
    dlon = long2 - long1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    km = 6371* c
    return km

spark = SparkSession.builder.master('local').getOrCreate()
df = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/DataSample.csv'))
df = df.withColumnRenamed(" TimeSt","TimeSt")

poi = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/POIList.csv'))
poi = poi.withColumnRenamed(" Latitude","Latitude")

df.printSchema()
poi.printSchema()

root
 |-- _ID: integer (nullable = true)
 |-- TimeSt: timestamp (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)

root
 |-- POIID: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [2]:
df2 = df.dropDuplicates(['TimeSt', 'Latitude', 'Longitude']).sort("_ID")
df2.show()

+-------+--------------------+-------+--------+------------+--------+---------+
|    _ID|              TimeSt|Country|Province|        City|Latitude|Longitude|
+-------+--------------------+-------+--------+------------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON|    Waterloo|43.49347|-80.49123|
|4516547|2017-06-21 18:00:...|     CA|      ON|      London| 42.9399| -81.2709|
|4516550|2017-06-21 15:00:...|     CA|      ON|      Guelph| 43.5776| -80.2201|
|4516600|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773|
|4516613|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773|
|4516693|2017-06-21 14:00:...|     CA|      ON|   Kitchener| 43.4381| -80.5099|
|4516771|2017-06-21 10:00:...|     CA|      ON|      Sarnia|  42.961|  -82.373|
|4516831|2017-06-21 12:00:...|     CA|      ON|      London| 43.0091| -81.1765|
|4516915|2017-06-21 15:00:...|     CA|      ON|      London| 43.0091| -81.1765|
|4516953|2017-06-21 16:00:...|     CA|  

In [5]:
df2pd = df2.toPandas()
poipd = poi.toPandas()

def find_nearest(Latitude, Longitude):
    distances = poipd.apply(
        lambda row: dist(Latitude, Longitude, row['Latitude'], row['Longitude']), 
        axis=1)
    return poipd.loc[distances.idxmin(), 'POIID']

df2pd['POIID'] = df2pd.apply(
    lambda row: find_nearest(row['Latitude'], row['Longitude']), 
    axis=1)
df = spark.createDataFrame(df2pd)
df.write.mode("overwrite").option("header",True).csv("DataSample")
df.show()

+-------+--------------------+-------+--------+------------+--------+---------+-----+
|    _ID|              TimeSt|Country|Province|        City|Latitude|Longitude|POIID|
+-------+--------------------+-------+--------+------------+--------+---------+-----+
|4516516|2017-06-21 00:00:...|     CA|      ON|    Waterloo|43.49347|-80.49123| POI3|
|4516547|2017-06-21 18:00:...|     CA|      ON|      London| 42.9399| -81.2709| POI3|
|4516550|2017-06-21 15:00:...|     CA|      ON|      Guelph| 43.5776| -80.2201| POI3|
|4516600|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773| POI3|
|4516613|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773| POI3|
|4516693|2017-06-21 14:00:...|     CA|      ON|   Kitchener| 43.4381| -80.5099| POI3|
|4516771|2017-06-21 10:00:...|     CA|      ON|      Sarnia|  42.961|  -82.373| POI3|
|4516831|2017-06-21 12:00:...|     CA|      ON|      London| 43.0091| -81.1765| POI3|
|4516915|2017-06-21 15:00:...|     CA|      ON|      L