# Exercise #2 - Popular POI
You are given two datasets (Upload them to Databricks):
* Pickup/Dropoff locations of taxi rides in New York - `rides_small_ds.csv`
* POI (Points of interest) of New York city - `poi_small_ds.csv`

Your goal is to find the most popular places visited by people during morning and evening hours (morning is defined by range 6AM - 11AM, while evening is 17PM - 23PM).

We assume that a place was visited if it’s located within 500 meters of pick up / drop off location.

Pay attention, even though the input files are 'small', you can end up with lots of data depending on the operations you make, resulting in long waiting times (~5 minutes per job).

### Haversine Formula
The haversine formula determines the great-circle distance between two points on a sphere given their longitudes and latitudes. You should use it to find the distance in meters between two Geo locations.

A method that implements it is already defined below. You can use it as a UDF or to check the algorithm and implement it in any other way.

In [2]:
from pyspark.sql import functions as F
import math

def haversine_formula(lat1, lon1, lat2, lon2):
    """
    Return the distance between two coordinates, in meters.
    """
    lat1 = math.pi / 180.0 * lat1
    lon1 = math.pi / 180.0 * lon1
    lat2 = math.pi / 180.0 * lat2
    lon2 = math.pi / 180.0 * lon2
    radius = 6371  # km
    meters = 1000

    # Use the haversine formula:
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = math.pow(math.sin(dlat/2),2) + math.cos(lat1) * math.cos(lat2) * math.pow(math.sin(dlon/2),2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    meters = radius * c * meters
    return meters

In [3]:
# Write your code here
df_rides = spark.read.option("header", "true").option("inferSchema", "true").csv("FileStore/tables/rides_small_ds.csv")
df_poi = spark.read.option("header", "true").option("inferSchema", "true").csv("/FileStore/tables/poi_small_ds.csv").withColumn('Lat', F.col("Lat").cast("double"))

df_rides.show(5)
df_poi.show(5)

In [4]:
# Format the date and extract only the event that are on the morning or evening range
df_morning_event_rides = df_rides\
                          .select(F.to_timestamp(F.col("Date/Time"), "MM/dd/yyyy HH:mm:ss").alias("arrival_time"), F.col("Lat"), F.col("Lon"))\
                          .withColumn("arrival_hour", F.hour(F.col("arrival_time")))\
                          .where("(arrival_hour between 6 and 11) or (arrival_hour between 17 and 23)")
df_morning_event_rides.show(5)

In [5]:
# Solution #1 - Use a UDF for the haversine_formula - Takes ~45 seconds
from pyspark.sql import types as T
udf_haversine_formula = F.udf(haversine_formula)

# Attach to each POI all drop offs and pickups that are less than 100 meters apart, using the UDF.
# df_combined = df_morning_event_rides\
#                 .crossJoin(df_poi)\
#                 .where(udf_haversine_formula(df_morning_event_rides["Lat"], df_morning_event_rides["Lon"], df_poi["Lat"], df_poi["Lon"]) < 300)\
#                 .cache()
# df_combined.show(5)
df_combined = df_morning_event_rides\
                .crossJoin(df_poi)\
                .withColumn('distance', udf_haversine_formula(df_morning_event_rides["Lat"], df_morning_event_rides["Lon"], df_poi["Lat"], df_poi["Lon"]))\
                .filter(F.col('distance') < 300)\
                .cache()
df_combined.show(5)

In [6]:
# Solution #2 - Implement the Haversine Formula with SparkSQL's functions to improve the performance - Takes ~2 seconds
# To understand why UDFs in Python are not always the best option, read here: https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance

pi = math.pi
radius = 6371  # km
meters = 1000

# Prepare the two main dataframes
df_morning_event_rides_hav = df_morning_event_rides\
                              .withColumn("lat_hav_rides",(pi / 180.0 * F.col("Lat")))\
                              .withColumn("lon_hav_rides", (pi / 180.0 * F.col("Lon")))

df_poi_hav = df_poi\
              .withColumn("lat_hav_poi", (pi / 180.0 * F.col("Lat")))\
              .withColumn("lon_hav_poi", (pi / 180.0 * F.col("Lon")))

# Crossjoin and apply haversine formula
df_combined = df_morning_event_rides_hav\
                  .crossJoin(df_poi_hav)\
                  .withColumn('dlon', F.col("lon_hav_poi") - F.col("lon_hav_rides"))\
                  .withColumn('dlat', F.col("lat_hav_poi") - F.col("lat_hav_rides"))\
                  .withColumn('a', F.pow(F.sin(F.col('dlat')/2),2)+ F.cos(F.col("lat_hav_rides"))*F.cos(F.col("lat_hav_poi"))*F.pow((F.sin(F.col('dlon')/2)),2))\
                  .withColumn('c', 2 * F.atan2(F.sqrt(F.col('a')), F.sqrt(1-F.col('a'))))\
                  .withColumn('distance', radius * meters * F.col('c'))\
                  .filter(F.col('distance') < 500)\
                  .cache()
df_combined.show(5)

In [7]:
# Sort the most popular POIs

# Option 1 - With SparkSQL
df_sorted = df_combined.groupby('Name').count().sort("count", ascending=False).withColumnRenamed('count', 'num_of_visits')

# option 2 - With SparkSQL Context
df_combined.createOrReplaceTempView("combined")
df_sorted = spark.sql("select Name, count(*) as num_of_visits from combined group by Name order by 2 desc")

df_sorted.show()