# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [1]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

In [2]:
from pyspark import SparkContext
if SparkContext._active_spark_context:
    SparkContext._active_spark_context.stop()


Initialize PySpark Context

In [3]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)



Define function for loading data

In [4]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.1

Perform data casting to clean incoming dataset

In [5]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn("passenger_count", df["passenger_count"].cast("integer"))
    df = df.withColumn("total_amount", df["total_amount"].cast("float"))
    df = df.withColumn("tip_amount", df["tip_amount"].cast("float"))
    df = df.withColumn("trip_distance", df["trip_distance"].cast("float"))
    df = df.withColumn("fare_amount", df["fare_amount"].cast("float"))
    df = df.withColumn("tpep_pickup_datetime", df["tpep_pickup_datetime"].cast("timestamp"))
    df = df.withColumn("tpep_dropoff_datetime", df["tpep_dropoff_datetime"].cast("timestamp"))
    

    # END YOUR CODE HERE -----------
    
    return df

### Q1.2

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [6]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import functions as F
    df = df.filter(F.col("PULocationID") != F.col("DOLocationID"))
    df = df.groupBy("PULocationID", "DOLocationID") \
       .agg(
           F.sum("passenger_count").alias("total_passenger_count"),
           F.sum("total_amount").alias("total_amount")
       )
    df = df.withColumn("per_person_rate", F.col("total_amount") / F.col("total_passenger_count"))
    df = df.orderBy(F.desc("total_passenger_count"), F.desc("per_person_rate"))
    df = df.drop("total_amount")
    df = df.limit(10)
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.3

Find trips which trip distances generate the highest tip percentage.

In [7]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import functions as F
    df = df.filter((df.fare_amount > 2) & (df.trip_distance > 0))
    df = df.withColumn("tip_percent", (df.tip_amount * 100) / df.fare_amount)
    df = df.withColumn("rounded_trip_distance", F.ceil(F.col("trip_distance")).cast("integer"))
    df = df.groupBy("rounded_trip_distance") \
       .agg(F.avg("tip_percent").alias("tip_percent"))
    df = df.orderBy(F.col("tip_percent").desc())
    df = df.limit(15)
    df = df.withColumnRenamed("rounded_trip_distance", "trip_distance")
    # END YOUR CODE HERE -----------
    
    return df

### Q1.4

Determine the average speed at different times of day.

In [8]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    am_avg_speed and pm_avg_speed are the average trip distance / average trip time calculated for each hour
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn("pickup_hour", hour("tpep_pickup_datetime"))
    time_diff = (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 3600
    df = df.withColumn("trip_time", when(time_diff > 0, time_diff).otherwise(0.001))
    df = df.withColumn("AMorPM", when(col("pickup_hour") < 12, "AM").otherwise("PM")) \
           .withColumn("time_of_day", when(col("pickup_hour") == 0, 0).otherwise(col("pickup_hour") % 12))
    df = df.groupBy("time_of_day", "AMorPM") \
           .agg({"trip_distance": "avg", "trip_time": "avg"}) \
           .withColumn("avg_speed", col("avg(trip_distance)") / col("avg(trip_time)"))
    df = df.groupBy("time_of_day") \
           .pivot("AMorPM", ["AM", "PM"]) \
           .max("avg_speed") \
           .withColumnRenamed("AM", "am_avg_speed") \
           .withColumnRenamed("PM", "pm_avg_speed") \
           .orderBy("time_of_day")
    df = df.withColumn("time_of_day", df["time_of_day"].cast("string"))

    # END YOUR CODE HERE -----------
    
    return df

## The below cells are for you to investigate your solutions and will not be graded

In [9]:
df = load_data()
df = clean_data(df)

In [10]:
common_pair(df).show()

+------------+------------+---------------------+------------------+
|PULocationID|DOLocationID|total_passenger_count|   per_person_rate|
+------------+------------+---------------------+------------------+
|         239|         238|                   62|  4.26274198870505|
|         237|         236|                   60| 4.482500068346659|
|         263|         141|                   52|3.4190384974846473|
|         161|         236|                   42| 5.368571440378825|
|         148|          79|                   42| 4.711904752822149|
|         142|         238|                   39|  5.05487182812813|
|         141|         236|                   37| 4.355675723101641|
|         239|         143|                   37| 4.252162224537617|
|         239|         142|                   35| 3.817714350564139|
|          79|         170|                   34| 6.394705884596881|
+------------+------------+---------------------+------------------+



In [11]:
distance_with_most_tip(df).show()

+-------------+------------------+
|trip_distance|       tip_percent|
+-------------+------------------+
|            1|17.129815971513313|
|            2|15.815527155632552|
|           17|15.796441782308916|
|           20| 15.11240992123345|
|            3|14.886705727113446|
|            6|14.579695131601051|
|            5|14.245405861990653|
|            4|13.831569507473274|
|            9|13.814476557648435|
|            8|12.072596772433315|
|           19|11.952632334985276|
|           10|11.880490518902954|
|            7| 10.80057562837643|
|           21|10.739019886973427|
|           18|10.696822158448429|
+-------------+------------------+



In [12]:
time_with_most_traffic(df).show()

+-----------+------------------+-------------------+
|time_of_day|      am_avg_speed|       pm_avg_speed|
+-----------+------------------+-------------------+
|          0| 9.377661781258988|               NULL|
|          1|10.845483413697353|  5.125214305177561|
|          3|              NULL|                0.0|
|          4|              NULL|                0.0|
|          5|              NULL| 0.5137660239764732|
|          6|              NULL|  9.989847870647605|
|          7|              NULL|0.18415305490417713|
|          8|              NULL| 0.5183127622697896|
|         10|              NULL| 0.6147483972627696|
|         11|              NULL|  4.650958285207579|
+-----------+------------------+-------------------+

