In [5]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=7651bceb591c93b4a3474d1856e5c7dd4bfe8dd5d727a11303681279cc578bc8
  Stored in directory: /Users/phuocdang/Library/Caches/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NYC Taxi") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/09 21:09:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DoubleType, DateType, TimestampType, LongType

#schema for taxi dataframe
df_taxi_schema = StructType(fields=[StructField("VendorID", IntegerType(), True),\
                               StructField("tpep_pickup_datetime", TimestampType(), True),\
                               StructField("tpep_dropoff_datetime", TimestampType(), True),\
                               StructField("passenger_count", LongType(), True),\
                               StructField("trip_distance", DoubleType(), True),\
                               StructField("RatecodeID", LongType(), True),\
                               StructField("store_and_fwd_flag", StringType(), True),\
                               StructField("PULocationID", IntegerType(), True),\
                               StructField("DOLocationID", IntegerType(),True),\
                               StructField("payment_type", LongType(), True),\
                               StructField("fare_amount", DoubleType(), True),\
                               StructField("extra", DoubleType(), True),\
                               StructField("mta_tax", DoubleType(), True),\
                               StructField("tip_amount", DoubleType(), True),\
                               StructField("tolls_amount", DoubleType(), True),\
                               StructField("improvement_surcharge", DoubleType(), True),\
                               StructField("total_amount", DoubleType(), True),\
                               StructField("congestion_surcharge", DoubleType(), True),\
                               StructField("Airport_fee", DoubleType(), True)])

#schema for location
location_schema = StructType(fields=[StructField("LocationID", IntegerType(), True),\
                                        StructField("Borough", StringType(), True),\
                                        StructField("Zone", StringType(), True),\
                                        StructField("service_zone", StringType(), True)])
                               

In [3]:
#load taxi and location dataframe
taxi_df = spark.read\
        .option('header', True)\
        .schema(df_taxi_schema)\
        .parquet('yellow_tripdata_2024-01.parquet')

location_df = spark.read\
                .option("header", True)\
                .schema(location_schema)\
                .csv("taxi_zone_lookup.csv")

In [4]:
from pyspark.sql.functions import col

#Create a DataFrame for the pickup location with 'PU' prefix
location_df_pu = location_df.select([col(c).alias(f"PU{c}") if c != "PULocationID" else col(c) for c in location_df.columns])

# Perform left join between taxi_df and location_df_pu on PULocationID
df = taxi_df.join(location_df_pu, on="PULocationID", how="left")

#Create a separate DataFrame for the drop-off location with 'DO' prefix
location_df_do = location_df.select([col(c).alias(f"DO{c}") if c != "DO_LocationID" else col(c) for c in location_df.columns])

# Perform left join between the already joined DataFrame and location_df_do on DOLocationID
df = df.join(location_df_do, on="DOLocationID", how="left")

In [5]:
df.columns

['DOLocationID',
 'PULocationID',
 'VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'PUBorough',
 'PUZone',
 'PUservice_zone',
 'DOBorough',
 'DOZone',
 'DOservice_zone']

In [6]:
#remove unnecessary columns
df = df.select(["tpep_pickup_datetime", "tpep_dropoff_datetime", "RatecodeID", "PULocationID", "DOLocationID",\
                        "passenger_count", "trip_distance", "fare_amount", "tip_amount", "payment_type",\
                        "PUBorough","PUZone","PUservice_zone","DOBorough", "DOZone", "DOservice_zone"]) 

In [7]:
#all fare amounts should be treated as positive
from pyspark.sql.functions import abs, avg, when, col, sum, median
df = df.withColumn("fare_amount", abs(df["fare_amount"]))

In [8]:
#fill missing tip_amount values by an average value
avg_tip = df.select(avg("tip_amount")).collect()[0][0]
df = df.fillna({"tip_amount": avg_tip})

                                                                                

In [9]:
#filter for fare_amount and trip_distance values to be greater than 0
df = df.filter((col("fare_amount") > 0) & (col("trip_distance") > 0))

In [127]:
#fill missing passenger_count by median values
median_passenger_count = df.approxQuantile("passenger_count", [0.5], 0.01)[0]
df = df.fillna({"passenger_count": median_passenger_count})

                                                                                

In [10]:
#fill in missing values for RatecodeID

# Step 1: Update RatecodeID based on conditions
df = df.withColumn(
    "RatecodeID",
    when((col("RatecodeID") == 4) & (col("PUBorough") != "Unknown") & (col("DOBorough") == "EWR"), 3)
    .when((col("RatecodeID") == 4) & (col("PUBorough") != "Unknown") & (col("DOBorough") != "Unknown"), 1)
    .otherwise(col("RatecodeID"))
)

# Step 2: Drop rows where PUBorough or DOBorough is 'Unknown'
df = df.filter((col("PUBorough") != "Unknown") & (col("DOBorough") != "Unknown"))

# Step 3: Replace RatecodeID == 99 with null (NaN in pandas)
df = df.withColumn("RatecodeID", when(col("RatecodeID") == 99, None).otherwise(col("RatecodeID")))

# Step 4: Update RatecodeID for specific borough and zone conditions when RatecodeID is null
df = df.withColumn(
    "RatecodeID",
    when((col("PUBorough") == "Manhattan") & (col("DOZone") == "JFK Airport") & col("RatecodeID").isNull(), 2)
    .when((col("PUZone") == "JFK Airport") & (col("DOBorough") == "Manhattan") & col("RatecodeID").isNull(), 2)
    .when((col("DOZone") == "Newark Airport") & col("RatecodeID").isNull(), 3)
    .otherwise(col("RatecodeID"))
)

# fill the rest with RatecodeID 1
df = df.fillna({"RatecodeID":1})

In [14]:
from pyspark.sql.functions import col, sum

# List comprehension to check for NULLs in each column
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the result
null_counts.show()


                                                                                

+--------------------+---------------------+----------+------------+------------+---------------+-------------+-----------+----------+------------+---------+------+--------------+---------+------+--------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|tip_amount|payment_type|PUBorough|PUZone|PUservice_zone|DOBorough|DOZone|DOservice_zone|
+--------------------+---------------------+----------+------------+------------+---------------+-------------+-----------+----------+------------+---------+------+--------------+---------+------+--------------+
|                   0|                    0|         0|           0|           0|         116953|            0|          0|         0|           0|        0|     0|             0|        0|     0|             0|
+--------------------+---------------------+----------+------------+------------+---------------+-------------+-----------+----------+------------+-----