In [0]:
from pyspark.sql import functions as F

# 1. Setup paths
csv_source_path = "/Volumes/main/db_project/raw_data/chunks/chunk2_incremental" 
checkpoint_path = "/Volumes/main/db_project/raw_data/_checkpoints/bronze_csv_dlt_alt"
schema_path = "/Volumes/main/db_project/raw_data/_checkpoints/bronze_csv_schema_alt"

# 2. Ingest via Auto Loader
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("cloudFiles.inferColumnTypes", "true")
  .option("cloudFiles.schemaLocation", schema_path)
  .load(csv_source_path)
  .select(
      "*", 
      F.current_timestamp().alias("processing_time"),
      # FIX: Use _metadata.file_path instead of input_file_name()
      F.col("_metadata.file_path").alias("source_file")
  )
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .option("mergeSchema", "true")
  .trigger(availableNow=True)
  .table("main.db_project.bronze_csv_data_dlt"))

In [0]:
%sql
COMMENT ON TABLE main.db_project.bronze_csv_data_dlt 
IS 'Bronze layer: Final CSV chunk ingested via Auto Loader (Alternative to DLT for Free Edition).';

ALTER TABLE main.db_project.bronze_csv_data_dlt 
ALTER COLUMN processing_time COMMENT 'The exact time this record was processed by the ingestion stream.';

ALTER TABLE main.db_project.bronze_csv_data_dlt 
ALTER COLUMN source_file COMMENT 'The original CSV file path retrieved via Unity Catalog metadata.';