# CSE6242 - HW3 - Q1

Pyspark Imports

In [1]:
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce, avg
from pyspark.sql.functions import *

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/23 20:23:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df =df.withColumn("passenger_count",col("passenger_count").cast('integer'))
    df =df.withColumn("total_amount",col("total_amount").cast('float'))
    df =df.withColumn("tip_amount",col("tip_amount").cast('float'))
    df =df.withColumn("trip_distance",col("trip_distance").cast('float'))
    df =df.withColumn("fare_amount",col("fare_amount").cast('float'))
    # from string to timestamp
    df=df.withColumn("tpep_pickup_datetime",col("tpep_pickup_datetime").cast('timestamp'))
    df=df.withColumn("tpep_dropoff_datetime",col("tpep_dropoff_datetime").cast('timestamp')) 

    # END YOUR CODE HERE -----------
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [5]:
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    df=df.groupBy("PULocationID",
                  "DOLocationID").agg({"total_amount":"avg","trip_distance": "avg", "PULocationID":"count"})
    df=df.withColumn("per_person_rate", col("avg(total_amount)") /
                     col("avg(trip_distance)"))
    df=df.sort(col('count(PULocationID)').desc(), col('per_person_rate').desc())
    df=df.select(col('PULocationID'), col('DOLocationID'),col('count(PULocationID)').alias('passenger_count'), col('per_person_rate')).limit(10)
    # END YOUR CODE HERE -----------
    
    return df

### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [6]:
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.filter((col("fare_amount").cast("double") > 2.0) & (col("trip_distance").cast("double") > 0))
    df = df.withColumn("tip_percent", (col("tip_amount").cast("double") * 100) / col("fare_amount").cast("double"))
    df = df.withColumn("rounded_trip_distance", round(col("trip_distance").cast("double")))
    df = df.groupBy("rounded_trip_distance").agg(
        avg("tip_percent").alias("avg_tip_percent")
    )
    df = df.orderBy(col("avg_tip_percent").desc())
    df = df.limit(15)
    df = df.select(col("rounded_trip_distance").alias("trip_distance"), col("avg_tip_percent").alias("tip_percent"))
    # END YOUR CODE HERE -----------
    
    return df

### Q1.d

Determine the average speed at different times of day.

In [22]:
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    # Extract the hour from the pickup time
    df = df.withColumn("hour", hour(col("tpep_pickup_datetime")))

    # Calculate trip time in seconds
    df = df.withColumn("trip_time_in_secs", (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))))

    # Categorize each trip as AM or PM based on the hour
    df = df.withColumn("ampm", when(col("hour") < 12, "AM").otherwise("PM"))

    # Calculate average speed (distance per hour) for each AM and PM
    df = df.groupBy("hour", "ampm").agg(
        avg(col("trip_distance") / (col("trip_time_in_secs") / 3600)).alias("avg_speed")
    )

    # Format the time of day in 12-hour time
    df = df.withColumn("formatted_hour", when(col("hour") < 12, col("hour")).otherwise(col("hour") - 12))

    # Pivot the data to have AM and PM columns
    df = df.groupBy("formatted_hour").pivot("ampm").agg(avg(col("avg_speed")))

    # Ensure all hours from 0-11 are included (AM and PM)
    df = df.union(
        df.select(
            (col("formatted_hour") + 12).alias("formatted_hour"),
            col("AM"),
            col("PM")
        )
    ).orderBy("formatted_hour")

    # Rename the columns
    df = df.select(
        col("formatted_hour").alias("time_of_day"),
        col("AM").alias("am_avg_speed"),
        col("PM").alias("pm_avg_speed")
    )
    # END YOUR CODE HERE -----------
    
    return df

### The below cells are for you to investigate your solutions and will not be graded
## Ensure they are commented out prior to submitting to Gradescope to avoid errors

In [8]:
##df = load_data()
##df = clean_data(df)

In [9]:
#common_pair(df).show()

[Stage 1:>                                                          (0 + 1) / 1]

+------------+------------+---------------+------------------+
|PULocationID|DOLocationID|passenger_count|   per_person_rate|
+------------+------------+---------------+------------------+
|         264|         264|             97| 5.482259531398455|
|         239|         238|             34| 8.395489315120459|
|         237|         236|             34|7.1150794250423965|
|         236|         236|             24|12.230708730972086|
|          79|          79|             23|10.641212116864102|
|         142|         239|             23|10.056728351507015|
|         148|          79|             23|  9.72959679025766|
|         263|         141|             23| 7.301437441278104|
|         141|         263|             22| 6.897755674061171|
|         170|         170|             21| 9.681594815392343|
+------------+------------+---------------+------------------+



                                                                                

In [10]:
#distance_with_most_tip(df).show()

+-------------+------------------+
|trip_distance|       tip_percent|
+-------------+------------------+
|          1.0|16.948128912934433|
|          0.0| 15.42138785436327|
|          2.0|15.316306992357612|
|         17.0|15.221903908010676|
|          5.0| 14.79129696426021|
|          3.0|14.467840957137605|
|         21.0|14.318693304544306|
|         19.0|14.024168248325521|
|          9.0|13.566757631743657|
|          4.0|13.548640673302767|
|          6.0|13.301328970560771|
|          8.0|11.935883822973539|
|         23.0|11.666666666666666|
|         10.0|11.469710538185623|
|         18.0| 11.40584794093632|
+-------------+------------------+



In [23]:
#time_with_most_traffic(df).show()

+-----------+-----------------+-------------------+
|time_of_day|     am_avg_speed|       pm_avg_speed|
+-----------+-----------------+-------------------+
|          0|12.17564301026818|               NULL|
|          1|10.87329879921813|  4.486486396274051|
|          3|             NULL|                0.0|
|          4|             NULL|                0.0|
|          5|             NULL| 1.8609359136845776|
|          6|             NULL|  9.989847870647605|
|          7|             NULL|0.18415305490417713|
|          8|             NULL|  6.433583983527856|
|         10|             NULL| 11.452775401547134|
|         11|             NULL|  13.29377144720091|
|         12|12.17564301026818|               NULL|
|         13|10.87329879921813|  4.486486396274051|
|         15|             NULL|                0.0|
|         16|             NULL|                0.0|
|         17|             NULL| 1.8609359136845776|
|         18|             NULL|  9.989847870647605|
|         19