### Data Access

In [0]:
storage_account = "nyctaxistoragejubin"
access_key = "sample"

spark.conf.set( 
  f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
  access_key
)

In [0]:
dbutils.fs.ls(
    "abfss://bronze@nyctaxistoragejubin.dfs.core.windows.net/"
)


### Data Reading

#### Importing Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#### Reading CSV Data

##### Trip Type Data

In [0]:
df_trip_type = spark.read.format('csv')\
               .option('header', True)\
               .option('inferSchema', True)\
               .load("abfss://bronze@nyctaxistoragejubin.dfs.core.windows.net/trip_type")

In [0]:
df_trip_type.display()

In [0]:
df_trip_zone = spark.read.format('csv')\
                .option('header', True)\
                .option('inferSchema', True)\
                .load("abfss://bronze@nyctaxistoragejubin.dfs.core.windows.net/trip_zone")

In [0]:
df_trip_zone.display()

#### Trip data

In [0]:
myschema = '''
                VendorID BIGINT,
                lpep_pickup_datetime TIMESTAMP,
                lpep_dropoff_datetime TIMESTAMP,
                store_and_fwd_flag STRING,
                RatecodeID BIGINT,
                PULocationID BIGINT,
                DOLocationID BIGINT,
                passenger_count BIGINT,
                trip_distance DOUBLE,
                fare_amount DOUBLE,
                extra DOUBLE,
                mta_tax DOUBLE,
                tip_amount DOUBLE,
                tolls_amount DOUBLE,
                ehail_fee DOUBLE,
                improvement_surcharge DOUBLE,
                total_amount DOUBLE,
                payment_type BIGINT,
                trip_type BIGINT,
                congestion_surcharge DOUBLE     
            '''

In [0]:
df_trip = spark.read.format('parquet')\
                .schema(myschema)\
                .option('header', True)\
                .option('recursiveFileLookup', True)\
                .load('abfss://bronze@nyctaxistoragejubin.dfs.core.windows.net/trips2023data/')  ###recursiveFileLookup is used to read files within folders in case there are multiple files and we need iteration

In [0]:
df_trip.display()

### Data Tranformations

#### Taxt trip type

In [0]:
df_trip_type.display()

In [0]:
df_trip_type = df_trip_type.withColumnRenamed( 'description', 'trip_description')
df_trip_type.display()

In [0]:
## we have append, overwrite, error and ignore modes
df_trip_type.write.format('parquet')\
                    .mode('append')\
                    .option('path', 'abfss://silver@nyctaxistoragejubin.dfs.core.windows.net/trip_type')\
                    .save()

#### Trip zone

In [0]:
df_trip_zone.display()

In [0]:
df_trip_zone = df_trip_zone.withColumn('zone1', split(col('Zone'), '/')[0])\
                            .withColumn('zone2', split(col('Zone'), '/')[1])

In [0]:
df_trip_zone.display()

In [0]:
df_trip_zone.write.format('parquet')\
                    .mode('append')\
                    .option('path', 'abfss://silver@nyctaxistoragejubin.dfs.core.windows.net/trip_zone')\
                    .save()

#### Trip data

In [0]:
df_trip.display()

In [0]:
df_trip = df_trip.withColumn('trip_date', to_date(col('lpep_pickup_datetime')))\
                .withColumn('trip_year', year(col('lpep_pickup_datetime')))\
                .withColumn('trip_month', month(col('lpep_pickup_datetime')))

In [0]:
df_trip.display()

In [0]:
df_trip = df_trip.select('VendorID', 'PULocationID', 'DOLocationID', 'trip_distance', 'fare_amount', 'total_amount')
df_trip.display()

In [0]:
df_trip.write.format('parquet')\
                    .mode('append')\
                    .option('path', 'abfss://silver@nyctaxistoragejubin.dfs.core.windows.net/trip2023data')\
                    .save()

### Analysis

In [0]:
display(df_trip)

Databricks visualization. Run in Databricks to view.