In [0]:
# import the essential libraries
from pyspark.sql import functions as F
from delta.tables import DeltaTable

### Bronze Processing

In [0]:
%run /Workspace/Users/lavouver96@gmail.com/Consolidated_Pipeline/1_setup/utilities

In [0]:
# check the variable
print(bronze_schema, silver_schema, gold_schema)

In [0]:
# create the widgets for user input
dbutils.widgets.text("catalog", "fmcg", "Catalog")
dbutils.widgets.text("data_source", "customers", "Data Source")

In [0]:
# get the catalog and data source
catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")


# create the source path that we can read the csv
base_path = f"s3://sports-db-side-project/{data_source}/*.csv"

In [0]:
# read data from s3 and set up the option and format
df = (
    spark.read.format('csv')
    .option("header", True)
    .option("inferSchema", True)
    .load(base_path)
    .withColumn("read_timestamp", F.current_timestamp())
    .select("*", "_metadata.file_name", "_metadata.file_size")
    )

display(df.limit(10))

In [0]:
# write the data into bronze layer with delta format
df.write \
    .format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .mode("overwrite") \
    .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")

### Silver Processing

In [0]:
# Get the bronze data
df_bronze = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.{data_source};")
df_bronze.show(10)

In [0]:
# Check the schema, is there need to change the data type
df_bronze.printSchema()

#### Deal with Duplicate

In [0]:
# check how many duplicates we have
df_duplicates = df_bronze.groupBy("customer_id").count().filter(F.col("count") > 1)
display(df_duplicates) 

In [0]:
# drop the duplicates and check the number
print("Row before duplicates dropped: ", df_bronze.count())
df_silver = df_bronze.dropDuplicates(['customer_id'])
print("Row after duplicates dropped: ", df_silver.count())

#### Trim the space

In [0]:
# Show how many rows has the unwanted space
display(
  df_silver.filter(F.col("customer_name") != F.trim(F.col("customer_name")))
)

In [0]:
# trim the space
df_silver = (
    df_silver.withColumn(
        "customer_name",
        F.trim(F.col("customer_name"))
    )
)

#### Fix the typo

In [0]:
# check with the business user for correct_name
city_mapping = {
    'Bengaluruu': 'Bengaluru',
    'Bengalore': 'Bengaluru',

    'Hyderabadd': 'Hyderabad',
    'Hyderbad': 'Hyderabad',

    'NewDelhi': 'New Delhi',
    'NewDheli': 'New Delhi',
    'NewDelhee': 'New Delhi'
}

In [0]:
# if the city is not three of allowed list, it is garbage
allowed = ["Bengaluru", "Hyderabad", "New Delhi"]

df_silver = (
    df_silver
    .replace(city_mapping, subset=["city"])
    .withColumn(
        "city",
        F.when(F.col("city").isNull(), None)
         .when(F.col("city").isin(allowed), F.col("city"))
         .otherwise(None)
    )
)

#### Fix the case with initcap

In [0]:
# if the customer name is null, sets it as None
df_silver = df_silver.withColumn(
    "customer_name",
    F.when(F.col("customer_name").isNull(), None)
     .otherwise(F.initcap("customer_name"))
)

In [0]:
# sanity check
df_silver.select('customer_name').distinct().show()

#### Handle missing cities

In [0]:
# check the null
df_silver.filter(F.col("city").isNull()).show(truncate=False)

In [0]:
# get all rows with customer name contains null city
null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
df_silver.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)

In [0]:
# check the result, we observe that each customer name has the specific rule in city name
# business Confirmation Note: City corrections confirmed by business team
customer_city_fix = {
    # Sprintx Nutrition
    789403: "New Delhi",

    # Zenathlete Foods
    789420: "Bengaluru",

    # Primefuel Nutrition
    789521: "Hyderabad",

    # Recovery Lane
    789603: "Hyderabad"
}

# create the df with customer_id and fixed city
df_fix = spark.createDataFrame(
    [(k, v) for k, v in customer_city_fix.items()],
    ["customer_id", "fixed_city"]
)

display(df_fix)

In [0]:
# join silver dataframe with fixed city, then drop
df_silver = (
    df_silver
    .join(df_fix, "customer_id", "left")
    .withColumn(
        "city",
        F.coalesce("city", "fixed_city")   # Replace null with fixed city
    )
    .drop("fixed_city")
)

In [0]:
# sanity Checks

null_customer_names = ['Sprintx Nutrition', 'Zenathlete Foods', 'Primefuel Nutrition', 'Recovery Lane']
df_silver.filter(F.col("customer_name").isin(null_customer_names)).show(truncate=False)

#### Convert customer_id to string

In [0]:
# convert type of customer_id into string
df_silver = df_silver.withColumn("customer_id", F.col("customer_id").cast("string"))
print(df_silver.printSchema())

#### Standardizing Customer Attributes to Match Parent Company Data Model

In [0]:
df_silver = (
    df_silver
    # build final customer column: "CustomerName-City" or "CustomerName-Unknown"
    # after checking the dim_customer table of parant company
    # its customer combines with name and city
    .withColumn(
        "customer",
        F.concat_ws("-", "customer_name", F.coalesce(F.col("city"), F.lit("Unknown")))
    )
    
    # static attributes aligned with parent data model
    .withColumn("market", F.lit("India"))
    .withColumn("platform", F.lit("Sports Bar"))
    .withColumn("channel", F.lit("Acquisition"))
)

In [0]:
display(df_silver.limit(5))

In [0]:
# write int silver layer
df_silver.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .option("mergeSchema", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")

### Gold Processing

In [0]:
df_silver = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source};")


# take req cols only
# "customer_id, customer_name, city, read_timestamp, file_name, file_size, customer, market, platform, channel"
df_gold = df_silver.select("customer_id", "customer_name", "city", "customer", "market", "platform", "channel")

In [0]:
 # sb means abbreviation of child company 
df_gold.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

### Merging Data source with parent

In [0]:
# get the delta table of parent's gold customer table
delta_table = DeltaTable.forName(spark, "fmcg.gold.dim_customers")
df_child_customers = spark.table("fmcg.gold.sb_dim_customers").select(
    F.col("customer_id").alias("customer_code"),
    "customer",
    "market",
    "platform",
    "channel"
)

In [0]:
# do the upsert, parent's gold as the target, child's gold as the source
# if the customer_code exists in the target, update all columns
# if the customer_code doesn't exist in the target, insert all columns
delta_table.alias("target").merge(
    source=df_child_customers.alias("source"),
    condition="target.customer_code = source.customer_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()