In [69]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Initialize spark session

In [70]:
spark = SparkSession.builder.appName("taxi_tripdata").getOrCreate()

Load data

In [71]:
options = {"header": True, "inferSchema": True}
taxi_tripdata = spark.read.options(**options).csv(r"/Users/calin.iorga/Documents/Python projects/Spark learning/taxi_analytics/data/taxi_tripdata.csv")
taxi_zone = spark.read.options(**options).csv(r"/Users/calin.iorga/Documents/Python projects/Spark learning/taxi_analytics/data/taxi_zone_lookup.csv")

Print schema and few rows

In [72]:
taxi_tripdata.printSchema()
taxi_zone.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = tru

In [73]:
taxi_tripdata.show(3)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       1| 2021-07-01 00:30:52|  2021-07-01 00:35:36|                 N|         1|          74|         168|              1|          1.2|        6.0|  0.5|    0.

In [74]:
taxi_zone.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



Clean and normalize data

In [75]:
taxi_tripdata = taxi_tripdata.na.drop(subset=["passenger_count", "trip_distance", "fare_amount"])
taxi_tripdata = taxi_tripdata.where((col("passenger_count") > 0) | (col("trip_distance") > 0) | (col("fare_amount") > 0) )
taxi_tripdata = taxi_tripdata.na.fill(value = 0, subset = ["tolls_amount", "congestion_surcharge"])
taxi_tripdata = taxi_tripdata.drop(taxi_tripdata.ehail_fee)


Create new cols

In [76]:
taxi_tripdata = taxi_tripdata.withColumn("tip_ratio", taxi_tripdata.tip_amount / taxi_tripdata.fare_amount)
taxi_tripdata = taxi_tripdata.withColumn('pickup_hour', hour(taxi_tripdata.lpep_pickup_datetime))
taxi_tripdata.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+-------------------+-----------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|          tip_ratio|pickup_hour|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+-------------------+-----------+
|       1| 2021-07-01 00:30:52|  2021-07-01 00:35:36|                 N|         1|          74| 

Joins

In [79]:
tz_pu = taxi_zone.alias("tz_pu")
taxi_tripdata_pu_join = taxi_tripdata.alias("t").join(
    tz_pu,
    col("t.PULocationID") == col("tz_pu.LocationID"),
    "left"
).select(
    col("t.*"),
    col("tz_pu.Zone").alias("PU_Zone"),
    col("tz_pu.Borough").alias("PU_Borough"),
    col("tz_pu.service_zone").alias("PU_service_zone")
)

In [80]:
tz_do = taxi_zone.alias("tz_do")
taxi_tripdata_pu_du_join = taxi_tripdata_pu_join.alias("t_pu").join(
    tz_do,
    col("t_pu.DOLocationID") == col("tz_do.LocationID"),
    "left"
).select(
    col("t_pu.*"),
    col("tz_do.Zone").alias("DO_Zone"),
    col("tz_do.Borough").alias("DO_Borough"),
    col("tz_do.service_zone").alias("DO_service_zone")
)

In [81]:
full_taxi_tripdata = taxi_tripdata_pu_du_join
full_taxi_tripdata.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+-------------------+-----------+--------------------+----------+---------------+--------------------+-------------+---------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|          tip_ratio|pickup_hour|             PU_Zone|PU_Borough|PU_service_zone|             DO_Zone|   DO_Borough|DO_service_zone|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+-----------

25/05/26 15:03:00 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
