In [0]:
spark

# Data Access

In [0]:
spark.conf.set("fs.azure.account.auth.type.nyctaxidataproject.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.nyctaxidataproject.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.nyctaxidataproject.dfs.core.windows.net", app_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.nyctaxidataproject.dfs.core.windows.net", Secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.nyctaxidataproject.dfs.core.windows.net", "https://login.microsoftonline.com/XXX/oauth2/token")

# Testing Connection

In [0]:
dbutils.fs.ls("abfss://bronze@nyctaxidataproject.dfs.core.windows.net")

[FileInfo(path='abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_data_2021/', name='trip_data_2021/', size=0, modificationTime=1745720540000),
 FileInfo(path='abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_data_2022/', name='trip_data_2022/', size=0, modificationTime=1745720589000),
 FileInfo(path='abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_data_2023/', name='trip_data_2023/', size=0, modificationTime=1745720591000),
 FileInfo(path='abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_data_2024/', name='trip_data_2024/', size=0, modificationTime=1745720592000),
 FileInfo(path='abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_type/', name='trip_type/', size=0, modificationTime=1745676197000),
 FileInfo(path='abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_zone/', name='trip_zone/', size=0, modificationTime=1745676203000)]

# Import libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Reading Data

In [0]:
df_trip_type = spark.read.format("csv")\
    .option("header", "true").option("inferSchema", "true")\
    .load("abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_type/")

In [0]:
df_trip_type.display()

trip_type,description
1,Street-hail
2,Dispatch


# Trip-Zone

In [0]:
df_trip_zone = spark.read.format('csv')\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfss://bronze@nyctaxidataproject.dfs.core.windows.net/trip_zone/")

In [0]:
df_trip_zone.display()

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


# Trip - Data

In [0]:
myschema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("lpep_pickup_datetime", TimestampType(), True),
    StructField("lpep_dropoff_datetime", TimestampType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("RatecodeID", LongType(), True),
    StructField("PULocationID", LongType(), True),
    StructField("DOLocationID", LongType(), True),
    StructField("passenger_count", LongType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("ehail_fee", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", LongType(), True),
    StructField("trip_type", LongType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])

# Any format works

# myschema1 = """
#             VendorID BIGINT,
#             lpep_pickup_datetime TIMESTAMP,
#             lpep_dropoff_datetime TIMESTAMP,
#             store_and_fwd_flag STRING,
#             RatecodeID BIGINT,
#             PULocationID BIGINT,
#             DOLocationID BIGINT,
#             passenger_count BIGINT,
#             trip_distance DOUBLE,
#             fare_amount DOUBLE,
#             extra DOUBLE,
#             mta_tax DOUBLE,
#             tip_amount DOUBLE,
#             tolls_amount DOUBLE,
#             ehail_fee DOUBLE,
#             improvement_surcharge DOUBLE,
#             total_amount DOUBLE,
#             payment_type BIGINT,
#             trip_type BIGINT,
#             congestion_surcharge DOUBLE
# """

In [0]:
bronze_path = "abfss://bronze@nyctaxidataproject.dfs.core.windows.net/"

# Read all trip data from all years
df_trip = spark.read.parquet(f"{bronze_path}/trip_data_*/trip-data/")

          

In [0]:
# Count number of rows
row_count = df_trip.count()

print(f"Total number of rows: {row_count}")

Total number of rows: 3356435


# Data Transformation

### Taxi-Trip-Type

In [0]:
df_trip_type = df_trip_type.withColumnRenamed('description', 'trip_description')


In [0]:
df_trip_type.write.format('parquet')\
                  .mode('append')\
                  .option('path', 'abfss://silver@nyctaxidataproject.dfs.core.windows.net/trip_type')\
                  .save()

### Trip-Zone

In [0]:
df_trip_zone = df_trip_zone.withColumn('Zone1', split(col('Zone'), '/')[0])\
                           .withColumn('Zone2', split(col('Zone'), '/')[1])

df_trip_zone.display()

LocationID,Borough,Zone,service_zone,Zone1,Zone2
1,EWR,Newark Airport,EWR,Newark Airport,
2,Queens,Jamaica Bay,Boro Zone,Jamaica Bay,
3,Bronx,Allerton/Pelham Gardens,Boro Zone,Allerton,Pelham Gardens
4,Manhattan,Alphabet City,Yellow Zone,Alphabet City,
5,Staten Island,Arden Heights,Boro Zone,Arden Heights,
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,Arrochar,Fort Wadsworth
7,Queens,Astoria,Boro Zone,Astoria,
8,Queens,Astoria Park,Boro Zone,Astoria Park,
9,Queens,Auburndale,Boro Zone,Auburndale,
10,Queens,Baisley Park,Boro Zone,Baisley Park,


In [0]:
df_trip_zone.write.format('parquet')\
                  .mode('append')\
                  .option('path', 'abfss://silver@nyctaxidataproject.dfs.core.windows.net/trip_zone')\
                  .save()

### Trip-Data

In [0]:
df_trip = df_trip.withColumn('trip_date', to_date(col('lpep_pickup_datetime')))\
       .withColumn('trip_year', year(col('lpep_pickup_datetime')))\
       .withColumn('trip_month', month(col('lpep_pickup_datetime')))

In [0]:
df_trip.groupBy('trip_year').agg(sum(col('total_amount')).alias('Revenue')).display()

trip_year,Revenue
2023,18776563.750000037
2022,16238334.779974531
2009,360.91
2010,115.21
2008,306.45
2021,25577742.590075985
2024,16019489.130000852
2025,272.52
2020,9.3


In [0]:
df_trip = df_trip.select('VendorID', 'PULocationID', 'DOLocationID', 'fare_amount', 'total_amount')

df_trip.display()

VendorID,PULocationID,DOLocationID,fare_amount,total_amount
2,37,37,10.0,10.3
2,92,82,20.0,20.3
2,41,167,13.0,13.3
2,134,135,7.0,8.3
2,119,247,5.5,6.8
2,169,235,5.5,6.8
2,75,217,28.0,32.05
2,66,90,25.0,34.86
2,82,56,9.0,10.3
2,66,231,10.5,17.46


In [0]:
df_trip.write.format('parquet')\
                  .mode('append')\
                  .option('path', 'abfss://silver@nyctaxidataproject.dfs.core.windows.net/trip_data(2021-2024)')\
                  .save()