# Data Access

In [0]:
service_credential = dbutils.secrets.get(scope="<secret-scope>",key="<service-credential-key>")

spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

In [0]:
dbutils.fs.ls('abfss://bronze@nyctaxistorageansh.dfs.core.windows.net/')

[FileInfo(path='abfss://bronze@nyctaxistorageansh.dfs.core.windows.net/trip_type/', name='trip_type/', size=0, modificationTime=1731341292000),
 FileInfo(path='abfss://bronze@nyctaxistorageansh.dfs.core.windows.net/trip_zone/', name='trip_zone/', size=0, modificationTime=1731341300000),
 FileInfo(path='abfss://bronze@nyctaxistorageansh.dfs.core.windows.net/trips2023data/', name='trips2023data/', size=0, modificationTime=1731343601000)]

# Data Reading

**Importing Libraries**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

**Reading CSV DATA**

**Trip Type Data**

In [0]:
df_trip_type = spark.read.format('csv')\
                    .option('inferSchema',True)\
                    .option('header',True)\
                    .load('abfss://bronze@nyctaxistorageansh.dfs.core.windows.net/trip_type')

In [0]:
df_trip_type.display()

trip_type,description
1,Street-hail
2,Dispatch


**Trip Zone**

In [0]:
df_trip_zone = spark.read.format('csv')\
                    .option('inferSchema',True)\
                    .option('header',True)\
                    .load('abfss://bronze@nyctaxistorageansh.dfs.core.windows.net/trip_zone')

In [0]:
df_trip_zone.display()

**Trip Data**

In [0]:
myschema = '''
                VendorID BIGINT,
                lpep_pickup_datetime TIMESTAMP,
                lpep_dropoff_datetime TIMESTAMP,
                store_and_fwd_flag STRING,
                RatecodeID BIGINT,
                PULocationID BIGINT,
                DOLocationID BIGINT,
                passenger_count BIGINT,
                trip_distance DOUBLE,
                fare_amount DOUBLE,
                extra DOUBLE,
                mta_tax DOUBLE,
                tip_amount DOUBLE,
                tolls_amount DOUBLE,
                ehail_fee DOUBLE,
                improvement_surcharge DOUBLE,
                total_amount DOUBLE,
                payment_type BIGINT,
                trip_type BIGINT,
                congestion_surcharge DOUBLE

      '''

In [0]:
df_trip = spark.read.format('parquet')\
              .schema(myschema)\
              .option('header',True)\
              .option('recursiveFileLookup',True)\
              .load('abfss://bronze@nyctaxistorageansh.dfs.core.windows.net/trips2023data/')

In [0]:
df_trip.display()

# Data Transformation

**Taxi Trip Type**

In [0]:
df_trip_type.display()

In [0]:
df_trip_type = df_trip_type.withColumnRenamed('description','trip_description')
df_trip_type.display()

In [0]:
df_trip_type.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@nyctaxistorageansh.dfs.core.windows.net/trip_type")\
            .save()

**Trip Zone**

In [0]:
df_trip_zone.display()

In [0]:
df_trip_zone = df_trip_zone.withColumn('zone1',split(col('Zone'),'/')[0])\
                            .withColumn('zone2',split(col('Zone'),'/')[1])

df_trip_zone.display()

In [0]:
df_trip_zone.write.format('parquet')\
          .mode('append')\
          .option('path','abfss://silver@nyctaxistorageansh.dfs.core.windows.net/trip_zone')\
          .save()

**Trip Data**

In [0]:
df_trip.display()

In [0]:
df_trip = df_trip.withColumn('trip_date',to_date('lpep_pickup_datetime'))\
                  .withColumn('trip_year',year('lpep_pickup_datetime'))\
                  .withColumn('trip_month',month('lpep_pickup_datetime'))
                  

In [0]:
df_trip.display()

In [0]:
df_trip = df_trip.select('VendorID','PULocationID','DOLocationID','fare_amount','total_amount')
df_trip.display()

In [0]:
df_trip.write.format('parquet')\
            .mode('append')\
            .option('path','silver@nyctaxistorageansh.dfs.core.windows.net/trips2023data')\
            .save()

# Analysis

In [0]:
display(df_trip)