In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
catlogName = 'ecommerce'

In [0]:
df_brands = spark.table(f"{catlogName}.bronze.brz_brands")

In [0]:
display(df_brands.limit(5))

In [0]:
df_brands = df_brands.withColumn('brand_name', F.trim(F.col('brand_name')))
display(df_brands.limit(10))

In [0]:
df_brands = df_brands.withColumn('brand_code', F.regexp_replace(F.col('brand_code'), r'[^a-zA-Z0-9]',''))
display(df_brands.limit(10))

In [0]:
display(df_brands.select('category_code').distinct())

In [0]:
# anomalies directory

anomalies = {
    "BOOKS": "BKS",
    "GROCERY": "GRCY",
    "TOYS": "TOY"
}

df_brands = df_brands.replace(anomalies, subset=['category_code'])
display(df_brands.select('category_code').distinct())

In [0]:
df_brands.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catlogName}.silver.slv_brands")

Category

In [0]:
df_category = spark.table(f"{catlogName}.bronze.brz_category")
display(df_category.limit(10))

In [0]:
display(df_category.select('category_code').distinct())

In [0]:
cat_dups = df_category.groupBy('category_code').count().filter(F.col('count') > 1)
display(cat_dups)

In [0]:
df_category = df_category.dropDuplicates(['category_code'])
display(df_category.groupBy('category_code').count().filter(F.col('count') > 1))

In [0]:
df_category = df_category.withColumn("category_code", F.upper(F.col('category_code')))
display(df_category.limit(5))

In [0]:
df_category.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catlogName}.silver.slv_category")

Products

In [0]:
df_products = spark.read.table(f"{catlogName}.bronze.brz_products")
display(df_products.limit(10))

In [0]:
# Get row and column count
row_count, column_count = df_products.count(), len(df_products.columns)

# Print the results
print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

In [0]:
df_products.select('weight_grams').limit(5).show()

In [0]:
# replace 'g' with ''
df_products = df_products.withColumn(
    "weight_grams",
    F.regexp_replace(F.col("weight_grams"), "g", "").cast(IntegerType())
)
df_products.select("weight_grams").show(5, truncate=False)

In [0]:
df_products.select('length_cm').show(5)

In [0]:
df_products = df_products.withColumn(
    "length_cm",
    F.regexp_replace(F.col("length_cm"), ",", ".").cast(FloatType())
)
df_products.select("length_cm").show(3)

In [0]:
df_products.select("category_code", "brand_code").show(2)

In [0]:
df_products = df_products.withColumn(
    "category_code",
    F.upper(F.col("category_code"))
).withColumn(
    "brand_code",
    F.upper(F.col("brand_code"))
)
df_products.select("category_code", "brand_code").show(2)

In [0]:
df_products.select("material").distinct().show()

In [0]:
# Fix spelling mistakes
df_products = df_products.withColumn(
    "material",
    F.when(F.col("material") == "Coton", "Cotton")
     .when(F.col("material") == "Alumium", "Aluminum")
     .when(F.col("material") == "Ruber", "Rubber")
     .otherwise(F.col("material"))
)
df_products.select("material").distinct().show()  

In [0]:
df_products.filter(F.col('rating_count')<0).select("rating_count").show(3)

In [0]:
# Convert negative rating_count to positive
df_products = df_products.withColumn(
    "rating_count",
    F.when(F.col("rating_count").isNotNull(), F.abs(F.col("rating_count")))
     .otherwise(F.lit(0))  # if null, replace with 0
)

In [0]:
display(df_products.limit(5))

In [0]:
# Write raw data to the silver layer (catalog: ecommerce, schema: silver, table: slv_dim_products)
df_products.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catlogName}.silver.slv_products")

Customers

In [0]:
# Read the raw data from the bronze table (ecommerce.bronze.brz_calendar)
df_cus = spark.read.table(f"{catlogName}.bronze.brz_customers")

# Get row and column count
row_count, column_count = df_cus.count(), len(df_cus.columns)

# Print the results
print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

df_cus.show(10)

In [0]:
null_count = df_cus.filter(F.col("customer_id").isNull()).count()
null_count

In [0]:
df_cus.filter(F.col('customer_id').isNull()).show()

In [0]:
df_cus = df_cus.filter(F.col('customer_id').isNotNull())
df_cus.show()


In [0]:
null_count = df_cus.filter(F.col("customer_id").isNull()).count()
null_count

In [0]:
null_count = df_cus.filter(F.col("phone").isNull()).count()
print(f"Number of nulls in phone: {null_count}") 

In [0]:
df_cus.filter(F.col('phone').isNull()).show()

In [0]:
### Fill null values with 'Not Available'
df_cus = df_cus.fillna("Not Available", subset=["phone"])

# sanity check (If any nulls still exist)
df_cus.filter(F.col("phone").isNull()).show()

In [0]:
# Write raw data to the silver layer (catalog: ecommerce, schema: silver, table: slv_customers)
df_cus.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catlogName}.silver.slv_customers")

Calander / Date

In [0]:
# Read the raw data from the bronze table (ecommerce.bronze.brz_calendar)
df_calander = spark.read.table(f"{catlogName}.bronze.brz_calander")

# Get row and column count
row_count, column_count = df_calander.count(), len(df_calander.columns)

# Print the results
print(f"Row count: {row_count}")
print(f"Column count: {column_count}")

df_calander.show(3)

In [0]:
df_calander.printSchema()

In [0]:
from pyspark.sql.functions import to_date


# Convert the string column to a date type
df_calander = df_calander.withColumn("date", to_date(df_calander["date"], "dd-MM-yyyy"))

In [0]:
print(df_calander.printSchema())

df_calander.show(5)

In [0]:
# Find duplicate rows in the DataFrame
duplicates = df_calander.groupBy('date').count().filter("count > 1")

# Show the duplicate rows
print("Total duplicated Rows: ", duplicates.count())
display(duplicates)

In [0]:
# Remove duplicate rows
df_calander = df_calander.dropDuplicates(['date'])

# Get row count
row_count = df_calander.count()

print("Rows After removing Duplicates: ", row_count)

In [0]:
# Capitalize first letter of each word in day_name
df_calander = df_calander.withColumn("day_name", F.initcap(F.col("day_name")))

df_calander.show(5)

In [0]:
df_calander = df_calander.withColumn("week_of_year", F.abs(F.col("week_of_year")))  # Convert negative to positive

df_calander.show(3)

In [0]:
df_calander = df_calander.withColumn("quarter", F.concat_ws("", F.concat(F.lit("Q"), F.col("quarter"), F.lit("-"), F.col("year"))))

df_calander = df_calander.withColumn("week_of_year", F.concat_ws("-", F.concat(F.lit("Week"), F.col("week_of_year"), F.lit("-"), F.col("year"))))

df_calander.show(3)

In [0]:
# Rename a column
df_calander = df_calander.withColumnRenamed("week_of_year", "week")
df_calander.show(3)

In [0]:
df_calander.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catlogName}.silver.slv_calendar")