In [0]:
from pyspark.sql.functions import current_timestamp, col, lit

# -------------------------------------------------
# 1. Source path
# -------------------------------------------------
source_path = "/databricks-datasets/retail-org/sales_orders"

# -------------------------------------------------
# 2. Read files as RAW TEXT (Unity Catalog safe)
# -------------------------------------------------
raw_df = (
    spark.read
         .text(source_path)
         .withColumn("file_path", col("_metadata.file_path"))
)

# -------------------------------------------------
# 3. Read metadata audit table
# -------------------------------------------------
audit_df = spark.read.table(
    "workspace.metadata.sales_orders_file_audit"
)

processed_files_df = (
    audit_df
    .filter(col("status") == "SUCCESS")
    .select("file_path")
    .distinct()
)

# -------------------------------------------------
# 4. Incremental filter
# -------------------------------------------------
new_files_df = raw_df.join(
    processed_files_df,
    on="file_path",
    how="left_anti"
)

# -------------------------------------------------
# 5. Add ingestion timestamp
# -------------------------------------------------
bronze_df = (
    new_files_df
    .withColumn("ingestion_timestamp", current_timestamp())
)

# -------------------------------------------------
# 6. Write to Bronze
# -------------------------------------------------
bronze_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("workspace.bronze.sales_orders_raw")

# -------------------------------------------------
# 7. Update audit table
# -------------------------------------------------
files_ingested_df = (
    bronze_df
    .select("file_path")
    .distinct()
    .withColumn("ingestion_time", current_timestamp())
    .withColumn("status", lit("SUCCESS"))
)

files_ingested_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("workspace.metadata.sales_orders_file_audit")

display(bronze_df)


In [0]:
%sql
SELECT COUNT(*) FROM workspace.bronze.sales_orders_raw;


In [0]:
%sql
SELECT * 
FROM workspace.metadata.sales_orders_file_audit;