In [65]:
import os
import math

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, avg, stddev, max, count, min
import pyspark.sql.functions as f
from pyspark.sql.types import DoubleType

import altair as alt

from math import radians, cos, sin, asin, sqrt

spark = SparkSession.builder.master('local').getOrCreate()
df = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/DataSample.csv'))
df.count()

22025

In [66]:
#since geoinfo isn't actually a column name, I'm pretty sure it's the lat and long for each request

# Question 1
df1 = df.dropDuplicates([' TimeSt','Latitude','Longitude'])
df1.count()

19999

In [67]:
dfPoi = spark.read.options(
        header='True',
        inferSchema='True',
        delimiter=',',
    ).csv(os.path.expanduser('~/data/POIList.csv'))

In [68]:
# Apparently one of the points of interest is a duplicate, and the Lat has a whitespace infront of it.  
# Also renaming the column names as they're too similar to the datasheets names.
dfPoi = dfPoi.dropDuplicates([' Latitude','Longitude'])
dfPoi = dfPoi.withColumnRenamed(' Latitude','poiLat').withColumnRenamed('Longitude', 'poiLong')

In [69]:
# Haversine formula stolen from 
# https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
def calcDistance(rLat, rLong, tLat, tLong):
    print(rLat)
    lon1 = math.radians(rLat)
    lat1 = math.radians(rLong)
    lon2 = math.radians(tLat)
    lat2 = math.radians(tLong)
    
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6371 #Radius of earth in kilometers. Not going to use 3956 for miles since Canada. :>
    b = c * r
    return b

cCalcDistance = udf(lambda a,b,c,d:calcDistance(a,b,c,d))
    

In [70]:
# So my first attempt at this one, was to take the data, use a withColumn function to pass it in, along with the
# Points of Interest spots, and then compare each poi with the lat and long, and see which is the closest.
# But I learned that you can't nest withColumn functions as it looks like they lock the thread to increase the
# effeicency for applying the function to the data.  Which means I couldn't use that approach.

# So I thought about it, and while this isn't as scalable, it'll do: to apply the lat and long of each request to all the
# poi's, then by grouping by their original ID, you can find out which had the lowest distance, and then just grab the
# poi number itself.  And then delete the others.

# One issue I had was that the udf returns a string for some reason?  Was able to bypass that by using withColumn to apply 
# all the data to be casted as a DoubleType() and delete the previous column all in one go. (not technical terms, but will do)

# Question 2
dfPoi = dfPoi.select('POIID',col('poiLat'), col('poiLong'))
df1=df1.crossJoin(dfPoi)
df1 = df1.withColumn('distance', cCalcDistance(df1['Latitude'],df1['Longitude'], df1['poiLat'], df1['poiLong']))
df1 = df1.withColumn('distance', df1['distance'].cast(DoubleType()))

tmpDf1 = df1.groupBy('_ID').min('distance')
df1=df1.join(tmpDf1,(df1['_ID'] == tmpDf1['_ID']) & (df1['distance'] == tmpDf1['min(distance)'])).drop(tmpDf1._ID)
df1=df1.drop('distance')
df1.show()



+--------------------+-------+--------+-------------+--------+----------+-----+---------+-----------+-------+------------------+
|              TimeSt|Country|Province|         City|Latitude| Longitude|POIID|   poiLat|    poiLong|    _ID|     min(distance)|
+--------------------+-------+--------+-------------+--------+----------+-----+---------+-----------+-------+------------------+
|2017-06-21 00:22:...|     CA|      ON|    Etobicoke| 43.6381|  -79.5787| POI3|45.521629| -73.566024|5613403| 670.2586351276966|
|2017-06-21 00:34:...|     CA|      ON|      Toronto| 43.6606|  -79.4635| POI3|45.521629| -73.566024|5013924| 657.4592243551863|
|2017-06-21 00:42:...|     CA|      AB|      Calgary| 51.1188| -113.9471| POI1|53.546167|-113.485734|5122425| 120.0615960353242|
|2017-06-21 01:01:...|     CA|      AB|      Calgary| 51.0876| -114.0214| POI1|53.546167|-113.485734|4571908|125.18334295783798|
|2017-06-21 01:08:...|     CA|      ON|      Toronto|43.66341| -79.38597| POI3|45.521629| -73.566

In [71]:
# df6 = df1.groupBy('POIID')
# alt.Chart(df6).mark_circle(
#     color='blue',
#     opacity=0.3
# ).encode(
# x='Latitude',
# y='Longitude')

In [72]:
# Question 3 a
df3 = df1.groupBy('POIID').agg(avg('min(distance)').alias('Average distance'), stddev('min(distance)').alias('Standard Deviation'))

df3.show()


+-----+-----------------+------------------+
|POIID| Average distance|Standard Deviation|
+-----+-----------------+------------------+
| POI4|664.0431071829552| 2072.764962751564|
| POI1|296.9202099001768| 571.2873049876008|
| POI3|590.8752886807175|309.04612101412664|
+-----+-----------------+------------------+



In [73]:
#3 b was not the best for the wording, and I spent many hours on attempting to draw a circle to visualize the data, 
# that I didn't need to do.  

# Question 3 b
df3b = df1.groupBy('POIID').agg(max('min(distance)').alias('Radius'), count('min(distance)').alias('Count'))
df3b = df3b.withColumn('Density', df3b['Count']/(df3b['Radius']**2*math.pi))
df3b.show()

+-----+------------------+-----+--------------------+
|POIID|            Radius|Count|             Density|
+-----+------------------+-----+--------------------+
| POI4|12784.457896075026|  477|9.289746689640268E-7|
| POI1|14376.059009320134| 9704|1.494587073110212...|
| POI3| 2210.180854641291| 9818|6.397606583284486E-4|
+-----+------------------+-----+--------------------+



In [74]:
# So basically I need to get the IQR, then filter the data that falls inside that range
# group that data by POIID, and create new columns for radius without the outliers, and see how many there are
# Then find the density by deviding the count by the area of the circle

# To get the data and rescale it to have the selected min and max ranges, I'll be using 
# newvalue= (b-a)*(x-minimum)/(maximum-minimum)+a
# (taken after some googling from https://stats.stackexchange.com/questions/70801/how-to-normalize-data-to-0-1-range and modifying)
# 

# Question 4 a
Q1 = df1.approxQuantile('min(distance)', [0.35], 0.05)
Q3 = df1.approxQuantile('min(distance)', [0.65], 0.05)
IQR = Q3[0] - Q1[0]
lowerRange = Q1[0] - 1.5*IQR
upperRange = Q3[0] + 1.5*IQR

df4 = df1[(df1['min(distance)'] > lowerRange) & (df1['min(distance)'] < upperRange)].groupBy('POIID').agg(max('min(distance)').alias('Radius'), count('min(distance)').alias('Count'))
df4 = df4.withColumn('density', df4['Count']/(df4['Radius']**2*math.pi))

# now we use the model formula
mini = df4.agg(min('density')).first()[0]
maxi = df4.agg(max('density')).first()[0]
a = -10
b = 10
print(mini)
df4a = df4.withColumn('density(adjusted)', ((b - a) * (df4['density']-mini)/(maxi-mini)+a))
df4a.show()



0.00010498152119975096
+-----+------------------+-----+--------------------+-----------------+
|POIID|            Radius|Count|             density|density(adjusted)|
+-----+------------------+-----+--------------------+-----------------+
| POI4|1183.5582054834815|  462|1.049815211997509...|            -10.0|
| POI1|1401.4928282958247| 9386|0.001521069008350...|5.272995535688684|
| POI3|1248.9641974323345| 9602|0.001959349230326935|9.999999999999996|
+-----+------------------+-----+--------------------+-----------------+

