In [1]:
# Define paths for Bronze data

initial_path = "Files/Bronze/Raw_Data/Sales_Data_Raw"
incremental_path = "Files/Bronze/Incremental_Data/Incremental_Sales_Data"
silver_table_name = "Merged_Data_Silver"

StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 3, Finished, Available, Finished)

In [2]:
# Read raw and incremental files from lakehouse

initial_df = spark.read.format("csv").option("header", True).load(initial_path)
incremental_df = spark.read.format("csv").option("header", True).load(incremental_path)

StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 4, Finished, Available, Finished)

In [3]:
# Cast LAST_UPDATED to timestamp

from pyspark.sql.functions import col

initial_df = initial_df.withColumn("LAST_UPDATED_DATE", col("LAST_UPDATED_DATE").cast("timestamp"))
incremental_df = incremental_df.withColumn("LAST_UPDATED_DATE", col("LAST_UPDATED_DATE").cast("timestamp"))


StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 5, Finished, Available, Finished)

In [4]:
# Combine both raw and incremental files

bronze_df = initial_df.unionByName(incremental_df)

StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 6, Finished, Available, Finished)

In [5]:
# Check if Silver table exists

if spark.catalog.tableExists(silver_table_name):
    silver_df = spark.read.table(silver_table_name)
    
    # Get max LAST_UPDATED timestamp
    from pyspark.sql.functions import max
    max_timestamp = silver_df.select(max("LAST_UPDATED_DATE")).collect()[0][0]
    
    # Filter only new records from Bronze
    new_records_df = bronze_df.filter(col("LAST_UPDATED_DATE") > max_timestamp)
    
    # Combine new records with existing Silver
    combined_df = silver_df.unionByName(new_records_df)
else:
    # First time run: use all bronze data
    combined_df = bronze_df

StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 7, Finished, Available, Finished)

In [11]:
# Deduplicate based on SALES_ID

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("SALES_ID").orderBy(col("LAST_UPDATED_DATE").desc())

dedup_df = combined_df.withColumn("rn", row_number().over(window_spec)).filter("rn = 1").drop("rn")


StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 13, Finished, Available, Finished)

In [12]:
# Write to Silver table in Lakehouse

dedup_df.write.mode("overwrite").format("delta").saveAsTable(silver_table_name)


StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 14, Finished, Available, Finished)

In [13]:
display(spark.read.table(silver_table_name))

StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e54c0f47-a07a-4358-aa33-9b8997499135)

In [17]:
silver_df = spark.read.table("Merged_Data_Silver")


StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 20, Finished, Available, Finished)

In [18]:
from pyspark.sql.functions import to_date

dedup_df = dedup_df.withColumn("SALES_ID", dedup_df["SALES_ID"].cast("int")) \
       .withColumn("SALES_DATE", to_date("SALES_DATE", "yyyy-MM-dd")) \
       .withColumn("PRODUCT_NAME", dedup_df["PRODUCT_NAME"].cast("string")) \
       .withColumn("CATEGORY", dedup_df["CATEGORY"].cast("string")) \
       .withColumn("SUB_CATEGORY", dedup_df["SUB_CATEGORY"].cast("string")) \
       .withColumn("CUSTOMER_NAME", dedup_df["CUSTOMER_NAME"].cast("string")) \
       .withColumn("REGION", dedup_df["REGION"].cast("string")) \
       .withColumn("CUSTOMER_TYPE", dedup_df["CUSTOMER_TYPE"].cast("string")) \
       .withColumn("QUANTITY_SOLD", dedup_df["QUANTITY_SOLD"].cast("int")) \
       .withColumn("UNIT_PRICE", dedup_df["UNIT_PRICE"].cast("double")) \
       .withColumn("DISCOUNT", dedup_df["DISCOUNT"].cast("double")) \
       .withColumn("TOTAL_AMOUNT", dedup_df["TOTAL_AMOUNT"].cast("double"))

dedup_df.printSchema()


StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 21, Finished, Available, Finished)

root
 |-- SALES_ID: integer (nullable = true)
 |-- SALES_DATE: date (nullable = true)
 |-- PRODUCT_NAME: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- SUB_CATEGORY: string (nullable = true)
 |-- CUSTOMER_NAME: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- CUSTOMER_TYPE: string (nullable = true)
 |-- QUANTITY_SOLD: integer (nullable = true)
 |-- UNIT_PRICE: double (nullable = true)
 |-- DISCOUNT: double (nullable = true)
 |-- TOTAL_AMOUNT: double (nullable = true)
 |-- LAST_UPDATED_DATE: timestamp (nullable = true)



In [19]:
silver_df.write.mode("overwrite").format("delta").saveAsTable(silver_table_name)


StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 22, Finished, Available, Finished)

In [20]:
df = spark.read.table(silver_table_name)
display(df)

StatementMeta(, 22a7ea88-54ec-4ab5-aeba-b5c65d925a05, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5da21260-8db4-4ee4-a0ac-57c521db496d)