# Accessing Data

In [0]:
spark.conf.set("fs.azure.account.auth.type.nycadlsstorage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.nycadlsstorage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.nycadlsstorage.dfs.core.windows.net", "a2275355-99ed-4019-b0fa-82c90940b62c")
spark.conf.set("fs.azure.account.oauth2.client.secret.nycadlsstorage.dfs.core.windows.net", "wL58Q~Q1rBDe.jVq4cFiieqaKszU6~kTcy5s8bPg")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.nycadlsstorage.dfs.core.windows.net", "https://login.microsoftonline.com/57ffcb2e-8a4d-45b8-8d5d-77568777e677/oauth2/token")

In [0]:
dbutils.fs.ls('abfss://bronze@nycadlsstorage.dfs.core.windows.net/')

# Data Reading

**Import Libraries**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

**Read CSV Data**

In [0]:
df_trip_type=spark.read.format('csv')\
                    .option('inferschema',True)\
                    .option('header',True)\
                    .load('abfss://bronze@nycadlsstorage.dfs.core.windows.net/trip_type')

In [0]:
df_trip_type.display()

In [0]:
df_trip_zone=spark.read.format('csv')\
                   .option('inferschema',True)\
                   .option('header',True)\
                   .load('abfss://bronze@nycadlsstorage.dfs.core.windows.net/trip_zone')

In [0]:
df_trip_zone.display()

**Read trips data for every Month**

We are going to create our own schema and don't want spark to create it.Tip: Good to use own defined schema when using recursive file lookup because we have a folder inside folder and then a lot of files in each folder


In [0]:
dbutils.fs.ls('abfss://bronze@nycadlsstorage.dfs.core.windows.net/trips2023data/')


In [0]:
myschema = '''
                VendorID BIGINT,
                lpep_pickup_datetime TIMESTAMP,
                lpep_dropoff_datetime TIMESTAMP,
                store_and_fwd_flag STRING,
                RatecodeID BIGINT,
                PULocationID BIGINT,
                DOLocationID BIGINT,
                passenger_count BIGINT,
                trip_distance DOUBLE,
                fare_amount DOUBLE,
                extra DOUBLE,
                mta_tax DOUBLE,
                tip_amount DOUBLE,
                tolls_amount DOUBLE,
                ehail_fee DOUBLE,
                improvement_surcharge DOUBLE,
                total_amount DOUBLE,
                payment_type BIGINT,
                trip_type BIGINT,
                congestion_surcharge DOUBLE

      '''

In [0]:
df_trip = spark.read.format('parquet')\
              .schema(myschema)\
              .option('header',True)\
              .option('recursiveFileLookup',True)\
              .load('abfss://bronze@nycadlsstorage.dfs.core.windows.net/trips2023data/')

In [0]:
df_trip.display()

# Transformation

**Taxi Trip Type**

In [0]:
df_trip_type=df_trip_type.withColumnRenamed('description','trip_description')

In [0]:
df_trip_type.display()

In [0]:
df_trip_type.write.format('parquet')\
                  .mode('append')\
                  .option('path','abfss://silver@nycadlsstorage.dfs.core.windows.net/trip_type')\
                  .save()

**Trip Zone**

In [0]:
df_trip_zone.display()

In [0]:
In this we will split the zone where two zones are mentioned


In [0]:
df_trip_zone=df_trip_zone.withColumn('zone1',split(col('Zone'),'/')[0])\
                         .withColumn('zone2',split(col('Zone'),'/')[1])

In [0]:
df_trip_zone.display()

In [0]:
df_trip_zone.write.format('parquet')\
                .mode('append')\
                .option('path','abfss://silver@nycadlsstorage.dfs.core.windows.net/trip_zone')\
                .save()

**Trip Data**

In [0]:
df_trip.display()

Convert pickup Timestamp into 3 seperate columns
Date,Year,Month


In [0]:
df_trip=df_trip.withColumn('trip_date',to_date('lpep_pickup_datetime'))\
                .withColumn('trip_year',year('lpep_pickup_datetime'))\
                .withColumn('trip_month',month('lpep_pickup_datetime'))

In [0]:
df_trip.display()

In [0]:
df_trip = df_trip.select('VendorID','PULocationID','DOLocationID','fare_amount','total_amount')
df_trip.display()

In [0]:
df_trip.write.format('parquet')\
            .mode('append')\
            .option('path','abfss://silver@nycadlsstorage.dfs.core.windows.net/trips2023data')\
            .save()

# Analysis

In [0]:
display(df_trip)

Databricks visualization. Run in Databricks to view.