In [326]:
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-0>

In [327]:
#export
from pyspark.sql.functions import col
from pyspark.sql import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [328]:
#### DO NOT CHANGE ANYTHING IN THIS CELL ####

def load_data(size='small'):
    # Loads the data for this question. Do not change this function.
    # This function should only be called with the parameter 'small' or 'large'
    
    if size != 'small' and size != 'large':
        print("Invalid size parameter provided. Use only 'small' or 'large'.")
        return
    
    input_bucket = "s3://nyc-taxi-trip"
    
    # Load Trip Data
    trip_path = '/'+size+'/yellow_tripdata*'
    trips = spark.read.csv(input_bucket + trip_path, header=True, inferSchema=True)
    print("Trip Count: ",trips.count()) # Prints # of trips (# of records, as each record is one trip)
    
    # Load Lookup Data
    lookup_path = '/'+size+'/taxi*'
    lookup = spark.read.csv(input_bucket + lookup_path, header=True, inferSchema=True)
    
    return trips, lookup

def main(size, bucket):
    # Runs your functions implemented above.
    
    print(user())
    trips, lookup = load_data(size=size)
    trips = long_trips(trips)
    mtrips = manhattan_trips(trips, lookup)
    wp = weighted_profit(trips, mtrips)
    final = final_output(wp, lookup)
    
    # Outputs the results for you to visually see
    final.show()
    
    # Writes out as a CSV to your bucket.
    final.write.csv(bucket)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [330]:
#export
def long_trips(trips):
    # Returns a Dataframe (trips) with Schema the same as :trips:
    
    trips = trips.filter(col("trip_distance") >= 2)
    return trips

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [331]:
#export
def manhattan_trips(trips, lookup):
    # Returns a Dataframe (mtrips) with Schema: DOLocationID, pcount
    
    mtrips = trips.withColumn("passenger_count", col("passenger_count").cast("Int")).join(lookup, [trips.DOLocationID == lookup.LocationID]).filter(lookup.Borough == "Manhattan").groupBy("DOLocationID").sum("passenger_count") \
                    .orderBy(col("sum(passenger_count)").desc()).select("DOLocationID", col("sum(passenger_count)").alias("pcount")).limit(20)
    
    return mtrips

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [332]:
#export
def weighted_profit(trips, mtrips): 
    # Returns a Dataframe (wp) with Schema: PULocationID, weighted_profit
    # Note: Use decimal datatype for weighted profit (NOTE: DON'T USE FLOAT)
    # Our grader will be only be checking the first 8 characters for each value in the dataframe
    
    cnt_total_trips = trips.groupBy("PULocationID").count().withColumn("cnt_total_trips", col("count")).select("PULocationID", "cnt_total_trips")
    avg_total_amount = trips.withColumn("total_amount", col("total_amount").cast("double")).groupBy("PULocationID").agg({"total_amount" : 'avg'}) \
                            .withColumn("avg_total_amount", col("avg(total_amount)").cast("decimal(38, 10)")).select("PULocationID", "avg_total_amount")
    
    wp = mtrips.join(trips, mtrips.DOLocationID == trips.DOLocationID).groupBy(col("PULocationID")).count()
    wp = wp.withColumn("cnt_top_trips", col("count")).join(cnt_total_trips, wp.PULocationID == cnt_total_trips.PULocationID, "inner").select(wp.PULocationID, col("cnt_top_trips"), col("cnt_total_trips"))
    wp = wp.join(avg_total_amount, wp.PULocationID == avg_total_amount.PULocationID).select(wp.PULocationID, col("cnt_top_trips"), col("cnt_total_trips"), col("avg_total_amount"))

    wp = wp.withColumn("weighted_profit", (col("avg_total_amount")*(col("cnt_top_trips")/col("cnt_total_trips"))).cast("double")).select("PULocationID", col("weighted_profit"))
    
    return wp

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [333]:
#export
def final_output(wp, lookup): 
    # Returns a Dataframe (final) with Schema: Zone, Borough, weighted_profit
    # Note: Use decimal datatype for weighted profit (NOTE: DON'T USE FLOAT)
    # Our grader will be only be checking the first 8 characters for each value in the dataframe
    
    final = wp.join(lookup, wp.PULocationID == lookup.LocationID).select("Zone", "Borough", "weighted_profit").orderBy(wp.weighted_profit.desc()).limit(20)
    
    return final

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [334]:
trips, lookup = load_data(size='large')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Trip Count:  187203269

In [335]:
trips = long_trips(trips)
mtrips = manhattan_trips(trips, lookup)
wp = weighted_profit(trips, mtrips)
final = final_output(wp, lookup)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [337]:
trips_small, lookup_small = load_data(size='small')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Trip Count:  7667792

In [338]:
trips_small = long_trips(trips_small)
mtrips_small = manhattan_trips(trips_small, lookup_small)
wp_small = weighted_profit(trips_small, mtrips_small)
final = final_output(wp_small, lookup_small)
final.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------+------------------+
|                Zone|      Borough|   weighted_profit|
+--------------------+-------------+------------------+
|Arrochar/Fort Wad...|Staten Island|31.784444444433333|
|     Freshkills Park|Staten Island|           29.3425|
|       Port Richmond|Staten Island|24.357777777766664|
|Flushing Meadows-...|       Queens|24.330137731667936|
|        Baisley Park|       Queens| 23.62469433348418|
|     Randalls Island|    Manhattan| 23.12457751515951|
|       South Jamaica|       Queens| 22.17411493945043|
|     Mariners Harbor|Staten Island|21.786556927288885|
|Saint George/New ...|Staten Island|21.491893491126923|
|       Arden Heights|Staten Island|          20.29625|
|        Astoria Park|       Queens|19.965360000000004|
|   LaGuardia Airport|       Queens|17.923153949607844|
|  Murray Hill-Queens|       Queens|17.585450424363888|
|         JFK Airport|       Queens|16.958978201182898|
|             Jamaica|       Queens|14.879835188