In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, FloatType

catalog_name='ecommerce'

In [0]:
df_bronze=spark.table(f'{catalog_name}.bronze.brz_brands')
df_bronze.show()

In [0]:
df_silver=df_bronze.withColumn('brand_name', F.trim(F.col('brand_name')))
df_silver.show(10)

In [0]:
df_silver=df_silver.withColumn('brand_code',F.regexp_replace(F.col('brand_code'), r'[^A-Za-z0-9]', ''))

df_silver.show(10)

In [0]:
df_silver.select('category_code').distinct().show()

In [0]:
anamolies={
    'GROCERY':'GRCY',
    'BOOKS':'BKS',
    'TOYS':'TOY'
}

df_silver = df_silver.replace(anamolies, subset="category_code")

df_silver.select('category_code').distinct().show(10)

In [0]:
df_silver.write.format('delta') \
    .mode('overwrite') \
    .option('mergeSchema','true') \
    .saveAsTable(f'{catalog_name}.silver.slv_brands')

Category

In [0]:
 df_bronze=spark.table(f'{catalog_name}.bronze.brz_category')
 df_bronze.show()

In [0]:
df_duplicates=df_bronze.groupBy('category_code').count().filter(F.col('count')>1)
df_duplicates.show()

In [0]:
df_silver=df_bronze.dropDuplicates(['category_code'])
df_silver.show()

In [0]:
df_silver=df_silver.withColumn('category_code',F.upper(F.col('category_code')))
display(df_silver)

In [0]:
df_silver.write.format('delta') \
    .mode('overwrite') \
    .option('mergeSchema','true') \
    .saveAsTable(f'{catalog_name}.silver.slv_category')

Products

In [0]:
df_bronze=spark.table(f'{catalog_name}.bronze.brz_products')
df_bronze.display()

In [0]:
df_bronze.count()

In [0]:
len(df_bronze.columns)

In [0]:
df_bronze.select('weight_grams').show(5,truncate=False)

In [0]:
df_silver=df_bronze.withColumn(
    'weight_grams',
    F.regexp_replace(F.col('weight_grams'), 'g','').cast(IntegerType())
)

df_silver.select('weight_grams').show(5)

In [0]:
df_silver.select('length_cm').show(5)

In [0]:
df_silver=df_silver.withColumn(
    'length_cm',
    F.regexp_replace(F.col('length_cm'), ',', '.').cast(FloatType())
)

df_silver.select('length_cm').show()

In [0]:
df_silver.select('brand_code','category_code').show(5)

In [0]:
df_silver=df_silver.withColumn(
    'category_code',
    F.upper(F.col('category_code'))    
).withColumn(
    'brand_code',
    F.upper(F.col('brand_code'))
)

df_silver.select('category_code','brand_code').show(5)

In [0]:
df_silver.select('material').distinct().show()

In [0]:
df_silver=df_silver.withColumn(
    'material',
    F.when(F.col('material')=='Coton','Cotton')
    .when(F.col('material')=='Alumium','Aluminum')
    .when(F.col('material')=='Ruber','Rubber')
    .otherwise(F.col('material'))
)

df_silver.select('material').distinct().show()

In [0]:
df_silver.filter(F.col('rating_count')<0).select('rating_count').show()

In [0]:
df_silver=df_silver.withColumn(
    'rating_count',
    F.when(F.col('rating_count').isNotNull(),
           F.abs(F.col('rating_count')))
      .otherwise(F.lit(0)) #if null replace with 0
        
)

In [0]:
df_silver.select(
    'weight_grams',
    'length_cm',
    'brand_code',
    'category_code',
    'material',
    'rating_count'
).show(10,truncate=10)

In [0]:
df_silver.write.format('delta') \
    .mode('overwrite') \
    .option('mergeSchema','true') \
    .saveAsTable(f'{catalog_name}.silver.slv_products')


Customers

In [0]:
df_bronze=spark.read.table(f'{catalog_name}.bronze.brz_customers')

row_count, column_count=df_bronze.count(), len(df_bronze.columns)

In [0]:
display(row_count,'.',column_count)

In [0]:
null_Count=df_bronze.filter(F.col('customer_id').isNull()).count()
null_Count

In [0]:
df_bronze.filter(F.col('customer_id').isNull()).show(3)

In [0]:
df_silver=df_bronze.dropna(subset=['customer_id'])

row_count=df_silver.count()
row_count

In [0]:
df_silver.filter(F.col('phone').isNull()).count()

In [0]:
df_silver=df_silver.fillna('Not Available',subset=['phone'])

df_silver.filter(F.col('phone').isNull()).count()

In [0]:
df_silver.write.format('delta') \
    .mode('overwrite') \
    .option('mergeSchema', 'true') \
    .saveAsTable(f'{catalog_name}.silver.slv_customers')

Calender

In [0]:
df_bronze = spark.read.table(f'{catalog_name}.bronze.brz_calendar')


In [0]:
row_count, column_count = df_bronze.count(), len(df_bronze.columns)

display(row_count, '\n')

display(column_count)

In [0]:
df_bronze.printSchema()

In [0]:
from pyspark.sql.functions import to_date

df_silver=df_bronze.withColumn('date',to_date(df_bronze['date'],'dd-MM-yyyy'))



In [0]:
df_silver.printSchema()

In [0]:
df_silver.show(5)

In [0]:
duplicates = df_silver.groupBy('date').count().filter('count>1')

duplicates.show()

In [0]:
df_silver=df_silver.dropDuplicates(['date'])

row_count=df_silver.count()

In [0]:
row_count

In [0]:
df_silver=df_silver.withColumn(
    'day_name',
    F.initcap(F.col('day_name'))
)

df_silver.show(5)

In [0]:
df_silver=df_silver.withColumn('week_of_year',
                               F.abs(F.col('week_of_year')))

df_silver.show(3)

In [0]:
df_silver=df_silver.withColumn('quarter', F.concat_ws('',F.concat(F.lit('Q'), F.col('quarter'), F.lit('-'), F.col('year'))))

df_silver=df_silver.withColumn('week_of_year', F.concat_ws('', F.concat(F.lit('Week'), F.col('week_of_year'), F.lit('-'), F.col('year'))))



In [0]:
df_silver=df_silver.withColumnRenamed('week_of_year','week')



In [0]:
df_silver.write.format('delta') \
    .mode('overwrite') \
    .option('mergeSchema','true') \
    .saveAsTable(f'{catalog_name}.silver.slv_calendar')