### Bronze Data Processing

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:
%run /Workspace/Consolidated_pipeline/1_setup/utilities

In [0]:
print(bronze_schema, silver_schema, gold_schema)

In [0]:
dbutils.widgets.text("catalog","fmcg","Catalog")
dbutils.widgets.text("data_source","products","Data Source")

In [0]:
catalog = dbutils.widgets.get('catalog')
data_source = dbutils.widgets.get('data_source')

base_path = f's3://sportsbar-dj047/{data_source}/*.csv'
print(base_path)

In [0]:
df = spark.read.format('csv')\
    .option('inferSchema',True)\
    .option('header',True)\
    .load(base_path)\
    .withColumn("read_timestamp",current_timestamp())\
    .select("*","_metadata.file_name","_metadata.file_size")

display(df)

In [0]:
df.printSchema()

In [0]:
df.write.format('delta')\
    .option("enableChangeDataFeed",True)\
    .mode('overwrite')\
    .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")

### Silver Data Processing

In [0]:
df_bronze = spark.table(f"{catalog}.{bronze_schema}.{data_source}")
df_bronze.limit(10).display(truncate=False)

#### Transformations

1. drop duplicates

In [0]:
# check if duplicates
print("rows before duplication check: ", df_bronze.count())
df_silver = df_bronze.dropDuplicates(['product_id'])
print("rows after duplication check: ", df_silver.count())


2. Title case fix

In [0]:
df_silver.select(col('category')).distinct().display()

In [0]:
df_silver.select(initcap(col('category'))).distinct().display()

In [0]:
df_silver = df_silver.withColumn("category",when(col('category').isNull(),None)\
    .otherwise(initcap(col('category'))))


In [0]:
df_silver.select(col('category')).distinct().display()

3. Fix Spelling Mistake

In [0]:
df_silver = df_silver.withColumn('product_name', regexp_replace(col("product_name"), "(?i)Protien", "Protein"))\
  .withColumn('category', regexp_replace(col("category"), "(?i)Protien", "Protein"))


In [0]:
display(df_silver.limit(10))

#### Standardizing Customer Attributes to Match Parent Company Data Model

In [0]:
df_silver = df_silver.withColumn('division',when(col("category") == "Energy Bars","Nutrition Bars")
         .when(col("category") == "Protein Bars","Nutrition Bars")
         .when(col("category") == "Granola & Cereals","Breakfast Foods")
         .when(col("category") == "Recovery Dairy","Dairy & Recovery")
         .when(col("category") == "Healthy Snacks","Healthy Snacks")
         .when(col("category") == "Electrolyte Mix","Hydration & Electrolytes")
         .otherwise("Other"))

df_silver = df_silver.withColumn('variant',regexp_extract(col("product_name"), r"\((.*?)\)", 1))

### 3: Create new column: product_code  
# Invalid product_ids are replaced with a fallback value to avoid losing fact records and ensure downstream joins remain consistent

df_silver = (
    df_silver
    # 1. Generate deterministic product_code from product_name
    .withColumn(
        "product_code",
        sha2(col("product_name").cast("string"), 256)
    )
    # 2. Clean product_id: keep only numeric IDs, else set to 999999
    .withColumn(
        "product_id",
        when(
            col("product_id").cast("string").rlike("^[0-9]+$"),
            col("product_id").cast("string")
        ).otherwise(lit(999999).cast("string"))
    )
    # 3. Rename product_name â†’ product
    .withColumnRenamed("product_name", "product")
)



In [0]:
df_silver = df_silver.select("product_code", "division", "category", "product", "variant", "product_id", "read_timestamp", "file_name", "file_size")

In [0]:
display(df_silver)

In [0]:
df_silver.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .option("mergeSchema", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")

### Gold Data Processing

In [0]:
df_silver = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source};")
df_gold = df_silver.select("product_code", "product_id", "division", "category", "product", "variant")
df_gold.show(5)

In [0]:
df_gold.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

Merging Data Source with Parent

In [0]:
delta_table = DeltaTable.forName(spark, "fmcg.gold.dim_products")
df_child_products = spark.sql(f"SELECT product_code, division, category, product, variant FROM fmcg.gold.sb_dim_products;")
df_child_products.show(5)

In [0]:
delta_table.alias("target").merge(
    source=df_child_products.alias("source"),
    condition="target.product_code = source.product_code"
).whenMatchedUpdate(
    set={
        "division": "source.division",
        "category": "source.category",
        "product": "source.product",
        "variant": "source.variant"
    }
).whenNotMatchedInsert(
    values={
        "product_code": "source.product_code",
        "division": "source.division",
        "category": "source.category",
        "product": "source.product",
        "variant": "source.variant"
    }
).execute()