# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [1]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)



Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # Handling missing values
    df = df.dropna(subset=['congestion_surcharge'])  # Drop rows with missing values in 'congestion_surcharge'
    
    # Handling date/time
    df = df.withColumn('pickup_datetime', to_timestamp(col('tpep_pickup_datetime'), format='yyyy-MM-dd HH:mm:ss'))  # Convert pickup datetime to timestamp
    df = df.withColumn('dropoff_datetime', to_timestamp(col('tpep_dropoff_datetime'), format='yyyy-MM-dd HH:mm:ss'))  # Convert dropoff datetime to timestamp
    df = df.drop('tpep_pickup_datetime', 'tpep_dropoff_datetime')  # Drop original datetime columns
    
    # Handle data types
    df = df.withColumn('passenger_count', col('passenger_count').cast('int'))  # Cast passenger_count to integer
    df = df.withColumn('total_amount', col('total_amount').cast('float'))  # Cast total_amount to float
    df = df.withColumn('tip_amount', col('tip_amount').cast('float'))  # Cast tip_amount to float
    df = df.withColumn('trip_distance', col('trip_distance').cast('float'))  # Cast trip_distance to float
    df = df.withColumn('fare_amount', col('fare_amount').cast('float'))  # Cast fare_amount to float
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [5]:
#export
from pyspark.sql.functions import count, sum, col, avg
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    # Group by pickup and dropoff location
    grouped_df = df.groupBy('PULocationID', 'DOLocationID')
    
    # Calculate total passengers and total amount per pair
    grouped_df = grouped_df.agg(
      count(col('passenger_count')).alias('total_passenger_count'),
      sum(col('fare_amount')).alias('total_amount')
    )
    
    # Calculate per-person rate (average fare amount per passenger)
    grouped_df = grouped_df.withColumn('per_person_rate', col('total_amount') / col('total_passenger_count'))
    
    return grouped_df.select('PULocationID', 'DOLocationID', 'total_passenger_count', 'per_person_rate')

### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [6]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    # Add a new column for tip percentage
    df = df.withColumn('tip_percent', (col('tip_amount') / col('fare_amount')) * 100)
    
    # Group by trip distance and calculate average tip percentage
    avg_tip_by_distance = df.groupBy('trip_distance').agg(
      avg(col('tip_percent')).alias('avg_tip_percent')
    )
    
    return avg_tip_by_distance.select('trip_distance', 'avg_tip_percent')

### Q1.d

Determine the average speed at different times of day.

In [7]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    # Extract hour of the day from the pickup datetime
    df = df.withColumn('hour_of_day', hour(col('pickup_datetime')))
    
    # Define AM/PM categories
    am_pm = when(col('hour_of_day') < 12, 'AM').otherwise('PM')
    df = df.withColumn('time_of_day', am_pm)
    
    # Calculate speed
    df = df.withColumn('speed', (col('trip_distance') / (col('dropoff_datetime').cast('long') - col('pickup_datetime').cast('long'))))
    
    # Group by AM/PM and calculate average speed
    avg_speed_by_time = df.groupBy('time_of_day').agg(
      avg(col('speed')).alias('avg_speed')
    )
    
    return avg_speed_by_time

## The below cells are for you to investigate your solutions and will not be graded

In [8]:
df = load_data()
df = clean_data(df)

In [9]:
common_pair(df).show()

+------------+------------+---------------------+---------------+
|PULocationID|DOLocationID|total_passenger_count|per_person_rate|
+------------+------------+---------------------+---------------+
+------------+------------+---------------------+---------------+



In [10]:
distance_with_most_tip(df).show()

+-------------+---------------+
|trip_distance|avg_tip_percent|
+-------------+---------------+
+-------------+---------------+



In [11]:
time_with_most_traffic(df).show()

+-----------+---------+
|time_of_day|avg_speed|
+-----------+---------+
+-----------+---------+

