## Data Access

In [0]:
dbutils.secrets.list(scope="databricks-key12")

### Retriving the key from Secrets

In [0]:
secret = dbutils.secrets.get(scope="databricks-key12", key="secret-id")
app_id = dbutils.secrets.get(scope="databricks-key12",key="app-id")
dir_id = dbutils.secrets.get(scope="databricks-key12", key="dir-id")

### Configure Azure credentials to access Azure storage

In [0]:
#service_credential = dbutils.secrets.get(scope="<secret-scope>",key="<service-credential-key>")

spark.conf.set("fs.azure.account.auth.type.nyctaxistorageabhinav.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.nyctaxistorageabhinav.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.nyctaxistorageabhinav.dfs.core.windows.net", app_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.nyctaxistorageabhinav.dfs.core.windows.net", secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.nyctaxistorageabhinav.dfs.core.windows.net", f"https://login.microsoftonline.com/{dir_id}/oauth2/token")

In [0]:
dbutils.fs.ls("abfss://bronze@nyctaxistorageabhinav.dfs.core.windows.net")

## Data Ingestion

### Importing libraries

In [0]:
  from pyspark.sql.functions import *
  from pyspark.sql.types import *

### Reading csv data

#### Trip Type data

In [0]:
df_trip_type = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("abfss://bronze@nyctaxistorageabhinav.dfs.core.windows.net/trip_type/")
display(df_trip_type)

#### df_trip_zone

In [0]:
df_trip_zone = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("abfss://bronze@nyctaxistorageabhinav.dfs.core.windows.net/trip_zone")
display(df_trip_zone.head(5))

### Reading parquet data

#### trip data

In [0]:
#list all the file in trip-data folder
display(dbutils.fs.ls("abfss://bronze@nyctaxistorageabhinav.dfs.core.windows.net/trips2024/trip-data/"))


In [0]:
# create schema using DDL
#my_schema = '''
#        VendorID BIGINT,
#        tpep_pickup_datetime timestamp,
#        tpep_dropoff_datetime timestamp,
#        store_and_fwd_flag string,
#        RatecodeID BIGINT,
#        PULocationID BIGINT,
#        DOLocationID BIGINT,
#        passenger_count BIGINT,
#        trip_distance double,
#        fare_amount double,
#        extra double,
#        mta_tax double,
#        tip_amount double,
#        tolls_amount double,
#        ehail_fee double,
#        improvement_surcharge double,
#        total_amount double,
#        payment_type BIGINT,
#        trip_type BIGINT,
#        congestion_surcharge double
#'''


my_schema = """
VendorID BIGINT,
lpep_pickup_datetime TIMESTAMP,
lpep_dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
RatecodeID BIGINT,
PULocationID BIGINT,
DOLocationID BIGINT,
passenger_count BIGINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type BIGINT,
trip_type BIGINT,
congestion_surcharge DOUBLE
"""

df_trip = (
    spark.read.format("parquet")
    .schema(my_schema)
    .option("recursiveFileLookup", "true")
    .load("abfss://bronze@nyctaxistorageabhinav.dfs.core.windows.net/trips2024/trip-data/")
)

display(df_trip.limit(5))


## Data Transformation

### Read trip type

In [0]:
df_trip_type.withColumnRenamed('description', 'trip_description')
display(df_trip_type)

In [0]:
df_trip_type.write.format("parquet")\
  .mode("append")\
  .option("path", "abfss://silver@nyctaxistorageabhinav.dfs.core.windows.net/trip_type")\
    .save()

### Read trip Zone

In [0]:
display(df_trip_zone)

In [0]:
df_trip_zone = (
    df_trip_zone
    .withColumn("zone1", split(col("Zone"), "/").getItem(0))
    .withColumn("zone2", split(col("Zone"), "/").getItem(1))
)

display(df_trip_zone.limit(5))

In [0]:
df_trip_zone.write.format("parquet")\
  .mode("append")\
  .option("path", "abfss://silver@nyctaxistorageabhinav.dfs.core.windows.net/trip_zone")\
    .save()

### Trip Data

In [0]:
display(df_trip)

In [0]:
df_trip = df_trip.withColumn('trip_date', to_date(col('lpep_pickup_datetime')))\
    .withColumn("trip_year", year(col("lpep_pickup_datetime")))\
        .withColumn("trip_month", month(col("lpep_pickup_datetime")))
    
display(df_trip)

In [0]:
df_trip = df_trip.select("VendorID","PULocationID","DOLocationID","fare_amount","total_amount")
display(df_trip)

In [0]:
df_trip.write.format("parquet")\
  .mode("append")\
  .option("path", "abfss://silver@nyctaxistorageabhinav.dfs.core.windows.net/trip/trips2024")\
    .save()

## Analysis

In [0]:
display(df_trip)

Databricks visualization. Run in Databricks to view.