# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [None]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

Initialize PySpark Context

In [None]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Define function for loading data

In [1]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.1

Perform data casting to clean incoming dataset

In [1]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn('passenger_count', col('passenger_count').cast('int'))
    df = df.withColumn('total_amount', col('total_amount').cast('float'))
    df = df.withColumn('tip_amount', col('tip_amount').cast('float'))
    df = df.withColumn('trip_distance', col('trip_distance').cast('float'))
    df = df.withColumn('fare_amount', col('fare_amount').cast('float'))
    df = df.withColumn('tpep_pickup_datetime', col('tpep_pickup_datetime').cast('timestamp'))
    df = df.withColumn('tpep_dropoff_datetime', col('tpep_dropoff_datetime').cast('timestamp'))
    # END YOUR CODE HERE -----------
    
    return df

### Q1.2

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [None]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.filter(col("PULocationID") != col("DOLocationID"))
    df = df.groupBy("PULocationID", "DOLocationID").agg({
        "passenger_count": "sum",
        "total_amount": "sum"
    })
    df = df.withColumnRenamed("sum(passenger_count)", "total_passenger_count") \
           .withColumnRenamed("sum(total_amount)", "sum_total_amount")
    df = df.withColumn("per_person_rate", col("sum_total_amount") / col("total_passenger_count"))
    df = df.orderBy(col("total_passenger_count").desc(), col("per_person_rate").desc())
    df = df.select("PULocationID", "DOLocationID", "total_passenger_count", "per_person_rate").limit(10)
    # END YOUR CODE HERE -----------
    
    return df

### Q1.3

Find trips which trip distances generate the highest tip percentage.

In [None]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.filter(col("fare_amount") > 2.00)
    df = df.filter(col("trip_distance") > 0)
    df = df.withColumn("tip_percent_calc", (col("tip_amount") * 100) / col("fare_amount"))
    df = df.withColumn("rounded_distance", ceil(col("trip_distance")))
    df = df.groupBy("rounded_distance").agg({"tip_percent_calc": "avg"})
    df = df.withColumnRenamed("rounded_distance", "trip_distance") \
           .withColumnRenamed("avg(tip_percent_calc)", "tip_percent")
    df = df.select("trip_distance", "tip_percent").orderBy(col("tip_percent").desc()).limit(15)
    # END YOUR CODE HERE -----------
    
    return df

### Q1.4

Determine the average speed at different times of day.

In [None]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    am_avg_speed and pm_avg_speed are the average trip distance / average trip time calculated for each hour
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn("hour", hour(col("pickup_datetime")))
    df = df.withColumn("duration_hours", 
                       (unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))) / 3600.0)
    
    df = df.filter((col("duration_hours") > 0) & (col("trip_distance") > 0))
    
    am_speeds = df.filter(col("hour") < 12) \
        .groupBy(col("hour").alias("hour_am")) \
        .agg(
            avg("trip_distance").alias("avg_distance_am"),
            avg("duration_hours").alias("avg_duration_am")
        ) \
        .withColumn("am_avg_speed", col("avg_distance_am") / col("avg_duration_am"))

    pm_speeds = df.filter(col("hour") >= 12) \
        .withColumn("hour_pm", col("hour") - 12) \
        .groupBy("hour_pm") \
        .agg(
            avg("trip_distance").alias("avg_distance_pm"),
            avg("duration_hours").alias("avg_duration_pm")
        ) \
        .withColumn("pm_avg_speed", col("avg_distance_pm") / col("avg_duration_pm"))

    result = am_speeds.select(
        col("hour_am").alias("hour"),
        col("am_avg_speed")
    ).join(
        pm_speeds.select(
            col("hour_pm").alias("hour"),
            col("pm_avg_speed")
        ),
        "hour",
        "outer"
    )

    result = result.withColumn("time_of_day", 
        date_format(
            to_timestamp(concat(lpad(col("hour"), 2, "0"), lit(":00:00")), "HH:mm:ss"),
            "h"
        )
    )

    df = result.select(
        "time_of_day",
        "am_avg_speed",
        "pm_avg_speed"
    ).orderBy("hour")
        # END YOUR CODE HERE -----------

    return df

## The below cells are for you to investigate your solutions and will not be graded

In [None]:
df = load_data()
df = clean_data(df)

In [None]:
common_pair(df).show()

In [None]:
distance_with_most_tip(df).show()

In [None]:
time_with_most_traffic(df).show()