In [0]:
import json

flights_data_params = json.loads(
    dbutils.widgets.get("input")
)

In [0]:
from pyspark.sql.functions import current_timestamp, input_file_name, col, when, lit

# Define the generic Auto Loader function
def ingest_flights_data(table_params):
    """
    Creates an Auto Loader stream for a given table configuration
    """
    print(f"Ingesting: {table_params['source_name']}...")
    # Auto Loader configuration
    df = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", f"{table_params['checkpoint']}_schema")
        .option("cloudFiles.schemaEvolutionMode","rescue")
        .option("header", "true")
        .load(table_params['raw_path'])
        .withColumn("_bronze_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
    )
    # Write to target Delta table
    return (df.writeStream
        .format("delta")
        .option("checkpointLocation", table_params['checkpoint'])
        .outputMode("append")
        .trigger(availableNow=True) # or trigger(once=True)
        .toTable(table_params['target_table']))

In [0]:
ingest_flights_data(flights_data_params)