In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

catalog_name='ecommerce'

### Brands Table

In [0]:
df_bronze = spark.table(f"{catalog_name}.bronze.brz_brands")
df_bronze.show(10)

In [0]:
df_silver=df_bronze.withColumn('brand_name', F.trim(F.col('brand_name')))
df_silver.show(20)

In [0]:
df_silver = df_silver.withColumn("brand_code", F.regexp_replace(F.col("brand_code"),r'[^A-Za-z0-0]',''))
df_silver.show(20)

In [0]:
df_silver.select("category_code").distinct().show()

In [0]:
#Anamolies
anamolies = {
    "GROCERY" : "GRCY",
    "BOOKS" : "BKS" , 
    "TOYS" : "TOY"
}

df_silver = df_silver.replace(anamolies, subset="category_code")
df_silver.select("category_code").distinct().show()

In [0]:
df_silver.write.format("delta")\
  .option("overwriteSchema","true")\
    .mode("overwrite")\
      .saveAsTable(f"{catalog_name}.silver.slv_brands")

### Category Table

In [0]:
df_bronze = spark.table(f"{catalog_name}.bronze.brz_category")
display(df_bronze.limit(10))

In [0]:
df_duplicate = df_bronze.groupBy("category_code").count().filter(F.col("count")>1).show()

In [0]:
df_silver = df_bronze.dropDuplicates(['category_code'])
df_silver = df_bronze.withColumn("category_code", F.upper(F.col("category_code")))
display(df_silver)

In [0]:
df_silver.write.format("delta")\
    .option("mergeSchema","true") \
        .mode("overwrite")\
            .saveAsTable(f"{catalog_name}.silver.slv_category")

### Product Table

In [0]:
df_bronze = spark.read.table(f"{catalog_name}.bronze.brz_products")


row_count = df_bronze.count()
column_count = len(df_bronze.columns)

print(f"RowCount : {row_count}")
print(f"columnCount : {column_count}")

display(df_bronze.limit(5))

In [0]:
df_bronze.select("weight_grams").show(5, truncate=False)

In [0]:
### # replace 'g' with ''

df_silver = df_bronze.withColumn("weight_grams", F.regexp_replace(F.col("weight_grams"), "g" ,"").cast(IntegerType()))
#df_silver.select("weight_grams").show(6, truncate=False)

df_silver= df_silver.withColumn("length_cm", F.regexp_replace(F.col("length_cm"),",",".").cast(FloatType()))

df_silver = df_silver.withColumn("category_code", F.upper(F.col("category_code"))) \
    .withColumn("brand_code", F.upper(F.col("brand_code")))

df_silver = df_silver.withColumn("material",
                                 F.when(F.col("material")=="Coton", "Cotton")
                                  .when(F.col("material")=="Alumium" , "Aluminium")
                                  .when(F.col("material")== "Ruber", "Rubber")
                                  .otherwise(F.col("material"))
)

df_silver = df_silver.withColumn(
    "rating_count",
    F.when(F.col("rating_count").isNotNull(), F.abs(F.col("rating_count")))
    .otherwise(F.lit(0))
)

display(df_silver)

In [0]:
#Check length_cm (comma instead of dot)

display(df_silver)

#df_silver= df_silver.withColumn("length_cm", F.regexp_replace(F.col("length_cm"),",",".").cast(FloatType()))
#df_silver.select("length_cm").show(5, truncate=False)
#display(df_silver1)

In [0]:
#category_code and brand_code are in lower case. we need to make it all upper case

df_silver = df_silver.withColumn("category_code", F.upper(F.col("category_code"))) \
    .withColumn("brand_code", F.upper(F.col("brand_code")))
#df_silver.select("category_code","brand_code").show(5, truncate=False)
display(df_silver)


In [0]:
#Spelling mistakes in material column

df_silver.select("material").distinct().show()


In [0]:
df_silver = df_silver.withColumn("material",
                                 F.when(F.col("material")=="Coton", "Cotton")
                                  .when(F.col("material")=="Alumium" , "Aluminium")
                                  .when(F.col("material")== "Ruber", "Rubber")
                                  .otherwise(F.col("material"))
)
#df_silver.select("material").distinct().show()
display(df_silver)


In [0]:
#Negative values in rating_count

df_bronze.filter(F.col("rating_count")<0).select("rating_count").show(5)

In [0]:
df_silver = df_silver.withColumn(
    "rating_count",
    F.when(F.col("rating_count").isNotNull(), F.abs(F.col("rating_count")))
    .otherwise(F.lit(0))
)

#df_silver.select("weight_grams", "length_cm", "category_code", "brand_code", "material", "rating_count").show(10)
df_silver.printSchema()

In [0]:
## Write raw data to the silver layer


df_silver.write.format("delta")\
    .mode("overwrite")\
        .option("mergeSchema","true")\
            .saveAsTable(f"{catalog_name}.silver.slv_products")

## ### Customers

In [0]:
df_bronze = spark.read.table(f"{catalog_name}.bronze.brz_customer")

row_count = df_bronze.count()
column_count = len(df_bronze.columns)

print(f"RowCount : {row_count}")
print(f"ColumnCount : {column_count}")
df_bronze.show(10)

In [0]:
#Handle NULL values in customer_id column

null_count = df_bronze.filter(F.col("customer_id").isNull()).count()
null_count
#df_bronze.filter(F.col("customer_id").isNull()).show(10)
display(df_bronze)

In [0]:
#Handle NULL values in phone column

null_count = df_bronze.filter(F.col("phone").isNull()).count()
print(f"null_count : {null_count}")

df_bronze.filter(F.col("phone").isNull()).show(3)

In [0]:
df_silver = df_bronze.fillna({"phone": 0})

#df_silver.filter(F.col("phone").isNull()).show(5)

display(df_silver)



In [0]:
df_silver.write.format("delta")\
    .mode("overwrite")\
        .option("mergeSchema","true")\
            .saveAsTable(f"{catalog_name}.silver.slv_customer")

## Date Table

In [0]:
df_bronze = spark.read.table(f"{catalog_name}.bronze.brz_date")
row_count = df_bronze.count()
column_count = len(df_bronze.columns)

print(f"RowCount : {row_count}")
print(f"ColumnCount : {column_count}")
display(df_bronze.limit(10))
df_bronze.printSchema()

In [0]:
#Remove Duplicates

duplicates = df_bronze.groupBy("date").count().filter("count > 1")
print(f"Duplicates: ",duplicates.count())
display(duplicates)

In [0]:
df_silver = df_bronze.dropDuplicates(["date"])\
    .withColumn("day_name", F.initcap(F.col("day_name")))\
        .withColumn("week_of_year", F.abs(F.col("week_of_year")))\
            .withColumn("week_of_year",F.concat_ws("",F.lit("Week"), F.col("week_of_year"), F.lit("-"), F.col("year")))\
                .withColumn("quarter", F.concat_ws("", F.lit("Q"), F.col("quarter"), F.lit("-"), F.col("year")))\
                    .withColumnRenamed("week_of_year", "week")
                
display(df_silver.limit(10))

In [0]:
df_silver.write.format("delta")\
    .mode("overwrite")\
        .option("mergeSchema", "true")\
            .saveAsTable(f"{catalog_name}.silver.slv_date")