# CSE6242 - HW3 - Q1

Pyspark Imports

In [1]:
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/29 18:26:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn("passenger_count", df['passenger_count'].cast("integer"))
    df = df.withColumn("total_amount", df['total_amount'].cast("float"))
    df = df.withColumn("tip_amount", df['tip_amount'].cast("float"))
    df = df.withColumn("fare_amount", df['fare_amount'].cast("float"))
    df = df.withColumn("trip_distance", df['trip_distance'].cast("float"))
    df = df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime"))
    df = df.withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [5]:
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''    
    # START YOUR CODE HERE ---------
    df = df.filter(col("PULocationID") != col("DOLocationID"))
    
    grouped_df = df.groupBy("PULocationID", "DOLocationID")
    df = grouped_df.agg({"passenger_count": "sum", "total_amount": "avg"})

    df = df.withColumnRenamed("sum(passenger_count)", "passenger_count")
    df = df.withColumnRenamed("avg(total_amount)", "per_person_rate")
    
    df = df.orderBy(["passenger_count", "per_person_rate"], ascending=[False, False]).limit(10)
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [6]:
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.filter((col("fare_amount") > 2) & (col("trip_distance") > 0))    
    df = df.withColumn("tip_p", (col("tip_amount") * 100) / col("fare_amount"))
    df = df.withColumn("trip_distance", pyspark.sql.functions.ceil(col("trip_distance")))
    
    group_df = df.groupBy("trip_distance")
    df = group_df.agg({"tip_p": "avg"})
    df = df.orderBy("avg(tip_p)", ascending=False).limit(15)
    
    df = df.withColumnRenamed("avg(tip_p)", "tip_percent")    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.d

Determine the average speed at different times of day.

In [21]:
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    df = df.select("tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance")
    df = df.withColumn("p_hour", hour("tpep_pickup_datetime"))
    df = df.withColumn("time_diff", (col("tpep_dropoff_datetime").cast("double")-col("tpep_pickup_datetime").cast("double"))/3600.0)
    
    df_am = df.filter(df["p_hour"]<12)
    df_am = df_am.withColumn("t_of_day", date_format(df["tpep_pickup_datetime"], "KK"))
    df_pm = df.filter(df["p_hour"]>=12)
    df_pm = df_pm.withColumn("t_of_day", date_format(df["tpep_pickup_datetime"], "KK"))

    df_am = df_am.groupBy("t_of_day").agg({"time_diff":"avg", "trip_distance":"avg"})
    df_pm = df_pm.groupBy("t_of_day").agg({"time_diff":"avg", "trip_distance":"avg"})

    df_am = df_am.withColumnRenamed("avg(time_diff)", "am_time")
    df_pm = df_pm.withColumnRenamed("avg(time_diff)", "pm_time")

    df_am = df_am.withColumnRenamed("avg(trip_distance)", "am_trip")
    df_pm = df_pm.withColumnRenamed("avg(trip_distance)", "pm_trip")

    df_am.show()
    df_pm.show()

    df = df_am.join(df_pm, df_am["t_of_day"] == df_pm["t_of_day"], "full_outer")

    df = df.withColumn("am_avg_speed", col("am_trip")/col("am_time"))
    df = df.withColumn("pm_avg_speed", col("pm_trip")/col("pm_time"))
    df = df.withColumn("time_of_day", when(df_am["t_of_day"].isNull(), df_pm["t_of_day"]).otherwise(df_am["t_of_day"]))

    df = df.select(df["time_of_day"], "am_avg_speed", "pm_avg_speed")

    df = df.orderBy("time_of_day", ascending=True)
    # END YOUR CODE HERE -----------
    
    return df

### The below cells are for you to investigate your solutions and will not be graded
## Ensure they are commented out prior to submitting to Gradescope to avoid errors