In [0]:
from pyspark.sql.functions import *
from delta.tables import *
from pyspark.sql.window import Window

In [0]:
%run /Workspace/consolidated_pipeline/1_setup/utilities

In [0]:
print(bronze_schema,silver_schema,gold_schema)

bronze silver gold


In [0]:
dbutils.widgets.text("catalog","fmcg","Catalog")
dbutils.widgets.text("data_source","gross_price","Data Source")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

base_path = f's3://pranshu-sports-bar/{data_source}/*.csv'
print(base_path)

s3://pranshu-sports-bar/gross_price/*.csv


## Bronze

In [0]:
df = (
  spark.read.format("csv") \
    .option("inferSchema","true") \
      .option("header","true") \
        .load(base_path) \
          .withColumn("read_timestamp",current_timestamp()) \
            .select("*","_metadata.file_name","_metadata.file_size")
)

In [0]:
df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- gross_price: string (nullable = true)
 |-- read_timestamp: timestamp (nullable = false)
 |-- file_name: string (nullable = false)
 |-- file_size: long (nullable = false)



In [0]:
display(df.limit(10))

product_id,month,gross_price,read_timestamp,file_name,file_size
25891101,2025/07/01,-84,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891101,01/08/2025,unknown,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891101,2025/09/01,84,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891101,2025-10-01,83,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891101,2025-11-01,83,2026-01-19T21:06:39.878Z,gross_price.csv,2741
88888888,2025-12-01,-83,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891102,2025-07-01,68,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891102,2025-08-01,68,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891102,2025-09-01,68,2026-01-19T21:06:39.878Z,gross_price.csv,2741
25891102,2025-10-01,69,2026-01-19T21:06:39.878Z,gross_price.csv,2741


In [0]:
df.write \
    .format("delta") \
        .option("delta.enableChangeDataFeed","true") \
            .mode("overwrite") \
                .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")

## Silver processing

In [0]:
df_bronze = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.{data_source};")
df_bronze.show(10)

+----------+----------+-----------+--------------------+---------------+---------+
|product_id|     month|gross_price|      read_timestamp|      file_name|file_size|
+----------+----------+-----------+--------------------+---------------+---------+
|  25891101|2025/07/01|        -84|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|01/08/2025|    unknown|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|2025/09/01|         84|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|2025-10-01|         83|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|2025-11-01|         83|2026-01-19 21:06:...|gross_price.csv|     2741|
|  88888888|2025-12-01|        -83|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891102|2025-07-01|         68|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891102|2025-08-01|         68|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891102|2025-09-01|         68|2026-01-19 21:06:...|gross_price.csv|     2741|
|  2

### Transformation

In [0]:
# 1. month ko sahi kro

df_bronze.select("month").distinct().display()

month
2025/07/01
01/08/2025
2025/09/01
2025-10-01
2025-11-01
2025-12-01
2025-07-01
2025-08-01
2025-09-01
2025/11/01


In [0]:
# parse month
date_formats = ["yyyy/MM/dd" , "dd/MM/yyyy", "yyyy-MM-dd","dd-MM-yyyy"]

df_silver = df_bronze.withColumn(
    "month",
    coalesce(
        try_to_date(col("month"),"yyyy/MM/dd"),
        try_to_date(col("month"),"dd/MM/yyyy"),
        try_to_date(col("month"),"yyyy-MM-dd"),
        try_to_date(col("month"),"dd-MM-yyyy")
    )
)

In [0]:
df_silver.select("month").distinct().display()

month
2025-07-01
2025-08-01
2025-09-01
2025-10-01
2025-11-01
2025-12-01


In [0]:
# 2. gross price ko sahi krna

df_silver.display()

product_id,month,gross_price,read_timestamp,file_name,file_size
25891101,2025-07-01,-84,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,2025-08-01,unknown,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,2025-09-01,84,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,2025-10-01,83,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,2025-11-01,83,2026-01-19T21:06:52.941Z,gross_price.csv,2741
88888888,2025-12-01,-83,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,2025-07-01,68,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,2025-08-01,68,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,2025-09-01,68,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,2025-10-01,69,2026-01-19T21:06:52.941Z,gross_price.csv,2741


In [0]:
df_silver = df_silver.withColumn(
    "gross_price",
    when(
        col("gross_price").rlike(r'^-?\d+(\.\d+)?$'),
        when(
            col("gross_price").cast("double") < 0,
            -1 * col("gross_price").cast("double")
        ).otherwise(
            col("gross_price").cast("double")
        )
    ).otherwise(0)
)

In [0]:
df_silver.show(10)

+----------+----------+-----------+--------------------+---------------+---------+
|product_id|     month|gross_price|      read_timestamp|      file_name|file_size|
+----------+----------+-----------+--------------------+---------------+---------+
|  25891101|2025-07-01|       84.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|2025-08-01|        0.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|2025-09-01|       84.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|2025-10-01|       83.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891101|2025-11-01|       83.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  88888888|2025-12-01|       83.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891102|2025-07-01|       68.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891102|2025-08-01|       68.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  25891102|2025-09-01|       68.0|2026-01-19 21:06:...|gross_price.csv|     2741|
|  2

In [0]:
# # Ensure gross_price is double and positive
# from pyspark.sql.functions import abs

# df_silver = df_silver.withColumn(
#     "gross_price",
#     when(
#         col("gross_price").rlike(r'^-?\d+(\.\d+)?$'),
#         abs(col("gross_price").cast("double"))
#     ).otherwise(0.0)
# )
# df_silver = df_silver.withColumn("gross_price", col("gross_price").cast("double"))
# display(df_silver)

In [0]:
# 3.enrichment of data by joining products table with this on product_id and selecting product_code

df_products = spark.table("fmcg.silver.products")
df_joined = df_silver.join(df_products.select("product_id","product_code"),on = "product_id",how = "inner")
df_joined = df_joined.select("product_id","product_code","month","gross_price","read_timestamp","file_name","file_size")

In [0]:
df_joined.display()

product_id,product_code,month,gross_price,read_timestamp,file_name,file_size
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-07-01,84.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-08-01,0.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-09-01,84.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-10-01,83.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-11-01,83.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-07-01,68.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-08-01,68.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-09-01,68.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-10-01,69.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-11-01,69.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741


In [0]:
df_joined.write \
    .format("delta") \
        .option("delta.enableChangeDataFeed","true") \
            .option("mergeSchema","true") \
                .mode("overwrite") \
                    .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")

# Gold

In [0]:
df_silver = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source};")
display(df_silver)

product_id,product_code,month,gross_price,read_timestamp,file_name,file_size
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-07-01,84.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-08-01,0.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-09-01,84.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-10-01,83.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891101,e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,2025-11-01,83.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-07-01,68.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-08-01,68.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-09-01,68.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-10-01,69.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741
25891102,e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,2025-11-01,69.0,2026-01-19T21:06:52.941Z,gross_price.csv,2741


In [0]:
df_gold = df_silver.select("product_code","gross_price","month")
df_gold.show(5)

+--------------------+-----------+----------+
|        product_code|gross_price|     month|
+--------------------+-----------+----------+
|e91ba9d665f90254d...|       84.0|2025-07-01|
|e91ba9d665f90254d...|        0.0|2025-08-01|
|e91ba9d665f90254d...|       84.0|2025-09-01|
|e91ba9d665f90254d...|       83.0|2025-10-01|
|e91ba9d665f90254d...|       83.0|2025-11-01|
+--------------------+-----------+----------+
only showing top 5 rows


In [0]:
df_gold.write \
    .format("delta") \
        .option("delta.enableChangeDataFeed","true") \
            .mode("overwrite") \
                .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

## Merging with parent dataset

In [0]:
df_gold_price = spark.table("fmcg.gold.sb_dim_gross_price")
df_gold_price.show(5)

+--------------------+-----------+----------+
|        product_code|gross_price|     month|
+--------------------+-----------+----------+
|e91ba9d665f90254d...|       84.0|2025-07-01|
|e91ba9d665f90254d...|        0.0|2025-08-01|
|e91ba9d665f90254d...|       84.0|2025-09-01|
|e91ba9d665f90254d...|       83.0|2025-10-01|
|e91ba9d665f90254d...|       83.0|2025-11-01|
+--------------------+-----------+----------+
only showing top 5 rows


In [0]:
# 1. in parent the price_inr is aggregated by year

df_gold_price = (
    df_gold_price
    .withColumn(
        "year",
        year("month")
    )
    .withColumn(
        "is_zero",
        when(col("gross_price") == 0,1).otherwise(0)
    )
)

In [0]:
df_gold_price.display()

product_code,gross_price,month,year,is_zero
e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,84.0,2025-07-01,2025,0
e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,0.0,2025-08-01,2025,1
e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,84.0,2025-09-01,2025,0
e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,83.0,2025-10-01,2025,0
e91ba9d665f90254da5809bfdebe3db2be01a52f50b6fd96b57eed238392b843,83.0,2025-11-01,2025,0
e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,68.0,2025-07-01,2025,0
e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,68.0,2025-08-01,2025,0
e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,68.0,2025-09-01,2025,0
e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,69.0,2025-10-01,2025,0
e92c739a8d78cd6cbe954648c2f9dd75ed61fcfd99b03e10dca65c3082d0728e,69.0,2025-11-01,2025,0


In [0]:
window = Window \
    .partitionBy("product_code","year") \
        .orderBy(col("is_zero"),col("month").desc())

In [0]:
df_gold_latest_price = (
    df_gold_price
    .withColumn(
        "rank",
        row_number().over(window)
    )
    .filter(col("rank") == 1)
)

In [0]:
display(df_gold_latest_price)

product_code,gross_price,month,year,is_zero,rank
062f5574bbdf4386b2c7c6075483b417b4a00b172fcba919dbba7dae1b774379,281.0,2025-12-01,2025,0,1
0cb7b2f42657b625f754e833aa1cf6a967be26f17415f5342302ebb0e90c8a28,100.0,2025-10-01,2025,0,1
102628255d24304d6bbe0438b1ac992054f262e0814d306d0a34d7356cef3268,86.0,2025-12-01,2025,0,1
2e387cef1424d6e7b162b45622d4b1a788d11776e33d05cc8552f4ecd2ea1896,108.0,2025-12-01,2025,0,1
3cab59f05924285270313afcfe40a08983bb03dd88f432e34fc6336914c14345,493.0,2025-12-01,2025,0,1
451f7167b28a25bde73995910e31c07dfa26411f1db47847f19e16747effbdaa,187.0,2025-12-01,2025,0,1
716fa4e54b7894c910180276e0535d49afb25cdcfac09533fb74ae00689e5742,440.0,2025-11-01,2025,0,1
778c2a7aa27bfdb211fd5ece048de80d00fbf3d6924bd908d91054796ba16ab6,296.0,2025-12-01,2025,0,1
77b6f538a9d0e0cf845db5c2cbecec46fdd30303b501e06f64baf1d4dc0e66f9,50.0,2025-12-01,2025,0,1
889c67757ece9c973791dfbc2d47b026a3342cc7255e47a3170329d158e897c2,138.0,2025-12-01,2025,0,1


In [0]:
## Take required cols

df_gold_latest_price = df_gold_latest_price.select("product_code", "year", "gross_price").withColumnRenamed("gross_price", "price_inr").select("product_code", "price_inr", "year")

# change year to string
df_gold_latest_price = df_gold_latest_price.withColumn("year", col("year").cast("string"))

df_gold_latest_price.show(5)

+--------------------+---------+----+
|        product_code|price_inr|year|
+--------------------+---------+----+
|062f5574bbdf4386b...|    281.0|2025|
|0cb7b2f42657b625f...|    100.0|2025|
|102628255d24304d6...|     86.0|2025|
|2e387cef1424d6e7b...|    108.0|2025|
|3cab59f0592428527...|    493.0|2025|
+--------------------+---------+----+
only showing top 5 rows


In [0]:
df_gold_latest_price.printSchema()

root
 |-- product_code: string (nullable = true)
 |-- price_inr: double (nullable = true)
 |-- year: string (nullable = true)



In [0]:
df_gold_latest_price.write \
    .format("delta") \
        .option("delta.enableChangeDataFeed","true") \
            .mode("overwrite") \
                .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_latest_gross_price")

In [0]:
delta_table = DeltaTable.forName(spark,"fmcg.gold.dim_gross_price")

delta_table.alias("target").merge(
    source=df_gold_latest_price.alias("source"),
    condition="target.product_code = source.product_code"
).whenMatchedUpdate(
    set={
        "price_inr" : "source.price_inr",
        "year" : "source.year"
    }
).whenNotMatchedInsert(
    values={
        "product_code" : "source.product_code",
        "price_inr" : "source.price_inr",
        "year" : "source.year"
    }
).execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]