In [0]:
from pyspark.sql.functions import col, trim

#Get the latest record date already in Silver
# (If Silver is empty, we'll just take everything)
#Check if table exists and has data
table_exists = spark.catalog.tableExists("nhs_silver_workforce")

if table_exists:
    # Get max timestamp; if table is empty, max() returns None, so we use coalesce/fillna
    res = spark.sql("SELECT max(ingested_at) FROM nhs_silver_workforce").collect()[0][0]
    last_processed_ts = res if res is not None else '1900-01-01 00:00:00'
else:
    last_processed_ts = '1900-01-01 00:00:00'

print(f"Processing data ingested after: {last_processed_ts}")

#load data from bronze table
bronze_df = spark.table("nhs_bronze_workforce").filter(col("ingested_at") > last_processed_ts)

#apply transformations
silver_df = (bronze_df
             #trim any whotespace
            .withColumn("TYPE", trim(col("TYPE")))
            .withColumn("NHSE_REGION_CODE", trim(col("NHSE_REGION_CODE")))
            .withColumn("NHSE_REGION_NAME", trim(col("NHSE_REGION_NAME")))
            .withColumn("ICS_CODE", trim(col("ICS_CODE")))
            .withColumn("ICS_NAME", trim(col("ICS_NAME")))
            .withColumn("REPORTING_ORG_NAME", trim(col("REPORTING_ORG_NAME")))
            .withColumn("REPORTING_ORG_CODE", trim(col("REPORTING_ORG_CODE")))
            .withColumn("CLUSTER_GROUP", trim(col("CLUSTER_GROUP")))
            .withColumn("BENCHMARK_GROUP", trim(col("BENCHMARK_GROUP")))
            .withColumn("STAFF_GROUP_1_NAME", trim(col("STAFF_GROUP_1_NAME")))
             #cast to numeric types
             .withColumn("HC", col("HC").cast("integer"))
             .withColumn("FTE", col("FTE").cast("double"))
             
             #filter out rows where OrgCode is null
             .filter(col("REPORTING_ORG_CODE").isNotNull())
            
             )



# Merge these new rows into Silver
silver_df.createOrReplaceTempView("incremental_bronze")

spark.sql("""
  MERGE INTO nhs_silver_workforce AS target
  USING incremental_bronze AS source
  ON target.REPORTING_ORG_CODE = source.REPORTING_ORG_CODE 
     AND target.STAFF_GROUP_1_NAME = source.STAFF_GROUP_1_NAME
  WHEN MATCHED THEN
    UPDATE SET 
      target.HC = source.HC,
      target.FTE = source.FTE
  WHEN NOT MATCHED THEN
    INSERT *
""")

# Verify the Schema
silver_df.printSchema()
#display(bronze_df.head(10))
display(silver_df.head(10))