In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable


In [0]:
%run /Workspace/Users/idowusangotade082@gmail.com/Sports_pipeline/Setup/utilities

In [0]:
print(bronze_schema)
print(silver_schema)

bronze
silver


In [0]:
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "customers", "Data Source")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

base_path = f's3://sportstorage20/{data_source}/*.csv'
print(base_path)

s3://sportstorage20/customers/*.csv


In [0]:
df = (
    spark.read.format("csv")
        .option("header", True)
        .option("inferSchema", True)
        .load(base_path)
        .withColumn("read_timestamp", F.current_timestamp())
        .select("*", "_metadata.file_name", "_metadata.file_size")
)
display(df)

customer_id,customer_name,city,read_timestamp,file_name,file_size
789201,FitFuel Market,Bengaluru,2026-01-16T22:02:05.255Z,customers.csv,1404
789202,FitFuel Market,Hyderabad,2026-01-16T22:02:05.255Z,customers.csv,1404
789203,FitFuel Market,New Delhi,2026-01-16T22:02:05.255Z,customers.csv,1404
789301,Athlete's Choice Store,Bengaluru,2026-01-16T22:02:05.255Z,customers.csv,1404
789303,Athlete's Choice Store,New Delhi,2026-01-16T22:02:05.255Z,customers.csv,1404
789101,Endurance Foods,Bengalore,2026-01-16T22:02:05.255Z,customers.csv,1404
789102,Endurance Foods,Hyderabad,2026-01-16T22:02:05.255Z,customers.csv,1404
789103,Endurance Foods,New Delhi,2026-01-16T22:02:05.255Z,customers.csv,1404
789121,HydroBoost Nutrition,Hyderabad,2026-01-16T22:02:05.255Z,customers.csv,1404
789122,HydroBoost Nutrition,New Delhi,2026-01-16T22:02:05.255Z,customers.csv,1404


In [0]:
df.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- read_timestamp: timestamp (nullable = false)
 |-- file_name: string (nullable = false)
 |-- file_size: long (nullable = false)



## Send the Raw Data to Bronze Schema

In [0]:
df.write\
    .format("delta") \
    .option("enableChangeDataFeed", "true") \
    .mode("Overwrite") \
    .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")


## SILVER PROCESSING FOR Customers table

In [0]:
# Read bronze table
bronze_df = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.{data_source}")
display(bronze_df )

customer_id,customer_name,city,read_timestamp,file_name,file_size
789201,FitFuel Market,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404
789202,FitFuel Market,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404
789203,FitFuel Market,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404
789301,Athlete's Choice Store,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404
789303,Athlete's Choice Store,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404
789101,Endurance Foods,Bengalore,2026-01-16T22:02:16.595Z,customers.csv,1404
789102,Endurance Foods,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404
789103,Endurance Foods,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404
789121,HydroBoost Nutrition,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404
789122,HydroBoost Nutrition,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404


In [0]:
bronze_df.show(10)

+-----------+--------------------+---------+--------------------+-------------+---------+
|customer_id|       customer_name|     city|      read_timestamp|    file_name|file_size|
+-----------+--------------------+---------+--------------------+-------------+---------+
|     789201|      FitFuel Market|Bengaluru|2026-01-16 22:02:...|customers.csv|     1404|
|     789202|      FitFuel Market|Hyderabad|2026-01-16 22:02:...|customers.csv|     1404|
|     789203|      FitFuel Market|New Delhi|2026-01-16 22:02:...|customers.csv|     1404|
|     789301|Athlete's Choice ...|Bengaluru|2026-01-16 22:02:...|customers.csv|     1404|
|     789303|Athlete's Choice ...|New Delhi|2026-01-16 22:02:...|customers.csv|     1404|
|     789101|     Endurance Foods|Bengalore|2026-01-16 22:02:...|customers.csv|     1404|
|     789102|     Endurance Foods|Hyderabad|2026-01-16 22:02:...|customers.csv|     1404|
|     789103|     Endurance Foods|New Delhi|2026-01-16 22:02:...|customers.csv|     1404|
|     7891

In [0]:
bronze_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- read_timestamp: timestamp (nullable = true)
 |-- file_name: string (nullable = true)
 |-- file_size: long (nullable = true)



In [0]:
#check for duplicates
bronze_duplicates = bronze_df.groupBy("customer_id").count().filter("count > 1")
display(bronze_duplicates)

customer_id,count
789321,2
789503,2
789522,2
789603,2


In [0]:
#Drop duplicates

silver_df = bronze_df.dropDuplicates(["customer_id"])
display(silver_df)

customer_id,customer_name,city,read_timestamp,file_name,file_size
789201,FitFuel Market,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404
789202,FitFuel Market,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404
789203,FitFuel Market,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404
789301,Athlete's Choice Store,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404
789303,Athlete's Choice Store,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404
789101,Endurance Foods,Bengalore,2026-01-16T22:02:16.595Z,customers.csv,1404
789102,Endurance Foods,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404
789103,Endurance Foods,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404
789121,HydroBoost Nutrition,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404
789122,HydroBoost Nutrition,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404


In [0]:

print('rows before duplicates dropped: ', bronze_df.count())
print('rows after duplicates dropped: ', silver_df.count())

rows before duplicates dropped:  39
rows after duplicates dropped:  35


In [0]:
# check for customer nmaes with white  spaces
silver_df.filter(F.col("customer_name").contains(" ")).show()

+-----------+--------------------+----------+--------------------+-------------+---------+
|customer_id|       customer_name|      city|      read_timestamp|    file_name|file_size|
+-----------+--------------------+----------+--------------------+-------------+---------+
|     789201|      FitFuel Market| Bengaluru|2026-01-16 22:02:...|customers.csv|     1404|
|     789202|      FitFuel Market| Hyderabad|2026-01-16 22:02:...|customers.csv|     1404|
|     789203|      FitFuel Market| New Delhi|2026-01-16 22:02:...|customers.csv|     1404|
|     789301|Athlete's Choice ...| Bengaluru|2026-01-16 22:02:...|customers.csv|     1404|
|     789303|Athlete's Choice ...| New Delhi|2026-01-16 22:02:...|customers.csv|     1404|
|     789101|     Endurance Foods| Bengalore|2026-01-16 22:02:...|customers.csv|     1404|
|     789102|     Endurance Foods| Hyderabad|2026-01-16 22:02:...|customers.csv|     1404|
|     789103|     Endurance Foods| New Delhi|2026-01-16 22:02:...|customers.csv|     1404|

In [0]:
## Remove the spaces
silver_df = silver_df.withColumn("customer_name", F.trim(F.col("customer_name")))

In [0]:
display(
    silver_df.filter(F.col("customer_name")!= F.trim(F.col("customer_name")))
)

customer_id,customer_name,city,read_timestamp,file_name,file_size


In [0]:
## Tnspect City columns

silver_df.select("city").distinct().show()


+----------+
|      city|
+----------+
| Bengaluru|
| Hyderabad|
| New Delhi|
| Bengalore|
|Hyderabadd|
|      NULL|
|  Hyderbad|
| NewDelhee|
|  NewDelhi|
|Bengaluruu|
|  NewDheli|
+----------+



In [0]:
# # typo dictionary
# city_typos = {
#     'Bengaluru': ['Bengaluruu', 'Bengaluruu', 'Bengalore'],
#     'Hyderabad': ['Hyderabadd', 'Hyderbad'],
#     'New Delhi': ['NewDelhi', 'NewDheli', 'NewDelhee']
# }

# typos â†’ correct names
city_mapping = {
    'Bengaluruu': 'Bengaluru',
    'Bengalore': 'Bengaluru',

    'Hyderabadd': 'Hyderabad',
    'Hyderbad': 'Hyderabad',

    'NewDelhi': 'New Delhi',
    'NewDheli': 'New Delhi',
    'NewDelhee': 'New Delhi'
}


allowed = ["Bengaluru", "Hyderabad", "New Delhi"]

silver_df = (
    silver_df
    .replace(city_mapping, subset=["city"])
    .withColumn(
        "city",
        F.when(F.col("city").isNull(), None)
         .when(F.col("city").isin(allowed), F.col("city"))
         .otherwise(None)
    )
)

silver_df.select("city").distinct().show()

+---------+
|     city|
+---------+
|Bengaluru|
|Hyderabad|
|New Delhi|
|     NULL|
+---------+



In [0]:
## Check Case issue
silver_df.select('customer_name').distinct().show()

+--------------------+
|       customer_name|
+--------------------+
|      FitFuel Market|
|Athlete's Choice ...|
|     Endurance Foods|
|HydroBoost Nutrition|
|MacroBite Superfoods|
|MacroBite superfoods|
|      PowerSnack Hub|
|      PowerSnack hub|
|   SprintX nutrition|
|   SprintX Nutrition|
|    ZenAthlete foods|
|    ZenAthlete Foods|
|Peak performance ...|
|Peak Performance ...|
| PrimeFuel Nutrition|
|       Recovery Lane|
|      StaminaX Store|
|EliteAthlete Nutr...|
|      GamePlan Foods|
|   Champion's choice|
+--------------------+
only showing top 20 rows


## We need to correct the issue of  sending name having capital & lower case in some cases

In [0]:
# Title case fix
silver_df = silver_df.withColumn(
    "customer_name",
    F.when(F.col("customer_name").isNull(), None)
     .otherwise(F.initcap("customer_name"))
)

In [0]:
silver_df.select('customer_name').distinct().show(5)

+--------------------+
|       customer_name|
+--------------------+
|      Fitfuel Market|
|Athlete's Choice ...|
|     Endurance Foods|
|Hydroboost Nutrition|
|Macrobite Superfoods|
+--------------------+
only showing top 5 rows


In [0]:
## Check the missing columns
silver_df.filter(F.col("city").isNull()).show(truncate=False)



+-----------+-------------------+----+--------------------------+-------------+---------+
|customer_id|customer_name      |city|read_timestamp            |file_name    |file_size|
+-----------+-------------------+----+--------------------------+-------------+---------+
|789403     |Sprintx Nutrition  |NULL|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789420     |Zenathlete Foods   |NULL|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789521     |Primefuel Nutrition|NULL|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789603     |Recovery Lane      |NULL|2026-01-16 22:02:16.595808|customers.csv|1404     |
+-----------+-------------------+----+--------------------------+-------------+---------+



In [0]:
null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
silver_df.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)

+-----------+-------------------+---------+--------------------------+-------------+---------+
|customer_id|customer_name      |city     |read_timestamp            |file_name    |file_size|
+-----------+-------------------+---------+--------------------------+-------------+---------+
|789401     |Sprintx Nutrition  |Bengaluru|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789402     |Sprintx Nutrition  |Hyderabad|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789403     |Sprintx Nutrition  |NULL     |2026-01-16 22:02:16.595808|customers.csv|1404     |
|789420     |Zenathlete Foods   |NULL     |2026-01-16 22:02:16.595808|customers.csv|1404     |
|789421     |Zenathlete Foods   |Hyderabad|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789422     |Zenathlete Foods   |New Delhi|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789520     |Primefuel Nutrition|Bengaluru|2026-01-16 22:02:16.595808|customers.csv|1404     |
|789521     |Primefuel Nutrition|NULL     |2026-01

In [0]:
# Business Confirmation Note: City corrections confirmed by business team
customer_city_fix = {
    # Sprintx Nutrition
    789403: "New Delhi",

    # Zenathlete Foods
    789420: "Bengaluru",

    # Primefuel Nutrition
    789521: "Hyderabad",

    # Recovery Lane
    789603: "Hyderabad"
}

df_fix = spark.createDataFrame(
    [(k, v) for k, v in customer_city_fix.items()],
    ["customer_id", "fixed_city"]
)

display(df_fix)

customer_id,fixed_city
789403,New Delhi
789420,Bengaluru
789521,Hyderabad
789603,Hyderabad


In [0]:
silver_df = (
    silver_df
    .join(df_fix, "customer_id", "left")
    .withColumn(
        "city",
        F.coalesce("city", "fixed_city")   # Replace null with fixed city
    )
    .drop("fixed_city")
)

In [0]:
# check  for confirmation
silver_df.filter(F.col("city").isNull()).show(truncate=False)


+-----------+-------------+----+--------------+---------+---------+
|customer_id|customer_name|city|read_timestamp|file_name|file_size|
+-----------+-------------+----+--------------+---------+---------+
+-----------+-------------+----+--------------+---------+---------+



In [0]:
# Convert cutomer_id tostring
silver_df = silver_df.withColumn("customer_id", F.col("customer_id").cast("string"))
print(silver_df.printSchema())

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- read_timestamp: timestamp (nullable = true)
 |-- file_name: string (nullable = true)
 |-- file_size: long (nullable = true)

None


## Update the Child company  table to match the Parent Company

In [0]:
silver_df = (
    silver_df
    # Build final customer column to concatenante customer name and city
    # "Unknown" if city is null or "Unknown"
    .withColumn(
        "customer",
        F.concat_ws("-", "customer_name", F.coalesce(F.col("city"), F.lit("Unknown")))
    )
    
    # constant attributes aligned with parent data model
    .withColumn("market", F.lit("India"))
    .withColumn("platform", F.lit("Sports Bar"))
    .withColumn("channel", F.lit("Acquisition"))
)

In [0]:
display(silver_df.limit(10))

customer_id,customer_name,city,read_timestamp,file_name,file_size,customer,market,platform,channel
789503,Peak Performance Store,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404,Peak Performance Store-New Delhi,India,Sports Bar,Acquisition
789420,Zenathlete Foods,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404,Zenathlete Foods-Bengaluru,India,Sports Bar,Acquisition
789703,Staminax Store,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404,Staminax Store-New Delhi,India,Sports Bar,Acquisition
789621,Eliteathlete Nutrition,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404,Eliteathlete Nutrition-Hyderabad,India,Sports Bar,Acquisition
789101,Endurance Foods,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404,Endurance Foods-Bengaluru,India,Sports Bar,Acquisition
789220,Macrobite Superfoods,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404,Macrobite Superfoods-Bengaluru,India,Sports Bar,Acquisition
789720,Gameplan Foods,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404,Gameplan Foods-Bengaluru,India,Sports Bar,Acquisition
789601,Recovery Lane,Bengaluru,2026-01-16T22:02:16.595Z,customers.csv,1404,Recovery Lane-Bengaluru,India,Sports Bar,Acquisition
789122,Hydroboost Nutrition,New Delhi,2026-01-16T22:02:16.595Z,customers.csv,1404,Hydroboost Nutrition-New Delhi,India,Sports Bar,Acquisition
789402,Sprintx Nutrition,Hyderabad,2026-01-16T22:02:16.595Z,customers.csv,1404,Sprintx Nutrition-Hyderabad,India,Sports Bar,Acquisition


In [0]:
silver_df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .option("mergeSchema", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")

## Gold Layer

In [0]:
silver_df = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source}")


# select req columns only
# "customer_id, customer_name, city, read_timestamp, file_name, file_size, customer, market, platform, channel"
gold_df= silver_df.select("customer_id", "customer_name", "city", "customer", "market", "platform", "channel")

In [0]:
display(gold_df.limit(10))


customer_id,customer_name,city,customer,market,platform,channel
789503,Peak Performance Store,New Delhi,Peak Performance Store-New Delhi,India,Sports Bar,Acquisition
789420,Zenathlete Foods,Bengaluru,Zenathlete Foods-Bengaluru,India,Sports Bar,Acquisition
789703,Staminax Store,New Delhi,Staminax Store-New Delhi,India,Sports Bar,Acquisition
789621,Eliteathlete Nutrition,Hyderabad,Eliteathlete Nutrition-Hyderabad,India,Sports Bar,Acquisition
789101,Endurance Foods,Bengaluru,Endurance Foods-Bengaluru,India,Sports Bar,Acquisition
789220,Macrobite Superfoods,Bengaluru,Macrobite Superfoods-Bengaluru,India,Sports Bar,Acquisition
789720,Gameplan Foods,Bengaluru,Gameplan Foods-Bengaluru,India,Sports Bar,Acquisition
789601,Recovery Lane,Bengaluru,Recovery Lane-Bengaluru,India,Sports Bar,Acquisition
789122,Hydroboost Nutrition,New Delhi,Hydroboost Nutrition-New Delhi,India,Sports Bar,Acquisition
789402,Sprintx Nutrition,Hyderabad,Sprintx Nutrition-Hyderabad,India,Sports Bar,Acquisition


In [0]:
gold_df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

In [0]:
delta_table = DeltaTable.forName(spark, "fmcg.gold.dim_customers")
df_child_customers = spark.table("fmcg.gold.sb_dim_customers").select(
    F.col("customer_id").alias("customer_code"),
    "customer",
    "market",
    "platform",
    "channel"
)

In [0]:
delta_table.alias("target").merge(
    source=df_child_customers.alias("source"),
    condition="target.customer_code = source.customer_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]