In [None]:
import os
import sys
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk1.8.0_212"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder \
    .appName('poi_exercise') \
    .master('local[*]') \
    .config('spark.sql.execution.arrow.pyspark.enabled', True) \
    .config('spark.driver.memory','10G') \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .getOrCreate()

In [16]:
spark

In [3]:
from pyspark.sql import functions as F
import math

def haversine_formula(lat1, lon1, lat2, lon2):
    """
    Return the distance between two coordinates, in meters.
    """
    lat1 = math.pi / 180.0 * lat1
    lon1 = math.pi / 180.0 * lon1
    lat2 = math.pi / 180.0 * lat2
    lon2 = math.pi / 180.0 * lon2
    radius = 6371  # km
    meters = 1000

    # Use the haversine formula:
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = math.pow(math.sin(dlat/2),2) + math.cos(lat1) * math.cos(lat2) * math.pow(math.sin(dlon/2),2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    meters = radius * c * meters
    return meters

In [5]:
df_rides = spark.read.option("header", "true").option("inferSchema", "true").csv("data/rides_small_ds.csv")
df_poi = spark.read.option("header", "true").option("inferSchema", "true").csv("data/poi_small_ds.csv").withColumn('Lat', F.col("Lat").cast("double"))

df_rides.show(5)
df_poi.show(5)

+----------------+-------+--------+------+
|       Date/Time|    Lat|     Lon|  Base|
+----------------+-------+--------+------+
|4/1/2014 0:11:00| 40.769|-73.9549|B02512|
|4/1/2014 0:17:00|40.7267|-74.0345|B02512|
|4/1/2014 0:21:00|40.7316|-73.9873|B02512|
|4/1/2014 0:28:00|40.7588|-73.9776|B02512|
|4/1/2014 0:33:00|40.7594|-73.9722|B02512|
+----------------+-------+--------+------+
only showing top 5 rows

+---------+----------+----+----------------+
|      Lat|       Lon|Type|            Name|
+---------+----------+----+----------------+
|40.640761|-73.699404|cafe|       Starbucks|
|41.789708|-87.601708|cafe|Starbucks Coffee|
|29.911807|-95.685513|cafe|       Starbucks|
|29.845172|-95.646238|cafe|       Starbucks|
|40.529512|-74.540861|bank|           Chase|
+---------+----------+----+----------------+
only showing top 5 rows



In [7]:
# Format the date and extract only the event that are on the morning or evening range
df_morning_event_rides = df_rides\
                          .select(F.to_timestamp(F.col("Date/Time"), "M/d/y H:mm:ss").alias("arrival_time"), F.col("Lat"), F.col("Lon"))\
                          .withColumn("arrival_hour", F.hour(F.col("arrival_time")))\
                          .where("(arrival_hour between 6 and 11) or (arrival_hour between 17 and 23)")
df_morning_event_rides.show(5)

df_morning_event_rides = df_morning_event_rides.filter(F.col('Lat').isNotNull() & F.col('Lon').isNotNull())
df_poi = df_poi.filter(F.col('Lat').isNotNull() & F.col('Lon').isNotNull())

+-------------------+-------+--------+------------+
|       arrival_time|    Lat|     Lon|arrival_hour|
+-------------------+-------+--------+------------+
|2014-04-01 06:00:00|40.7507|-73.9703|           6|
|2014-04-01 06:01:00|40.7337|-73.9979|           6|
|2014-04-01 06:02:00|40.7236|-74.0111|           6|
|2014-04-01 06:03:00|40.7151|-74.0464|           6|
|2014-04-01 06:06:00|40.7701|-73.9588|           6|
+-------------------+-------+--------+------------+
only showing top 5 rows



In [20]:
# Solution #1 - Use a UDF for the haversine_formula - Takes ~45 seconds
from pyspark.sql import types as T
udf_haversine_formula = F.udf(haversine_formula)

# Attach to each POI all drop offs and pickups that are less than 100 meters apart, using the UDF.
# df_combined = df_morning_event_rides\
#                 .crossJoin(df_poi)\
#                 .where(udf_haversine_formula(df_morning_event_rides["Lat"], df_morning_event_rides["Lon"], df_poi["Lat"], df_poi["Lon"]) < 300)\
#                 .cache()
# df_combined.show(5)

df_morning_event_rides = df_morning_event_rides.filter(F.col('Lat').isNotNull() & F.col('Lon').isNotNull())
df_poi = df_poi.filter(F.col('Lat').isNotNull() & F.col('Lon').isNotNull())

df_combined = df_morning_event_rides\
                .crossJoin(df_poi)\
                .withColumn('distance', udf_haversine_formula(df_morning_event_rides["Lat"], df_morning_event_rides["Lon"], df_poi["Lat"], df_poi["Lon"]))\
                .filter(F.col('distance') < 300)\
                .cache()
df_combined.show(5)

+-------------------+-------+--------+------------+---------+----------+----+-----+------------------+
|       arrival_time|    Lat|     Lon|arrival_hour|      Lat|       Lon|Type| Name|          distance|
+-------------------+-------+--------+------------+---------+----------+----+-----+------------------+
|2014-04-02 07:30:00|40.7347|-73.9797|           7|40.737221|-73.978682|bank|Chase| 293.1508966472016|
|2014-04-02 07:44:00|40.7377|-73.9802|           7|40.737221|-73.978682|bank|Chase|138.54379820956495|
|2014-04-02 07:52:00|40.7364|-73.9797|           7|40.737221|-73.978682|bank|Chase|125.26236330434024|
|2014-04-02 08:07:00|40.7369|-73.9789|           8|40.737221|-73.978682|bank|Chase| 40.14211725233168|
|2014-04-02 08:20:00|40.7349|-73.9772|           8|40.737221|-73.978682|bank|Chase|286.70291962886296|
+-------------------+-------+--------+------------+---------+----------+----+-----+------------------+
only showing top 5 rows



In [15]:
# Solution #2 - Implement the Haversine Formula with SparkSQL's functions to improve the performance - Takes ~2 seconds
# To understand why UDFs in Python are not always the best option, read here: https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance

pi = math.pi
radius = 6371  # km
meters = 1000

# Prepare the two main dataframes
df_morning_event_rides_hav = df_morning_event_rides\
                              .withColumn("lat_hav_rides",(pi / 180.0 * F.col("Lat")))\
                              .withColumn("lon_hav_rides", (pi / 180.0 * F.col("Lon")))

df_poi_hav = df_poi\
              .withColumn("lat_hav_poi", (pi / 180.0 * F.col("Lat")))\
              .withColumn("lon_hav_poi", (pi / 180.0 * F.col("Lon")))

# Crossjoin and apply haversine formula
df_combined = df_morning_event_rides_hav\
                  .crossJoin(df_poi_hav.drop("Lat", "Lon"))\
                  .withColumn('dlon', F.col("lon_hav_poi") - F.col("lon_hav_rides"))\
                  .withColumn('dlat', F.col("lat_hav_poi") - F.col("lat_hav_rides"))\
                  .withColumn('a', F.pow(F.sin(F.col('dlat')/2),2)+ F.cos(F.col("lat_hav_rides"))*F.cos(F.col("lat_hav_poi"))*F.pow((F.sin(F.col('dlon')/2)),2))\
                  .withColumn('c', 2 * F.atan2(F.sqrt(F.col('a')), F.sqrt(1-F.col('a'))))\
                  .withColumn('distance', radius * meters * F.col('c'))\
                  .filter(F.col('distance') < 500)\
                  .cache()
df_combined.show(5)

+-------------------+-------+--------+------------+------------------+-------------------+----+-----+-----------------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|       arrival_time|    Lat|     Lon|arrival_hour|     lat_hav_rides|      lon_hav_rides|Type| Name|      lat_hav_poi|        lon_hav_poi|                dlon|                dlat|                   a|                   c|          distance|
+-------------------+-------+--------+------------+------------------+-------------------+----+-----+-----------------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|2014-04-01 07:46:00|40.7396| -73.981|           7| 0.711040155945482| -1.291212033917925|bank|Chase|0.710998634562577|-1.2911715771858638|4.045673206110045E-5|-4.15213829050298...|6.659223914694289...|5.161094425089494E-5|328.81332582245165|
|2014-04-01 22:42:00|40.7401

In [13]:
df_combined.limit(5)

arrival_time,Lat,Lon,arrival_hour,Lat.1,Lon.1,Type,Name,distance
2014-04-02 07:30:00,40.7347,-73.9797,7,40.737221,-73.978682,bank,Chase,293.1508966472016
2014-04-02 07:44:00,40.7377,-73.9802,7,40.737221,-73.978682,bank,Chase,138.54379820956495
2014-04-02 07:52:00,40.7364,-73.9797,7,40.737221,-73.978682,bank,Chase,125.26236330434024
2014-04-02 08:07:00,40.7369,-73.9789,8,40.737221,-73.978682,bank,Chase,40.14211725233168
2014-04-02 08:20:00,40.7349,-73.9772,8,40.737221,-73.978682,bank,Chase,286.70291962886296


In [32]:
# Sort the most popular POIs

# Option 1 - With SparkSQL
df_sorted = df_combined.groupby('Name', 'type').count().sort("count", ascending=False).withColumnRenamed('count', 'num_of_visits')

df_sorted.show()

+--------------------+----------------+-------------+
|                Name|            type|num_of_visits|
+--------------------+----------------+-------------+
|           Starbucks|            cafe|          982|
|               Chase|            bank|          610|
|         Duane Reade|        pharmacy|          592|
|    Starbucks Coffee|            cafe|          378|
|            Citibank|            bank|          351|
|           Starbucks|         toilets|           17|
|     Municipal Lot D|         parking|            5|
|     Municipal Lot B|         parking|            5|
|     Municipal Lot G|         parking|            4|
|                 CVS|        pharmacy|            3|
|Saint Francis Church|place_of_worship|            2|
|Rose of Sharon Ho...|place_of_worship|            2|
| private parking lot|         parking|            1|
+--------------------+----------------+-------------+



In [None]:
# option 2 - With SparkSQL Context
df_combined.createOrReplaceTempView("combined")
df_sorted = spark.sql("select Name, count(*) as num_of_visits from combined group by Name order by 2 desc")