**Reading Data from Bronze Lakehouse and writing it to Silver Lakehouse as one big file**

In [None]:
from pyspark.sql.functions import to_date

# 1. Read Bronze files from Lakehouse (CSV format in 'Files' section)
df_account = spark.read.format("csv").option("header", True).option("inferschema", True).load("Files/tbl_account/tbl_account.csv")
df_demand = spark.read.format("csv").option("header", True).option("inferschema", True).load("Files/tbl_demand/tbl_demand.csv")
df_product = spark.read.format("csv").option("header", True).option("inferschema", True).load("Files/tbl_product/tbl_product.csv")
df_social = spark.read.format("csv").option("header", True).option("inferschema", True).load("Files/tbl_social_media/tbl_social_media.csv")
df_web = spark.read.format("csv").option("header", True).option("inferschema", True).load("Files/tbl_web_activity/tbl_web_activity.csv")
df_assets = spark.read.format("csv").option("header", True).option("inferschema", True).load("Files/tbl_assets/tbl_assets.csv")  # Optional

# 2. # === RENAME conflicting columns before joins ===
df_account = df_account \
    .withColumn("account_created_date", to_date("created_datetime_date", "dd/MM/yyyy")) \
    .withColumnRenamed("email", "account_email") \
    .drop("created_datetime_date")

df_demand = df_demand \
    .withColumn("demand_created_date", to_date("created_datetime_date", "dd/MM/yyyy")) \
    .withColumnRenamed("product_name", "demand_product_name") \
    .drop("created_datetime_date")

df_social = df_social = df_social \
    .withColumn("social_date", to_date("interaction_datetime_date", "dd/MM/yyyy")) \
    .withColumnRenamed("interaction_datetime_time", "social_time") \
    .drop("interaction_datetime_date")

df_web = df_web \
    .withColumn("web_visit_date", to_date("visit_datetime_date", "dd/MM/yyyy")) \
    .withColumnRenamed("visit_datetime_time", "web_visit_time") \
    .withColumnRenamed("email", "web_email") \
    .drop("visit_datetime_date")

# 3. Join demand with product
df_demand_full = df_demand.join(df_product, on="product_id", how="left")

# 4. Join user account with demand
df_user_demand = df_account.join(df_demand_full, on="user_id", how="left")

# 5. Join social interactions
df_user_social = df_user_demand.join(df_social, on="user_id", how="left")

# 6. Join web activity
df_final = df_user_social.join(df_web, on="user_id", how="left")

# 7. Drop duplicates and unnecessary columns (optional)
df_final_clean = df_final.dropDuplicates(["user_id", "product_id", "social_date", "web_visit_date"])

# 8. Write unified fact table to Silver Lakehouse as ONE BIG FILE
df_final_clean.coalesce(1).write.format("delta").mode("overwrite").save("abfss://Project_WS@onelake.dfs.fabric.microsoft.com/Silver_Lakehouse.Lakehouse/Files/Fact_user_engagement")
