## **Fact Orders**

**Data Reading**

In [0]:
df = spark.sql("select * from databricks_etl.silver.orders")
df.display()

order_id,customer_id,product_id,order_date,quantity,total_amount,Year
O00001,C00710,P0159,2023-03-22T00:00:00.000Z,3,2022.87,2023
O00002,C00954,P0036,2023-06-30T00:00:00.000Z,2,3560.74,2023
O00003,C01578,P0427,2023-11-06T00:00:00.000Z,3,5903.52,2023
O00004,C00962,P0332,2024-02-27T00:00:00.000Z,3,4107.99,2024
O00005,C00156,P0038,2024-10-13T00:00:00.000Z,5,5784.95,2024
O00006,C00521,P0174,2023-05-17T00:00:00.000Z,5,407.75,2023
O00007,C00982,P0352,2024-01-18T00:00:00.000Z,4,4907.64,2024
O00008,C00976,P0172,2023-01-10T00:00:00.000Z,4,7037.88,2023
O00009,C01001,P0238,2023-04-20T00:00:00.000Z,3,4076.97,2023
O00010,C00702,P0258,2023-07-07T00:00:00.000Z,4,5695.64,2023


In [0]:
df_dimcus = spark.sql("select DimCustomerKey, customer_id as dim_customer_id from databricks_etl.gold.dimcustomers")

df_dimpro = spark.sql("select product_id as DimProductKey, product_id as dim_product_id from databricks_etl.gold.dimproducts")

**Fact Dataframe**

In [0]:
df_fact = df.join(df_dimcus, df['customer_id'] == df_dimcus['dim_customer_id'],how='left').join(df_dimpro, df['product_id'] == df_dimpro['dim_product_id'],how='left')

df_fact_new = df_fact.drop('dim_customer_id','dim_product_id','customer_id','product_id')

In [0]:
df_fact_new.display()

order_id,order_date,quantity,total_amount,Year,DimCustomerKey,DimProductKey
O00001,2023-03-22T00:00:00.000Z,3,2022.87,2023,710,P0159
O00002,2023-06-30T00:00:00.000Z,2,3560.74,2023,954,P0036
O00003,2023-11-06T00:00:00.000Z,3,5903.52,2023,1578,P0427
O00004,2024-02-27T00:00:00.000Z,3,4107.99,2024,962,P0332
O00005,2024-10-13T00:00:00.000Z,5,5784.95,2024,156,P0038
O00006,2023-05-17T00:00:00.000Z,5,407.75,2023,521,P0174
O00007,2024-01-18T00:00:00.000Z,4,4907.64,2024,982,P0352
O00008,2023-01-10T00:00:00.000Z,4,7037.88,2023,976,P0172
O00009,2023-04-20T00:00:00.000Z,3,4076.97,2023,1001,P0238
O00010,2023-07-07T00:00:00.000Z,4,5695.64,2023,702,P0258


**Upsert on Fact Table**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricks_etl.gold.FactOrders"):
    
    dlt_obj = DeltaTable.forName(spark, "databricks_etl.gold.FactOrders")

    dlt_obj.alias("trg").merge(df_fact_new.alias("src"), "trg.order_id = src.order_id AND trg.DimCustomerKey = src.DimCustomerKey AND trg.DimProductKey = src.DimProductKey")\
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()

else:
    df_fact_new.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("databricks_etl.gold.FactOrders")

In [0]:
%sql
select * from databricks_etl.gold.FactOrders

order_id,order_date,quantity,total_amount,Year,DimCustomerKey,DimProductKey
O00001,2023-03-22T00:00:00.000Z,3,2022.87,2023,710,P0159
O00002,2023-06-30T00:00:00.000Z,2,3560.74,2023,954,P0036
O00003,2023-11-06T00:00:00.000Z,3,5903.52,2023,1578,P0427
O00004,2024-02-27T00:00:00.000Z,3,4107.99,2024,962,P0332
O00005,2024-10-13T00:00:00.000Z,5,5784.95,2024,156,P0038
O00006,2023-05-17T00:00:00.000Z,5,407.75,2023,521,P0174
O00007,2024-01-18T00:00:00.000Z,4,4907.64,2024,982,P0352
O00008,2023-01-10T00:00:00.000Z,4,7037.88,2023,976,P0172
O00009,2023-04-20T00:00:00.000Z,3,4076.97,2023,1001,P0238
O00010,2023-07-07T00:00:00.000Z,4,5695.64,2023,702,P0258
