In [28]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [29]:
spark = SparkSession.builder.master('local').getOrCreate()
df = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/DataSample.csv'))
df = df.select([F.col(col).alias(col.replace(' ', '')) for col in df.columns])
df.printSchema()

root
 |-- _ID: integer (nullable = true)
 |-- TimeSt: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [30]:
poi = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/POIList.csv'))
poi = poi.select([F.col(col).alias(col.replace(' ', '')) for col in poi.columns])
poi.printSchema()

root
 |-- POIID: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [31]:
df.show()
poi.show()

+-------+--------------------+-------+--------+------------+--------+---------+
|    _ID|              TimeSt|Country|Province|        City|Latitude|Longitude|
+-------+--------------------+-------+--------+------------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON|    Waterloo|43.49347|-80.49123|
|4516547|2017-06-21 18:00:...|     CA|      ON|      London| 42.9399| -81.2709|
|4516550|2017-06-21 15:00:...|     CA|      ON|      Guelph| 43.5776| -80.2201|
|4516600|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773|
|4516613|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773|
|4516693|2017-06-21 14:00:...|     CA|      ON|   Kitchener| 43.4381| -80.5099|
|4516771|2017-06-21 10:00:...|     CA|      ON|      Sarnia|  42.961|  -82.373|
|4516831|2017-06-21 12:00:...|     CA|      ON|      London| 43.0091| -81.1765|
|4516915|2017-06-21 15:00:...|     CA|      ON|      London| 43.0091| -81.1765|
|4516953|2017-06-21 16:00:...|     CA|  

In [32]:
df.count()

22025

# 1. Cleanup

removing duplicate entries via timest, latitude, longitude

In [33]:
df = df.dropDuplicates(['TimeSt', 'Latitude', 'Longitude'])

In [34]:
df.count()

19999

# 2. Label

assigning closest poi to each entry.

In [35]:
from geopy import distance

In [37]:
df = df.crossJoin(poi.withColumnRenamed('Latitude', 'lat').withColumnRenamed('Longitude', 'lon'))

In [38]:
df.show()

+-------+--------------------+-------+--------+---------+--------+---------+-----+---------+-----------+
|    _ID|              TimeSt|Country|Province|     City|Latitude|Longitude|POIID|      lat|        lon|
+-------+--------------------+-------+--------+---------+--------+---------+-----+---------+-----------+
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787| POI1|53.546167|-113.485734|
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787| POI2|53.546167|-113.485734|
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787| POI3|45.521629| -73.566024|
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787| POI4| 45.22483| -63.232729|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toronto| 43.6606| -79.4635| POI1|53.546167|-113.485734|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toronto| 43.6606| -79.4635| POI2|53.546167|-113.485734|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toront

In [40]:
df.withColumn("distance", distance.distance(('Latitude', 'Longitude'), ('lat', 'lon')).km).show()

ValueError: could not convert string to float: 'Latitude'

In [41]:
pwd()

'/home/jovyan'

In [14]:
coords_1 = (52.2296756, 21.0122287)
coords_2 = (52.406374, 16.9251681)

print(distance.distance(coords_1, coords_2).km)

279.35290160430094
