# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [None]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

Initialize PySpark Context

In [None]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Define function for loading data

In [None]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.1

Perform data casting to clean incoming dataset

In [None]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    # Cast datetime columns to timestamp
    df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
    df = df.withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
    
    # Cast integer columns
    df = df.withColumn("VendorID", col("VendorID").cast("integer"))
    df = df.withColumn("passenger_count", col("passenger_count").cast("integer"))
    df = df.withColumn("RatecodeID", col("RatecodeID").cast("integer"))
    df = df.withColumn("PULocationID", col("PULocationID").cast("integer"))
    df = df.withColumn("DOLocationID", col("DOLocationID").cast("integer"))
    df = df.withColumn("payment_type", col("payment_type").cast("integer"))
    
    # Cast float columns (use float not double)
    df = df.withColumn("trip_distance", col("trip_distance").cast("float"))
    df = df.withColumn("fare_amount", col("fare_amount").cast("float"))
    df = df.withColumn("extra", col("extra").cast("float"))
    df = df.withColumn("mta_tax", col("mta_tax").cast("float"))
    df = df.withColumn("tip_amount", col("tip_amount").cast("float"))
    df = df.withColumn("tolls_amount", col("tolls_amount").cast("float"))
    df = df.withColumn("improvement_surcharge", col("improvement_surcharge").cast("float"))
    df = df.withColumn("total_amount", col("total_amount").cast("float"))

    # END YOUR CODE HERE -----------
    
    return df

### Q1.2

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [None]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import sum as _sum, desc
    
    # Group by pickup and dropoff location
    df = df.groupBy("PULocationID", "DOLocationID").agg(
        _sum("passenger_count").alias("total_passenger_count"),
        _sum("total_amount").alias("total_amount_sum")
    )
    
    # Calculate per person rate
    df = df.withColumn("per_person_rate", 
                       col("total_amount_sum") / col("total_passenger_count"))
    
    # Select only required columns
    df = df.select("PULocationID", "DOLocationID", "total_passenger_count", "per_person_rate")
    
    # Sort by total_passenger_count descending and limit to top 10
    df = df.orderBy(desc("total_passenger_count")).limit(10)
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.3

Find trips which trip distances generate the highest tip percentage.

In [None]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    tip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import avg, desc
    
    # Filter out records where fare_amount is 0 or null to avoid division by zero
    df = df.filter((col("fare_amount") > 0) & (col("fare_amount").isNotNull()))
    
    # Calculate tip percentage for each trip and round up using ceil
    df = df.withColumn("tip_percent", 
                       ceil((col("tip_amount") / col("fare_amount")) * 100))
    
    # Group by trip_distance and calculate average tip percentage
    df = df.groupBy("trip_distance").agg(
        avg("tip_percent").alias("tip_percent")
    )
    
    # Select only required columns
    df = df.select("trip_distance", "tip_percent")
    
    # Sort by tip_percent descending and limit to top 15
    df = df.orderBy(desc("tip_percent")).limit(15)
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.4

Determine the average speed at different times of day.

In [None]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    am_avg_speed and pm_avg_speed are the average trip distance / average trip time calculated for each hour
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import avg, unix_timestamp
    
    # Extract hour from pickup datetime
    df = df.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))
    
    # Calculate trip duration in hours
    df = df.withColumn("duration_seconds", 
                       unix_timestamp(col("tpep_dropoff_datetime")) - 
                       unix_timestamp(col("tpep_pickup_datetime")))
    df = df.withColumn("duration_hours", col("duration_seconds") / 3600)
    
    # Filter out invalid trips (zero or negative duration)
    df = df.filter((col("duration_hours") > 0) & (col("duration_hours").isNotNull()))
    
    # Create time_of_day (12-hour format: 0-11 for both AM and PM)
    df = df.withColumn("time_of_day", when(col("pickup_hour") >= 12, 
                                            col("pickup_hour") - 12).otherwise(col("pickup_hour")))
    
    # Create separate columns for AM and PM distances and durations
    df = df.withColumn("am_distance", when(col("pickup_hour") < 12, col("trip_distance")).otherwise(None))
    df = df.withColumn("pm_distance", when(col("pickup_hour") >= 12, col("trip_distance")).otherwise(None))
    df = df.withColumn("am_duration", when(col("pickup_hour") < 12, col("duration_hours")).otherwise(None))
    df = df.withColumn("pm_duration", when(col("pickup_hour") >= 12, col("duration_hours")).otherwise(None))
    
    # Group by time_of_day and calculate average distance / average time for AM and PM
    df = df.groupBy("time_of_day").agg(
        (avg("am_distance") / avg("am_duration")).alias("am_avg_speed"),
        (avg("pm_distance") / avg("pm_duration")).alias("pm_avg_speed")
    )
    
    # Select only required columns
    df = df.select("time_of_day", "am_avg_speed", "pm_avg_speed")
    
    # END YOUR CODE HERE -----------
    
    return df

## The below cells are for you to investigate your solutions and will not be graded

In [None]:
df = load_data()
df = clean_data(df)

In [None]:
common_pair(df).show()

In [None]:
distance_with_most_tip(df).show()

In [None]:
time_with_most_traffic(df).show()