In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Data Reading

### trip_type

In [0]:
df_trip_type = spark.read.format('csv')\
                        .option('inferSchema',True)\
                        .option('header',True)\
                        .load('abfss://bronze@nyctaxidatalake00.dfs.core.windows.net/trip_type/')
df_trip_type.display()

trip_type,description
1,Street-hail
2,Dispatch


### trip_zone

In [0]:
df_trip_zone = spark.read.format('csv')\
                        .option('inferSchema',True)\
                        .option('header',True)\
                        .load('abfss://bronze@nyctaxidatalake00.dfs.core.windows.net/trip_zone')
df_trip_zone.limit(10).display()

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


Defining Schema

In [0]:
my_schema = '''
            VendorID BIGINT,
            lpep_pickup_datetime TIMESTAMP,
            lpep_dropoff_datetime TIMESTAMP,
            store_and_fwd_flag STRING,
            RatecodeID BIGINT,
            PULocationID INT,
            DOLocationID INT,
            passenger_count BIGINT,
            trip_distance DOUBLE,
            fare_amount DOUBLE,
            extra DOUBLE,
            mta_tax DOUBLE,
            tip_amount DOUBLE,
            tolls_amount DOUBLE,
            ehail_fee DOUBLE,
            improvement_surcharge DOUBLE,
            total_amount DOUBLE,
            payment_type BIGINT,
            trip_type BIGINT,
            congestion_surcharge DOUBLE
'''

RecursiveFileLookup

In [0]:
df_trip = spark.read.format('parquet')\
                    .schema(my_schema)\
                    .option('header',True)\
                    .option('recursiveFileLookup',True)\
                    .load('abfss://bronze@nyctaxidatalake00.dfs.core.windows.net/trips2024/')
df_trip.limit(10).display()

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
2,2024-05-01T00:07:08.000Z,2024-05-01T00:15:03.000Z,N,1,65,49,1,1.24,9.3,1.0,0.5,2.0,0.0,,1.0,13.8,1,1,0.0
2,2024-05-01T00:30:48.000Z,2024-05-01T00:35:49.000Z,N,1,7,179,1,0.94,7.2,1.0,0.5,1.94,0.0,,1.0,11.64,1,1,0.0
2,2024-05-01T00:34:13.000Z,2024-05-01T00:38:07.000Z,N,1,74,42,1,0.84,6.5,1.0,0.5,0.0,0.0,,1.0,9.0,2,1,0.0
2,2024-05-01T00:58:01.000Z,2024-05-01T01:14:41.000Z,N,1,75,235,1,6.07,25.4,1.0,0.5,5.0,0.0,,1.0,32.9,1,1,0.0
2,2024-05-01T00:11:45.000Z,2024-05-01T00:20:38.000Z,N,1,256,49,2,2.06,12.1,1.0,0.5,2.92,0.0,,1.0,17.52,1,1,0.0
1,2024-05-01T00:29:06.000Z,2024-05-01T00:36:03.000Z,N,1,210,210,1,1.3,9.3,1.0,1.5,1.0,0.0,,1.0,12.8,1,1,0.0
2,2024-05-01T00:06:23.000Z,2024-05-01T00:18:18.000Z,N,1,66,4,5,4.35,19.8,1.0,0.5,3.0,0.0,,1.0,28.05,1,1,2.75
2,2024-05-01T00:06:36.000Z,2024-05-01T00:18:03.000Z,N,1,95,95,1,2.02,13.5,1.0,0.5,0.0,0.0,,1.0,16.0,2,1,0.0
2,2024-05-01T00:58:01.000Z,2024-05-01T01:07:35.000Z,N,1,24,143,1,2.35,12.8,1.0,0.5,3.0,0.0,,1.0,21.05,1,1,2.75
2,2024-05-01T00:54:12.000Z,2024-05-01T00:58:51.000Z,N,5,210,210,1,1.3,8.0,0.0,0.0,0.0,0.0,,1.0,9.0,1,2,0.0


# Data Transformation

### Trip_Type

In [0]:
df_trip_type = df_trip_type.withColumnRenamed('description','trip_description')
df_trip_type.display()

trip_type,trip_description
1,Street-hail
2,Dispatch


In [0]:
df_trip_type.write.format('parquet')\
                        .mode('append')\
                        .option('path','abfss://silver@nyctaxidatalake00.dfs.core.windows.net/trip_type')\
                        .save()

### Trip_Zone

In [0]:
df_trip_zone = df_trip_zone.withColumn('zone1',split(col('Zone'),'/')[0])

In [0]:
df_trip_zone.limit(10).display()

LocationID,Borough,Zone,service_zone,zone1
1,EWR,Newark Airport,EWR,Newark Airport
2,Queens,Jamaica Bay,Boro Zone,Jamaica Bay
3,Bronx,Allerton/Pelham Gardens,Boro Zone,Allerton
4,Manhattan,Alphabet City,Yellow Zone,Alphabet City
5,Staten Island,Arden Heights,Boro Zone,Arden Heights
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,Arrochar
7,Queens,Astoria,Boro Zone,Astoria
8,Queens,Astoria Park,Boro Zone,Astoria Park
9,Queens,Auburndale,Boro Zone,Auburndale
10,Queens,Baisley Park,Boro Zone,Baisley Park


In [0]:
df_trip_zone.write.format('parquet')\
                        .mode('append')\
                        .option('path','abfss://silver@nyctaxidatalake00.dfs.core.windows.net/trip_zone')\
                        .save()

### Trip Data

In [0]:
df_trip = df_trip.withColumn('Trip_Date',to_date(col('lpep_pickup_datetime')))\
                    .withColumn('Trip_Year',year(col('Trip_Date')))\
                    .withColumn('Trip_Month',month(col('Trip_Date')))\
                    .withColumn('Trip_Day',dayofmonth(col('Trip_Date')))
df_trip.limit(10).display()

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,Trip_Date,Trip_Year,Trip_Month,Trip_Day
2,2024-05-01T00:07:08.000Z,2024-05-01T00:15:03.000Z,N,1,65,49,1,1.24,9.3,1.0,0.5,2.0,0.0,,1.0,13.8,1,1,0.0,2024-05-01,2024,5,1
2,2024-05-01T00:30:48.000Z,2024-05-01T00:35:49.000Z,N,1,7,179,1,0.94,7.2,1.0,0.5,1.94,0.0,,1.0,11.64,1,1,0.0,2024-05-01,2024,5,1
2,2024-05-01T00:34:13.000Z,2024-05-01T00:38:07.000Z,N,1,74,42,1,0.84,6.5,1.0,0.5,0.0,0.0,,1.0,9.0,2,1,0.0,2024-05-01,2024,5,1
2,2024-05-01T00:58:01.000Z,2024-05-01T01:14:41.000Z,N,1,75,235,1,6.07,25.4,1.0,0.5,5.0,0.0,,1.0,32.9,1,1,0.0,2024-05-01,2024,5,1
2,2024-05-01T00:11:45.000Z,2024-05-01T00:20:38.000Z,N,1,256,49,2,2.06,12.1,1.0,0.5,2.92,0.0,,1.0,17.52,1,1,0.0,2024-05-01,2024,5,1
1,2024-05-01T00:29:06.000Z,2024-05-01T00:36:03.000Z,N,1,210,210,1,1.3,9.3,1.0,1.5,1.0,0.0,,1.0,12.8,1,1,0.0,2024-05-01,2024,5,1
2,2024-05-01T00:06:23.000Z,2024-05-01T00:18:18.000Z,N,1,66,4,5,4.35,19.8,1.0,0.5,3.0,0.0,,1.0,28.05,1,1,2.75,2024-05-01,2024,5,1
2,2024-05-01T00:06:36.000Z,2024-05-01T00:18:03.000Z,N,1,95,95,1,2.02,13.5,1.0,0.5,0.0,0.0,,1.0,16.0,2,1,0.0,2024-05-01,2024,5,1
2,2024-05-01T00:58:01.000Z,2024-05-01T01:07:35.000Z,N,1,24,143,1,2.35,12.8,1.0,0.5,3.0,0.0,,1.0,21.05,1,1,2.75,2024-05-01,2024,5,1
2,2024-05-01T00:54:12.000Z,2024-05-01T00:58:51.000Z,N,5,210,210,1,1.3,8.0,0.0,0.0,0.0,0.0,,1.0,9.0,1,2,0.0,2024-05-01,2024,5,1


In [0]:
df_trip = df_trip.select('VendorID','PULocationID','DOLocationID','trip_distance','fare_amount','total_amount')
df_trip.limit(10).display()

VendorID,PULocationID,DOLocationID,trip_distance,fare_amount,total_amount
2,65,49,1.24,9.3,13.8
2,7,179,0.94,7.2,11.64
2,74,42,0.84,6.5,9.0
2,75,235,6.07,25.4,32.9
2,256,49,2.06,12.1,17.52
1,210,210,1.3,9.3,12.8
2,66,4,4.35,19.8,28.05
2,95,95,2.02,13.5,16.0
2,24,143,2.35,12.8,21.05
2,210,210,1.3,8.0,9.0


In [0]:
df_trip.write.format('parquet')\
                        .mode('append')\
                        .option('path','abfss://silver@nyctaxidatalake00.dfs.core.windows.net/trips2024')\
                        .save()