# Databricks Auto Loader

- **Databricks needs permission to read our files from ADLS. so, we have to create an External Location in the Databricks Catalog.**
- **Unity Catalog enables centralized governance, fine-grained access control, auditing, and secure access across workspaces. It is mandatory for enterprise-grade security, compliance.**

In [0]:
%sql
SHOW EXTERNAL LOCATIONS

%md
**Databricks Auto Loader incrementally ingests new files from cloud storage with exactly-once guarantees at the file level using checkpointing, supports schema evolution, and is typically used for raw ingestion in a medallion architecture.**


- **Auto Loader detects only new files added to a directory and processes them exactly once.**
- **It achieves idempotency at the file level using checkpointing and file metadata.**
- **Auto Loader tracks files, not rows.**
- **It supports schema inference and schema evolution, allowing new columns to be added safely.**
- **Auto Loader does not perform upserts; it only appends data. Upserts are implemented later using Delta Lake MERGE logic in silver layer**

- **The job must run (continuous or triggered); Auto Loader itself does not run automatically.**
- **Auto Loader is NOT allowed to read from ADLS directly on shared clusters when Unity Catalog permission enforcement is enabled. that means we must register the external location for it in unity catalog.**

In [0]:
checkpoint_path = "abfss://nycsync@hisadls.dfs.core.windows.net/checkpoint"
file_path = "abfss://landing@hisadls.dfs.core.windows.net/Nyc_taxi"


df = spark.readStream.format("cloudFiles")\
  .option("cloudFiles.format", "parquet")\
    .option("cloudFiles.schemaLocation", checkpoint_path)\
      .option("cloudFiles.inferSchema", True)\
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")\
          .load(file_path)

In [0]:
from pyspark.sql.functions import *

brf = df.withColumn("input_filename", col("_metadata.file_path"))\
    .withColumn("ingested_at", current_timestamp())
    

In [0]:
brf.writeStream.format("delta")\
    .outputMode("append")\
        .trigger(once=True)\
        .option("checkpointLocation", checkpoint_path)\
            .toTable("nyc_yellow.bronzee.nyc_yellow")
            

#### Here iâ€™m using Auto Loader with trigger(once) to get the benefits of incremental file processing and checkpointing, while still running it as a batch job. This makes the pipeline future-proof if ingestion frequency increases.