In [0]:
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

def load_config(path):
    with open(path, "r") as f:
        return yaml.safe_load(f)

def run_autoloader(conf):
    spark = SparkSession.builder.getOrCreate()

    # build read stream
    df = (
        spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", conf["format"])
          .option("cloudFiles.inferColumnTypes", "true")
          .option("header", conf["schema"]["hasHeader"])
          .load(conf["source_path"])
    )

    # apply basic transformations (add ingestion date)
    df2 = df.withColumn("ingested_at", current_timestamp())

    # write to Delta in chosen layer folder
    target = f"{conf['target_base']}/{conf['medallion']}/{conf['subject_area']}"
    query = (
        df2.writeStream
           .format("delta")
           .outputMode("append" if conf["mode"]=="append" else "overwrite")
           .option("checkpointLocation", f"{target}/_checkpoints/{conf['pipeline_name']}")
           .start(target)
    )
    query.awaitTermination()

if __name__ == "__main__":
    import sys
    conf_path = sys.argv[1]  # e.g. "/dbfs/configs/example_pipeline.yaml"
    conf = load_config(conf_path)
    run_autoloader(conf)
