#  ** DATA ACCESS**

In [0]:
spark.conf.set("fs.azure.account.auth.type.nyctaxistoragedarsh.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.nyctaxistoragedarsh.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.nyctaxistoragedarsh.dfs.core.windows.net", 
               "c18af45e-8d55-4ccf-9ad4-122b4ea74a87")
spark.conf.set("fs.azure.account.oauth2.client.secret.nyctaxistoragedarsh.dfs.core.windows.net", 
               "pFD8Q~Wn3zU3XYAIyL0GvhmnKIwTnaFSxjfNScGz")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.nyctaxistoragedarsh.dfs.core.windows.net", 
               "https://login.microsoftonline.com/187c0287-bcdf-4b7d-9372-2fd7110be1ae/oauth2/token")


In [0]:
dbutils.fs.ls('abfss://bronze@nyctaxistoragedarsh.dfs.core.windows.net/')

# Data Reading

**Importing Libraries**

In [0]:
from pyspark.sql.functions import*
from pyspark.sql.types import*

**Reading CSV Data**

**Trip type code**

In [0]:
df_trip_type = spark.read.format("csv") \
    .option("inferSchema", True) \
    .option("header", True) \
    .load("abfss://bronze@nyctaxistoragedarsh.dfs.core.windows.net/trip_type")


In [0]:
# Schema (with data types also)
df_trip_type.printSchema()


**trip Zone**

In [0]:
df_trip_type = spark.read.format("csv") \
    .option("inferSchema", True) \
    .option("header", True) \
    .load("abfss://bronze@nyctaxistoragedarsh.dfs.core.windows.net/trip_zone")


In [0]:
df_trip_type.display()

**Trip Data**

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType

myschema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("lpep_pickup_datetime", TimestampType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("lpep_dropoff_datetime", TimestampType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("RatecodeID", LongType(), True),
    StructField("PULocationID", LongType(), True),
    StructField("DOLocationID", LongType(), True),
    StructField("passenger_count", LongType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("ehail_fee", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", LongType(), True),
    StructField("trip_type", LongType(), True),
    StructField("congestion_surcharge", DoubleType(), True)
])


In [0]:
df_trip = (
    spark.read.format("parquet")
        .schema(myschema)
        .option("recursiveFileLookup", True)
        .load("abfss://bronze@nyctaxistoragedarsh.dfs.core.windows.net/trips2023data/")
)


In [0]:
df_trip.display()

# Data Transformation

**Taxi Trip Type**

In [0]:
df_trip_type.display()


In [0]:
df_trip_type = df_trip_type.withColumnRenamed('description','trip_description')
df_trip_type.printSchema()


In [0]:
df_trip_type.write.format("parquet") \
    .mode("append") \
    .option("compression", "snappy") \
    .option("path", "abfss://silver@nyctaxistoragedarsh.dfs.core.windows.net/trip_type") \
    .save()


**trip Zone**

In [0]:
df_trip_zone = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("abfss://bronze@nyctaxistoragedarsh.dfs.core.windows.net/trip_zone/taxi_zone_lookup.csv")

df_trip_zone.display()


In [0]:
df_trip_zone.withColumn('Zonel', split(col('Zone'),'/')[0]).display()


In [0]:
from pyspark.sql.functions import split, col

df_trip_zone = df_trip_zone \
    .withColumn("zone1", split(col("Zone"), "/")[0]) \
    .withColumn("zone2", split(col("Zone"), "/")[1])

df_trip_zone.display()


In [0]:
df_trip_zone.write.format("parquet") \
    .mode("append") \
    .option("compression", "snappy") \
    .option("path", "abfss://silver@nyctaxistoragedarsh.dfs.core.windows.net/trip_type") \
    .save()


**Trip data**

In [0]:
df_trip.display()

In [0]:
from pyspark.sql.functions import to_date, year, month

df_trip = df_trip \
    .withColumn("trip_date", to_date("lpep_pickup_datetime")) \
    .withColumn("trip_year", year("lpep_pickup_datetime")) \
    .withColumn("trip_month", month("lpep_pickup_datetime"))

df_trip.display()


In [0]:
df_trip = df_trip.select('VendorID','PULocationID','DOLocationID','fare_amount','total_amount')
df_trip.display()



Databricks visualization. Run in Databricks to view.

In [0]:
df_trip.write.format('parquet') \
    .mode('append') \
    .option('path','silver@nyctaxistoragedarsh.dfs.core.windows.net/trips2023data') \
    .save()

# Analysis

In [0]:
display(df_trip)

Databricks visualization. Run in Databricks to view.