In [440]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').getOrCreate()

In [441]:
import os

# Load the df(s) from file

df = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/DataSample.csv'))

POI = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('~/data/POIList.csv'))

# Display for testing & debugging
POI.show()
df.show()

+-----+---------+-----------+
|POIID| Latitude|  Longitude|
+-----+---------+-----------+
| POI1|53.546167|-113.485734|
| POI2|53.546167|-113.485734|
| POI3|45.521629| -73.566024|
| POI4| 45.22483| -63.232729|
+-----+---------+-----------+

+-------+--------------------+-------+--------+------------+--------+---------+
|    _ID|              TimeSt|Country|Province|        City|Latitude|Longitude|
+-------+--------------------+-------+--------+------------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON|    Waterloo|43.49347|-80.49123|
|4516547|2017-06-21 18:00:...|     CA|      ON|      London| 42.9399| -81.2709|
|4516550|2017-06-21 15:00:...|     CA|      ON|      Guelph| 43.5776| -80.2201|
|4516600|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773|
|4516613|2017-06-21 15:00:...|     CA|      ON|   Stratford| 43.3716| -80.9773|
|4516693|2017-06-21 14:00:...|     CA|      ON|   Kitchener| 43.4381| -80.5099|
|4516771|2017-06-21 10:00:...|     CA| 

In [442]:
# Step 1: Clean the data. 
df = df.dropDuplicates([' TimeSt', 'Latitude', 'Longitude'])

# Assumption: Two POIs at the same geolocation should be cleaned - they are the exact same location. It is impossible to assign a request to "the closest" POI if there are two at the same location.
POI = POI.dropDuplicates([' Latitude', 'Longitude'])

In [449]:
# Step 2: Assigning the requests to their nearest POI.
# In order to accomplish this, each request needs have its distance calculated to all POIs in the POI df in order to determine the minimum (linear search)
# This needs to be done for each request -> O(n * m) where n = size(requests) & m = size(POI). Can be accomplished with a nested loop.
# However, Apache Spark does not seem to support iteration by row, and instead prefers RDD. In order to accomplish this, the POI data needs to be changed to a list.
# A function can be written to iterate the different POIs and then this function can be called in the lambda function call for the RDD.
# Perhaps not the most scalable solution, as collecting a larger list can cause inefficiencies and even crashes, but for this use case it seems appropriate and convienient.

POILatitudes = [float(row[' Latitude']) for row in POI.collect()]
POILongitudes = [float(row['Longitude']) for row in POI.collect()]
POIIds = [float(i) for i in range(len(POILatitudes))]


In [444]:
# Useful helper functions for step 2.

# haversine distance formula implementation (distance between two points, given longitude & lattitude) 
import math
def haversine(lat1, lon1, lat2, lon2):
    # Calculate in KM
    R = 6370
    
    # Convert each latitude and longitude to radians
    lat1 = radians(lat1)  
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)

    # Take the absolute distance in radians and convert using formula
    dlon = abs(lon2 - lon1)
    dlat = abs(lat2 - lat1)

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    return R * c

# Returns the closest POI and the actual distance of it. Compares to ALL POIs in the df to find the minimum.
def smallest_geo(lat, long):
    # Simple linear search for minimum in a list - all elements need to be checked.
    smallestDist = float('inf')
    POI = -1
    for i in range(len(POILatitudes)):
        dist = haversine(lat, long, POILatitudes[i], POILongitudes[i])
        if dist < smallestDist:
            smallestDist = dist
            POI = i
    return smallestDist, POI

In [445]:
# Apply the functions to the requests dataframe and add their results to the table.
rdd = df.rdd.map(lambda x:            
                 (x[0], x[1], x[2], x[3], x[4], x[5], x[6], smallest_geo(x[5], x[6]) [0], smallest_geo(x[5], x[6]) [1])
                )
df = rdd.toDF(['_ID', 'TimeSt', 'Country', 'Province', 'City', 'Latitude', 'Longitude', 'ClosestPOI (distance)', 'ClosestPOI (id)'])

# Display new table
df.show()

+-------+--------------------+-------+--------+-------------+--------+----------+---------------------+---------------+
|    _ID|              TimeSt|Country|Province|         City|Latitude| Longitude|ClosestPOI (distance)|ClosestPOI (id)|
+-------+--------------------+-------+--------+-------------+--------+----------+---------------------+---------------+
|4516516|2017-06-21 00:00:...|     CA|      ON|     Waterloo|43.49347| -80.49123|     593.320298328476|              1|
|4519209|2017-06-21 00:00:...|     CA|      ON|      Hanover| 44.1517|  -81.0266|      607.32332418404|              1|
|4518130|2017-06-21 00:00:...|     CA|      ON|       London| 43.0004|  -81.2343|    671.4556427927032|              1|
|5368841|2017-06-21 00:00:...|     CA|      ON|       Nepean| 45.2778|  -75.7563|   173.11156962958216|              1|
|4521574|2017-06-21 00:00:...|     CA|      ON|    Brantford| 43.1508|  -80.2094|     590.126274312235|              1|
|4523455|2017-06-21 00:00:...|     CA|  

In [464]:
from pyspark.sql import Row

# Lists to store averages, std deviations, radii & densities - though not as scalable, in this instance (the count of the POI df is only 4) it is a simple and efficient sol'n
POIAvgs = list()
POIStdDevs = list()
radii = list()
densities = list()
 
# Iterate the POIs -> perform operations to fill the lists, POI by POI.                 
for i in range(POI.count()):
    
    # Filter the df to iterate by POI
    df_i = df.filter(df[8] == i)
    
    # Radius, density calculations. Radius will be the distance for the point furthest away to this POI, for which this POI is still the closest POI.
    radius = df_i.agg({'closestPOI (distance)' : 'max'}).collect()[0]['max(closestPOI (distance))']
    radii.append(radius)
    area = math.pi * radius * radius
    density = df_i.count() / area
    densities.append(density)
    
    # print('Density: ', density)
    
    # Calculate the mean distance to POI & use it to calculate the std. dev.
    POIAvgs.append(df_i.agg({'closestPOI (distance)' : 'avg'}).collect()[0]['avg(closestPOI (distance))'])
    rdd = df_i.rdd.map(lambda x:
                 (1, abs(x[7] - POIAvgs[i]))
                      )
    df_i = rdd.toDF(['filler', 'value'])
    sum_deviation = df_i.rdd.map(lambda x: (1, x[1])).reduceByKey(lambda x,y: x+y).collect()[0][1]
    sum_deviation = sum_deviation ** 2
    
    # Legacy error handling for duplicate POIs, resulting in POIs with no requests. Left in for the case where this is actually ocurring.
    try:
        POIStdDevs.append(sqrt(sum_deviation / df_i.count()))
    # Division by 0
    except ZeroDivisionError: 
        POIStdDevs.append(0)

# Display final results in tabular form
data = [POIIds, POIAvgs, POIStdDevs, radii, densities]
R = Row('POI ID', 'Mean Dist', 'Std Dev', 'Radius', 'Density')
spark.sparkContext.parallelize([R(*r) for r in zip(*data)]).toDF().show()



+------+------------------+------------------+------------------+--------------------+
|POI ID|         Mean Dist|           Std Dev|            Radius|             Density|
+------+------------------+------------------+------------------+--------------------+
|   0.0|497.20067338146583|12491.535071025728| 9348.105250039951|1.737486022149189...|
|   1.0| 451.4566830481075|15361.656814412267|1474.3495099861857|0.001434345146092434|
|   2.0| 301.8594700072927|  20334.7761251102|11530.010783047906|2.329000685027570...|
+------+------------------+------------------+------------------+--------------------+



In [467]:
# Step 4: Modelling
# In order to visualize the popularity of each POI and map it to a range, my idea is to apply the sigmoid function or hyperbolic tangent to the product of each POI's density & mean distance to request.
# My choice for using this function is that these functions have the highest values of the derivative near the middle values, making small changes/differences near the centre be more sensitive.
# There is a slow, gradual acceleration/deceleration at the end points. The sigmoid function also outputs a value between 0 and 1, allowing us to easily scale & shift the output to be -10 to 10.

def sigmoid(a,b):
    x = a * b
    return (1/(1+math.pow(math.e, -x)) * 20) - 10

def tanh(a,b):
    x = a * b
    return (math.pow(math.e, x) - math.pow(math.e, -x)) / (math.pow(math.e, x) + math.pow(math.e, -x)) * 10

def ranking(a,b):
    return sigmoid(a,b), tanh(a,b)

for i in range(len(POIAvgs)):
    print(ranking(POIAvgs[i], densities[i]))

(0.004319395832389716, 0.00863879005302761)
(3.1291390755399267, 5.700147555941087)
(0.03515140084101809, 0.07030193301660036)
