# HW3 - Q3 [35 pts]

## Important Notices

<div class="alert alert-block alert-danger">
    WARNING: <strong>REMOVE</strong> any print statements added to cells with "#export" that are used for debugging purposes befrore submitting because they will crash the autograder in Gradescope. Any additional cells can be used for testing purposes at the bottom. 
</div>

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> import any additional libraries into this workbook.
</div>

All instructions, code comments, etc. in this notebook **are part of the assignment instructions**. That is, if there is instructions about completing a task in this notebook, that task is not optional.  

<div class="alert alert-block alert-info">
    You <strong>must</strong> implement the following functions in this notebook to receive credit.
</div>

`user()`

`long_trips()`

`manhattan_trips()`

`weighted_profit()`

`final_output()`

Each method will be auto-graded using different sets of parameters or data, to ensure that values are not hard-coded.  You may assume we will only use your code to work with data from the NYC-TLC dataset during auto-grading.

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove or modify the following utility functions:
</div>

`load_data()`

`main()`

<div class="alert alert-block alert-info">
    Do <strong>not</strong> change the below cell. Run it to initialize your PySpark instance. If you don't get any output, make sure your Notebook's Kernel is set to "PySpark" in the top right corner.
</div>

In [25]:
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-0>

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remodify the below cell. It contains the function for loading data and all imports, and the function for running your code.
</div>

In [26]:
#export
from pyspark.sql.functions import col
from pyspark.sql import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
#### DO NOT CHANGE ANYTHING IN THIS CELL ####

def load_data(size='small'):
    # Loads the data for this question. Do not change this function.
    # This function should only be called with the parameter 'small' or 'large'
    
    if size != 'small' and size != 'large':
        print("Invalid size parameter provided. Use only 'small' or 'large'.")
        return
    
    input_bucket = "s3://cse6242-hw3-q3"
    
    # Load Trip Data
    trip_path = '/'+size+'/yellow_tripdata*'
    trips = spark.read.csv(input_bucket + trip_path, header=True, inferSchema=True)
    print("Trip Count: ",trips.count()) # Prints # of trips (# of records, as each record is one trip)
    
    # Load Lookup Data
    lookup_path = '/'+size+'/taxi*'
    lookup = spark.read.csv(input_bucket + lookup_path, header=True, inferSchema=True)
    
    return trips, lookup

def main(size, bucket):
    # Runs your functions implemented above.
    
    print(user())
    trips, lookup = load_data(size=size)
    trips = long_trips(trips)
    mtrips = manhattan_trips(trips, lookup)
    wp = weighted_profit(trips, mtrips)
    final = final_output(wp, lookup)
    
    # Outputs the results for you to visually see
    final.show()
    
    # Writes out as a CSV to your bucket.
    final.write.csv(bucket)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Implement the below functions for this assignment:
<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> change any function inputs or outputs, and ensure that the dataframes your code returns align with the schema definitions commented in each function. Do <strong>NOT</strong> remove the #export comment from each of the code blocks either. This can prevent your code from being converted to a python file.
</div>

## 3a. [1 pt] Update the `user()` function
This function should return your GT username, eg: gburdell3

In [28]:
#export
def user():
    # Returns a string consisting of your GT username.
    return 'ygan41'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3b. [2 pts] Update the `long_trips()` function
This function filters trips to keep only trips greater than or equal to 2 miles.

In [29]:
#export
def long_trips(trips):
    # Returns a Dataframe (trips) with Schema the same as :trips:
    trips = trips.where(col("trip_distance")>=2)
    return trips

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3c. [6 pts] Update the `manhattan_trips()` function

This function determines the top 20 locations with a `DOLocationID` in manhattan by passenger_count (pcount).

Example output formatting:

```
+--------------+--------+
| DOLocationID | pcount |
+--------------+--------+
|             5|      15|
|            16|      12| 
+--------------+--------+
```

In [30]:
#export
def manhattan_trips(trips, lookup):
    # Returns a Dataframe (mtrips) with Schema: DOLocationID, pcount
    cond = [trips.DOLocationID==lookup.LocationID]
    results = trips.join(lookup, cond)\
                    .filter(col("Borough") == "Manhattan")\
                    .withColumn("passenger_count",col("passenger_count").cast("int"))\
                    .groupBy("LocationID")\
                    .sum("passenger_count")\
                    .withColumnRenamed("LocationID","DOLocationID")\
                    .withColumnRenamed("sum(passenger_count)","pcount")\
                    .orderBy("pcount",ascending=False)\
                    .limit(20)
    return results

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3d. [6 pts] Update the `weighted_profit()` function
This function should determine the average `total_amount`, the total count of trips, and the total count of trips ending in the top 20 destinations and return the `weighted_profit` as discussed in the homework document.

Example output formatting:
```
+--------------+-------------------+
| PULocationID |  weighted_profit  |
+--------------+-------------------+
|            18| 33.784444421924436| 
|            12| 21.124577637149223| 
+--------------+-------------------+
```

In [31]:
#export
def weighted_profit(trips, mtrips): 
    # Returns a Dataframe (wp) with Schema: PULocationID, weighted_profit
    # Note: Use decimal datatype for weighted profit (NOTE: DON'T USE FLOAT)
    # Our grader will be only be checking the first 8 characters for each value in the dataframe
    trip_count = trips.groupBy("PULocationID")\
                      .count()
    
    trip_sum = trips.withColumn("total_amount", col("total_amount").cast("double"))\
                    .groupBy("PULocationID")\
                    .sum("total_amount")\
                    .withColumnRenamed("sum(total_amount)", "total_amount")
    
    trip_avg = trip_sum.join(trip_count, "PULocationID")\
                       .withColumn("avg_total_amount", col("total_amount") / col("count"))\
                       .drop("total_amount")\
                       .drop("count")

    trip_popular_count = trips.join(mtrips, "DOLocationID")\
                              .groupBy("PULocationID")\
                              .count()\
                              .withColumnRenamed("count", "pop_count")
                                
    trip_pop = trip_count.join(trip_popular_count, "PULocationID")\
                         .withColumn("avg_count", col("pop_count") / col("count"))\
                         .drop("pop_count")\
                         .drop("count")

    trip_final = trip_pop.join(trip_avg, "PULocationID")\
                         .withColumn("weighted_profit", col("avg_total_amount")*col("avg_count"))\
                         .orderBy("weighted_profit", ascending=False)\
                         .drop("avg_count")\
                         .drop("avg_total_amount")
    return trip_final

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3e. [5 pts] Update the `final_output()` function
This function will take the results of `weighted_profit`, links it to the `borough` and `zone` and returns the top 20 locations with the highest `weighted_profit`.

Example output formatting:
```
+------------+---------+-------------------+
|    Zone    | Borough |  weighted_profit  |
+----------------------+-------------------+
| JFK Airport|   Queens|  16.95897820117925|
|     Jamaica|   Queens| 14.879835188762488|
+------------+---------+-------------------+
```

In [32]:
#export
def final_output(wp, lookup): 
    # Returns a Dataframe (final) with Schema: Zone, Borough, weighted_profit
    # Note: Use decimal datatype for weighted profit (NOTE: DON'T USE FLOAT)
    # Our grader will be only be checking the first 8 characters for each value in the dataframe
    lookup_data = lookup.select("LocationID", "Zone", "Borough")
    wp_data = wp.withColumnRenamed("PULocationID", "LocationID")

    final_data = lookup_data.join(wp_data, "LocationID")\
                    .drop("LocationID")\
                    .orderBy("weighted_profit", ascending=False)\
                    .limit(20)
    return final_data

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Testing

<div class="alert alert-block alert-info">
    You may use the below cell for any additional testing you need to do, however any code implemented below will not be run or used when grading.
</div>

In [33]:
main("large","s3://cse6242-ygan41/output-large3")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ygan41
Trip Count:  187203269
+--------------------+-------------+------------------+
|                Zone|      Borough|   weighted_profit|
+--------------------+-------------+------------------+
|        Baisley Park|       Queens| 29.36045577913085|
|Flushing Meadows-...|       Queens|27.304845733617682|
|       South Jamaica|       Queens|26.294916239873476|
|     Randalls Island|    Manhattan| 24.15098994022752|
|        Astoria Park|       Queens| 21.70641711214753|
|Briarwood/Jamaica...|       Queens| 19.94506463178933|
|Springfield Garde...|       Queens|19.468309288781914|
|             Jamaica|       Queens|  19.2839430001379|
|              Corona|       Queens|18.228769248155974|
|   LaGuardia Airport|       Queens|18.181338808373003|
|         Jamaica Bay|       Queens|17.100529446757903|
|             Maspeth|       Queens|17.005450640079545|
|Eltingville/Annad...|Staten Island|16.837764756944445|
|         JFK Airport|       Queens| 16.77772534824964|
|        Battery P