In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

silver_order_df=spark.read.format("delta").load('abfss://silver@telcostoragelayer.dfs.core.windows.net/payment/')
# Get latest customer dimension records
dim_customer_df=spark.read.table("telco_catalog.gold.dim_customer")
customer_window = Window.partitionBy("customer_id").orderBy(col("timestamp").desc())
latest_dim_customer = dim_customer_df.withColumn("rn", row_number().over(customer_window)) \
                    .filter(col("rn") == 1) \
                        .drop("rn")
# Get latest address dimension records
dim_address_df=spark.read.table("telco_catalog.gold.dim_address")
address_window = Window.partitionBy("customer_id").orderBy(col("timestamp").desc())
latest_dim_address = dim_address_df.withColumn("rn", row_number().over(address_window)) \
                                   .filter(col("rn") == 1) \
                                   .drop("rn")



In [0]:
# Join with latest dim_customer
fact_df = silver_order_df.join(
    latest_dim_customer.select("customer_id", "dim_customer_key"),
    on="customer_id",
    how="left"
)

# Join with latest dim_address
fact_df = fact_df.join(
    latest_dim_address.select("customer_id", "dim_address_key"),
    on="customer_id",
    how="left"
)
display(fact_df)


In [0]:
fact_order_df = fact_df.select(
    "payment_id",
    "amount",
    "payment_date",
    "currency",
    "payment_method",
    "transaction_id",
    "payment_status",
    "invoice_id",
    "dim_customer_key",
    "dim_address_key"
)
fact_order_df.display()

In [0]:
from delta.tables import DeltaTable

# Set path or table name
fact_table_path = "telco_catalog.gold.fact_payment"

if spark.catalog.tableExists(fact_table_path):
     delta_table=DeltaTable.forPath(spark,'abfss://gold@telcostoragelayer.dfs.core.windows.net/fact_payment')
     delta_table.alias("target") \
        .merge(
            fact_order_df.alias("source"),
            "target.payment_id = source.payment_id"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    fact_order_df.write.format('delta').mode('overwrite').option('path','abfss://gold@telcostoragelayer.dfs.core.windows.net/fact_payment').saveAsTable('telco_catalog.gold.fact_payment')


In [0]:
display(spark.read.table("telco_catalog.gold.fact_payment"))