# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [1]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.1

Perform data casting to clean incoming dataset

In [4]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------

    df = df.withColumn("passenger_count", col("passenger_count").cast("integer"))
    df = df.withColumn("total_amount", col("total_amount").cast("float"))
    df = df.withColumn("tip_amount", col("tip_amount").cast("float"))
    df = df.withColumn("trip_distance", col("trip_distance").cast("float"))
    df = df.withColumn("fare_amount", col("fare_amount").cast("float"))
    df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
    df = df.withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))

    # END YOUR CODE HERE -----------
    
    return df

### Q1.2

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [5]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    grouped_df = df.groupBy("PULocationID", "DOLocationID") \
                   .agg({"passenger_count": "sum", "total_amount": "sum"}) \
                   .withColumnRenamed("sum(passenger_count)", "total_passenger_count") \
                   .withColumn("per_person_rate", col("sum(total_amount)") / col("total_passenger_count"))
    
    filtered_df = grouped_df.filter(col("PULocationID") != col("DOLocationID"))
    
    sorted_df = filtered_df.orderBy(col("total_passenger_count").desc(), col("per_person_rate").desc()).drop("sum(total_amount)")
    
    df = sorted_df.limit(10)
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.3

Find trips which trip distances generate the highest tip percentage.

In [6]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    
    filtered_df = df.filter((col("fare_amount") > 2.00) & (col("trip_distance") > 0))
    
    # Calculate tip_percent for each trip
    df_with_tip_percent = filtered_df.withColumn("tip_percent", (col("tip_amount") * 100) / col("fare_amount"))
    
    # Round trip distance up to the nearest mile
    df_with_rounded_distance = df_with_tip_percent.withColumn("trip_distance", ceil(col("trip_distance")))
    
    # Group by rounded trip distance and calculate the average tip_percent
    df_grouped = df_with_rounded_distance.groupBy("trip_distance") \
                   .agg({"tip_percent": "avg"}) \
                   .withColumnRenamed("avg(tip_percent)", "tip_percent")
    
    # Sort by tip_percent in descending order and limit to top 15
    df = df_grouped.orderBy(col("tip_percent").desc()).limit(15)
    

    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.4

Determine the average speed at different times of day.

In [24]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    am_avg_speed and pm_avg_speed are the average trip distance / average trip time calculated for each hour
    
    '''
    # START YOUR CODE HERE ---------
    
    df = df.withColumn('pickup_hour', hour(to_timestamp(col('tpep_pickup_datetime')))) \
           .withColumn('trip_duration', 
                       (col('tpep_dropoff_datetime').cast('long') - col('tpep_pickup_datetime').cast('long')))
    
    # Calculate average speed
    df = df.withColumn('avg_speed', col('trip_distance') / (col('trip_duration') / 3600))

    # Create a formatted time_of_day column with 12-hour format and AM/PM notation
    df = df.withColumn('time_of_day', date_format(col('tpep_pickup_datetime'), 'h'))

    # Separate AM and PM speeds
    df_am = df.filter(col('pickup_hour') < 12).groupBy('time_of_day') \
              .agg(pyspark.sql.functions.avg('avg_speed').alias('am_avg_speed'))
    df_pm = df.filter(col('pickup_hour') >= 12).groupBy('time_of_day') \
              .agg(pyspark.sql.functions.avg('avg_speed').alias('pm_avg_speed'))
    
    # Join AM and PM dataframes on time_of_day
    df = df_am.join(df_pm, on='time_of_day', how='outer').orderBy('time_of_day')

    
    # END YOUR CODE HERE -----------
    
    return df

## The below cells are for you to investigate your solutions and will not be graded

In [8]:
df = load_data()
df = clean_data(df)
df.dtypes

[('VendorID', 'string'),
 ('tpep_pickup_datetime', 'timestamp'),
 ('tpep_dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('trip_distance', 'float'),
 ('RatecodeID', 'string'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'string'),
 ('DOLocationID', 'string'),
 ('payment_type', 'string'),
 ('fare_amount', 'float'),
 ('extra', 'string'),
 ('mta_tax', 'string'),
 ('tip_amount', 'float'),
 ('tolls_amount', 'string'),
 ('improvement_surcharge', 'string'),
 ('total_amount', 'float'),
 ('congestion_surcharge', 'string')]

In [9]:
common_pair(df).show()

+------------+------------+---------------------+------------------+
|PULocationID|DOLocationID|total_passenger_count|   per_person_rate|
+------------+------------+---------------------+------------------+
|         239|         238|                   62|  4.26274198870505|
|         237|         236|                   60| 4.482500068346659|
|         263|         141|                   52|3.4190384974846473|
|         161|         236|                   42| 5.368571440378825|
|         148|          79|                   42| 4.711904752822149|
|         142|         238|                   39|  5.05487182812813|
|         141|         236|                   37| 4.355675723101641|
|         239|         143|                   37| 4.252162224537617|
|         239|         142|                   35| 3.817714350564139|
|          79|         170|                   34| 6.394705884596881|
+------------+------------+---------------------+------------------+



In [10]:
distance_with_most_tip(df).show()

+-------------+------------------+
|trip_distance|       tip_percent|
+-------------+------------------+
|            1|17.129815971513313|
|            2|15.815527155632552|
|           17|15.796441782308916|
|           20| 15.11240992123345|
|            3|14.886705727113446|
|            6|14.579695131601051|
|            5|14.245405861990653|
|            4|13.831569507473274|
|            9|13.814476557648435|
|            8|12.072596772433315|
|           19|11.952632334985276|
|           10|11.880490518902954|
|            7| 10.80057562837643|
|           21|10.739019886973427|
|           18|10.696822158448429|
+-------------+------------------+



In [25]:
time_with_most_traffic(df).show()

+-----------+-----------------+-------------------+
|time_of_day|     am_avg_speed|       pm_avg_speed|
+-----------+-----------------+-------------------+
|          1|10.87329879921813|  4.486486396274051|
|         10|             null| 11.452775401547134|
|         11|             null|  13.29377144720091|
|         12|12.17564301026818|               null|
|          3|             null|                0.0|
|          4|             null|                0.0|
|          5|             null| 1.8609359136845776|
|          6|             null|  9.989847870647605|
|          7|             null|0.18415305490417713|
|          8|             null|  6.433583983527856|
+-----------+-----------------+-------------------+

