## SILVER LAYER SCRIPT

#### DEFINING CREDENTIALS TO ACCESS THE DATA FROM DATALAKE

In [0]:
spark.conf.set("fs.azure.account.auth.type.nycdatalakestore.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.nycdatalakestore.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.nycdatalakestore.dfs.core.windows.net", "854a7cbb-3823-433d-90c7-79d255f4394f")
spark.conf.set("fs.azure.account.oauth2.client.secret.nycdatalakestore.dfs.core.windows.net", "Yju8Q~PqQCDQQa9jq0EzeSZrgdvvk3TqO3puNbCi")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.nycdatalakestore.dfs.core.windows.net", "https://login.microsoftonline.com/18ffd786-c707-4f2c-b8b0-5f49463d2c32/oauth2/token")

#### Test the connection

In [0]:
dbutils.fs.ls("abfss://nyc-bronze@nycdatalakestore.dfs.core.windows.net/")

[FileInfo(path='abfss://nyc-bronze@nycdatalakestore.dfs.core.windows.net/Trip_Zone/', name='Trip_Zone/', size=0, modificationTime=1745668151000),
 FileInfo(path='abfss://nyc-bronze@nycdatalakestore.dfs.core.windows.net/Trip_type/', name='Trip_type/', size=0, modificationTime=1745668138000),
 FileInfo(path='abfss://nyc-bronze@nycdatalakestore.dfs.core.windows.net/trip-data/', name='trip-data/', size=0, modificationTime=1745678615000)]

### READING DATA

#### Importing Necessary Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#### Reading CSV Data
  1. Trip Type Data
  2. Trip Zone Data

In [0]:
df_trip_type = spark.read.format('csv')\
                    .option('inferschema', True)\
                    .option('header', True)\
                    .load('abfss://nyc-bronze@nycdatalakestore.dfs.core.windows.net/Trip_type')

In [0]:
df_trip_type.display()

trip_type,description
1,Street-hail
2,Dispatch


In [0]:
df_trip_zone = spark.read.format('csv')\
                    .option('inferschema', True)\
                    .option('header', True)\
                    .load('abfss://nyc-bronze@nycdatalakestore.dfs.core.windows.net/Trip_Zone')

In [0]:
# df_trip_zone.display()

#### Trip Data - 2024
 If you have files within multiple folders, use recursive file lookup i.e .option('recursiveFileLookup', True) and 
 use .schema(schema_name) if you have defined your own schema.

In [0]:
df_trip_2024 = spark.read.format('parquet')\
                         .schema(my_schema)\
                         .option('header', True)\
                         .load('abfss://nyc-bronze@nycdatalakestore.dfs.core.windows.net/trip-data/')

#### Defining the schema in DDL format

In [0]:
my_schema = '''
                VendorId BIGINT,
                lpep_pickup_datetime TIMESTAMP,
                lpep_dropoff_datetime TIMESTAMP,
                store_and_fwd_flag STRING,
                RatecodeID BIGINT,
                PULocationID BIGINT,
                DOLocationID BIGINT,
                passenger_count BIGINT,
                trip_distance DOUBLE,
                fare_amount DOUBLE,
                extra DOUBLE,
                mta_tax DOUBLE,
                tip_amount DOUBLE,
                tolls_amount DOUBLE,
                ehail_fee DOUBLE,
                improvement_surcharge DOUBLE,
                total_amount DOUBLE,
                payment_type BIGINT,
                trip_type BIGINT,
                congestion_surcharge DOUBLE
            '''

In [0]:
# df_trip_2024.display()

### DATA TRANSFORMATIONS AND WRITING DATA TO SILVER CONTAINER

#### Trip Type
  withColumnRenamed() - Rename the column 'description' with 'trip_description' 

In [0]:
df_trip_type = df_trip_type.withColumnRenamed('description', 'trip_description')

In [0]:
df_trip_type.write.format('parquet')\
                 .mode('append')\
                 .option("path", "abfss://nyc-silver@nycdatalakestore.dfs.core.windows.net/Trip_type")\
                 .save()

In [0]:
df_trip_type.display()

trip_type,trip_description
1,Street-hail
2,Dispatch


#### Trip Zone Lookup
 Few records has multiple zones. Split the Zone column where it has multiple zones and create 2 new columns namely Zone_1 and Zone_2 with index[0]and index[1] respectively.
  1. split() - will split the Zone string into an array using / as the separator.

In [0]:
df_trip_zone = df_trip_zone.withColumn('Zone_1', split(col('Zone'),'/')[0])\
                           .withColumn('Zone_2', split(col('Zone'),'/')[1])


In [0]:
df_trip_zone.write.format('parquet')\
                 .mode('append')\
                 .option("path", "abfss://nyc-silver@nycdatalakestore.dfs.core.windows.net/Trip_Zone")\
                 .save()

In [0]:
# df_trip_zone.display()

### Green Taxi Trip Data - 2024
 1. Convert timestamp to date, month and year for column 'lpep_pickup_datetime' by creating 3 new columns namely
     PickUp_Date, PickUp_Month and PickUp-Year
 2. Convert timestamp to Date for column 'lpep_dropoff_datetime' by creating new column 'DropOff_Date'


In [0]:
df_trip_2024 = df_trip_2024.withColumn('PickUp_Date',to_date('lpep_pickup_datetime'))\
                           .withColumn('PickUp_Month', month('lpep_pickup_datetime'))\
                           .withColumn('PickUp_Year', year('lpep_pickup_datetime'))\
                           .withColumn('DropOff_Date',to_date('lpep_dropoff_datetime'))
# df_trip_2024.display()


#### Select only required columns instead of writing all the columns using select()

In [0]:
df_trip_2024 = df_trip_2024.select('VendorId','PickUp_Date','DropOff_Date','PULocationID','DOLocationID','passenger_count','trip_distance','fare_amount','tip_amount','total_amount')

# df_trip_2024.display()

In [0]:
df_trip_2024.write.format('parquet')\
                 .mode('append')\
                 .option("path", "abfss://nyc-silver@nycdatalakestore.dfs.core.windows.net/trip_data")\
                 .save()

### Data Analysis and Visualization

#### 1. Number of Zones for each Borough

In [0]:
df_trip_zone.groupBy('Borough').count().orderBy(desc('count')).display()

Borough,count
Queens,69
Manhattan,69
Brooklyn,61
Bronx,43
Staten Island,20
EWR,1
Unknown,1
,1


Databricks visualization. Run in Databricks to view.

2. Service Zone Distribution

In [0]:
df_trip_zone.groupBy('service_zone').count().orderBy(desc('count')).display()

service_zone,count
Boro Zone,205
Yellow Zone,55
,2
Airports,2
EWR,1


Databricks visualization. Run in Databricks to view.

3. Average Fare, Tip, Total

In [0]:
df_trip_2024.select(
            round(avg("fare_amount"), 2).alias("avg_fare"),
            round(avg("tip_amount"), 2).alias("avg_tip"),
            round(avg("total_amount"), 2).alias("avg_total")
        ).display()

avg_fare,avg_tip,avg_total
18.36,2.57,24.26


4. Top 10 pickup zones

In [0]:
df_trip_2024.groupBy("PULocationID") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .display()

PULocationID,count
74,149788
75,91276
166,33400
95,32720
43,32616
82,30643
41,29069
97,21437
65,19116
130,15317


Databricks visualization. Run in Databricks to view.