In [219]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local').getOrCreate()
df = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/DataSample.csv'))

poi = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/POIList.csv'))

In [220]:
df.show(10,False)

+-------+-----------------------+-------+--------+---------+--------+---------+
|_ID    | TimeSt                |Country|Province|City     |Latitude|Longitude|
+-------+-----------------------+-------+--------+---------+--------+---------+
|4516516|2017-06-21 00:00:00.143|CA     |ON      |Waterloo |43.49347|-80.49123|
|4516547|2017-06-21 18:00:00.193|CA     |ON      |London   |42.9399 |-81.2709 |
|4516550|2017-06-21 15:00:00.287|CA     |ON      |Guelph   |43.5776 |-80.2201 |
|4516600|2017-06-21 15:00:00.307|CA     |ON      |Stratford|43.3716 |-80.9773 |
|4516613|2017-06-21 15:00:00.497|CA     |ON      |Stratford|43.3716 |-80.9773 |
|4516693|2017-06-21 14:00:00.597|CA     |ON      |Kitchener|43.4381 |-80.5099 |
|4516771|2017-06-21 10:00:00.873|CA     |ON      |Sarnia   |42.961  |-82.373  |
|4516831|2017-06-21 12:00:00.950|CA     |ON      |London   |43.0091 |-81.1765 |
|4516915|2017-06-21 15:00:01.310|CA     |ON      |London   |43.0091 |-81.1765 |
|4516953|2017-06-21 16:00:01.700|CA     

In [221]:
df.count()

22025

In [222]:
poi.show(10,False)

+-----+---------+-----------+
|POIID| Latitude|Longitude  |
+-----+---------+-----------+
|POI1 |53.546167|-113.485734|
|POI2 |53.546167|-113.485734|
|POI3 |45.521629|-73.566024 |
|POI4 |45.22483 |-63.232729 |
+-----+---------+-----------+



In [223]:
df.where(length(col(" TimeSt")) == 23).count()

22025

In [224]:
df.select(col("_ID")).distinct().count()

22025

1. CLEANUP / FINDING SUSPICIOUS IDs

In [237]:
p=(df.groupBy([' TimeSt', 'Latitude', 'Longitude']).agg(collect_list("_ID").alias("id")).
    where(size("id") > 1)).select(explode("id").alias("_ID"))
data=df.join(p, df["_ID"] == p["_ID"], "left_anti").drop(p["_ID"]) 

In [238]:
data.count()

17973

2. LABEL

In [239]:
def getDistance(lat1,lon1,lat2,lon2):
    R=6371
    lon1=toRadians(lon1)
    lat1=toRadians(lat1)
    lon2=toRadians(lon2)
    lat2=toRadians(lat2)
    a=sin((lat1-lat2)/2)**2+cos(lat1)*cos(lat2)*sin((lon1-lon2)/2)**2
    c=2*atan2(sqrt(a),sqrt(1-a))
    d=R*c
    return d

In [240]:
fullData=data.crossJoin(poi.withColumnRenamed(" Latitude", "POI_Latitude").withColumnRenamed("Longitude", "POI_Longitude"))

In [241]:
fullDataWithDistance=fullData.withColumn("distance", getDistance(col("Latitude"),col("Longitude"), col("POI_Latitude"),col("POI_Longitude")))

In [242]:
d = fullDataWithDistance.groupBy(col("_ID")).agg(min("distance"))

In [243]:
fullDataWithMinDistance = d.join(fullDataWithDistance,(d["_ID"] == fullDataWithDistance["_ID"]) & (fullDataWithDistance["distance"] == d["min(distance)"])).drop("distance").drop(d["_ID"]).distinct()

3. ANALYSIS

In [244]:
k = fullDataWithDistance.groupBy("_ID").agg(mean("distance").alias("avgDistance"),stddev("distance").alias("stddevDistance"))
fullDataWithMinDistanceStats = k.join(fullDataWithMinDistance,["_ID"])

In [245]:
import numpy as np

a=fullDataWithMinDistanceStats.groupBy('_ID').agg(max("min(distance)").alias('Radius'), count("min(distance)").alias('Count'))
b = a.withColumn('Density',a['Count']/(a['Radius']**2*np.pi)).drop("Count")
fullDataWithMinDistanceStatsRadiusDensity = fullDataWithMinDistanceStats.join(b,["_ID"])

4. MODEL : scale =(high-low)*(x-min)/(max-min)+low; high=10, low=-10

In [246]:
def getScale(density,lowestDensity,highestDensity):
    s = 20*(density-lowestDensity)/(highestDensity-lowestDensity)-10
    return s

In [247]:
lowestDensity=fullDataWithMinDistanceStatsRadiusDensity.select([min("Density")]).collect()[0][0]
highestDensity=fullDataWithMinDistanceStatsRadiusDensity.select([max("Density")]).collect()[0][0]

In [248]:
final=fullDataWithMinDistanceStatsRadiusDensity.withColumn("Scale",getScale(col("Density"),lit(lowestDensity),lit(highestDensity)))

In [249]:
final.show()

+-------+------------------+------------------+------------------+--------------------+-------+--------+----------------+--------+---------+-----+------------+-------------+------------------+--------------------+------------------+
|    _ID|       avgDistance|    stddevDistance|     min(distance)|              TimeSt|Country|Province|            City|Latitude|Longitude|POIID|POI_Latitude|POI_Longitude|            Radius|             Density|             Scale|
+-------+------------------+------------------+------------------+--------------------+-------+--------+----------------+--------+---------+-----+------------+-------------+------------------+--------------------+------------------+
|4517905|1895.4265435455209| 839.2083047880213| 832.9559044776652|2017-06-21 17:00:...|     CA|      ON|         Windsor| 42.2957| -82.9599| POI3|   45.521629|   -73.566024| 832.9559044776652|4.587817198969043E-7|-9.999998263903002|
|4526426|1742.2775784880255|  1522.53566250839|219.46152613300868|20