#### Import required libs

In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run /Workspace/FMCG_Project/01_setup/02_schema_utilities

#### Set up widgets

In [0]:
dbutils.widgets.text("catalog", "fmcg", "catalog")
dbutils.widgets.text("data_source", "customers", "data_source")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

print(f"{catalog} - {data_source}")

fmcg - customers


#### Set up base_path

In [0]:
base_path = f"/Volumes/fmcg/bronze/souce_data/chaild_company/full_load/{data_source}/*.csv"
base_path

'/Volumes/fmcg/bronze/souce_data/chaild_company/full_load/customers/*.csv'

## Bronze Layer

In [0]:
df_cust = (spark.read.format("csv")\
              .option("inferSchema", True)\
              .option("header", True) \
              .load(base_path) \
              .withColumn("read_timestamp", F.current_timestamp())\
              .select("*", "_metadata.file_name", "_metadata.file_size")
          )

In [0]:
df_cust.write.format("delta")\
             .mode("overwrite")\
             .option("enableChangeDataFeed", True)\
             .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")
             

## Silver Layer

In [0]:
df_silver = spark.read.table(f"{catalog}.{bronze_schema}.{data_source}")

display(df_silver.limit(10))

customer_id,customer_name,city,read_timestamp,file_name,file_size
789201,FitFuel Market,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789202,FitFuel Market,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404
789203,FitFuel Market,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404
789301,Athlete's Choice Store,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789303,Athlete's Choice Store,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404
789101,Endurance Foods,Bengalore,2025-11-29T10:07:14.134Z,customers.csv,1404
789102,Endurance Foods,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404
789103,Endurance Foods,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404
789121,HydroBoost Nutrition,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404
789122,HydroBoost Nutrition,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404


In [0]:
display(
    df_silver.groupBy("customer_id").count().filter(F.col("count") > 1)
)

customer_id,count
789321,2
789503,2
789522,2
789603,2


In [0]:
print(f"Records before drop duplicates : {df_silver.count()}")

df_silver = df_silver.dropDuplicates(subset=["customer_id"])

print(f"Records after drop duplicates : {df_silver.count()}")

Records before drop duplicates : 39
Records after drop duplicates : 35


In [0]:
display(
    df_silver.filter(F.col("customer_name") != F.trim(F.col("customer_name")))
)

customer_id,customer_name,city,read_timestamp,file_name,file_size
789121,HydroBoost Nutrition,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404
789401,SprintX nutrition,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789420,ZenAthlete foods,,2025-11-29T10:07:14.134Z,customers.csv,1404
789421,ZenAthlete Foods,Hyderbad,2025-11-29T10:07:14.134Z,customers.csv,1404
789521,PrimeFuel Nutrition,,2025-11-29T10:07:14.134Z,customers.csv,1404
789702,StaminaX Store,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404


In [0]:
df_silver = df_silver.withColumn("customer_name", F.trim(F.col("customer_name"))).withColumn("city", F.trim(F.col("city")))

In [0]:
display(
    df_silver.filter(F.col("customer_name") != F.trim(F.col("customer_name")))
)

customer_id,customer_name,city,read_timestamp,file_name,file_size


In [0]:
df_silver = df_silver.withColumn("customer_name", F.initcap(F.col("customer_name")))

In [0]:
display(
    df_silver.select("city").distinct().orderBy("city")
)

city
""
Bengalore
Bengaluru
Bengaluruu
Hyderabad
Hyderabadd
Hyderbad
New Delhi
NewDelhee
NewDelhi


In [0]:
# Discuss with the upstream and confirmed the below mapping of city and only allowed cities are inserted and rest will be null

city_mapping = {
    "Bengalore" : "Bengaluru",
    "Bengaluruu" : "Bengaluru",

    "Hyderabadd" : "Hyderabad",
    "Hyderbad" : "Hyderabad",

    "NewDelhee" : "New Delhi",
    "NewDelhi" : "New Delhi",
    "NewDheli" : "New Delhi",
    "New Dehli" : "New Delhi"
}

allowed_cities = ["Bengaluru", "Hyderabad", "New Delhi"]

df_silver = df_silver.replace(city_mapping, subset=["city"])

In [0]:

display(df_silver.select("city").distinct().orderBy("city"))


city
""
Bengaluru
Hyderabad
New Delhi


In [0]:
df_silver = df_silver.withColumn("city", 
                                F.when(F.col("city").isNull(), None)
                                 .when(F.col("city").isin(allowed_cities), F.col("city"))
                                 .otherwise(None) 
                                )

In [0]:
display(
    df_silver.filter(F.col("city").isNull()).select("customer_id","customer_name").distinct()
)

customer_id,customer_name
789403,Sprintx Nutrition
789420,Zenathlete Foods
789521,Primefuel Nutrition
789603,Recovery Lane


In [0]:
# Business Confirmation Note: City corrections confirmed by business team
customer_city_fix = {
    # Sprintx Nutrition
    789403: "New Delhi",

    # Zenathlete Foods
    789420: "Bengaluru",

    # Primefuel Nutrition
    789521: "Hyderabad",

    # Recovery Lane
    789603: "Hyderabad"
}

df_fix_city = spark.createDataFrame(
    [(id, city) for id, city in customer_city_fix.items()],
    ["customer_id", "fixed_city"]
)

df_fix_city.display()


customer_id,fixed_city
789403,New Delhi
789420,Bengaluru
789521,Hyderabad
789603,Hyderabad


In [0]:
df_silver = df_silver.join(df_fix_city, on="customer_id", how="left").withColumn("city", F.coalesce("city", "fixed_city")).drop("fixed_city")

In [0]:
df_silver = df_silver.withColumn("customer_id", F.col("customer_id").cast("string"))

In [0]:
df_silver.display()

customer_id,customer_name,city,read_timestamp,file_name,file_size
789503,Peak Performance Store,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404
789420,Zenathlete Foods,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789703,Staminax Store,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404
789621,Eliteathlete Nutrition,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404
789101,Endurance Foods,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789220,Macrobite Superfoods,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789720,Gameplan Foods,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789601,Recovery Lane,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404
789122,Hydroboost Nutrition,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404
789402,Sprintx Nutrition,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404


In [0]:
df_silver = (df_silver
            
            .withColumn(
                "customer",
                F.concat_ws("-", "customer_name", F.coalesce(F.col("city"), F.lit("Unknown")))
                )
            # Additional columns 
            .withColumn("market", F.lit("India"))\
            .withColumn("platform", F.lit("Sports Bar"))\
            .withColumn("Channel", F.lit("acquisition"))
            )


In [0]:
display(df_silver.limit(5))

customer_id,customer_name,city,read_timestamp,file_name,file_size,customer,market,platform,Channel
789503,Peak Performance Store,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404,Peak Performance Store-New Delhi,India,Sports Bar,acquisition
789420,Zenathlete Foods,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404,Zenathlete Foods-Bengaluru,India,Sports Bar,acquisition
789703,Staminax Store,New Delhi,2025-11-29T10:07:14.134Z,customers.csv,1404,Staminax Store-New Delhi,India,Sports Bar,acquisition
789621,Eliteathlete Nutrition,Hyderabad,2025-11-29T10:07:14.134Z,customers.csv,1404,Eliteathlete Nutrition-Hyderabad,India,Sports Bar,acquisition
789101,Endurance Foods,Bengaluru,2025-11-29T10:07:14.134Z,customers.csv,1404,Endurance Foods-Bengaluru,India,Sports Bar,acquisition


In [0]:
df_silver.write.format("delta")\
                .mode("overwrite")\
                .option("enablechangeDataFeed", True)\
                .option("mergeSchema", True)\
                .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")

## Gold Layer

In [0]:
df_gold = spark.read.table(f"{catalog}.{silver_schema}.{data_source}")

df_gold = df_gold.select("customer_id", "customer_name", "city", "customer", "market", "platform", "Channel")

In [0]:
df_gold.write.format("delta")\
            .mode("overwrite")\
            .option("enableChangeDataFeed", True) \
            .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

# Merge the sb_

In [0]:
delta_target_customer = DeltaTable.forName(spark, f"{catalog}.{gold_schema}.dim_{data_source}")

df_src_customer = spark.read.table(f"{catalog}.{gold_schema}.sb_dim_{data_source}").select(
    F.col("customer_id").cast("string").alias("customer_code"),
    "customer",
    "market",
    "platform",
    "channel"
)

delta_target_customer.alias("trg").merge(
    df_src_customer.alias("src"),
    condition="trg.customer_code = src.customer_code"
).whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]