In [None]:
from pyspark.sql.types import (
    StructType, StructField, IntegerType, StringType,
    DateType, DoubleType, ShortType, DecimalType
)


In [None]:
%%sql
CREATE SCHEMA IF NOT EXISTS bronze


In [None]:
# product

schema = StructType([
    StructField("product_id", IntegerType(), False),              
    StructField("product_subcategory_id", IntegerType(), True),
    StructField("product", StringType(), False),                  
    StructField("color", StringType(), True),                     
    StructField("size", StringType(), True),                      
    StructField("product_line", StringType(), True),              
    StructField("class", StringType(), True),                     
    StructField("style", StringType(), True),                     
    StructField("product_model_id", IntegerType(), True),
])

df = (spark.read.format("csv")
        .option("header", True)
        .option("delimiter", ";")
        .schema(schema)
        .load("Files/raw/product.csv"))

df.write \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .saveAsTable("bronze.product")

In [None]:
# product_category

schema = StructType([
    StructField("product_category_id", IntegerType(), False),  
    StructField("product_category", StringType(), False),      
])

df = (
    spark.read.format("csv")
        .option("header", True)
        .option("delimiter", ";")
        .schema(schema)
        .load("Files/raw/product_category.csv")
)

df.write \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .saveAsTable("bronze.product_category")

In [None]:
# product_subcategory

schema = StructType([
    StructField("product_subcategory_id", IntegerType(), False),  
    StructField("product_category_id", IntegerType(), False),
    StructField("product_subcategory", StringType(), False),      
])

df = (
    spark.read.format("csv")
        .option("header", True)
        .option("delimiter", ";")
        .schema(schema)
        .load("Files/raw/product_subcategory.csv")
)

df.write \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .saveAsTable("bronze.product_subcategory")

In [None]:
# customer

schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("first_name", StringType(), False),        
    StructField("middle_name", StringType(), True),        
    StructField("last_name", StringType(), False),         
    StructField("address_line_1", StringType(), False),
    StructField("addess_line_2", StringType(), True),      
    StructField("city", StringType(), False),
    StructField("state_province_code", StringType(), False),   
    StructField("country_region_code", StringType(), False),  
    StructField("state_name", StringType(), False),        
    StructField("territory_name", StringType(), False),    
    StructField("territory_group", StringType(), False),
    StructField("start_date", DateType(), True),
    StructField("end_date", DateType(), True),
    StructField("is_active", IntegerType(), False)
])

df = (
    spark.read.format("csv")
        .option("header", True)
        .option("delimiter", ";")
        .schema(schema)
        .load("Files/raw/customer.csv")
)

df.write \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .saveAsTable("bronze.customer")

In [None]:
# sales_order_detail

schema = StructType([
    StructField("sales_order_id", IntegerType(), True),
    StructField("sales_order_detail_id", IntegerType(), True),
    StructField("sales_detail_item", IntegerType(), True),
    StructField("items_per_order", IntegerType(), True),
    StructField("order_qty", ShortType(), True),                 
    StructField("product_id", IntegerType(), True),
    StructField("unit_price", DecimalType(19, 4), True),
    StructField("percentage_discount", DecimalType(19, 4), True),
])

df = (
    spark.read.format("csv")
        .option("header", True)
        .option("delimiter", ";")
        .schema(schema)
        .load("Files/raw/sales_order_detail.csv")
)

df.write \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .saveAsTable("bronze.sales_order_detail")

In [None]:
# sales_order_header

schema = StructType([
    StructField("sales_order_id", IntegerType(), False),     
    StructField("order_date", DateType(), True),
    StructField("ship_date", DateType(), True),
    StructField("due_date", DateType(), True),
    StructField("customer_id", IntegerType(), False),
    StructField("tax_amount", DecimalType(19, 4), False),
    StructField("freight", DecimalType(19, 4), False),
])

df = (
    spark.read.format("csv")
        .option("header", True)
        .option("delimiter", ";")
        .schema(schema)
        .load("Files/raw/sales_order_header.csv")
)

df.write \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .saveAsTable("bronze.sales_order_header")

In [None]:
# product_cost_history

schema = StructType([
    StructField("product_id", IntegerType(), False),     
    StructField("start_date", DateType(), False),
    StructField("end_date", DateType(), True),
    StructField("is_active", IntegerType(), False),
    StructField("standard_cost", DecimalType(19, 4), False)
])

df = (
    spark.read.format("csv")
        .option("header", True)
        .option("delimiter", ";")
        .schema(schema)
        .load("Files/raw/product_cost_history.csv")
)

df.write \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .saveAsTable("bronze.product_cost_history")

In [None]:
%%sql

-- Option with SQL
ALTER TABLE bronze.customer SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE bronze.product SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE bronze.product_category SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE bronze.product_subcategory SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE bronze.sales_order_detail SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE bronze.sales_order_header SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
ALTER TABLE bronze.product_cost_history SET TBLPROPERTIES (delta.enableChangeDataFeed = true);


In [None]:
# Apply CDF (Change Data Feed) to all table of the schema

schema = "bronze"

tbls = (
    spark.sql(f"SHOW TABLES IN {schema}")
    .select("tableName")
    .collect()
)

for r in tbls:
    t = f"{schema}.{r['tableName']}"
    spark.sql(f"""
        ALTER TABLE {t} 
        SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    """)
    