In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run /Workspace/Users/amank0639@gmail.com/fmcg_repo/consolidated_pipeline/1_Setup/utilities

In [0]:
dbutils.widgets.text("Data_Source","orders")
dbutils.widgets.text("Catalog","fmcg")

In [0]:
data_source=dbutils.widgets.get("Data_Source")
catalog=dbutils.widgets.get("Catalog")

In [0]:
base_path=f"s3://sportsbar-bucket/{data_source}"
landing_path=f"{base_path}/landing"
processed_path=f"{base_path}/processed"

print(base_path)
print(landing_path)
print(processed_path)

#define tables

bronze_table=f"{catalog}.{bronze_schema}.{data_source}"
silver_table=f"{catalog}.{silver_schema}.{data_source}"
gold_table=f"{catalog}.{gold_schema}.sb_fact_{data_source}"

In [0]:
df=(spark.read.format("csv").option("header",True).option("inferSchema",True)
    .load(f"{landing_path}/*.csv").
    select("*",F.current_timestamp().alias("read_timestamp"),
           "_metadata.file_name","_metadata.file_size")
    )

display(df)

In [0]:
df.write\
    .format("delta")\
    .mode("append")\
    .option("delta.enableChangeDataFeed","true")\
    .saveAsTable(bronze_table)
    

In [0]:
df.write\
    .format("delta")\
    .mode("overwrite")\
    .option("delta.enableChangeDataFeed","true")\
    .saveAsTable(f"{catalog}.{bronze_schema}.staging_{data_source}")


In [0]:
files=dbutils.fs.ls(landing_path)

for file in files:
    dbutils.fs.mv(
        file.path,
        f"{processed_path}/{file.name}",
        True
    )

### Silver Processing

In [0]:
bronze_df=spark.read.table(f"{catalog}.{bronze_schema}.staging_{data_source}")

#keep orders which have order quantity
bronze_df=bronze_df.filter("order_qty is not null")
display(bronze_df)

In [0]:
#check customer_id column
bronze_df=(
    bronze_df
    .withColumn("customer_id",
                F.when(F.col("customer_id").rlike("^[0-9]+$"),
                F.col("customer_id"))
                .otherwise("999999")
                .cast("string"))
           )
display(bronze_df)

In [0]:
bronze_df=bronze_df.withColumn("order_placement_date",F.regexp_replace(F.col("order_placement_date"),r"^[A-Za-z]+,\s*",""))

display(bronze_df)

In [0]:
bronze_df=bronze_df.withColumn("order_placement_date",
                F.coalesce(
                        F.try_to_date(F.col("order_placement_date"),"yyyy/MM/dd"),
                        F.try_to_date(F.col("order_placement_date"),"dd-MM-yyyy"),
                        F.try_to_date(F.col("order_placement_date"),"dd/MM/yyyy"),
                        F.try_to_date(F.col("order_placement_date"),"MMMM dd, yyyy")
                        )
                    )
display(bronze_df)

In [0]:
bronze_df=bronze_df.dropDuplicates(["order_id","order_placement_date","customer_id","product_id","order_qty"])
display(bronze_df)

In [0]:
products_df=spark.read.table("fmcg.silver.products")
display(products_df)

In [0]:
joined_df=(
    bronze_df
    .alias("t1")
    .join(products_df.alias("t2"),F.col("t1.product_id")==F.col("t2.product_id"),"inner")
    .select(F.col("t1.*"),F.col("t2.product_code"))
    )
display(joined_df)

In [0]:
if not(spark.catalog.tableExists(silver_table)):
    joined_df.write\
        .format("delta")\
        .mode("overwrite")\
        .option("delta.enableChangeDataFeed","true")\
        .option("mergeSchema","true")\
        .saveAsTable(silver_table)
else:
    deltaTable=DeltaTable.forName(spark,silver_table)

    deltaTable.alias("silver")\
        .merge(joined_df.alias("bronze"),"silver.order_id=bronze.order_id AND silver.order_placement_date=bronze.order_placement_date AND silver.customer_id=bronze.customer_id AND silver.product_code=bronze.product_code")\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
#writing data into staging table
joined_df.write\
    .format("delta")\
    .option("delta.enableChangeDataFeed","true")\
    .option("mergeSchema","true")\
    .mode("overwrite")\
    .saveAsTable(f"{catalog}.{silver_schema}.staging_{data_source}")

### Gold Processing

In [0]:
gold_df=(
    spark
    .read
    .table(f"{catalog}.{silver_schema}.staging_{data_source}")
    .select(F.col("order_id"),F.col("order_placement_date").alias("date"),
            F.col("customer_id").alias("customer_code"),F.col("product_code"),
            F.col("product_id"),F.col("order_qty").alias("sold_quantity"))
    )
display(gold_df)

In [0]:
if not(spark.catalog.tableExists(gold_table)):
    gold_df.write\
        .format("delta")\
        .mode("overwrite")\
        .option("delta.enableChangeDataFeed","true")\
        .option("mergeSchema","true")\
        .saveAsTable(gold_table)
else:
    deltaTable=DeltaTable.forName(spark,gold_table)

    deltaTable.alias("source")\
        .merge(gold_df.alias("gold"),
               "source.date=gold.date AND source.order_id=gold.order_id AND source.customer_code=gold.customer_code AND source.product_code=gold.product_code")\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

### Merging With Parent Company

In [0]:
df_child=spark.read.table(f"{catalog}.{silver_schema}.staging_{data_source}")

incremental_df=df_child\
        .select(F.trunc(F.col("order_placement_date"),"MM").alias("start_month"))\
        .distinct()

incremental_df.show()
incremental_df.createOrReplaceTempView("incremental_vw")


In [0]:
monthly_tbl=(
        spark.sql(f"""
              SELECT date,product_code,customer_code,sold_quantity
              From {gold_table} sbf
              inner join incremental_vw ivw
              on trunc(sbf.date,'MM')=ivw.start_month
              """)
        )
print(monthly_tbl.count())
display(monthly_tbl)


In [0]:
df_monthly_recalc=(
    monthly_tbl.withColumn("date",F.trunc("date","MM"))
    .groupBy("date","product_code","customer_code")
    .agg(F.sum("sold_quantity").alias("sold_quantity"))
)
print(df_monthly_recalc.count())
display(df_monthly_recalc)


In [0]:
deltaTable=DeltaTable.forName(spark,f"{catalog}.{gold_schema}.fact_{data_source}")

deltaTable.alias("parent")\
    .merge(df_monthly_recalc.alias("child"),"parent.date=child.date AND parent.product_code=child.product_code AND parent.customer_code=child.customer_code")\
    .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()


### Cleanup operations

In [0]:
%sql
DROP TABLE fmcg.bronze.staging_orders

In [0]:
%sql
DROP TABLE fmcg.silver.staging_orders