In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, asc, udf, dayofmonth, desc,avg
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, TimestampType, FloatType, LongType
import math
import time
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql import Window

spark = SparkSession.builder.appName("q2_parquet").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

path_trip="hdfs://master:9000/yellow_tripdata_1m.parquet"
path_ven="hdfs://master:9000/yellow_tripvendors_1m.parquet"

trip_data = sqlContext.read.parquet(path_trip)
trip_vendors = sqlContext.read.parquet(path_ven)

t1 = time.time()

trip_data = trip_data.filter((col("trip_start_lat") >= 40) & (col("trip_start_lat") <= 45) &\
                             (col("trip_end_lat") >= 40) & (col("trip_end_lat") <= 45) &\
                             (col("trip_start_long") >= -80) & (col("trip_start_long") <= -71) &\
                             (col("trip_end_long") >= -80) & (col("trip_end_long") <= -71))


def haversine(long1, lat1, long2, lat2):
    R = 6371
    a = math.sin((lat2-lat1)/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin((long2-long1)/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = R * c
    return d

haversine = udf(haversine,DoubleType())

  
def duration(start, end):
    start_time = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
    end_time = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
    duration = (end_time - start_time).total_seconds() / 60.0
    return duration
    
duration = udf(duration, DoubleType())

trip_data = trip_data.withColumn("Distance", (haversine(col("trip_start_long"),col("trip_start_lat"),col("trip_end_long"),col("trip_end_lat"))))
trip_data = trip_data.withColumn("Duration", (duration(col("trip_start_time"),col("trip_end_time"))))

trip_data.createOrReplaceTempView("trip_data")
trip_vendors.createOrReplaceTempView("trip_vendors")


trip_joined = trip_data.join(trip_vendors, "trip_id", "inner")
trip_joined.createOrReplaceTempView("trip_joined")

w = Window.partitionBy('vendor_id')
res = trip_joined.withColumn('max_distance', F.max('Distance').over(w))\
    .where(F.col('Distance') == F.col('max_distance'))\
    .drop('max_distance')
res.select(["vendor_id","Duration","Distance"]).sort('vendor_id').show()



t2 = time.time()
total_time = t2-t1
print('total time to run: ', total_time)

sc.stop()


+---------+------------------+------------------+
|vendor_id|          Distance|          Duration|
+---------+------------------+------------------+
|        1|18739.202499298084| 5.233333333333333|
|        2|18445.843523416574|14.233333333333333|
+---------+------------------+------------------+

total time to run:  261.3669967651367
