#### Pyspark Imports
<span style="color:red">*Please don't modify the below cell*</span>

In [None]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import *

#### Define Spark Context
<span style="color:red">*Please don't modify the below cell*</span>

In [None]:
sc
sqlContext = SQLContext(sc)

In [None]:
### Student Section - Please compete all the functions below

#### Function to return GT Username

#### Function to load data

In [None]:
#export
def load_data(gcp_storage_path):
    """
        :param gcp_storage_path: string (full gs path including file name e.g gs://bucket_name/data.csv) 
        :return: spark dataframe  
    """
    ################################################################
    # code to load yellow_tripdata_2019-01.csv data from your GCP  #
    # storage bucket                                               #      
    ################################################################
    df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
          .csv(gcp_storage_path)
    return df

#### Function to exclude trips that don't have a pickup location

In [None]:
#export
def exclude_no_pickuplocations(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: spark dataframe  
    """
    ################################################################
    # code to exclude trips with no pickup locations               #
    # Note: Exclude nulls and zeros                                #        
    ################################################################
    df = df.na.drop(subset = 'pulocationid')
    df = df.filter(df.pulocationid != 0)

    return df

#### Function to exclude trips with no distance

In [None]:
#export
def exclude_no_tripdistance(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: spark dataframe  
    """
    ################################################################
    # code to exclude trips with no trip distances                 #
    # Note: Exclude nulls and zeros                                #        
    ################################################################
    df = exclude_no_pickuplocations(df)
    df = df.withColumn("trip_distance",df.trip_distance.cast('decimal(38,10)'))
    df = df.na.drop(subset = 'trip_distance')
    df = df.filter(df.trip_distance != 0)

    return df

#### Function to include fare amount between the range of 20 to 60 Dollars

In [None]:
#export
def include_fare_range(df):
    
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: spark dataframe  
    """
    ################################################################
    # code to include trips with only within the fare range of     #
    # 20 to 60 dollars (including 20 and 60 dollars)               #        
    ################################################################
    df = exclude_no_tripdistance(df)
    df = df.withColumn("fare_amount",df.fare_amount.cast('decimal(38,10)'))
    df = df.filter((df.fare_amount >= 20) & (df.fare_amount <= 60))
    return df

#### Function to get the highest tip amount

In [None]:
#export
def get_highest_tip(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: decimal (rounded to 2 digits)  (NOTE: DON'T USE FLOAT)
    """
    
    ################################################################
    # code to get the highest tip amount                           #
    #                                                              #        
    ################################################################
    df = include_fare_range(df)
    df = df.withColumn("tip_amount",df.tip_amount.cast('decimal(38,2)'))
    df = df.agg({"tip_amount": "max"}).collect()[0][0]
    return df

#### Function to get total toll amount

In [None]:
#export
def get_total_toll(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: decimal (rounded to 2 digits)  (NOTE: DON'T USE FLOAT)
    """
    
    ################################################################
    # code to get total toll amount                                #
    #                                                              #        
    ################################################################
    df = include_fare_range(df)
    df = df.withColumn("tolls_amount",df.tolls_amount.cast('decimal(38,2)'))
    df = df.agg({"tolls_amount": "sum"}).collect()[0][0]
    
    return df

### Run above functions and print

#### Uncomment the cells below and test your implemented functions

#### Load data from yellow_tripdata09-08-2021.csv

In [None]:
#gcp_storage_path = "gs://<replace_with_your_storage_bucket>/yellow_tripdata09-08-2021.csv"
#df = load_data(gcp_storage_path)
#df.printSchema()

#### Print total numbers of rows in the dataframe

In [None]:
#df.count()

#### Print total number of rows in the dataframe after excluding trips with no pickup location

In [None]:
#df_no_pickup_locations = exclude_no_pickuplocations(df)
#df_no_pickup_locations.count()

#### Print total number of rows in the dataframe after exclude trips with no distance

In [None]:
#df_no_trip_distance = exclude_no_tripdistance(df_no_pickup_locations)
#df_no_trip_distance.count()

#### Print total number of rows in the dataframe after including trips with fair amount between the range of 20 to 60 Dollars

In [None]:
#df_include_fare_range = include_fare_range(df_no_trip_distance)
#df_include_fare_range.count()

#### Print the highest tip amount

In [None]:
#max_tip = get_highest_tip(df_include_fare_range)
#print(max_tip)

#### Print the total toll amount

In [None]:
#total_toll = get_total_toll(df_include_fare_range)
#print(total_toll)