# **Clean the dataset and convert into zone-level features**

In [2]:
import geopandas as gpd
import os, sys

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, unix_timestamp, hour, dayofweek, when, to_date

In [1]:
path_taxi = "../../raw_data/tlc_trip_data/2023/yellow_tripdata_2023-01.parquet"
path_zones_lookup = "../../raw_data/tlc_trip_data/taxi_zone_lookup.csv"
path_zone_shapefile = "../../raw_data/tlc_trip_data/taxi_zones/taxi_zones.shp"

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
spark = SparkSession.builder.appName("TaxiZone-Feature Engineering").getOrCreate()

In [5]:
df_taxi = spark.read.parquet(path_taxi).dropna()
df_taxi.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

In [6]:
df_lookup = spark.read.csv(path_zones_lookup, header=True, inferSchema=True)
df_lookup.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [7]:
df_zone = gpd.read_file(path_zone_shapefile).dropna()
df_zone.drop(
    columns='geometry', inplace=True
)

In [8]:
spark_zones = spark.createDataFrame(df_zone)
zones_info = spark_zones.join(df_lookup, on=['LocationID', 'zone', 'borough'], how='left').distinct().dropna()

In [9]:
df_joined = df_taxi.join(
    zones_info.select(
        col('LocationID').alias('PULocationID'),
        col('zone'),
        col('borough')
    ),
    on="PULocationID",
    how="left"
).dropna()

In [10]:
core_df = df_joined.select(
    "VendorID",
    col("tpep_pickup_datetime").alias("pickup_time"),
    col("tpep_dropoff_datetime").alias("dropoff_time"),
    col("passenger_count").alias("P_count"),
    col("trip_distance"),
    col("fare_amount").alias("amt"),
    "PULocationID",
    "zone",
    "borough"
)

In [11]:
core_df.show(5)

+--------+-------------------+-------------------+-------+-------------+----+------------+--------------+--------+
|VendorID|        pickup_time|       dropoff_time|P_count|trip_distance| amt|PULocationID|          zone| borough|
+--------+-------------------+-------------------+-------+-------------+----+------------+--------------+--------+
|       1|2023-01-01 17:32:54|2023-01-01 18:02:09|    1.0|          8.1|28.2|          26|  Borough Park|Brooklyn|
|       1|2023-01-02 12:12:02|2023-01-02 12:45:55|    1.0|         11.4|38.5|          26|  Borough Park|Brooklyn|
|       2|2023-01-02 13:23:49|2023-01-02 13:58:48|    2.0|         5.12|33.8|          26|  Borough Park|Brooklyn|
|       1|2023-01-02 14:57:10|2023-01-02 15:22:27|    1.0|          0.0|30.5|          26|  Borough Park|Brooklyn|
|       1|2023-01-03 07:40:30|2023-01-03 08:28:42|    1.0|         16.3|54.5|          29|Brighton Beach|Brooklyn|
+--------+-------------------+-------------------+-------+-------------+----+---

**Trip Features**

In [12]:
trip_features = core_df.withColumn(
    "trip_duration",
    (unix_timestamp(col('dropoff_time')) - unix_timestamp(col('pickup_time')))/60
).withColumn(
    "pickup_hour",
    hour(col('pickup_time'))
).withColumn(
    "pickup_dow",
    dayofweek(col('pickup_time'))
)
trip_features.show(5)

+--------+-------------------+-------------------+-------+-------------+----+------------+--------------+--------+------------------+-----------+----------+
|VendorID|        pickup_time|       dropoff_time|P_count|trip_distance| amt|PULocationID|          zone| borough|     trip_duration|pickup_hour|pickup_dow|
+--------+-------------------+-------------------+-------+-------------+----+------------+--------------+--------+------------------+-----------+----------+
|       1|2023-01-01 17:32:54|2023-01-01 18:02:09|    1.0|          8.1|28.2|          26|  Borough Park|Brooklyn|             29.25|         17|         1|
|       1|2023-01-02 12:12:02|2023-01-02 12:45:55|    1.0|         11.4|38.5|          26|  Borough Park|Brooklyn| 33.88333333333333|         12|         2|
|       2|2023-01-02 13:23:49|2023-01-02 13:58:48|    2.0|         5.12|33.8|          26|  Borough Park|Brooklyn|34.983333333333334|         13|         2|
|       1|2023-01-02 14:57:10|2023-01-02 15:22:27|    1.0|

**Behavioral Flags**

In [13]:
behavioral_flags = (
    trip_features
    .withColumn(
        "is_night",
        when((col("pickup_hour") >= 21) | (col("pickup_hour") <= 5), 1).otherwise(0)
    )
    .withColumn(
        "is_weekend",
        when(col("pickup_dow").isin(1, 7), 1).otherwise(0)
    )
    .withColumn(
        "is_long_trip",
        when(col("trip_duration") > 30, 1).otherwise(0)
    )
    .withColumn(
        "is_high_fare",
        when(col("amt") > 20, 1).otherwise(0)
    )
)

**Daily Demand Normalization**

In [14]:
# Compute average no. of trips per zone per day.
daily_demand = (
    behavioral_flags
    .withColumn("trip_date", to_date("pickup_time"))
    .groupBy("PULocationID", "zone", "borough", "trip_date")
    .count()
    .withColumnRenamed("count", "daily_trip_count")
)

avg_daily_demand = (
    daily_demand
    .groupBy("PULocationID", "zone", "borough")
    .agg(
        avg("daily_trip_count").alias("avg_trips_per_day")
    )
)


**Zone-level aggregation**

In [15]:
zone_stats = (
    behavioral_flags
    .groupBy("PULocationID", "zone", "borough")
    .agg(
        avg("trip_duration").alias("avg_trip_duration"),
        avg("trip_distance").alias("avg_trip_distance"),
        avg("is_night").alias("pct_night_trips"),
        avg("is_weekend").alias("pct_weekend_trips"),
        avg("is_long_trip").alias("pct_long_trips"),
        avg("is_high_fare").alias("pct_high_fare_trips"),
        avg("amt").alias("avg_fare_amount")
    )
)

In [16]:
zone_features = (
    zone_stats
    .join(
        avg_daily_demand.select(
            "PULocationID",
            "avg_trips_per_day"
        ),
        on="PULocationID",
        how="inner"
    )
)
zone_features.show(5)

+------------+----------------+--------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+
|PULocationID|            zone| borough| avg_trip_duration| avg_trip_distance|    pct_night_trips|  pct_weekend_trips|     pct_long_trips|pct_high_fare_trips|   avg_fare_amount| avg_trips_per_day|
+------------+----------------+--------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+
|           7|         Astoria|  Queens|20.078063193439448| 2.651374819102756|0.26121562952243127|0.34081041968162085|0.08683068017366136|0.30824891461649784|16.339196816208407|           43.1875|
|          19|       Bellerose|  Queens| 17.27700421940929|5.2911392405063316|0.26582278481012656|0.12658227848101267|0.21518987341772153| 0.6329113924050633| 22.93354430379747|3.4347826086956523|
|          22|B

In [17]:
zoneFeatures = zone_features.select(
    'zone',
    'avg_trips_per_day',
    'avg_trip_duration',
    'pct_night_trips',
).orderBy(col('avg_trips_per_day').desc())

In [18]:
zoneFeatures.show(5)

+--------------------+-----------------+------------------+-------------------+
|                zone|avg_trips_per_day| avg_trip_duration|    pct_night_trips|
+--------------------+-----------------+------------------+-------------------+
|Upper East Side S...|4698.612903225807|11.732640037897344| 0.1236741110966174|
|         JFK Airport|4564.171428571429| 36.47185552898527| 0.2747987430045197|
|Upper East Side N...|4359.741935483871|12.459275359101968|0.07331744998224221|
|      Midtown Center|        4171.0625|14.497095064706683|0.15953668879332306|
|Penn Station/Madi...|         3383.125|15.540334380195883|0.18834287825605026|
+--------------------+-----------------+------------------+-------------------+
only showing top 5 rows



In [20]:
zone_features.count()

253

In [21]:
zone_features_pd = zone_features.toPandas()

In [24]:
zone_features_pd.to_parquet('../data/artifacts/zone_features.parquet', index=False)

In [28]:
df = pd.read_parquet('../data/artifacts/zone_features.parquet').dropna()

In [30]:
df

Unnamed: 0,PULocationID,zone,borough,avg_trip_duration,avg_trip_distance,pct_night_trips,pct_weekend_trips,pct_long_trips,pct_high_fare_trips,avg_fare_amount,avg_trips_per_day
0,7,Astoria,Queens,20.078063,2.651375,0.261216,0.340810,0.086831,0.308249,16.339197,43.187500
1,19,Bellerose,Queens,17.277004,5.291139,0.265823,0.126582,0.215190,0.632911,22.933544,3.434783
2,22,Bensonhurst West,Brooklyn,50.571396,5.461757,0.162162,0.256757,0.500000,0.716216,41.226081,2.551724
3,26,Borough Park,Brooklyn,31.615530,5.788977,0.079545,0.125000,0.500000,0.659091,34.042045,3.259259
4,29,Brighton Beach,Brooklyn,51.256140,12.441842,0.065789,0.171053,0.736842,0.855263,53.957237,3.040000
...,...,...,...,...,...,...,...,...,...,...,...
248,240,Van Cortlandt Park,Bronx,19.813095,7.486429,0.357143,0.571429,0.214286,0.714286,51.857143,1.400000
249,251,Westerleigh,Staten Island,14.033333,2.025714,0.000000,0.285714,0.000000,0.857143,22.914286,1.000000
250,259,Woodlawn/Wakefield,Bronx,36.434541,9.309130,0.173913,0.173913,0.579710,0.855072,40.128986,2.653846
251,261,World Trade Center,Manhattan,18.843172,4.317489,0.141923,0.314862,0.104050,0.484054,22.547083,404.580645
