#### Pyspark Imports

In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import *

#### Define Spark Context


In [2]:
sc
sqlContext = SQLContext(sc)

#### Function to load data

In [4]:
def load_data(gcp_storage_path):
    """
        :param gcp_storage_path: string (full gs path including file name e.g gs://bucket_name/data.csv) 
        :return: spark dataframe  
    """
    # code to load yellow_tripdata_2019-01.csv data from your GCP storage bucket#
    df = sqlContext.read.option("header",True).csv(gcp_storage_path)
    return df

#### Function to exclude trips that don't have a pickup location

In [24]:
def exclude_no_pickuplocations(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: spark dataframe  
    """
    df = df.where(df.pulocationid.isNotNull()) #exclude null
    return df.where(df.pulocationid != '0') #exclude zeros

#### Function to exclude trips with no distance

In [6]:
def exclude_no_tripdistance(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: spark dataframe  
    """
    df = df.withColumn("trip_distance", df["trip_distance"].cast("decimal(38,10)"))
    df = df.where(df.trip_distance.isNotNull()) #exclude null
    return df.where(df.trip_distance != 0) #exclude zeros

#### Function to include fare amount between the range of 20 to 60 Dollars

In [7]:
def include_fare_range(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: spark dataframe  
    """
    df = df.withColumn("fare_amount", df["fare_amount"].cast("decimal(38,10)"))
    df = df.filter((df.fare_amount >= 20) & (df.fare_amount <= 60))
    return df

#### Function to get the highest tip amount

In [8]:
def get_highest_tip(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: decimal (rounded to 2 digits)  (NOTE: DON'T USE FLOAT)
    """
    df = df.withColumn("tip_amount", df["tip_amount"].cast("decimal(38,10)"))
    max_tip = df.agg({'tip_amount': 'max'}).collect()[0]["max(tip_amount)"]
    return round(max_tip,2)

#### Function to get total toll amount

In [9]:
def get_total_toll(df):
    """
        :param nyc tax trips dataframe: spark dataframe 
        :return: decimal (rounded to 2 digits)  (NOTE: DON'T USE FLOAT)
    """     
    ################################################################
    df = df.withColumn("tolls_amount", df["tolls_amount"].cast("decimal(38,10)"))
    total_toll = df.agg({'tolls_amount': 'sum'}).collect()[0]["sum(tolls_amount)"]
    return round(total_toll,2)

### Run above functions and print

#### Load data from yellow_tripdata09-08-2021.csv

In [17]:
#gcp_storage_path = "gs://dcheng65/yellow_tripdata09-08-2021.csv"
#df = load_data(gcp_storage_path)
#df.printSchema()

root
 |-- vendorid: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- ratecodeid: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pulocationid: string (nullable = true)
 |-- dolocationid: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



#### Print total numbers of rows in the dataframe

In [18]:
#df.count()

7667792

#### Print total number of rows in the dataframe after excluding trips with no pickup location

In [25]:
#df_no_pickup_locations = exclude_no_pickuplocations(df)
#df_no_pickup_locations.count()

3833896

#### Print total number of rows in the dataframe after exclude trips with no distance

In [13]:
#df_no_trip_distance = exclude_no_tripdistance(df)
#df_no_trip_distance.count()

#### Print total number of rows in the dataframe after including trips with fair amount between the range of 20 to 60 Dollars

In [14]:
#df_include_fare_range = include_fare_range(df)
#df_include_fare_range.count()

#### Print the highest tip amount

In [15]:
#max_tip = get_highest_tip(df)
#print(max_tip)

#### Print the total toll amount

In [16]:
#total_toll = get_total_toll(df)
#print(total_toll)