# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [5]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

Initialize PySpark Context

In [6]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=HW3-Q1, master=local[*]) created by __init__ at /tmp/ipykernel_24/1694043449.py:2 

Define function for loading data

In [7]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.1

Perform data casting to clean incoming dataset

In [8]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df = (
        df
        .withColumn("passenger_count", col("passenger_count").cast("int"))
        .withColumn("total_amount", col("total_amount").cast("float"))
        .withColumn("tip_amount", col("tip_amount").cast("float"))
        .withColumn("trip_distance", col("trip_distance").cast("float"))
        .withColumn("fare_amount", col("fare_amount").cast("float"))
        .withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
        .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
    )

    # END YOUR CODE HERE -----------
    
    return df

### Q1.2

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [9]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import functions as F

    df_filtered = df.where(col("PULocationID") != col("DOLocationID"))

    agg_df = (
        df_filtered
        .groupBy("PULocationID", "DOLocationID")
        .agg(
            F.sum("passenger_count").alias("total_passenger_count"),
            F.sum("total_amount").alias("sum_total_amount"),
        )
        .withColumn(
            "per_person_rate",
            F.when(col("total_passenger_count") > 0,
                   col("sum_total_amount") / col("total_passenger_count"))
             .otherwise(F.lit(None))
        )
        .select(
            "PULocationID",
            "DOLocationID",
            "total_passenger_count",
            "per_person_rate",
        )
        .orderBy(col("total_passenger_count").desc(), col("per_person_rate").desc())
        .limit(10)
    )

    df = agg_df
    # END YOUR CODE HERE -----------
    
    return df

### Q1.3

Find trips which trip distances generate the highest tip percentage.

In [10]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import functions as F

    filtered = df.where((col("fare_amount") > 2.0) & (col("trip_distance") > 0))

    with_tip = filtered.withColumn(
        "tip_percent",
        (col("tip_amount") * F.lit(100.0)) / col("fare_amount")
    )

    by_distance = (
        with_tip
        .withColumn("rounded_distance", F.ceil(col("trip_distance")))
        .groupBy("rounded_distance")
        .agg(F.avg("tip_percent").alias("tip_percent"))
        .orderBy(col("tip_percent").desc())
        .limit(15)
        .select(col("rounded_distance").alias("trip_distance"), col("tip_percent"))
    )

    df = by_distance
    # END YOUR CODE HERE -----------
    
    return df

### Q1.4

Determine the average speed at different times of day.

In [11]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    am_avg_speed and pm_avg_speed are the average trip distance / average trip time calculated for each hour
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import functions as F

    with_time = (
        df
        .withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))
        .withColumn("time_of_day", date_format(col("tpep_pickup_datetime"), "K"))
        .withColumn(
            "period",
            when(col("pickup_hour") < 12, F.lit("AM")).otherwise(F.lit("PM"))
        )
        .withColumn(
            "duration_hours",
            (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / F.lit(3600.0)
        )
    )

    per_group = (
        with_time
        .groupBy("time_of_day", "period")
        .agg(
            F.avg("trip_distance").alias("avg_trip_distance"),
            F.avg("duration_hours").alias("avg_duration_hours"),
        )
        .withColumn(
            "avg_speed",
            when(col("avg_duration_hours") > 0,
                 col("avg_trip_distance") / col("avg_duration_hours")).otherwise(F.lit(0.0))
        )
        .select("time_of_day", "period", "avg_speed")
    )

    result = (
        per_group
        .groupBy("time_of_day")
        .pivot("period", ["AM", "PM"]).agg(F.first("avg_speed"))
        .withColumnRenamed("AM", "am_avg_speed")
        .withColumnRenamed("PM", "pm_avg_speed")
    )

    # order rows by time_of_day numerically 0..11
    result = result.withColumn("_td", col("time_of_day").cast("int")).orderBy(col("_td")).drop("_td")

    df = result
    # END YOUR CODE HERE -----------
    
    return df

## The below cells are for you to investigate your solutions and will not be graded

In [13]:
df = load_data()
df = clean_data(df)

In [14]:
common_pair(df).show()

[Stage 1:>                                                          (0 + 1) / 1]

+------------+------------+---------------------+------------------+
|PULocationID|DOLocationID|total_passenger_count|   per_person_rate|
+------------+------------+---------------------+------------------+
|         239|         238|                   62|  4.26274198870505|
|         237|         236|                   60| 4.482500068346659|
|         263|         141|                   52|3.4190384974846473|
|         161|         236|                   42| 5.368571440378825|
|         148|          79|                   42| 4.711904752822149|
|         142|         238|                   39|  5.05487182812813|
|         141|         236|                   37| 4.355675723101641|
|         239|         143|                   37| 4.252162224537617|
|         239|         142|                   35| 3.817714350564139|
|          79|         170|                   34| 6.394705884596881|
+------------+------------+---------------------+------------------+



                                                                                

In [15]:
distance_with_most_tip(df).show()

+-------------+------------------+
|trip_distance|       tip_percent|
+-------------+------------------+
|            1|17.129815992473326|
|            2| 15.81552712528758|
|           17|15.796441904884075|
|           20|15.112410000027054|
|            3|14.886705735873237|
|            6|14.579695033034238|
|            5|14.245405810737791|
|            4|13.831569499212133|
|            9|13.814476541860179|
|            8| 12.07259673796427|
|           19| 11.95263232603509|
|           10|11.880490472296412|
|            7|10.800575637356776|
|           21| 10.73901997840823|
|           18|10.696822232896201|
+-------------+------------------+



In [16]:
time_with_most_traffic(df).show()

+-----------+------------------+-------------------+
|time_of_day|      am_avg_speed|       pm_avg_speed|
+-----------+------------------+-------------------+
|          0| 9.377696196631234|               NULL|
|          1|10.845483413697353|  5.125214305177561|
|          3|              NULL|                0.0|
|          4|              NULL|                0.0|
|          5|              NULL| 0.5137660239764732|
|          6|              NULL|  9.989847870647605|
|          7|              NULL|0.18415305490417713|
|          8|              NULL| 0.5183127622697896|
|         10|              NULL| 0.6147483972627696|
|         11|              NULL|  4.650958285207579|
+-----------+------------------+-------------------+

