## Customer Ingestion And Transformations With AutoLoaders

In [0]:
df_customer = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").option("cloudFiles.schemaLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_customer_enr/checkpoint_location").option("schemaEvolutionMode", "addNewColumns").load("abfss://bronze@trsdatalakejess.dfs.core.windows.net/dim_customer-sales_data")

In [0]:
df_customer.display()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df_customer = df_customer.withColumn("customer_sk", col("customer_sk").cast(IntegerType()))\
    .withColumn("signup_date", col("signup_date").cast(DateType()))\
    .drop("_rescued_data")\
    .dropDuplicates(["customer_sk"])\
    .withColumnRenamed("customer_code", "customer_id")

df_customer.display()


In [0]:
df_customer = df_customer.withColumn("last_updated", current_timestamp())\
    .withColumn("loyalty_tier", regexp_replace(col("loyalty_tier"), "None", "Bronze Tier"))\
    .withColumn("loyalty_tier", regexp_replace(col("loyalty_tier"), "Silver", "Silver Tier"))\
    .withColumn("loyalty_tier", regexp_replace(col("loyalty_tier"), "Gold", "Gold Tier"))\
    .withColumn("loyalty_tier", regexp_replace(col("loyalty_tier"), "Platinum", "Platinum Tier"))

df_customer.display()


In [0]:
df_customer.display()

In [0]:
df_customer.writeStream.format("delta").outputMode("append").option("checkpointLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_customer_enr/checkpoint_location").option("path", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_customer_enr/dim_customer_data").trigger(once=True).toTable("sales_project.silver.dim_customer_enr")

## Dim_Product Ingestion And Transformations With AutoLoaders

In [0]:
df_products = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").option("cloudFiles.schemaLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_products_enr/checkpoint_location").option("schemaEvolutionMode", "addNewColumns").load("abfss://bronze@trsdatalakejess.dfs.core.windows.net/dim_product-sales_data")

In [0]:
df_products.display()

In [0]:
df_products = df_products.withColumn("last_updated", current_timestamp())\
    .withColumn("product_sk", col("product_sk").cast(IntegerType()))\
    .withColumn("list_price", col("list_price").cast(DoubleType()))\
    .drop("_rescued_data")\
    .dropDuplicates(["product_sk"])

df_products.display()

In [0]:
df_products = df_products.drop("uom")\
    .withColumnRenamed("product_code", "product_id")\
    .withColumn("supplier_sk", col("supplier_sk").cast(IntegerType()))

df_products.display()

In [0]:
df_products.writeStream.format("delta").outputMode("append").option("checkpointLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_products_enr/checkpoint_location").option("path", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_products_enr/dim_products_data").trigger(once=True).toTable("sales_project.silver.dim_products_enr")

## Dim_Store Ingestion And Transformations With AutoLoaders

In [0]:
df_stores = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").option("cloudFiles.schemaLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_stores_enr/checkpoint_location").option("schemaEvolutionMode", "addNewColumns").load("abfss://bronze@trsdatalakejess.dfs.core.windows.net/dim_store-sales_data")
df_stores.display()


In [0]:
df_stores = df_stores.withColumn("last_updated", current_timestamp())\
    .withColumn("store_sk", col("store_sk").cast(IntegerType()))\
    .drop("_rescued_data")\
    .dropDuplicates(["store_sk"])\
    .withColumnRenamed("store_code", "store_id")\
    .withColumn("open_date", col("open_date").cast(DateType()))\
    .withColumn("sq_ft", col("sq_ft").cast(IntegerType()))\
    
    

df_stores.display()

In [0]:
df_stores = df_stores.withColumn("region", regexp_replace("region", "Northeast", "NorthEast"))

df_stores.display()

In [0]:
df_stores.writeStream.format("delta").outputMode("append").option("checkpointLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_stores_enr/checkpoint_location").option("path", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_stores_enr/dim_stores_data").trigger(once=True).toTable("sales_project.silver.dim_stores_enr")


## Dim_Date Ingestion And Transformations With AutoLoaders

In [0]:
df_date = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").option("cloudFiles.schemaLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_date_enr/checkpoint_location").option("schemaEvolutionMode", "addNewColumns").load("abfss://bronze@trsdatalakejess.dfs.core.windows.net/dim_date-sales_data")
df_date.display()


In [0]:
df_date = df_date.withColumn("date_sk", col("date_sk").cast(IntegerType()))\
    .withColumn("date", col("date").cast(DateType()))\
    .withColumn("day", col("day").cast(IntegerType()))\
    .withColumn("month", col("month").cast(IntegerType()))\
    .withColumn("quarter", col("quarter").cast(IntegerType()))\
    .withColumn("year", col("year").cast(IntegerType()))\
    .withColumn("day_of_week", col("day_of_week").cast(IntegerType()))\
    .withColumn("is_weekend", col("is_weekend").cast(BooleanType()))\
    .withColumn("is_month_end", col("is_month_end").cast(BooleanType()))\
    .withColumn("is_month_start", col("is_month_start").cast(BooleanType()))\
    .withColumn("is_quarter_end", col("is_quarter_end").cast(BooleanType()))\
    .withColumn("is_quarter_start", col("is_quarter_start").cast(BooleanType()))\
    .withColumn("last_updated", current_timestamp())\
    .dropDuplicates(["date_sk"])\
    

In [0]:
df_date = df_date.drop("_rescued_data")
df_date.display()

In [0]:
df_date.writeStream.format("delta").outputMode("append").option("checkpointLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_date_enr/checkpoint_location").option("path", "abfss://silver@trsdatalakejess.dfs.core.windows.net/dim_date_enr/dim_date_data").trigger(once=True).toTable("sales_project.silver.dim_date_enr")

## FactSales Ingestion And Transformations With AutoLoaders

In [0]:
df_facts = spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet").option("cloudFiles.schemaLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/fact_sales_enr/checkpoint_location").option("schemaEvolutionMode", "addNewColumns").load("abfss://bronze@trsdatalakejess.dfs.core.windows.net/fact_sales-sales_data")
df_facts.display()


In [0]:
df_facts = df_facts.withColumn("sales_id", col("sales_id").cast(IntegerType()))\
    .withColumn("store_sk", col("store_sk").cast(IntegerType()))\
    .withColumn("product_sk", col("product_sk").cast(IntegerType()))\
    .withColumn("date_sk", col("date_sk").cast(IntegerType()))\
    .withColumn("customer_sk", col("customer_sk").cast(IntegerType()))\
    .withColumn("quantity", col("quantity").cast(IntegerType()))\
    .drop("promotion_sk")\
    .withColumn("unit_price", col("unit_price").cast(DoubleType()))\
    .withColumn("gross_amount", col("gross_amount").cast(DoubleType()))\
    .withColumn("discount_amount", col("discount_amount").cast(DoubleType()))\
    .withColumn("net_amount", col("net_amount").cast(DoubleType()))\
    .withColumn("last_updated", current_timestamp())\
    .drop("payment_method")\
    .dropDuplicates(["sales_id"])

df_facts.display()

In [0]:
df_facts = df_facts.drop("_rescued_data")
df_facts.display()



In [0]:
df_facts.writeStream.format("delta").outputMode("append").option("checkpointLocation", "abfss://silver@trsdatalakejess.dfs.core.windows.net/fact_sales_enr/checkpoint_location").option("path", "abfss://silver@trsdatalakejess.dfs.core.windows.net/fact_sales_enr/fact_sales_data").trigger(once=True).toTable("sales_project.silver.fact_sales_enr")
