# NYC Taxi Data Engineering with Azure Databricks

This notebook demonstrates the Extract, Transform, Load (ETL) process for New York City Taxi data using Azure Databricks. We'll process both Yellow and Green taxi trip data from the NYC Taxi & Limousine Commission (TLC) and prepare it for model training in Azure Machine Learning.

## Overview of the ETL Process

1. **Extract**: 
   - Download Yellow and Green taxi trip data from TLC website
   - Fetch taxi zone lookup data and latitude/longitude information

2. **Transform**:
   - Clean and standardize taxi trip data
   - Add derived features (trip duration, time-based features)
   - Join with location data for geographical context
   - Aggregate data for analysis

3. **Load**:
   - Prepare data in the required format
   - Create MLTable configuration
   - Upload to Azure ML datastore

Let's begin with setting up our environment!

# 1. Setup Environment

We'll start by importing the necessary Python packages and setting up our Databricks environment. This includes:
- Standard Python libraries for data manipulation
- PySpark functions for distributed data processing
- Creating input/output directories in Databricks File System (DBFS)

In [0]:
# Install holidays library for feature engineering in a later step
!pip install holidays

Collecting holidays
  Downloading holidays-0.77-py3-none-any.whl.metadata (46 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/46.4 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━[0m [32m41.0/46.4 kB[0m [31m1.2 MB/s[0m eta [36m0:00:01[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.4/46.4 kB[0m [31m946.0 kB/s[0m eta [36m0:00:00[0m
Downloading holidays-0.77-py3-none-any.whl (1.2 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.2 MB[0m [31m3.2 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.4/1.2 MB[0m [31m5.3 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━[0m [32m1.0/1.2 MB[0m [31m9.6 MB/s[0m e

In [0]:
# Import standard Python libraries for data manipulation and analysis
import os
import math
import datetime
import numpy as np
import pandas as pd
import holidays

# Import specific PySpark functions for data processing
from pyspark.sql.functions import (
    col,           # Column operations
    lit,           # Create literal values
    when,          # Conditional statements
    month,         # Date functions
    year,
    dayofmonth,
    date_format,
    unix_timestamp,
    udf            # User-defined functions
)
from pyspark.sql.types import IntegerType

In [0]:
# Create input and output directories in Databricks File System (DBFS)
# These directories will store our raw data and processed results
dbutils.fs.mkdirs("/FileStore/tables/Input/")
dbutils.fs.mkdirs("/FileStore/tables/Output/")

True

# 2. Extract Data (E)

In this section, we'll download the required data from various sources:

1. **NYC TLC Trip Data**: 
   - Yellow taxi trip records (2020-2024)
   - Green taxi trip records (2020-2024)
   
2. **Supporting Data**:
   - Taxi zone lookup table
   - Location latitude/longitude information

The data will be downloaded using `wget` command in shell through the `%sh` magic command in Databricks.

In [0]:
%sh
cd /dbfs/FileStore/tables/Input/

# Base URL for TLC trip data
base_url="https://d37ci6vzurychx.cloudfront.net/trip-data/"

# Define file types and months to download
file_types=("yellow" "green")
months=("01" "02" "03" "04" "05" "06" "07" "08" "09" "10" "11" "12")

# Set year range
start_year=2020
end_year=2024

# Initialize counters for download statistics
downloaded_files=0
skipped_missing=0
skipped_existing=0

# Loop through years, months, and file types to download data
for year in $(seq $start_year $end_year); do
  for month in "${months[@]}"; do
    for file_type in "${file_types[@]}"; do
      mkdir -p "$file_type"
      cd "$file_type"

      file_name="${file_type}_tripdata_${year}-${month}.parquet"
      url="${base_url}${file_name}"

      # Skip if file already exists
      if [ -f "$file_name" ]; then
        echo "Already exists, skipping: $file_name"
        ((skipped_existing++))
        cd ..
        continue
      fi

      # Download file if it exists on server
      wget --spider "$url"
      if [ $? -eq 0 ]; then
        wget -O "$file_name" "$url"
        echo "Downloaded: $file_name"
        ((downloaded_files++))
      else
        echo "Not found on server, skipped: $file_name"
        ((skipped_missing++))
      fi
      cd ..
    done
  done
done

# Print download statistics
echo "Downloaded files: $downloaded_files"
echo "Skipped (already existing): $skipped_existing"
echo "Skipped (missing on server): $skipped_missing"

Already exists, skipping: yellow_tripdata_2020-01.parquet
Already exists, skipping: green_tripdata_2020-01.parquet
Already exists, skipping: yellow_tripdata_2020-02.parquet
Already exists, skipping: green_tripdata_2020-02.parquet
Already exists, skipping: yellow_tripdata_2020-03.parquet
Already exists, skipping: green_tripdata_2020-03.parquet
Already exists, skipping: yellow_tripdata_2020-04.parquet
Already exists, skipping: green_tripdata_2020-04.parquet
Already exists, skipping: yellow_tripdata_2020-05.parquet
Already exists, skipping: green_tripdata_2020-05.parquet
Already exists, skipping: yellow_tripdata_2020-06.parquet
Already exists, skipping: green_tripdata_2020-06.parquet
Already exists, skipping: yellow_tripdata_2020-07.parquet
Already exists, skipping: green_tripdata_2020-07.parquet
Already exists, skipping: yellow_tripdata_2020-08.parquet
Already exists, skipping: green_tripdata_2020-08.parquet
Already exists, skipping: yellow_tripdata_2020-09.parquet
Already exists, skippi

### Download supporting data files

In [0]:
%sh

cd /dbfs/FileStore/tables/Input/

# Download taxi zone lookup table
wget 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv' -O taxi_zone_lookup.csv

# Download latitude/longitude information
wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1mNh-xQhaU790aH8q_ZLCVXp6OZTNa5Wz' -O taxi_zones_latlong.csv

--2025-07-27 09:57:16--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.125.148, 54.230.125.120, 54.230.125.110, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.125.148|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’

     0K .......... ..                                         100% 1.08M=0.01s

2025-07-27 09:57:17 (1.08 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]

--2025-07-27 09:57:17--  https://docs.google.com/uc?export=download&id=1mNh-xQhaU790aH8q_ZLCVXp6OZTNa5Wz
Resolving docs.google.com (docs.google.com)... 142.250.198.142, 2404:6800:4005:804::200e
Connecting to docs.google.com (docs.google.com)|142.250.198.142|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://drive.usercontent.google.com/download?id=1mNh-xQhaU790

### List downloaded files

In [0]:
%sh
ls /dbfs/FileStore/tables/Input/yellow/

yellow_tripdata_2020-01.parquet
yellow_tripdata_2020-02.parquet
yellow_tripdata_2020-03.parquet
yellow_tripdata_2020-04.parquet
yellow_tripdata_2020-05.parquet
yellow_tripdata_2020-06.parquet
yellow_tripdata_2020-07.parquet
yellow_tripdata_2020-08.parquet
yellow_tripdata_2020-09.parquet
yellow_tripdata_2020-10.parquet
yellow_tripdata_2020-11.parquet
yellow_tripdata_2020-12.parquet
yellow_tripdata_2021-01.parquet
yellow_tripdata_2021-02.parquet
yellow_tripdata_2021-03.parquet
yellow_tripdata_2021-04.parquet
yellow_tripdata_2021-05.parquet
yellow_tripdata_2021-06.parquet
yellow_tripdata_2021-07.parquet
yellow_tripdata_2021-08.parquet
yellow_tripdata_2021-09.parquet
yellow_tripdata_2021-10.parquet
yellow_tripdata_2021-11.parquet
yellow_tripdata_2021-12.parquet
yellow_tripdata_2022-01.parquet
yellow_tripdata_2022-02.parquet
yellow_tripdata_2022-03.parquet
yellow_tripdata_2022-04.parquet
yellow_tripdata_2022-05.parquet
yellow_tripdata_2022-06.parquet
yellow_tripdata_2022-07.parquet
yellow_t

# 3. Transform Data (T)

## 3.1 Process Location Data

Before proceeding with Green Taxi data, let's process the location data from our supporting files:
1. Load and clean taxi zone lookup data
2. Process latitude/longitude information
3. Merge location datasets
4. Add location details to taxi trip data

This will enrich our taxi data with geographic context.

In [0]:
# Load taxi zone lookup data
tz_df_S = spark.read.options(header='True', inferSchema='True', delimiter=',').csv("/FileStore/tables/Input/taxi_zone_lookup.csv")
print("\nTaxi Zone Lookup Data Sample:")
display(tz_df_S.limit(5))

# Load latitude/longitude data
tz_latlong_S = spark.read.options(header='True', inferSchema='True', delimiter=',').csv("/FileStore/tables/Input/taxi_zones_latlong.csv")
print("\nLatitude/Longitude Data Sample:")
display(tz_latlong_S.limit(5))

# Clean and standardize location data
# Fix ID mapping and remove unnecessary columns
tz_latlong_S = tz_latlong_S.withColumn('LocationID', col('OBJECTID')) \
                           .drop('OBJECTID', 'Shape_Leng', 'Shape_Area')

# Standardize column names
tz_latlong_S = tz_latlong_S.withColumnRenamed('borough', 'Borough') \
                           .withColumnRenamed('zone', 'Zone') \
                           .withColumnRenamed('X', 'Latitude') \
                           .withColumnRenamed('Y', 'Longitude')

# Merge location datasets
# Add service zone information to the latitude/longitude data
tz_latlong_S = tz_latlong_S.join(
    tz_df_S.select('LocationID', 'service_zone'), 
    on='LocationID', 
    how='left'
)

print("\nProcessed Location Data Sample:")
display(tz_latlong_S.limit(5))


Taxi Zone Lookup Data Sample:


LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone



Latitude/Longitude Data Sample:


X,Y,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough
-74.1767857452143,40.6895156480431,1,0.116357453189,0.0007823067885,Newark Airport,1,EWR
-73.8261257703202,40.6257242377511,2,0.43346966679,0.00486634037837,Jamaica Bay,2,Queens
-73.8494789238597,40.8658875419774,3,0.0843411059012,0.000314414156821,Allerton/Pelham Gardens,3,Bronx
-73.9770229219339,40.7241521436714,4,0.0435665270921,0.000111871946192,Alphabet City,4,Manhattan
-74.1899296712375,40.550340123832,5,0.0921464898574,0.000497957489363,Arden Heights,5,Staten Island



Processed Location Data Sample:


LocationID,Latitude,Longitude,Zone,Borough,service_zone
1,-74.1767857452143,40.6895156480431,Newark Airport,EWR,EWR
2,-73.8261257703202,40.6257242377511,Jamaica Bay,Queens,Boro Zone
3,-73.8494789238597,40.8658875419774,Allerton/Pelham Gardens,Bronx,Boro Zone
4,-73.9770229219339,40.7241521436714,Alphabet City,Manhattan,Yellow Zone
5,-74.1899296712375,40.550340123832,Arden Heights,Staten Island,Boro Zone


## 3.2 Transform Yellow Taxi Data

In this section, we'll process the Yellow Taxi trip data through these steps:
1. Load Parquet files into a Spark DataFrame
2. Calculate trip duration
3. Clean and standardize vendor information
4. Add location details from lookup tables
5. Clean and filter invalid records
6. Rename columns for consistency

Let's start by loading the Yellow Taxi data:

In [0]:
# Define paths for Yellow Taxi data
directory_path = '/dbfs/FileStore/tables/Input/yellow/'
directory_path_2 = '/FileStore/tables/Input/yellow/'

# Get list of all Parquet files
parquet_files = [file for file in os.listdir(directory_path) if file.endswith(".parquet")]

# Initialize DataFrame
yt_df_S = None

# Read and combine all Parquet files
for file in parquet_files:
    file_path = os.path.join(directory_path_2, file)
    df = spark.read.parquet(file_path)
    if yt_df_S is None:
         yt_df_S = df
    else:
        # Use unionByName to combine DataFrames with matching column names
        yt_df_S = yt_df_S.unionByName(df)

# Display initial data
print("Initial Yellow Taxi Data Summary:")
print(f"Number of Records: {yt_df_S.count()}")
print(f"Number of Columns: {len(yt_df_S.columns)}")
display(yt_df_S.limit(5))

Initial Yellow Taxi Data Summary:
Number of Records: 174689444
Number of Columns: 19


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2020-01-01T00:28:15,2020-01-01T00:33:03,1.0,1.2,1.0,N,238,239,1,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5,
1,2020-01-01T00:35:39,2020-01-01T00:43:04,1.0,1.2,1.0,N,239,238,1,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5,
1,2020-01-01T00:47:41,2020-01-01T00:53:52,1.0,0.6,1.0,N,238,238,1,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5,
1,2020-01-01T00:55:23,2020-01-01T01:00:14,1.0,0.8,1.0,N,238,151,1,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0,
2,2020-01-01T00:01:58,2020-01-01T00:04:16,1.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0,


### Initial Transformations on Yellow Taxi Data

In [0]:
# Calculate Trip Duration (in seconds)
yt_df_S = yt_df_S.withColumn("TripDuration", 
    unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))

# Clean Vendor Information
yt_df_S = yt_df_S.withColumn("VendorID", 
    when(col("VendorID") == 1, "Creative Mobile Technologies")
    .when(col("VendorID") == 2, "VeriFone Inc.")
    .otherwise(col("VendorID")))

# Standardize column names
yt_df_S = yt_df_S.withColumnRenamed('VendorID', 'RecordProviderName') \
                 .withColumnRenamed('tpep_pickup_datetime', 'PickupDatetime') \
                 .withColumnRenamed('tpep_dropoff_datetime', 'DropoffDatetime') \
                 .withColumnRenamed('trip_distance', 'TripDistanceMiles') \
                 .withColumnRenamed('fare_amount', 'MeterFareAmount') \
                 .withColumnRenamed('mta_tax', 'tax')

# Display sample of transformed data
display(yt_df_S.limit(5))

RecordProviderName,PickupDatetime,DropoffDatetime,passenger_count,TripDistanceMiles,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,MeterFareAmount,extra,tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,TripDuration
Creative Mobile Technologies,2020-01-01T00:28:15,2020-01-01T00:33:03,1.0,1.2,1.0,N,238,239,1,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5,,288
Creative Mobile Technologies,2020-01-01T00:35:39,2020-01-01T00:43:04,1.0,1.2,1.0,N,239,238,1,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5,,445
Creative Mobile Technologies,2020-01-01T00:47:41,2020-01-01T00:53:52,1.0,0.6,1.0,N,238,238,1,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5,,371
Creative Mobile Technologies,2020-01-01T00:55:23,2020-01-01T01:00:14,1.0,0.8,1.0,N,238,151,1,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0,,291
VeriFone Inc.,2020-01-01T00:01:58,2020-01-01T00:04:16,1.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0,,138


### Add Location Details to Yellow Taxi Data

In [0]:
# Add pickup location details
yt_df_S = yt_df_S.join(
    tz_latlong_S.select('LocationID', 'Borough', 'Zone', 'service_zone', 'Latitude', 'Longitude')
                .withColumnRenamed('Borough', 'PickupBorough')
                .withColumnRenamed('Zone', 'PickupZone')
                .withColumnRenamed('service_zone', 'PickupServiceZone')
                .withColumnRenamed('Latitude', 'PULatitude')
                .withColumnRenamed('Longitude', 'PULongitude'),
    col('PULocationID') == col('LocationID'),
    'left'
).drop('LocationID')

# Add dropoff location details
yt_df_S = yt_df_S.join(
    tz_latlong_S.select('LocationID', 'Borough', 'Zone', 'service_zone', 'Latitude', 'Longitude')
                .withColumnRenamed('Borough', 'DropoffBorough')
                .withColumnRenamed('Zone', 'DropoffZone')
                .withColumnRenamed('service_zone', 'DropoffServiceZone')
                .withColumnRenamed('Latitude', 'DOLatitude')
                .withColumnRenamed('Longitude', 'DOLongitude'),
    col('DOLocationID') == col('LocationID'),
    'left'
).drop('LocationID')

display(yt_df_S.limit(5))

RecordProviderName,PickupDatetime,DropoffDatetime,passenger_count,TripDistanceMiles,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,MeterFareAmount,extra,tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,TripDuration,PickupBorough,PickupZone,PickupServiceZone,PULatitude,PULongitude,DropoffBorough,DropoffZone,DropoffServiceZone,DOLatitude,DOLongitude
Creative Mobile Technologies,2020-01-01T00:28:15,2020-01-01T00:33:03,1.0,1.2,1.0,N,238,239,1,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5,,288,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Manhattan,Upper West Side South,Yellow Zone,-73.9782733418486,40.7841073645538
Creative Mobile Technologies,2020-01-01T00:35:39,2020-01-01T00:43:04,1.0,1.2,1.0,N,239,238,1,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5,,445,Manhattan,Upper West Side South,Yellow Zone,-73.9782733418486,40.7841073645538,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824
Creative Mobile Technologies,2020-01-01T00:47:41,2020-01-01T00:53:52,1.0,0.6,1.0,N,238,238,1,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5,,371,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824
Creative Mobile Technologies,2020-01-01T00:55:23,2020-01-01T01:00:14,1.0,0.8,1.0,N,238,151,1,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0,,291,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Manhattan,Manhattan Valley,Yellow Zone,-73.9678083909732,40.797866268004
VeriFone Inc.,2020-01-01T00:01:58,2020-01-01T00:04:16,1.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0,,138,Queens,Queensbridge/Ravenswood,Boro Zone,-73.9402863033051,40.7617244307638,Queens,Queensbridge/Ravenswood,Boro Zone,-73.9402863033051,40.7617244307638


### Data Cleaning for Yellow Taxi Data

Now we'll clean the data by removing invalid records:
1. Remove trips with zero or negative distance
2. Remove trips with duration less than 10 seconds
3. Remove trips with invalid location IDs
4. Handle disputed payments

In [0]:
# Remove trips with zero or negative distance
yt_df_S = yt_df_S.filter(yt_df_S['TripDistanceMiles'] > 0)

# Remove trips with very short duration (less than 10 seconds)
yt_df_S = yt_df_S.filter(yt_df_S["TripDuration"] >= 10)

# Remove trips with invalid location IDs (valid range: 1-263)
yt_df_S = yt_df_S.filter(
    (col('PULocationID') >= 1) & (col('PULocationID') <= 263) &
    (col('DOLocationID') >= 1) & (col('DOLocationID') <= 263)
)

# Handle disputed payment records (remove negative duplicates)
yt_df_S = yt_df_S.filter(~((col('total_amount') < 0) & (col('payment_type') == 4)))

# Drop unnecessary columns
yt_df_S = yt_df_S.drop('store_and_fwd_flag', 'NoOfPassengers', 'FareType', 'PaymentType', 'improvement_surcharge', 'airport_fee')

# Add ride type identifier
yt_df_S = yt_df_S.withColumn("RideType", lit("Yellow Taxi"))

# Display processed data
print("Processed Yellow Taxi Data Summary:")
print(f"Number of Records: {yt_df_S.count()}")
print(f"Number of Columns: {len(yt_df_S.columns)}")
display(yt_df_S.limit(5))

Processed Yellow Taxi Data Summary:
Number of Records: 168518626
Number of Columns: 28


RecordProviderName,PickupDatetime,DropoffDatetime,passenger_count,TripDistanceMiles,RatecodeID,PULocationID,DOLocationID,payment_type,MeterFareAmount,extra,tax,tip_amount,tolls_amount,total_amount,congestion_surcharge,TripDuration,PickupBorough,PickupZone,PickupServiceZone,PULatitude,PULongitude,DropoffBorough,DropoffZone,DropoffServiceZone,DOLatitude,DOLongitude,RideType
Creative Mobile Technologies,2020-01-01T00:28:15,2020-01-01T00:33:03,1.0,1.2,1.0,238,239,1,6.0,3.0,0.5,1.47,0.0,11.27,2.5,288,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Manhattan,Upper West Side South,Yellow Zone,-73.9782733418486,40.7841073645538,Yellow Taxi
Creative Mobile Technologies,2020-01-01T00:35:39,2020-01-01T00:43:04,1.0,1.2,1.0,239,238,1,7.0,3.0,0.5,1.5,0.0,12.3,2.5,445,Manhattan,Upper West Side South,Yellow Zone,-73.9782733418486,40.7841073645538,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Yellow Taxi
Creative Mobile Technologies,2020-01-01T00:47:41,2020-01-01T00:53:52,1.0,0.6,1.0,238,238,1,6.0,3.0,0.5,1.0,0.0,10.8,2.5,371,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Yellow Taxi
Creative Mobile Technologies,2020-01-01T00:55:23,2020-01-01T01:00:14,1.0,0.8,1.0,238,151,1,5.5,0.5,0.5,1.36,0.0,8.16,0.0,291,Manhattan,Upper West Side North,Yellow Zone,-73.9728145174503,40.7917662467824,Manhattan,Manhattan Valley,Yellow Zone,-73.9678083909732,40.797866268004,Yellow Taxi
VeriFone Inc.,2020-01-01T00:09:44,2020-01-01T00:10:37,1.0,0.03,1.0,7,193,2,2.5,0.5,0.5,0.0,0.0,3.8,0.0,53,Queens,Astoria,Boro Zone,-73.9214905669465,40.761084729151,Queens,Queensbridge/Ravenswood,Boro Zone,-73.9402863033051,40.7617244307638,Yellow Taxi


## 3.3 Transform Green Taxi Data

Now we'll apply similar transformations to the Green Taxi data. We'll follow the same steps:
1. Load the data
2. Apply transformations
3. Clean invalid records

The process is similar to Yellow Taxi data, but we need to handle some Green Taxi specific fields.

In [0]:
# Define paths for Green Taxi data
directory_path = '/dbfs/FileStore/tables/Input/green/'
directory_path_2 = '/FileStore/tables/Input/green/'

# Get list of Parquet files
parquet_files = [file for file in os.listdir(directory_path) if file.endswith(".parquet")]

# Initialize DataFrame
gt_df_S = None

# Read and combine Parquet files
for file in parquet_files:
    file_path = os.path.join(directory_path_2, file)
    df = spark.read.parquet(file_path)
    if gt_df_S is None:
         gt_df_S = df
    else:
        gt_df_S = gt_df_S.unionByName(df)

# Display initial data
print("Initial Green Taxi Data Summary:")
print(f"Number of Records: {gt_df_S.count()}")
print(f"Number of Columns: {len(gt_df_S.columns)}")
display(gt_df_S.limit(5))

Initial Green Taxi Data Summary:
Number of Records: 5090611
Number of Columns: 20


VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
2,2019-12-18T15:52:30,2019-12-18T15:54:39,N,1.0,264,264,5.0,0.0,3.5,0.5,0.5,0.01,0.0,,0.3,4.81,1.0,1.0,0.0
2,2020-01-01T00:45:58,2020-01-01T00:56:39,N,5.0,66,65,2.0,1.28,20.0,0.0,0.0,4.06,0.0,,0.3,24.36,1.0,2.0,0.0
2,2020-01-01T00:41:38,2020-01-01T00:52:49,N,1.0,181,228,1.0,2.47,10.5,0.5,0.5,3.54,0.0,,0.3,15.34,1.0,1.0,0.0
1,2020-01-01T00:52:46,2020-01-01T01:14:21,N,1.0,129,263,2.0,6.3,21.0,3.25,0.5,0.0,0.0,,0.3,25.05,2.0,1.0,2.75
1,2020-01-01T00:19:57,2020-01-01T00:30:56,N,1.0,210,150,1.0,2.3,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,1.0,1.0,0.0


### Initial Transformations on Green Taxi Data

In [0]:
# Calculate Trip Duration (in seconds)
gt_df_S = gt_df_S.withColumn("TripDuration", 
    unix_timestamp("lpep_dropoff_datetime") - unix_timestamp("lpep_pickup_datetime"))

# Clean vendor information
gt_df_S = gt_df_S.withColumn("VendorID", 
    when(col("VendorID") == 1, "Creative Mobile Technologies")
    .when(col("VendorID") == 2, "VeriFone Inc.")
    .otherwise(col("VendorID")))

# Standardize column names
gt_df_S = gt_df_S.withColumnRenamed('VendorID', 'RecordProviderName') \
                 .withColumnRenamed('lpep_pickup_datetime', 'PickupDatetime') \
                 .withColumnRenamed('lpep_dropoff_datetime', 'DropoffDatetime') \
                 .withColumnRenamed('trip_distance', 'TripDistanceMiles') \
                 .withColumnRenamed('fare_amount', 'MeterFareAmount')\
                 .withColumnRenamed('mta_tax', 'tax')

display(gt_df_S.limit(5))

RecordProviderName,PickupDatetime,DropoffDatetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,TripDistanceMiles,MeterFareAmount,extra,tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,TripDuration
VeriFone Inc.,2019-12-18T15:52:30,2019-12-18T15:54:39,N,1.0,264,264,5.0,0.0,3.5,0.5,0.5,0.01,0.0,,0.3,4.81,1.0,1.0,0.0,129
VeriFone Inc.,2020-01-01T00:45:58,2020-01-01T00:56:39,N,5.0,66,65,2.0,1.28,20.0,0.0,0.0,4.06,0.0,,0.3,24.36,1.0,2.0,0.0,641
VeriFone Inc.,2020-01-01T00:41:38,2020-01-01T00:52:49,N,1.0,181,228,1.0,2.47,10.5,0.5,0.5,3.54,0.0,,0.3,15.34,1.0,1.0,0.0,671
Creative Mobile Technologies,2020-01-01T00:52:46,2020-01-01T01:14:21,N,1.0,129,263,2.0,6.3,21.0,3.25,0.5,0.0,0.0,,0.3,25.05,2.0,1.0,2.75,1295
Creative Mobile Technologies,2020-01-01T00:19:57,2020-01-01T00:30:56,N,1.0,210,150,1.0,2.3,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,1.0,1.0,0.0,659


### Add Location Details to Green Taxi Data

In [0]:
# Add pickup location details
gt_df_S = gt_df_S.join(
    tz_latlong_S.select('LocationID', 'Borough', 'Zone', 'service_zone', 'Latitude', 'Longitude')
                .withColumnRenamed('Borough', 'PickupBorough')
                .withColumnRenamed('Zone', 'PickupZone')
                .withColumnRenamed('service_zone', 'PickupServiceZone')
                .withColumnRenamed('Latitude', 'PULatitude')
                .withColumnRenamed('Longitude', 'PULongitude'),
    col('PULocationID') == col('LocationID'),
    'left'
).drop('LocationID')

# Add dropoff location details
gt_df_S = gt_df_S.join(
    tz_latlong_S.select('LocationID', 'Borough', 'Zone', 'service_zone', 'Latitude', 'Longitude')
                .withColumnRenamed('Borough', 'DropoffBorough')
                .withColumnRenamed('Zone', 'DropoffZone')
                .withColumnRenamed('service_zone', 'DropoffServiceZone')
                .withColumnRenamed('Latitude', 'DOLatitude')
                .withColumnRenamed('Longitude', 'DOLongitude'),
    col('DOLocationID') == col('LocationID'),
    'left'
).drop('LocationID')

display(gt_df_S.limit(5))

RecordProviderName,PickupDatetime,DropoffDatetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,TripDistanceMiles,MeterFareAmount,extra,tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,TripDuration,PickupBorough,PickupZone,PickupServiceZone,PULatitude,PULongitude,DropoffBorough,DropoffZone,DropoffServiceZone,DOLatitude,DOLongitude
VeriFone Inc.,2019-12-18T15:52:30,2019-12-18T15:54:39,N,1.0,264,264,5.0,0.0,3.5,0.5,0.5,0.01,0.0,,0.3,4.81,1.0,1.0,0.0,129,,,,,,,,,,
VeriFone Inc.,2020-01-01T00:45:58,2020-01-01T00:56:39,N,5.0,66,65,2.0,1.28,20.0,0.0,0.0,4.06,0.0,,0.3,24.36,1.0,2.0,0.0,641,Brooklyn,DUMBO/Vinegar Hill,Boro Zone,-73.9863827475546,40.701732471687,Brooklyn,Downtown Brooklyn/MetroTech,Boro Zone,-73.9855710635353,40.6953726093294
VeriFone Inc.,2020-01-01T00:41:38,2020-01-01T00:52:49,N,1.0,181,228,1.0,2.47,10.5,0.5,0.5,3.54,0.0,,0.3,15.34,1.0,1.0,0.0,671,Brooklyn,Park Slope,Boro Zone,-73.979044896845,40.672019140546,Brooklyn,Sunset Park West,Boro Zone,-74.009960471569,40.6536182069019
Creative Mobile Technologies,2020-01-01T00:52:46,2020-01-01T01:14:21,N,1.0,129,263,2.0,6.3,21.0,3.25,0.5,0.0,0.0,,0.3,25.05,2.0,1.0,2.75,1295,Queens,Jackson Heights,Boro Zone,-73.8874063794919,40.7590574739636,Manhattan,Yorkville West,Yellow Zone,-73.9512079916544,40.7784958687768
Creative Mobile Technologies,2020-01-01T00:19:57,2020-01-01T00:30:56,N,1.0,210,150,1.0,2.3,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,1.0,1.0,0.0,659,Brooklyn,Sheepshead Bay,Boro Zone,-73.9443360454416,40.5937616085004,Brooklyn,Manhattan Beach,Boro Zone,-73.9428842620734,40.5802623241253


### Data Cleaning for Green Taxi Data

In [0]:
# Remove trips with zero or negative distance
gt_df_S = gt_df_S.filter(gt_df_S['TripDistanceMiles'] > 0)

# Remove trips with very short duration (less than 10 seconds)
gt_df_S = gt_df_S.filter(gt_df_S["TripDuration"] >= 10)

# Remove invalid locations
gt_df_S = gt_df_S.filter(
    (col('PULocationID') >= 1) & (col('PULocationID') <= 263) &
    (col('DOLocationID') >= 1) & (col('DOLocationID') <= 263)
)

# Handle disputed payments
gt_df_S = gt_df_S.filter(~((col('total_amount') < 0) & (col('payment_type') == 4)))

# Drop unnecessary columns
gt_df_S = gt_df_S.drop('store_and_fwd_flag', 'ehail_fee', 'NoOfPassengers', 'FareType', 'PaymentType', 'improvement_surcharge', 'trip_type')

# Add ride type identifier
gt_df_S = gt_df_S.withColumn("RideType", lit("Green Taxi"))

# Display processed data
print("Processed Green Taxi Data Summary:")
print(f"Number of Records: {gt_df_S.count()}")
print(f"Number of Columns: {len(gt_df_S.columns)}")
display(gt_df_S.limit(10))

Processed Green Taxi Data Summary:
Number of Records: 4812410
Number of Columns: 28


RecordProviderName,PickupDatetime,DropoffDatetime,RatecodeID,PULocationID,DOLocationID,passenger_count,TripDistanceMiles,MeterFareAmount,extra,tax,tip_amount,tolls_amount,total_amount,payment_type,congestion_surcharge,TripDuration,PickupBorough,PickupZone,PickupServiceZone,PULatitude,PULongitude,DropoffBorough,DropoffZone,DropoffServiceZone,DOLatitude,DOLongitude,RideType
VeriFone Inc.,2020-01-01T00:45:58,2020-01-01T00:56:39,5.0,66,65,2.0,1.28,20.0,0.0,0.0,4.06,0.0,24.36,1.0,0.0,641,Brooklyn,DUMBO/Vinegar Hill,Boro Zone,-73.9863827475546,40.701732471687,Brooklyn,Downtown Brooklyn/MetroTech,Boro Zone,-73.9855710635353,40.6953726093294,Green Taxi
VeriFone Inc.,2020-01-01T00:41:38,2020-01-01T00:52:49,1.0,181,228,1.0,2.47,10.5,0.5,0.5,3.54,0.0,15.34,1.0,0.0,671,Brooklyn,Park Slope,Boro Zone,-73.979044896845,40.672019140546,Brooklyn,Sunset Park West,Boro Zone,-74.009960471569,40.6536182069019,Green Taxi
Creative Mobile Technologies,2020-01-01T00:52:46,2020-01-01T01:14:21,1.0,129,263,2.0,6.3,21.0,3.25,0.5,0.0,0.0,25.05,2.0,2.75,1295,Queens,Jackson Heights,Boro Zone,-73.8874063794919,40.7590574739636,Manhattan,Yorkville West,Yellow Zone,-73.9512079916544,40.7784958687768,Green Taxi
Creative Mobile Technologies,2020-01-01T00:19:57,2020-01-01T00:30:56,1.0,210,150,1.0,2.3,10.0,0.5,0.5,0.0,0.0,11.3,1.0,0.0,659,Brooklyn,Sheepshead Bay,Boro Zone,-73.9443360454416,40.5937616085004,Brooklyn,Manhattan Beach,Boro Zone,-73.9428842620734,40.5802623241253,Green Taxi
Creative Mobile Technologies,2020-01-01T00:52:33,2020-01-01T01:09:54,1.0,35,39,1.0,3.0,13.5,0.5,0.5,0.0,0.0,14.8,1.0,0.0,1041,Brooklyn,Brownsville,Boro Zone,-73.9124814882782,40.6638098226151,Brooklyn,Canarsie,Boro Zone,-73.899773871952,40.6388789299243,Green Taxi
VeriFone Inc.,2020-01-01T00:10:18,2020-01-01T00:22:16,1.0,25,61,1.0,2.77,11.0,0.5,0.5,0.0,0.0,12.3,2.0,0.0,718,Brooklyn,Boerum Hill,Boro Zone,-73.9864589876713,40.685614589464,Brooklyn,Crown Heights North,Boro Zone,-73.9412820292214,40.6738428461316,Green Taxi
VeriFone Inc.,2020-01-01T01:03:14,2020-01-01T01:29:45,1.0,225,89,1.0,4.98,20.5,0.5,0.5,0.0,0.0,21.8,2.0,0.0,1591,Brooklyn,Stuyvesant Heights,Boro Zone,-73.9314448522357,40.6887850672001,Brooklyn,Flatbush/Ditmas Park,Boro Zone,-73.9626937337469,40.64098273381,Green Taxi
VeriFone Inc.,2020-01-01T00:04:11,2020-01-01T00:09:48,1.0,129,129,1.0,0.71,5.5,0.5,0.5,0.0,0.0,6.8,2.0,0.0,337,Queens,Jackson Heights,Boro Zone,-73.8874063794919,40.7590574739636,Queens,Jackson Heights,Boro Zone,-73.8874063794919,40.7590574739636,Green Taxi
VeriFone Inc.,2020-01-01T00:25:52,2020-01-01T00:32:16,1.0,129,83,1.0,0.8,5.5,0.5,0.5,0.0,0.0,6.8,2.0,0.0,384,Queens,Jackson Heights,Boro Zone,-73.8874063794919,40.7590574739636,Queens,Elmhurst/Maspeth,Boro Zone,-73.889221829475,40.7401456031773,Green Taxi
VeriFone Inc.,2020-01-01T00:47:32,2020-01-01T00:59:25,1.0,82,173,1.0,1.52,9.5,0.5,0.5,0.0,0.0,10.8,2.0,0.0,713,Queens,Elmhurst,Boro Zone,-73.8723440095934,40.7384639203673,Queens,North Corona,Boro Zone,-73.8630837007824,40.7517792439212,Green Taxi


## 3.4 Final Data Transformations

Now we'll:
1. Merge Yellow and Green taxi data
2. Add time-based features
3. Create daily aggregations by Date, Pickup borough, & Ride type
4. Add additional features (seasons, holidays, workdays)

In [0]:
# Merge Yellow and Green taxi data
merged_df = yt_df_S.unionByName(gt_df_S)

# Convert pickup datetime to timestamp and extract date
merged_df = merged_df.withColumn('PickupDatetime', merged_df['PickupDatetime'].cast('timestamp'))
merged_df = merged_df.withColumn('Date', merged_df['PickupDatetime'].cast('date'))

# Create daily aggregations
daily_agg_df = merged_df.groupBy('Date', 'PickupBorough', 'RideType') \
                       .count() \
                       .withColumnRenamed('count', 'RideCount')

# Filter date range
daily_agg_df = daily_agg_df.filter((col('Date') >= '2020-01-01') & (col('Date') < '2025-01-01'))

# Add time-based features i.e. breakdown the date into year, month, day
daily_agg_df = daily_agg_df.withColumn('Year', year(col('Date'))) \
                          .withColumn('Month', month(col('Date'))) \
                          .withColumn('Day', dayofmonth(col('Date')))

# Add season
daily_agg_df = daily_agg_df.withColumn('Season', 
    when(col('Month').between(3, 5), 'Spring')
    .when(col('Month').between(6, 8), 'Summer')
    .when(col('Month').between(9, 11), 'Autumn')
    .otherwise('Winter'))

### Add a binary column for public holidays in New York

In [0]:
# Create the holidays object
nyc_holidays = holidays.country_holidays('US', subdiv='NY')

# Broadcast the holidays to Spark workers
broadcast_holidays = spark.sparkContext.broadcast(nyc_holidays)

# Define a User-defined function (UDF) to check if a date is a holiday
def is_public_holiday(date):
    return 1 if date in broadcast_holidays.value else 0
holiday_udf = udf(is_public_holiday, IntegerType())

# Apply the UDF to your DataFrame
daily_agg_df = daily_agg_df.withColumn("PublicHoliday", holiday_udf(daily_agg_df["Date"]))

In [0]:
# Add a coulum for day of the week
daily_agg_df = daily_agg_df.withColumn('DayOfWeek', date_format(col('Date'), 'E'))

# Add workday indicator
daily_agg_df = daily_agg_df.withColumn('Workday', 
    when(col('DayOfWeek').isin(['Mon', 'Tue', 'Wed', 'Thu', 'Fri']), 1)
    .otherwise(0))

# Drop the date column
daily_agg_df = daily_agg_df.drop('Date')

# Display final dataset structure
print("\nFinal Dataset Summary:")
print(f"Number of records: {daily_agg_df.count()}")
display(daily_agg_df.limit(10))


Final Dataset Summary:
Number of records: 18519


PickupBorough,RideType,RideCount,Year,Month,Day,Season,PublicHoliday,DayOfWeek,Workday
EWR,Yellow Taxi,8,2020,1,2,Winter,0,Thu,1
EWR,Yellow Taxi,4,2020,1,22,Winter,0,Wed,1
Brooklyn,Yellow Taxi,1612,2020,1,2,Winter,0,Thu,1
Brooklyn,Yellow Taxi,2546,2020,1,7,Winter,0,Tue,1
Queens,Yellow Taxi,11357,2020,1,19,Winter,0,Sun,0
EWR,Yellow Taxi,6,2020,1,19,Winter,0,Sun,0
Brooklyn,Yellow Taxi,1808,2020,1,21,Winter,0,Tue,1
Manhattan,Yellow Taxi,197971,2020,1,15,Winter,0,Wed,1
Queens,Yellow Taxi,12787,2020,1,16,Winter,0,Thu,1
Manhattan,Yellow Taxi,222564,2020,1,30,Winter,0,Thu,1


# 4. Load Data (L)

In this final section, we will:
1. Save the processed data as CSV
2. Create MLTable configuration for Azure ML
3. Mount Azure storage
4. Copy the data to Azure ML datastore

### Save the processed data as CSV

In [0]:
# Save the processed DataFrame as a CSV file
daily_agg_df.coalesce(1).write.format('csv').mode('overwrite').option('header', 'true').save("/FileStore/tables/Output/")

In [0]:
%sh

# Verify the output folder (there shoud be a .csv file with name like part-00000*)
cd /dbfs/FileStore/tables/Output/
ls *.csv

part-00000-tid-234508123809289828-d269518f-16ff-40e5-80ec-8d0b16bcd192-888-1-c000.csv


In [0]:
%sh

cd /dbfs/FileStore/tables/Output/
# Rename the final CSV file
mv ./part-00000* ./nyc_daily_aggregated.csv
ls *.csv

nyc_daily_aggregated.csv


### Create `MLTable` configuration file for Azure ML

In [0]:
%sh

# Create MLTable configuration for Azure ML
cd /dbfs/FileStore/tables/Output/
echo "# MLTable definition file
paths:
  - file: ./nyc_daily_aggregated.csv
transformations:
  - read_delimited:
      delimiter: ','
      encoding: 'ascii'" > MLTable

### Mount Azure storage

In [0]:
# Replace these with your actual values
container_name = "azureml-blobstore-95bbd763-1d6c-4c3c-9f2b-c3d9b1457146"
storage_account_name = "atsazureml0329310979"
storage_account_key = "/QLBKgKRC3lkqnuGDl1PuZdzUM-EXAMPLE-lrDgkVndfozKmplxwY/Ulh/YqPCGgIou+AStC9zx0w=="

# Mount the container to DBFS
dbutils.fs.mount(
    source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
    mount_point = "/mnt/AzureMLDatastore",
    extra_configs = {
        f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key
    }
)

True

### Copy the data to Azure ML Datastore

In [0]:
# Copy the CSV file
dbutils.fs.cp("/FileStore/tables/Output/nyc_daily_aggregated.csv", "/mnt/AzureMLDatastore/nyctaxi/nyc_daily_aggregated.csv")

# Copy the MLTable file
dbutils.fs.cp("/FileStore/tables/Output/MLTable", "/mnt/AzureMLDatastore/nyctaxi/MLTable")

True

### Verify the files in the Azure Storage account
![Verify Azure ML Datastore](https://atsailabstorage.blob.core.windows.net/misc/VerifyAzureMLDatastore.png)

# Conclusion

We have successfully completed the ETL process for NYC Taxi data:

1. **Extracted** data from NYC Taxi and Limousine Commission (TLC) server:
   - Yellow Taxi trip records
   - Green Taxi trip records
   - Taxi zone lookup tables
   - Location coordinates

2. **Transformed** the data by:
   - Cleaning invalid records
   - Standardizing column names
   - Merging Yellow and Green taxi data
   - Creating daily aggregations by date, borough, & ride type
   - Adding derived features

3. **Loaded** the processed data:
   - Saved as CSV file
   - Created MLTable configuration
   - Copied the files to Azure ML datastore

The final dataset is now ready for model training in Azure Machine Learning studio.