In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run /Workspace/Ecommerce/consolidate_pipeline/Setup/utilities


In [0]:
dbutils.widgets.text("catalog","fmcg","Catalog")
dbutils.widgets.text("data_source","products","Data source")

In [0]:
catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")
base_path = f's3://child-sportsbar-cd/{data_source}/*csv'
print(base_path)

In [0]:
df = (
    spark.read.format('csv')
         .option('header',True)
         .option('inferSchema',True)
         .load(base_path)
         .withColumn('read_timestamp',F.current_timestamp())
         .select("*","_metadata.file_name","_metadata.file_size")
)

display(df.limit(10))

df.printSchema()

## Bronze layer processing

In [0]:
df = (
    spark.read.format('csv')
         .option('header',True)
         .option('inferSchema',True)
         .load(base_path)
         .withColumn('read_timestamp',F.current_timestamp())
         .select("*","_metadata.file_name","_metadata.file_size")
)

display(df.limit(10))


In [0]:
df.write\
    .format('delta')\
    .option("delta.enableChangeDataFeed","true")\
    .mode('overwrite')\
    .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")


Silver Layer Processing


In [0]:
df_bronze = spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.{data_source}")

Drop Duplicates


In [0]:
df_duplicates = df_bronze.groupBy('product_id').count().filter(F.col('count')>1)
display(df_duplicates)

In [0]:
print('Rows before duplicates dropped: ', df_bronze.count())
df_silver = df_bronze.dropDuplicates(['product_id'])
print('Rows before duplicates dropped: ', df_bronze.count())

In [0]:
df_silver.select('category').distinct().show()

In [0]:
df_silver = df_silver.withColumn(
    'category',
    F.when(F.col('category').isNull(),None)
     .otherwise(F.initcap(F.col('category'))
))

In [0]:
df_silver.select('category').distinct().show()

In [0]:
df_silver = df_silver.withColumn(
    'category',
    F.regexp_replace(F.col('category'), '(?i)Protien', 'Protein')
).withColumn(
    'product_name',
    F.regexp_replace(F.col('product_name'), '(?i)Protien', 'Protein')
)

In [0]:
df_silver.select('category').distinct().show()

#### Standardizing Customer Attributes to Match Parent Company Data Model

In [0]:
df_silver = df_silver.withColumn(
    'division',
    F.when(F.col('category')=='Energy Bars', 'Nutrition Bars')
     .when(F.col('category')=='Protein Bars', 'Nutrition Bars')
     .when(F.col('category')=='Granola & Cereals', 'Breakfast Foods')
     .when(F.col('category')=='Recovery Dairy', 'Dairy & Recovery')
     .when(F.col('category')=='Healthy Snacks', 'Healthy Snacks')
     .when(F.col('category')==' Electrolyte Mix', 'Hydration & Electrolytes')
     .otherwise('Other')
)

display(df_silver.limit(10))

In [0]:
df_silver = df_silver.withColumn(
    "variant",
    F.regexp_extract(F.col("product_name"), r"\((.*?)\)", 1)
)

In [0]:
df_silver = (
    df_silver
    .withColumn(
       "product_code",
       F.substring(
            F.sha2(F.col("product_name").cast("string"), 256),1,12))
    .withColumn(
       "product_id",
        F.when(
            F.col("product_id").cast("string").rlike("^[0-9]+$"),
            F.col("product_id").cast("string")
        ).otherwise(F.lit(999999).cast("string"))
    )
    .withColumnRenamed("product_name", "product")
    )


In [0]:
df_silver = df_silver.select('product_code','category','division','variant','product','product_id','read_timestamp','file_name','file_size')
display(df_silver)

In [0]:
df_silver.write\
    .format('delta')\
    .option("delta.enableChangeDataFeed","true")\
    .option("mergeSchema","true")\
    .mode('overwrite')\
    .saveAsTable(f"{catalog}.{silver_schema}.{data_source}")

### Gold Layer processing

In [0]:
df_silver = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.{data_source}")

In [0]:
df_gold = df_silver.select("product_code","division","variant","product","category")
df_gold.show()

In [0]:
df_gold.write\
    .format('delta')\
    .option("delta.enableChangeDataFeed","true")\
    .mode('overwrite')\
    .saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")

In [0]:
delta_table = DeltaTable.forName(spark, "fmcg.gold.dim_products")
df_child_products = spark.sql(f"SELECT product_code, division, category, product, variant FROM fmcg.gold.sb_dim_products;")
df_child_products.show(5)

In [0]:
delta_table.alias("target").merge(
    source=df_child_products.alias("source"),
    condition="target.product_code = source.product_code"
).whenMatchedUpdate(
    set={
        "division": "source.division",
        "category": "source.category",
        "product": "source.product",
        "variant": "source.variant"
    }
).whenNotMatchedInsert(
    values={
        "product_code": "source.product_code",
        "division": "source.division",
        "category": "source.category",
        "product": "source.product",
        "variant": "source.variant"
    }
).execute()