In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS workspace.bronze;



In [0]:
%sql
USE workspace.bronze;


In [0]:
# DEV version - ready for promotion


In [0]:
from pyspark.sql.functions import current_timestamp, col, lit

# 1. Read raw files
raw_df = (
    spark.read
         .option("header", "true")
         .option("inferSchema", "true")
         .csv("/databricks-datasets/samples/population-vs-price")
         .withColumn("file_path", col("_metadata.file_path"))
)

# 2. Read metadata table
audit_df = spark.read.table("workspace.metadata.file_ingestion_audit")

# 3. Collect already processed files (small control table)
processed_files = (
    audit_df
    .filter(col("status") == "SUCCESS")
    .select("file_path")
    .distinct()
)

# 4. Filter new files (NO JOIN USING)
new_files_df = raw_df.filter(
    ~col("file_path").isin([row.file_path for row in processed_files.collect()])
)
display(new_files_df)

# 5. Bronze transformation
bronze_df = (
    new_files_df
    .withColumnRenamed("2014 rank", "rank_2014")
    .withColumnRenamed("State Code", "state_code")
    .withColumnRenamed("2014 Population estimate", "population_estimate_2014")
    .withColumnRenamed("2015 median sales price", "median_sales_price_2015")
    .withColumn("ingestion_timestamp", current_timestamp())
)

# 6. Write Bronze incrementally
bronze_df.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("workspace.bronze.population_raw")

# 7. Update metadata table
files_ingested_df = (
    bronze_df
    .select(col("file_path"))
    .distinct()
    .withColumn("ingestion_time", current_timestamp())
    .withColumn("status", lit("SUCCESS"))
)

bronze_df.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .saveAsTable("workspace.bronze.population_raw")


display(bronze_df)
