# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [None]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce
from pyspark.sql.functions import *

Initialize PySpark Context

In [None]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Define function for loading data

In [None]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

In [None]:
df = load_data()

### Q1.a

Perform data casting to clean incoming dataset

In [None]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn("passenger_count",df.passenger_count.cast('int'))
    # from string to float
    df = df.withColumn("total_amount",df.total_amount.cast('float'))    
    df = df.withColumn("tip_amount",df.tip_amount.cast('float')) 
    df = df.withColumn("trip_distance",df.trip_distance.cast('float')) 
    df = df.withColumn("fare_amount",df.fare_amount.cast('float')) 
    
    df = df.withColumn("tpep_pickup_datetime",to_timestamp("tpep_pickup_datetime")) 
    df = df.withColumn("tpep_dropoff_datetime",to_timestamp("tpep_dropoff_datetime")) 

    # END YOUR CODE HERE -----------
    
    return df

In [None]:
df = clean_data(df)

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [None]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.filter(col("PULocationID") != col("DOLocationID")) \
           .groupBy("PULocationID", "DOLocationID") \
           .agg(sum("passenger_count").alias("total_passenger_count"),
                avg("total_amount").alias("average_total_amount"),
                sum("passenger_count").alias("total_passengers")) \
           .withColumn("per_person_rate", col("average_total_amount") / col("total_passengers")) \
           .orderBy(col("total_passenger_count").desc(), col("per_person_rate").desc()) \
           .select("PULocationID", "DOLocationID", "total_passenger_count", "per_person_rate") \
           .limit(10)
    # END YOUR CODE HERE -----------
    
    return df

### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [None]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.filter((col("fare_amount") > 2.00) & (col("trip_distance") > 0)) \
            .withColumn("rounded_trip_distance", round(col("trip_distance") + 0.5)) \
            .groupBy("rounded_trip_distance") \
            .agg(avg((col("tip_amount") * 100 / col("fare_amount"))).alias("tip_percent")) \
            .orderBy(col("tip_percent").desc()) \
            .limit(15) \
            .select(col("rounded_trip_distance").alias("trip_distance"), col("tip_percent"))
    # END YOUR CODE HERE -----------
    
    return df

### Q1.d

Determine the average speed at different times of day.

In [None]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn("trip_time", (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 3600)
    
    # Categorize hours as AM or PM based on pickup time
    df = df.withColumn("hour_of_day", hour("tpep_pickup_datetime"))
    df = df.withColumn("time_of_day", date_format("tpep_pickup_datetime", "h a"))
    
    # Calculate average speed for trips starting in the morning (AM) and afternoon (PM)
    df = df.groupBy("time_of_day") \
           .agg(avg("trip_distance").alias("avg_distance"), avg("trip_time").alias("avg_time")) \
           .withColumn("avg_speed", col("avg_distance") / col("avg_time"))
    
    # Categorize AM and PM average speeds
    df = df.withColumn("am_avg_speed", when(col("hour_of_day") < 12, col("avg_speed")).otherwise(None)) \
           .withColumn("pm_avg_speed", when(col("hour_of_day") >= 12, col("avg_speed")).otherwise(None)) \
           .drop("hour_of_day", "avg_distance", "avg_time", "avg_speed")
    # END YOUR CODE HERE -----------
    
    return df

## The below cells are for you to investigate your solutions and will not be graded

In [None]:
df = load_data()
df = clean_data(df)

In [None]:
common_pair(df).show()

In [None]:
distance_with_most_tip(df).show()

In [None]:
time_with_most_traffic(df).show()