In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable


In [0]:
%run /Workspace/Users/itishagajbhiye7@gmail.com/consolidated_pipeline/2_dimension_data_processing/utilities

In [0]:
print(bronze_schema, silver_schema, gold_schema)

In [0]:
dbutils.widgets.text("catalog","fmcg","Catalog")
dbutils.widgets.text("data_source","customer","Data Source")

In [0]:
catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")
print(catalog, data_source)

In [0]:
base_path = f's3://sportsbar-dp-db/{data_source}/*.csv'
print(base_path)

In [0]:
df = spark.read.format("csv").load(base_path)
display(df.limit(10))

Bronze

In [0]:
df = (
    spark.read.format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(base_path)
        .withColumn("read_timestamp", F.current_timestamp())
        .select("*","_metadata.file_name","_metadata.file_size")
)
display(df.limit(10))

In [0]:
df.printSchema()

In [0]:
df.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")

**Silver Layer**

In [0]:
df_bronze = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.{data_source};")
df_bronze.show(10)

In [0]:
df_duplicates = df_bronze.groupBy("customer_id").count().where("count > 1")
display(df_duplicates)


In [0]:
print("Rows before duplicates",df_bronze.count())
df_silver = df_bronze.dropDuplicates()
print('Rows after duplicates dropped: ', df_silver.count())

In [0]:
display(
    df_silver.filter(F.col("customer_name") != F.trim(F.col("customer_name")))
    )

In [0]:
df_silver = df_silver.withColumn("customer_name", F.trim(F.col("customer_name")))
                                 

In [0]:
df_silver.select('city').distinct().show()


In [0]:
city_mapping = {
    'Bengaluruu': 'Bengaluru',
    'Bengalore': 'Bengaluru',

    'Hyderabadd': 'Hyderabad',
    'Hyderbad': 'Hyderabad',

    'NewDelhi': 'New Delhi',
    'NewDheli': 'New Delhi',
    'NewDelhee': 'New Delhi'
}


allowed = ["Bengaluru", "Hyderabad", "New Delhi"]

df_silver = (
    df_silver
    .replace(city_mapping, subset=["city"])
    .withColumn(
        "city",
        F.when(F.col("city").isNull(), None)
         .when(F.col("city").isin(allowed), F.col("city"))
         .otherwise(None)
    )
)

In [0]:
df_silver.select('city').distinct().show()

In [0]:
# Title case fix
df_silver = df_silver.withColumn(
    "customer_name",
    F.when(F.col("customer_name").isNull(),None)
    .otherwise(F.initcap("customer_name"))
)
df_silver.select("customer_name").distinct().show()

In [0]:
df_silver.filter(F.col("city").isNull()).show(truncate=False)


In [0]:
null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
df_silver.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)


In [0]:
customer_city_fix = {
    # Sprintx Nutrition
    789403: "New Delhi",

    # Zenathlete Foods
    789420: "Bengaluru",

    # Primefuel Nutrition
    789521: "Hyderabad",

    # Recovery Lane
    789603: "Hyderabad"
}
df_fix = spark.createDataFrame(
    [(k, v) for k, v in customer_city_fix.items()],
    ["customer_id", "fixed_city"]
)

display(df_fix)

In [0]:
df_silver = (
    df_silver.join(df_fix, on="customer_id", how="left")
    .withColumn
    ("city", F.coalesce(F.col("city"), F.col("fixed_city")))
    .drop("Fixed_city")
)
display(df_silver)


In [0]:
df_silver = df_silver.withColumn("customer_id", F.col("customer_id").cast("string"))
print(df_silver.printSchema())

In [0]:
df_silver = (
    df_silver
    .withColumn(
        "customer",
        F.concat_ws("-","customer_name", F.coalesce(F.col("city"),F.lit("Unknown")))
    )
    .withColumn("market",F.lit("India"))
    .withColumn("platform",F.lit("Sports Bar"))
    .withColumn("channel",F.lit("Acquisition"))
)

In [0]:
df_silver.printSchema()

In [0]:
display(df_silver.limit(5))

**Gold Layer**

In [0]:
df_silver = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source};")

df_gold =  df_silver.select("customer_id","customer_name","city","customer","market", "platform", "channel")

In [0]:
df_silver.printSchema()

In [0]:
df_gold.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

Merging Data source wit parent

In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark,"fmcg.gold.dim_customers")
df_child_customers = spark.table("fmcg.gold.sb_dim_customers").select(
  F.col("customer_id").alias("customer_code"),
  "customer",
  "market",
  "platform",
  "channel"
)

In [0]:
display(df_child_customers)

In [0]:
df_child_customers.groupBy("customer_code") \
    .count() \
    .filter("count > 1") \
    .show()

Again Removing duplicates from df_child_customers

In [0]:
df_child_customers_dedup = df_child_customers.dropDuplicates(["customer_code"])


In [0]:
delta_table.alias("target").merge(
    source=df_child_customers_dedup.alias("source"),
    condition="target.customer_code = source.customer_code"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()