#Bronze to Silver: Data Cleaning and Transformation for Dimension Tables

In [0]:
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, FloatType
import pyspark.sql.functions as F

catalog_name = 'ecommerce'

#Brands

In [0]:
df_bronze = spark.read.table(f'{catalog_name}.bronze.brz_brands')
df_bronze.show()

In [0]:
df_silver_trim_regex = df_bronze.withColumn("brand_code", F.regexp_replace(F.col("brand_code"), r"([^A-Za-z0-9])", ""))
df_silver = df_silver_trim_regex.withColumn("brand_name", F.trim(F.col("brand_name")))
df_silver.show()
                                

In [0]:
df_silver.select("category_code").distinct().show()

In [0]:
anomalies = {
    "GROCERY": "GRCY",
    "BOOKS": "BKS",
    "TOYS": "TOY"
}

df_silver = df_silver.replace(to_replace=anomalies, subset=["category_code"])
df_silver.select("category_code").distinct().show()


In [0]:
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeschema", "true") \
    .saveAsTable(f"{catalog_name}.silver.slv_brands")

#Category

In [0]:
df_bronze = spark.table(f"{catalog_name}.bronze.brz_category")
df_bronze.show()

In [0]:
# Check for duplicates
df_duplicates = df_bronze.groupBy("category_code").count().filter(F.col("count") > 1)
display(df_duplicates)

In [0]:
df_silver = df_bronze.dropDuplicates(["category_code"])
display(df_silver)

In [0]:
df_silver = df_silver.withColumn("category_code", F.upper(F.col("category_code")))
display(df_silver)

In [0]:
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeschema", "true") \
    .saveAsTable(f"{catalog_name}.silver.slv_category")

#Products

In [0]:
df_bronze = spark.read.table(f"{catalog_name}.bronze.brz_products")

row_count, column_count = df_bronze.count(), len(df_bronze.columns)

print(f"Number of rows in bronze Products table: {row_count}")
print(f"Number of columns in bronze Products table: {column_count}")

In [0]:
display(df_bronze.limit(5))

##Check weight_grams (contains 'g')

In [0]:
df_bronze.select("weight_grams").show(5,truncate=False)

In [0]:
df_silver = df_bronze.withColumn(
    "weight_grams",
    F.regexp_replace(F.col("weight_grams"), "g", "").cast(IntegerType())
)
df_silver.select("weight_grams").show(5, truncate=False)

##Check length_cm (comma instead of dot)

In [0]:
df_silver.select("length_cm").show()

In [0]:
df_silver =  df_silver.withColumn(
    "length_cm",
    F.regexp_replace(F.col("length_cm"), ",", ".").cast(FloatType())
)

df_silver.select("length_cm").show()

## category_code and brand_code are in lower case. we need to make it all upper case

In [0]:
df_silver.select("category_code", "brand_code").show()

In [0]:
df_silver = df_silver.withColumn(
    "category_code",
    F.upper(F.col("category_code"))
).withColumn(
    "brand_code",
    F.upper(F.col("brand_code"))
)

##Spelling mistakes in material column

In [0]:
df_silver.select("material").distinct().show()

In [0]:
df_silver = df_silver.withColumn(
    "material",
    F.when(F.col("material") == "Coton", "Cotton")
    .when(F.col("material") == "Ruber", "Rubber")
    .when(F.col("material") == "Alumium", "Aluminium")
    .otherwise(F.col("material"))  
)

##Negative values in rating_count

In [0]:
df_silver.filter(F.col("rating_count") < 0).select("rating_count").show()

In [0]:
df_silver = df_silver.withColumn(
    "rating_count",
    F.when(F.col("rating_count").isNotNull(), F.abs(F.col("rating_count")))
    .otherwise(F.lit(0))
)

In [0]:
df_silver.select(
    "category_code",
    "brand_code",
    "weight_grams",
    "length_cm",
    "material",
    "rating_count"
).show()

In [0]:
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("mergeschema", "true") \
    .saveAsTable(f"{catalog_name}.silver.slv_products")

# Customers

In [0]:
df_bronze = spark.read.table(f"{catalog_name}.bronze.brz_customers")

row_count, column_count = df_bronze.count(), len(df_bronze.columns)
print(f"row_count: {row_count}, column_count: {column_count}")


##Handle NULL values in customer_id column

In [0]:
null_count = df_bronze.filter(F.col("customer_id").isNull()).count()
print(f"null_count: {null_count}")

df_bronze.filter(F.col("customer_id").isNull()).show(10)

### Drop Null Customer ID rows

In [0]:
df_silver = df_bronze.dropna(subset=["customer_id"])

row_count = df_silver.count()
print(f"row_count: {row_count}")


### Handle Nulls in Phone Column

In [0]:
null_count = df_silver.filter(F.col("phone").isNull()).count()
print(f"Nulls in Phone Column: {null_count}")

df_silver.where(F.col("phone").isNull()).show(10)

In [0]:
df_silver = df_silver.fillna("Not Available", subset=["phone"])

df_silver.where(F.col("phone").isNull()).show(10)

In [0]:
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalog_name}.silver.slv_customers")

#Calendar/Date


In [0]:
df_bronze_date = spark.read.table(f"{catalog_name}.bronze.brz_calendar")

row_count, column_count = df_bronze_date.count(), len(df_bronze_date.columns)
print(f"row_count: {row_count}, column_count: {column_count}")

df_bronze_date.show(10)

In [0]:
df_bronze_date.printSchema()

###Converting date column: String to Date

In [0]:
from pyspark.sql.functions import to_date

df_silver_date = df_bronze_date.withColumn("date", to_date(df_bronze_date["date"], "dd-MM-yyyy"))

df_silver_date.printSchema()

df_silver_date.show(10)

###Remove Duplicates

In [0]:
duplicates = df_silver_date.groupBy("date").count().filter("count > 1")

print(f"Duplicate Dates: {duplicates.count()}")

display(duplicates)


In [0]:
df_silver_date = df_silver_date.dropDuplicates(["date"])

row_count = df_silver_date.count()

print(f"row_count: {row_count}")

## day_name normalize casing

In [0]:
df_silver_date = df_silver_date.withColumn("day_name", F.initcap(F.col("day_name")))

df_silver_date.show(10)

### Convert negative `week_of_year` to positive

In [0]:
df_silver_date = df_silver_date.withColumn("week_of_year", F.abs(F.col("week_of_year")))

df_silver_date.show(10)

###Enhance `quarter` and `week_of_year` column

In [0]:
df_silver_date =  df_silver_date.withColumn("quarter", F.concat_ws("",F.concat(F.lit("Q"), F.col("quarter"), F.lit("-"), F.col("year"))))
df_silver_date = df_silver_date.withColumn("week_of_year", F.concat_ws("-",F.concat(F.lit("Week"), F.col("week_of_year"), F.lit("-"), F.col("year"))))

df_silver_date.show(10)


### Rename columns

In [0]:
df_silver_date = df_silver_date.withColumnRenamed("week_of_year", "week")

In [0]:
#write Data to Silver

df_silver_date.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.silver.slv_calendar")

In [0]:
# spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.silver.slv_calendar")