In [70]:
#This codeblock contains all the import statments and the spark setup, run this before running any other blocks of code.

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos

spark = SparkSession.builder.master('local').getOrCreate()

In [72]:
#This function takes 4 inputs, the name of the file to clean, the list of columns on which we will check for duplicates, a name for the new file, 
#and a Boolean which controls whether the file writes to a CSV or simply returns the dataframe so that it can be used directly as the input to another function.

def clean_data_sample (file_name, list_of_columns_to_check, new_file_name, write_csv):

    #checks to see if the inputs given are correct
    if file_name.endswith('.csv') and type(list_of_columns_to_check) == list:
        
        #reads the data file into a dataframe
        df = spark.read.options(
            header='True', 
            inferSchema='True', 
            delimiter=',',
        ).csv(os.path.expanduser('~/data/' + file_name))
        
        #provides a row count so that we can tell how many rows were removed
        initial_rows = df.count()
        final_df = df.dropDuplicates(list_of_columns_to_check)
        final_rows = final_df.count()
        
        #writes dataframe to csv if true, otherwise shows some rows of the dataframe and returns it so that you can use the dataframe with aditional functions
        if write_csv == True:
            final_df.write.csv(new_file_name)
            print(str(initial_rows - final_rows) + ' rows were dropped due to duplicate/suspicious data')
            
        else: 
            final_df.show(10)
            print(str(initial_rows - final_rows) + ' rows were dropped due to duplicate/suspicious data')
            return final_df
    
    #prints an error and does nothing else if the function is given the wrong inputs    
    else:
        print('Error: Please check the file name and columns provided')

In [69]:
#runs the data cleaning function
clean_data_sample('DataSample.csv', [' TimeSt', 'Latitude', 'Longitude'], 'DataSampleClean', False)

+-------+--------------------+-------+--------+---------+--------+---------+
|    _ID|              TimeSt|Country|Province|     City|Latitude|Longitude|
+-------+--------------------+-------+--------+---------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON| Waterloo|43.49347|-80.49123|
|4519209|2017-06-21 00:00:...|     CA|      ON|  Hanover| 44.1517| -81.0266|
|4518130|2017-06-21 00:00:...|     CA|      ON|   London| 43.0004| -81.2343|
|5368841|2017-06-21 00:00:...|     CA|      ON|   Nepean| 45.2778| -75.7563|
|4521574|2017-06-21 00:00:...|     CA|      ON|Brantford| 43.1508| -80.2094|
|4523455|2017-06-21 00:00:...|     CA|      ON|   London| 43.0091| -81.1765|
|4522231|2017-06-21 00:00:...|     CA|      ON|  Chatham| 42.4247| -82.1755|
|4522376|2017-06-21 00:00:...|     CA|      ON| Waterloo| 43.4634| -80.5201|
|4524947|2017-06-21 00:00:...|     CA|      ON|Kitchener| 43.4306| -80.4877|
|4526599|2017-06-21 00:00:...|     CA|      ON| Ancaster|  43.208| -79.9652|

DataFrame[_ID: int,  TimeSt: timestamp, Country: string, Province: string, City: string, Latitude: double, Longitude: double]

In [73]:
#This function takes 4 inputs, the name of the file that needs POIs assigned, the name of the POI file we are using to assign POIs, a name for the new file created, 
#and a Boolean which controls whether the file writes to a CSV or simply returns the dataframe so that it can be used directly as the input to another function.

def assign_POI(data_file_name, POI_file_name, new_file_name, write_csv):
    #reads the data file into a dataframe
    data_df = spark.read.options(
        header='True', 
        inferSchema='True', 
        delimiter=',',
    ).csv(os.path.expanduser('~/data/DataSample.csv'))
    
    #reads the POI file into a dataframe
    POI_df = spark.read.options(
        header='True', 
        inferSchema='True', 
        delimiter=',',
    ).csv(os.path.expanduser('~/data/POIList.csv'))
    
    #renames columns so that they dont share a name after a join
    POI_df = POI_df.withColumnRenamed(" Latitude","Latitude2") \
        .withColumnRenamed("Longitude","Longitude2")

    #cross joins the data and POI tables and calculates the haversine distance between points then takes the row with the closest POI 
    #you can find more info on the haversine function at the end of the notebook
    final_df = data_df.crossJoin(POI_df) \
        .withColumn("dlong", radians(col("Longitude2")) - radians(col("Longitude"))) \
        .withColumn("dlat", radians(col("Latitude2")) - radians(col("Latitude"))) \
        .withColumn("haversine_dist", asin(sqrt(sin(col("dlat") / 2) ** 2 + cos(radians(col("Latitude"))) * cos(radians(col("Latitude2"))) * sin(col("dlong") / 2) ** 2)) * 2 * 6371) \
        .drop("dlong", "dlat") \
        .groupBy("_ID", " TimeSt", "Country", "Province", "City", "Latitude", "Longitude", "POIID") \
        .min("haversine_dist") \
        .drop("min(haversine_dist)")
    
    #writes dataframe to csv if true, otherwise shows some rows of the dataframe and returns it so that you can use the dataframe with aditional functions
    if write_csv == True:
        final_df.write.csv(new_file_name)
        
    else: 
        final_df.show(10)
        return final_df

In [74]:
#runs the POI assigning function
assign_POI('DataSample.csv', 'POIList.csv', 'DataSamplePOI', False)

+-------+--------------------+-------+--------+-------------+--------+---------+-----+
|    _ID|              TimeSt|Country|Province|         City|Latitude|Longitude|POIID|
+-------+--------------------+-------+--------+-------------+--------+---------+-----+
|4517999|2017-06-21 12:00:...|     CA|      ON|    Kitchener| 43.4553| -80.4845| POI1|
|4524928|2017-06-21 01:00:...|     CA|      ON|    Kitchener|43.44144|-80.50167| POI1|
|4530905|2017-06-21 04:00:...|     CA|      AB|      Calgary| 51.0412|-114.0762| POI4|
|4533231|2017-06-21 18:00:...|     CA|      ON|  Mississauga| 43.5769| -79.6283| POI4|
|4533920|2017-06-21 22:00:...|     CA|      AB|Sherwood Park| 53.5263| -113.287| POI2|
|4534602|2017-06-21 22:00:...|     CA|      AB|     Red Deer| 52.2529| -113.807| POI1|
|4535473|2017-06-21 19:00:...|     CA|      BC|Prince George| 53.9448| -122.752| POI3|
|4545805|2017-06-21 06:01:...|     CA|      AB|     Red Deer| 52.3116| -113.842| POI3|
|4549850|2017-06-21 05:01:...|     CA|     

DataFrame[_ID: int,  TimeSt: timestamp, Country: string, Province: string, City: string, Latitude: double, Longitude: double, POIID: string]

In [71]:
# Not for any of the questions, just for further information and a brief explanation of why i choose to solve the POI problem the way i did.

#this is a version of the haversine calculation done in the assign_POI function that is easier to read/understand. It takes lat-long coordinates and calculates the distance between them
#on a sphere. I chose not to simply apply this function to a new column because user defined functions in spark are typically not as well optimized and as such should be avoided when
#optimized spark functions exist that can do the same job.

#here is a link to the wikipedia page of the haversine formula. https://en.wikipedia.org/wiki/Haversine_formula

import math

def haversine_distance (lat1, long1, lat2, long2):
    earth_radius = 6371

    dif_lat = math.radians(lat2 - lat1)
    dif_long = math.radians(long2 - long1)
    
    a = (math.sin(dif_lat / 2) * math.sin(dif_lat / 2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dif_long / 2) * math.sin(dif_long / 2))
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    d = earth_radius * c

    return d