In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, FloatType, DoubleType

catalog_name = 'e-commerce'

#Data Cleaning

###Brands

In [0]:
df_bronze = spark.table(f"`{catalog_name}`.bronze.brz_brands")
df_bronze.show()
#in the output we can see the data can do with some cleaning, like the leading and trailing spaces from some of the columns can be removed

In [0]:
#removing the leading and trailing spaces 
df_silver = df_bronze.withColumn('brand_name', F.trim(F.col('brand_name')))
df_silver.show(10)

In [0]:
#we also notice brand_code has special characters in the name which needs to be handled and have only alphanumerics
df_silver = df_silver.withColumn('brand_code', F.regexp_replace(F.col('brand_code'), r'[^A-Za-z0-9]', ''))
df_silver.show(10)

In [0]:
#one of the common practices while exploring data is to look for distincts in columns 
#when looking at the distincts for category_code we some descrepancies where we have redundant cartegory but with different styles of name
df_silver.select('category_code').distinct().show()

In [0]:
#replacing the category_codes with the right codes
#anomalies dictionary

anomalies = {
    "GROCERY" : "GRCY",
    "BOOKS" : "BKS",
    "TOYS" : "TOY"
}

df_silver = df_silver.replace(anomalies, subset = 'category_code')
df_silver.select('category_code').distinct().show()

In [0]:
#data cleaning is done and the data can now be written to the silver layer

df_silver.write.format('delta')\
    .mode('overwrite')\
    .option('mergeSchema', 'true')\
    .saveAsTable(f"`{catalog_name}`.silver.slv_brands")

###Category

In [0]:
df_bronze = spark.read.table(f"`{catalog_name}`.bronze.brz_category")
df_bronze.show(10)

In [0]:
#finding the duplicates in the data 
df_duplicates = df_bronze.groupBy("category_code").count().filter(F.col("count")>1)
df_duplicates.show()

In [0]:
#dropping the duplicates
df_silver = df_bronze.dropDuplicates(['category_code'])
display(df_silver)

In [0]:
#converting the category codes to upper case, this will help with joins later
df_silver = df_silver.withColumn('category_code', F.upper(F.col('category_code')))
df_silver.show(10)

In [0]:
#write to delta table
df_silver.write.format('delta')\
    .mode('overwrite')\
    .option('mergeSchema', 'true')\
    .saveAsTable(f"`{catalog_name}`.silver.slv_category")

###Products

In [0]:
df_bronze = spark.read.table(f"`{catalog_name}`.bronze.brz_products")
display(df_bronze)

In [0]:
#getting row and column count

row_count, column_count = df_bronze.count(), len(df_bronze.columns)

print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

In [0]:
#checking the weight_grams column
df_bronze.select('weight_grams').show(5, truncate= False)

In [0]:
#need to remove the 'g' from the column and cast it to integer type

df_silver = df_bronze.withColumn('weight_grams', F.regexp_replace(F.col("weight_grams"), "g", "").cast(IntegerType()))
df_silver.select("weight_grams").show(5, truncate= False)

In [0]:
#checking the length column
df_silver.select("length_cm").show(5, truncate= False)

In [0]:
#removing the comma from the legth_cm column
df_silver = df_silver.withColumn("length_cm", F.regexp_replace(F.col("length_cm"), ",", ".").cast(FloatType()))
df_silver.select("length_cm").show(5)

In [0]:
#category_code and brand_code should be upper case 

df_silver = df_silver.withColumn('category_code', F.upper(F.col("category_code")))\
            .withColumn('brand_code', F.upper(F.col('brand_code')))

df_silver.select('category_code', 'brand_code').show(5)

In [0]:
#look into the material column
df_silver.select("material").distinct().show()

In [0]:
#there are some spelling mistakes in the material column

anomalies_dict = {
    "Coton" : "Cotton",
    "Alumium" : "Aluminum",
    "Ruber" : "Rubber"
}

df_silver = df_silver.replace(anomalies_dict, subset= 'material')
df_silver.select("material").distinct().show()


#Another way of doing the same thing is 
# df_silver = df_silver.withColumn('material',
#                                  F.when(F.col('material') == "Coton", "Cotton")
#                                   .when(F.col('material') == "Alumium", "Aluminum")
#                                   .when(F.col("material") == "Ruber", "Rubber")
#                                   .otherwise(F.col("material"))
#                                 )

In [0]:
#Cheking for anomalies in the rating column
df_silver.filter(F.col("rating_count") < 0).select("rating_count").show()

In [0]:
#handling the negative false values in the rating_count column 
df_silver = df_silver.withColumn("rating_count",
                                 F.when(F.col("rating_count").isNotNull(), F.abs(F.col("rating_count")))
                                  .otherwise(F.lit(0)) #if null replace it with 0
                                )
df_silver.filter(F.col("rating_count") < 0).select("rating_count").show()

In [0]:
#writing to delta table
df_silver.write.format("delta")\
               .mode("overwrite")\
               .option("mergeSchema", True)\
               .saveAsTable(f"`{catalog_name}`.silver.slv_products")

###Customers

In [0]:
#loading data
df_bronze = spark.read.table(f"`{catalog_name}`.bronze.brz_customers")

#row and column count
row_count, column_counts = df_bronze.count(), len(df_bronze.columns)

# print results
print(f"Row count: {row_count}")
print(f"Column count: {column_counts}")

In [0]:
#there are nulls in the customer_id column
null_count = df_bronze.filter(F.col("customer_id").isNull()).count()
null_count

In [0]:
#displaying some of the examples
df_bronze.filter(F.col("customer_id").isNull()).show(3)

In [0]:
df_silver = df_bronze.dropna(subset="customer_id")
row_count = df_silver.count()
print(f"The count of rows after dropping the Null values: {row_count}")

In [0]:
#checking for same nulls in the phone column
null_count = df_silver.filter(F.col('phone').isNull()).count()
null_count

In [0]:
#replacing the nulls with the string "Unavailable"
df_silver = df_silver.fillna("Unavailable", subset= "phone")

#sanity check to see if any more nulls in the same column are available
null_count = df_silver.filter(F.col("phone").isNull()).count()
null_count

In [0]:
#writing the data to delta table 
df_silver.write.format("delta")\
         .mode("overwrite")\
         .option("mergeSchema", True)\
         .saveAsTable(f"`{catalog_name}`.silver.slv_customers")

###Calendar/Date

In [0]:
#import data
df_bronze = spark.read.table(f"`{catalog_name}`.bronze.brz_calendar")

row_count, column_count = df_bronze.count(), len(df_bronze.columns)

print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

In [0]:
#type for date is string which should be changed
df_bronze.printSchema()

In [0]:
from pyspark.sql.functions import to_date
df_silver = df_bronze.withColumn("date", to_date(df_bronze["date"], "dd-MM-yyyy"))

In [0]:
print(df_silver.printSchema())
df_silver.show(5)

In [0]:
#duplicates in the data
df_duplicates = df_silver.groupBy('date').count().filter("count > 1")
print(f"Total duplicates: {df_duplicates}")
display(df_duplicates)

In [0]:
#remove duplicates
df_silver = df_bronze.dropDuplicates(["date"])

#get row count
row_count = df_silver.count()
print(f"Rows after removing the duplucates: {row_count}")

In [0]:
#There is no consistency for the names in the day name column need to fix that 
df_silver = df_silver.withColumn("day_name", F.initcap(F.col("day_name")))
df_silver.select("day_name").distinct().show()

In [0]:
#the wewk_of_year column is having -ve values need to fix that
df_silver = df_silver.withColumn("week_of_year", F.abs(F.col("week_of_year")))
df_silver.show()

In [0]:
#Refining the the quarter and week_of_year column
df_silver = df_silver.withColumn("quarter", F.concat_ws("", F.concat(F.lit("Q"), F.col("quarter"), F.lit("-"), F.col("year"))))

df_silver = df_silver.withColumn("week_of_year", F.concat_ws("", F.concat(F.lit("Week"), F.col("week_of_year"), F.lit("-"), F.col("year"))))

df_silver.show(3)

In [0]:
#rename the column 
df_silver = df_silver.withColumnRenamed("week_of_year", "week")
df_silver.show(3)

In [0]:
#write data to delta table
df_silver.write.format("delta")\
         .mode("overwrite")\
         .option("mergeSchema", True)\
         .saveAsTable(f"`{catalog_name}`.silver.slv_calendar")