## Databricks ELT Pipeline

</br>

Will implement a medallion architecture to injest and preprocess data for model training.

In [0]:
import pandas as pd
from pyspark.sql.functions import to_date
from pyspark.sql.functions import when, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType
from pyspark.sql.functions import col
from pyspark.sql.types import *

### Mount to Data Store
---

Will mount Azure Data Lake Storage to Databricks

In [0]:
STORAGE_ACCOUNT_NAME = "databrickssa83764"
STORAGE_ACCOUNT_KEY  = ""
CONTAINER_NAME       = "data"

In [0]:
configs = {
    f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net": STORAGE_ACCOUNT_KEY
}

if any(mount.mountPoint == f"/mnt/{CONTAINER_NAME}" for mount in dbutils.fs.mounts()):    
    dbutils.fs.unmount(f"/mnt/{CONTAINER_NAME}")

dbutils.fs.mount(
    source = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/",
    mount_point = f"/mnt/{CONTAINER_NAME}",
    extra_configs = configs
)

/mnt/data has been unmounted.


True

In [0]:
%fs ls mnt/data/bronze

path,name,size,modificationTime
dbfs:/mnt/data/bronze/nyc_taxi.csv,nyc_taxi.csv,1289928,1752786683000
dbfs:/mnt/data/bronze/nyc_weather.csv,nyc_weather.csv,1368,1752729624000


### Bronze Layer
---

Will injest the following raw datasets:
  - nyctaxi dataset from databricks-datasets
  - nyc weather dataset from github

In [0]:
nyc_taxi_df = spark.read.option("header", "true").csv("dbfs:/mnt/data/bronze/nyc_taxi.csv")
nyc_taxi_df.createOrReplaceTempView("nyc_taxi")

In [0]:
weather_df = spark.read.option("header", "true").csv("dbfs:/mnt/data/bronze/nyc_weather.csv")
weather_df.createOrReplaceTempView("weather")

In [0]:
display(nyc_taxi_df)

In [0]:
display(weather_df)

### Silver Layer
---

Will perform the following:
  - Filter datasets by timeframe
  - Join datasets
  - Schema enforcement and validation
  - Preprocessing (handling nulls, removing columns, etc.)

In [0]:
# Fix date time formats for datasets

weather_df = weather_df.withColumn("EST", to_date("EST", "M/d/yyyy"))
weather_df.createOrReplaceTempView("weather")

nyc_taxi_df = (nyc_taxi_df.withColumn("pickup_datetime",  to_date("pickup_datetime", "yyyy-MM-dd"))
                          .withColumn("dropoff_datetime", to_date("dropoff_datetime", "yyyy-MM-dd")))
nyc_taxi_df.createOrReplaceTempView("nyc_taxi")

Display joined datasets with correct time frame

In [0]:
%sql

SELECT * FROM nyc_taxi, weather 
WHERE EST = pickup_datetime 
LIMIT 5

vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,total_amount,EST,Temperature,DewPoint,Humidity,Sea Level PressureIn,VisibilityMiles,WindSpeedMPH,PrecipitationIn,CloudCover,Events,WindDirDegrees
2,2016-01-08,2016-01-08,5,1.98,-73.9764633178711,40.75135040283203,1,N,-73.99659729003906,40.7318000793457,2,10.5,1.0,0.5,0.0,0,0.3,2016-01-08,39,29,64,30.2,10,4.0,0,8,,79
2,2016-01-18,2016-01-18,2,1.69,-73.9912109375,40.74996566772461,1,N,-73.98192596435547,40.73429489135742,2,9.0,0.0,0.5,0.0,0,0.3,2016-01-18,25,6,53,29.83,9,12.0,T,2,Snow,293
1,2016-01-08,2016-01-08,1,2.3,-73.98847198486328,40.738731384277344,1,N,-73.96260070800781,40.7586555480957,1,10.0,0.5,0.5,2.25,0,0.3,2016-01-08,39,29,64,30.2,10,4.0,0,8,,79
1,2016-01-02,2016-01-02,1,1.3,-73.98456573486328,40.72883224487305,1,N,-73.99445343017578,40.74049377441406,1,7.5,1.0,0.5,1.85,0,0.3,2016-01-02,36,18,46,30.02,10,7.0,0,3,,275
2,2016-01-22,2016-01-22,2,0.91,-74.00942993164062,40.72370910644531,1,N,-74.0058364868164,40.71738815307617,2,5.0,1.0,0.5,0.0,0,0.3,2016-01-22,26,6,41,30.21,9,,0.01,3,Snow,34


In [0]:
# Perform join operation

silver_df = spark.sql("SELECT * FROM nyc_taxi, weather WHERE EST = pickup_datetime")
silver_df.createOrReplaceTempView("silver")
display(silver_df)

In [0]:
# Drop irrelevant columns
silver_df = silver_df.drop("pickup_datetime", "dropoff_datetime", "pickup_longitude", 
                           "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "precipitationIn")

# Rename column to date
silver_df = silver_df.withColumnRenamed("EST", "date")  

# Rename values in events column
silver_df = silver_df.withColumn("events", when(col("events").isNull(), "normal").otherwise(col("events")))

# remove rows with null values
silver_df = silver_df.dropna()

silver_df.createOrReplaceTempView("silver")

In [0]:
%sql

SELECT * FROM silver

Will now cast all attributes to appropriate types

In [0]:
silver_df = (silver_df
    .withColumn("vendor_id", col("vendor_id").cast(IntegerType())) 
    .withColumn("passenger_count", col("passenger_count").cast(IntegerType())) 
    .withColumn("trip_distance", col("trip_distance").cast(DoubleType())) 
    .withColumn("rate_code_id", col("rate_code_id").cast(IntegerType())) 
    .withColumn("store_and_fwd_flag", col("store_and_fwd_flag").cast(StringType())) 
    .withColumn("payment_type", col("payment_type").cast(IntegerType())) 
    .withColumn("fare_amount", col("fare_amount").cast(DoubleType())) 
    .withColumn("extra", col("extra").cast(DoubleType())) 
    .withColumn("mta_tax", col("mta_tax").cast(DoubleType())) 
    .withColumn("tip_amount", col("tip_amount").cast(DoubleType())) 
    .withColumn("tolls_amount", col("tolls_amount").cast(DoubleType())) 
    .withColumn("total_amount", col("total_amount").cast(DoubleType())) 
    .withColumn("date", col("date").cast(DateType())) 
    .withColumn("temperature", col("temperature").cast(IntegerType())) 
    .withColumn("dewpoint", col("dewpoint").cast(IntegerType())) 
    .withColumn("humidity", col("humidity").cast(IntegerType())) 
    .withColumn("sealevel_pressure", col("sea level pressureIn").cast(DoubleType())) 
    .withColumn("visibilityMiles", col("visibilityMiles").cast(IntegerType())) 
    .withColumn("windspeedmph", col("windspeedmph").cast(IntegerType())) 
    .withColumn("cloudcover", col("cloudcover").cast(IntegerType())) 
    .withColumn("events", col("events").cast(StringType()))
    .withColumn("winddirdegrees", col("winddirdegrees").cast(IntegerType())))

silver_df = silver_df.drop("sea level pressureIn")


In [0]:
display(silver_df)

In [0]:
(silver_df.write.format("delta")
               .mode("overwrite")
               .save("/mnt/data/silver"))

### Gold Layer
---

Will perform the following:
  - Run aggregations to perform basic feature engineering
  - One hot encoding on categorical attributes

In [0]:
(spark.read.format("delta")
            .load("/mnt/data/silver")
            .write.format("delta")
            .options(mergeSchema=True)
            .saveAsTable("gold_table"))

In [0]:
display(spark.sql("select * from gold_table"))

In [0]:
%sql
ALTER TABLE gold_table SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

In [0]:
%sql

ALTER TABLE gold_table DROP COLUMN rate_code_id;
ALTER TABLE gold_table DROP COLUMN store_and_fwd_flag;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4528019126338522>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;130;01m\n[39;00m[38;5;124mALTER TABLE gold_table DROP COLUMN rate_code_id;[39m[38;5;130;01m\n[39;00m[38;5;124mALTER TABLE gold_table DROP COLUMN store_and_fwd_flag;[39m[38;5;130;01m\n[39;00m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2541[0m, in [0;36mInteractiveShell.run_cell_magic[0;34m(self, magic_name, line, cell)[0m
[1;32m   2539[0m [38;5;28;01mwith[39;00m [38;5;28mself[39m[38;5;241m.[39mbuiltin_trap:
[1;32m   2540[0m     args [38;5;241m=[39m (magic_arg_s, cell)
[0;32m-> 2541[0m     result 

In [0]:
%sql
SELECT * FROM gold_table

In [0]:
%sql

UPDATE gold_table
SET total_amount = fare_amount + extra + mta_tax + tip_amount + tolls_amount;

ALTER TABLE gold_table
DROP COLUMNS fare_amount, extra, mta_tax, tip_amount, tolls_amount;

ALTER TABLE gold_table
ADD COLUMN psg_per_mile DOUBLE;

UPDATE gold_table
SET psg_per_mile = passenger_count / trip_distance;

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4528019126338530>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;130;01m\n[39;00m[38;5;124mUPDATE gold_table[39m[38;5;130;01m\n[39;00m[38;5;124mSET total_amount = fare_amount + extra + mta_tax + tip_amount + tolls_amount;[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124mALTER TABLE gold_table[39m[38;5;130;01m\n[39;00m[38;5;124mDROP COLUMNS fare_amount, extra, mta_tax, tip_amount, tolls_amount;[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124mALTER TABLE gold_table[39m[38;5;130;01m\n[39;00m[38;5;124mADD COLUMN psg_per_mile DOUBLE;[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124mUPDATE gold_table[3

In [0]:
%sql
ALTER TABLE gold_table
ADD COLUMNS weather_normal INT, weather_snow INT, weather_fog INT, weather_rain INT;

In [0]:
%sql

UPDATE gold_table
SET weather_normal = CASE
    WHEN events = "normal" THEN 1
    ELSE 0
END;

UPDATE gold_table
SET weather_snow = CASE
    WHEN events = "Snow" THEN 1
    ELSE 0
END;

UPDATE gold_table
SET weather_fog = CASE
    WHEN events = "Fog-snow" THEN 1
    ELSE 0
END;

UPDATE gold_table
SET weather_rain = CASE
    WHEN events = "Rain" THEN 1
    ELSE 0
END;


ALTER TABLE gold_table
DROP COLUMN events;

In [0]:
%sql

SELECT * FROM gold_table

In [0]:
gold_df = spark.sql("SELECT * FROM gold_table")

(gold_df.write.format("delta")
               .mode("overwrite")
               .save("/mnt/data/gold"))