# Exploring Pyspark
Exploring Pyspark with NYC Yellow Taxi Data

In [None]:
#!pip install pyspark

In [None]:
#import pyspark

Pyspark will not run if Java is not installed on the computer. PySpark 4.x requires Java 17 or 21. PySpark 3.5 works perfectly with Java 8

In [None]:
#pyspark.__version__

In [None]:
#pip uninstall pyspark -y

In [None]:
# Force install pyspark version
#!pip install pyspark==3.5.1

In [None]:
import pyspark

In [None]:
from pyspark.sql.functions import expr,col, regexp_replace, coalesce, lit, when
from pyspark.sql import SparkSession

In [None]:
#Build a Pyspark Session
spark = SparkSession.builder.appName('Practice').getOrCreate()
spark

In [None]:
# read csv file in pyspark
df = spark.read.csv(
    r'C:\Users\User\Documents\PORTFOLIO\NYC TAXI DATA\yellow_tripdata\csv_trip_data\yellow_tripdata_2024-01.csv'
    ,header=True #sets the first row to be the header
    ,inferSchema = True #Default is string if infer Schema is not specified.Ensures to infer the correct datatype of columns. 
)

In [None]:
#Another way to read files
df=spark.read.option('header','true').csv(
    r'C:\Users\User\Documents\PORTFOLIO\NYC TAXI DATA\yellow_tripdata\csv_trip_data\yellow_tripdata_2024-01.csv'
)

In [None]:
df.show()

In [None]:
# Check dataframe type
type(df)

In [None]:
#Print Schema
df.printSchema()

In [None]:
# Select columns in pyspark
df.select('VendorID','tpep_pickup_datetime').show()

In [None]:
#Read Parquet file
df_parq= spark.read.parquet(
    r'C:\Users\User\Documents\PORTFOLIO\NYC TAXI DATA\yellow_tripdata\yellow_tripdata_2024-01.parquet')

In [None]:
df_parq.show(1)

In [None]:
df_parq.printSchema()

In [None]:
# Select columns
df_parq.select('VendorID','tpep_pickup_datetime','store_and_fwd_flag').show(5)

In [None]:
df_parq.select('store_and_fwd_flag').distinct().show(5)

In [None]:
df_parq.dtypes
df_parq.select('VendorID','tpep_pickup_datetime').dtypes

In [None]:
df_parq.describe().show()

# Transformation Steps

In [None]:
#Add column
# A simple add is df.withColumn('New Column Name',derived column_value).col references a dataframe column that is called
df_parq.withColumn("Vendor_name",
    when(col("VendorID") == 1, "Creative Mobile Technologies, LLC")
    .when(col("VendorID") == 2, "Curb Mobility, LLC")
    .when(col("VendorID") == 6, "Myle Technologies Inc")
    .when(col("VendorID") == 7, "Helix")
    .otherwise("No Vendor")
)\
.withColumn(
        "store_and_forward_trip_flag",
        coalesce(regexp_replace(col("store_and_fwd_flag"), '"', ''), lit("N/A"))
).show(1)

In [None]:
# Use sql case when inside Pyspark

## Add additional columns. To add multiple columns just continue adding .withColumn after each column. \ specifies a new line

df_parq = df_parq\
.withColumn(
    "vendor_name",
    expr("""
        case
            when vendorID = 1 then 'Creative Mobile Technologies, LLC'
            when VendorID = 2 then 'Curb Mobility, LLC'
            when VendorID = 6 then 'Myle Technologies Inc'
            when VendorID = 7 then 'Helix'
            else 'No Vendor'
        end
    """)
)\
.withColumn(
    "rate_code_type",
    expr("""
        case 
            when ratecodeid = 1 then 'Standard rate'
            when ratecodeid = 2 then 'JFK'
            when ratecodeid = 3 then 'Newark'
            when ratecodeid = 4 then 'Nassau or Westchester'
            when ratecodeid = 5 then 'Negotiated fare'
            when ratecodeid = 6 then 'Group ride'
            when ratecodeid = 99 then 'Unknown'
            else 'N/A'
        end
    """)
)\
.withColumn(
    "payment_type", #since column name already exists, it overwrites it
    expr("""
        case
            when payment_type = 0 then 'Flex Fare trip'
            when payment_type = 1 then 'Credit card'
            when payment_type = 2 then 'Cash'
            when payment_type = 3 then 'No charge'
            when payment_type = 4 then 'Dispute'
            when payment_type = 5 then 'Unknown'
            when payment_type = 6 then 'Voided trip'
            else 'N/A'
        end
    """)
)


In [None]:
#Add forgotten column
df_parq = df_parq\
.withColumn("trip_duration_mins", 
            expr("(unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) / 60")
)\
.withColumn("store_and_forward_trip_flag",
            expr("coalesce(regexp_replace(store_and_fwd_flag, '\"', ''), 'N/A')")
)

In [None]:
df_parq.show(5)

In [None]:
df_parq.printSchema()

In [None]:
##Drop Columns
df_parq=df_parq.drop('store_and_fwd_flag')

In [None]:
df_parq.show(1)

In [None]:
## Rename columns
#df_parq.withColumnRenamed('Airport_fee','airport_fee')

In [None]:
##Rename multiple columns
df_parq=df_parq\
    .withColumnRenamed('VendorID','vendor_id').withColumnRenamed('RatecodeID','rate_code_id')\
    .withColumnRenamed('tpep_pickup_datetime','pickup_time').withColumnRenamed('tpep_dropoff_datetime','dropoff_time')\
    .withColumnRenamed('DOLocationID','droppoff_zone').withColumnRenamed('PULocationID','pickup_zone')\
    .withColumnRenamed('Airport_fee','airport_fee').withColumnRenamed('extra','extra_fees')

In [None]:
df_parq.show(2)

In [None]:
# reorder columns
cols = [
        "vendor_id","vendor_name","pickup_time","dropoff_time","trip_duration_mins","passenger_count","trip_distance"
        ,"rate_code_id","rate_code_type","store_and_forward_trip_flag","pickup_zone","droppoff_zone","payment_type"
        ,"fare_amount","extra_fees","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount"
        ,"congestion_surcharge","airport_fee"
       ]
df_parq= df_parq.select(cols)

In [None]:
df_parq.show(3)

In [None]:
# filter can be done both ways

#df_parq.filter('trip_distance <= 0').show()
df_parq.filter(df_parq['trip_distance'] <= 0).show(1)

In [None]:
#select only necessary rows
df_parq.filter('trip_distance <= 0').select(['vendor_name','trip_distance']).show() 

In [None]:
#multiple filter operations --& for and | for or 
df_parq.filter( (df_parq['trip_distance'] <= 0) & (df_parq['tip_amount'] > 0)  ).\
select('vendor_name','pickup_time','dropoff_time','trip_distance','tip_amount','fare_amount','payment_type').show()


In [None]:
df_parq.count()

In [None]:
#invalid trip data to be stored for data quality checks
df_invalid_tripdata = df_parq.filter((df_parq['trip_distance'] <= 0) | (df_parq['fare_amount'] <= 0) | (df_parq['passenger_count'] <= 0) )

In [None]:
df_invalid_tripdata.select('vendor_name','pickup_time','dropoff_time','trip_distance','tip_amount','fare_amount','passenger_count').show(5)

In [None]:
# Option 1 to exclude invalid trip data and only work with valid trips. This performs set difference, doesn't exclude nulls, expensive and not best practice for ETL purpose
#df_clean1=df_parq.subtract(df_invalid_tripdata) #remove invalid data from whole dataset 

# Option 2 filters only what is needed and handles null properly
df_clean=df_parq.filter( (df_parq['trip_distance'] > 0) & (df_parq['fare_amount'] > 0) & (df_parq['passenger_count'] > 0) )


In [None]:
# Validation check 
df_clean.filter((df_parq['trip_distance'] <= 0) | (df_parq['fare_amount'] <= 0) | (df_parq['passenger_count'] <= 0) )\
.select('vendor_name','pickup_time','dropoff_time','trip_distance','fare_amount','passenger_count').count()

In [None]:
#flag anomalies and log into another dataframe
df_clean.filter((df_parq['trip_duration_mins'] > 180) & (df_parq['trip_distance'] > 100) & (df_parq['fare_amount'] > 500))\
.select('vendor_name','pickup_time','dropoff_time','trip_distance','fare_amount','trip_duration_mins').show()

In [None]:
from pyspark.sql.functions import array, when, col, size, lit

anomalies_data = df_clean.withColumn(
    "anomaly_type",
    array(
        when(col("fare_amount") > 500, lit("high_fare")) ,
        when(col("trip_distance") > 100, lit("long_distance")),
        when(col("trip_duration_mins") > 180,lit( "long_duration")),
        when(col("passenger_count") > 6, lit("high_passenger_count"))
    ) #returns an array of the type of anomaly for each condition
).withColumn("anomaly_type",
    expr("filter(anomaly_type, x -> x is not null)") #removes null anomaly to focus on actual anomalies
).filter(
    size(col("anomaly_type")) > 0 #only retains those with anomaly
)


In [None]:
anomalies_data.filter(size(col("anomaly_type")) > 1).select('vendor_name','trip_distance','fare_amount','trip_duration_mins','anomaly_type').show(truncate=False)

In [None]:
df_clean3.count()

## Other Transformation functions for knowledge base ...Not required for this project

In [None]:
#Drop rows
#df_parq.select('trip_distance').distinct().show()

In [None]:
#df.na.drop(how="any",thresh=2,subset=['columnname']).show()
parameters inside drop()
how - can accept 2 values:
    any - drop rows with any values as null
    all - drop rows with all values as null
thresh - specifies the threshold of non null values required to be present
    if thresh is set to 2, it deletes rows where less than 2 non null values appear
subset - deletes null values in the specified column


In [None]:
df_parq.selectExpr(
    *[f"count({c}) as {c}" for c in df_parq.columns]
).show() #count values in each column


In [None]:
df_parq.filter(col("trip_distance").isNull()).show(5)
#df_parq.filter(col("passenger_count").isNull()).count() #count null values in column
#df_drop_test = df_parq.na.drop(how="any",thresh=20)


In [None]:
#df_drop_test.filter(col("passenger_count").isNull()).count()
df_drop_test.selectExpr(
    *[f"count({c}) as {c}" for c in df_drop_test.columns]
).show() #count values in each column


In [None]:
# filling missing values
#df.na.fill('updatedvalue',subset)
df_parq.select('passenger_count').distinct().show()

In [None]:
df=df_parq.na.fill('Unknown','passenger_count')

In [None]:
df.select('passenger_count').distinct().show()