In [0]:
# --- Configuration ---
# Replace these with your actual paths/table names
source_path = "s3://your-bucket/raw-data/"
checkpoint_path = "/mnt/telemetry/checkpoints/my_job_name/"
target_table = "catalog.schema.my_processed_table"

# 1. Read the data using Auto Loader
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")  # or csv, parquet, etc.
    .option("cloudFiles.schemaLocation", checkpoint_path) # Stores the inferred schema
    .option("cloudFiles.inferColumnTypes", "true")
    .load(source_path))

# 2. Add some 'snake_case' metadata for better tracking
from f.sql import functions as F
processed_df = df.withColumn("ingested_at", F.current_timestamp()) \
                 .withColumn("source_file", F.input_file_name())

# 3. Write to Delta Table and shut down after processing all new files
query = (processed_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)  # Key: Processes new data then stops
    .toTable(target_table))

query.awaitTermination()