In [0]:
from pyspark.sql.functions import regexp_extract, to_timestamp, col

input_path = "/Volumes/odace/map/files/logement/"  # Directory containing CSV files
schema_location = "/Volumes/odace/map/schemas/_bronze_logement_schema/"
checkpoint_location = "/Volumes/odace/map/checkpoints/_bronze_logement_checkpoint/"

# Read streaming data from the input path using Auto Loader (cloudFiles)
df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("delimiter", ";")
    .option("header", True)
    .option("inferSchema", True)
    .option("cloudFiles.schemaLocation", schema_location)
    .load(input_path)
)

# Extract the timestamp string from the filename and convert to timestamp
df = df.withColumn(
    "ingestion_timestamp",
    to_timestamp(
        regexp_extract(col("_metadata.file_name"), r"_(\d{8}_\d{6})", 1),
        "yyyyMMdd_HHmmss"
    )
)

# Write the stream to a Delta table with schema evolution enabled
df.writeStream.format("delta") \
    .option("checkpointLocation", checkpoint_location) \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .table("odace.map.bronze_logement")

# Note: This cell starts a streaming job using the AvailableNow trigger. New files added to the input path will be ingested in batch mode.
# The schema and checkpoint are stored in the specified volume paths.
# The column 'ingestion_timestamp' records the timestamp extracted from the filename for each ingested row.
# Schema evolution is enabled so new columns in incoming data will be added to the Delta table automatically.