# Automating Workflow Jobs with Schedules and Triggers

In **Lakeflow Jobs**, it is possible to configure jobs to automatically trigger in any of the following situations:

- **On a time-based schedule**
- **On the arrival of files** to a Unity Catalog storage location
- **Continuously**

You can also trigger job runs **manually** or through **external orchestration tools**.

---

## Job Schedules and Triggers

| **Trigger Type** | **Behavior** |
|------------------|--------------|
| **Scheduled** | Triggers a job run based on a time-based schedule. |
| **File arrival** | Triggers a job run when new files arrive in a monitored Unity Catalog storage location. |
| **Continuous** | To keep the job always running, trigger another job run whenever a job run completes or fails.|
| **None (manual)** | Runs are triggered manually with the **Run now** button or programmatically using other orchestration tools. |

---


# 🔄 What is Auto Loader in Databricks?

**Auto Loader** is a feature in **Databricks** used for **incrementally and efficiently ingesting new data files** as they arrive in cloud storage (like AWS S3, Azure Data Lake, or GCS) into Delta Lake tables.

---

## ✅ Key Highlights

| Feature              | Description                                                                 |
|----------------------|-----------------------------------------------------------------------------|
| **Incremental**       | Automatically detects and loads only **new files** added to a directory.    |
| **Scalable**          | Designed to **scale to millions of files**, better than using `read`.       |
| **Schema Evolution**  | Can **automatically detect new columns** and update the schema if enabled.  |
| **State Management**  | Tracks file ingestion state with **checkpoints** to avoid duplicates.       |
| **Optimized**         | Uses **cloud-specific APIs** for faster listing (e.g., Azure Event Grid).   |

---

# 📦 Supported File Formats
* CSV
* JSON
* Parquet
* Avro
* ORC
* Binary

---
## 📥 Auto Loader `readStream` Options

| **Option**                        | **Description**                                         | **Example**            |
| --------------------------------- | ------------------------------------------------------- | ---------------------- |
| `cloudFiles.format`               | File format (`csv`, `json`, `parquet`, etc.)            | `"csv"`                |
| `cloudFiles.schemaLocation`       | Required location to store schema metadata              | `"/mnt/schema/bronze"` |
| `cloudFiles.inferColumnTypes`     | Auto infer column types from data (CSV/JSON only)       | `"true"`               |
| `cloudFiles.includeExistingFiles` | Process existing files on first run                     | `"true"`               |
| `cloudFiles.schemaEvolutionMode`  | Auto schema evolution mode (`addNewColumns`)            | `"addNewColumns"`      |
| `cloudFiles.allowOverwrites`      | Allow file overwrites (if applicable)                   | `"true"`               |
| `cloudFiles.useNotifications`     | Use notification-based file discovery (Event Grid / S3) | `"true"`               |
| `cloudFiles.connectionString`     | For Azure Data Lake Gen2: SAS or credentials            | `"...?sig=..."`        |
| `cloudFiles.partitionColumns`     | Partition columns for file layout                       | `"year,month"`         |
| `cloudFiles.maxBytesPerTrigger`   | Max data size read per batch                            | `"104857600"` (100MB)  |
| `cloudFiles.maxFilesPerTrigger`   | Max number of files read per batch                      | `"100"`                |
| `cloudFiles.enforceSchema`        | If false, allows missing columns instead of failing     | `"false"`              |
| `cloudFiles.namingHint`           | Helps improve performance in file discovery             | `"bronze-data"`        |
| `cloudFiles.validateOptions`      | Enables validation of format-specific options           | `"true"`               |

---
## 📤 writeStream Options (Delta Sink)
| **Option**           | **Description**                                          | **Example**                           |
| -------------------- | -------------------------------------------------------- | ------------------------------------- |
| `checkpointLocation` | Required to track stream progress                        | `"/mnt/checkpoints/myjob/"`           |
| `path`               | Output path if not passed to `.start()`                  | `"/mnt/delta/bronze/"`                |
| `mergeSchema`        | Merge new schema with existing Delta schema              | `"true"`                              |
| `outputMode`         | Output mode: `append`, `complete`, `update`              | `"append"`                            |
| `trigger`            | Controls how frequently batches are triggered            | `Trigger.ProcessingTime("5 minutes")` |
| `maxFilesPerTrigger` | Throttles file processing rate                           | `"100"`                               |
| `maxBytesPerTrigger` | Limits total bytes per micro-batch                       | `"104857600"`                         |
| `ignoreChanges`      | Ignore updates for existing rows (for idempotent writes) | `"true"`                              |
| `ignoreDeletes`      | Ignore delete operations if using upserts                | `"true"`                              |
| `replaceWhere`       | Overwrite subset of data (if `overwrite` mode used)      | `"year=2024"`                         |
| `partitionBy`        | Partition columns for Delta sink                         | `["year", "month"]`                   |
---

## 🔄 Types of Auto Loader Triggers in Databricks
| Trigger Type     | Description                                   | Code Example                             | Use case |
| ---------------- | --------------------------------------------- | ---------------------------------------- |------------------
| `once`           |**one time micro batch.** Runs once, processes all available data       | `.trigger(once=True)`                    | Nightly or ad-hoc runs when files arrive at a known time. |
| `processingTime` | **fixed interval micro batch.** Runs every interval (e.g., 5 min, 1 hr)       | `.trigger(Trigger.ProcessingTime("5m"))` |Near real-time ingestion with controlled resource usage. |
| `availableNow`     | **Default.** Runs as many batches as needed to process all currently available data, then stops. | `.trigger(availableNow=True)`     | 

---
## 🔧 How It Works

```python

from pyspark.sql.functions import input_file_name, current_timestamp

# Step 1: Read from cloud storage using Auto Loader
df = (
  spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")  # Change to "json", "parquet", etc. if needed
  .option("cloudFiles.schemaLocation", "/mnt/schema/sales/")  # Tracks schema changes
  .option("header", "true")  # Assuming CSV has headers
  .load("/mnt/raw/mydata/")  # Folder with 100+ files
)

# Step 2: Add metadata columns (file name and load timestamp)
df_with_metadata = df.withColumn("Metadata_source_file_name", input_file_name()) \
                     .withColumn("Metadata_load_timestamp", current_timestamp())

# Step 3: Write to Delta table
df_with_metadata.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/checkpoints/sales_bronze/") \
    .start("/mnt/delta/sales_bronze/")  # You can also use a managed table path






---
## 🔄 Auto Loader vs COPY INTO 
| Feature / Aspect        | 🔄 **Auto Loader**                                          | 📥 **COPY INTO**                                     |
| ----------------------- | ----------------------------------------------------------- | ---------------------------------------------------- |
| **Ingestion Mode**      | **Streaming** (incremental, continuous or trigger-based)    | **Batch** (manual or scheduled execution)            |
| **Source Monitoring**   | Watches a directory for **new files continuously**          | Does **not track** files; must re-check manually     |
| **State Management**    | Uses **checkpointing** to track ingested files              | Tracks files using **audit log table metadata**      |
| **Latency**             | Near real-time (trigger every few seconds/minutes)          | Manual or scheduled; **higher latency**              |
| **Schema Evolution**    | Supported (with `addNewColumns` mode)                       | ❌ Not natively supported                             |
| **File Deduplication**  | Built-in using file IDs and checksums                       | Avoids reloading by checking file metadata hash      |
| **Trigger Options**     | `once`, `processingTime`, `availableNow`, `continuous`      | No streaming — must be triggered via SQL or jobs     |
| **Format Support**      | CSV, JSON, Parquet, Avro, ORC, Binary                       | Same (via `FILEFORMAT`)                              |
| **Ease of Use**         | Ideal for large-scale pipelines with many incoming files    | Simpler for small, ad-hoc, or periodic loads         |
| **Cost Efficiency**     | Efficient for **frequent ingestion** of large directories   | Better for **one-time** or low-frequency batch loads |
| **Catalog Integration** | Fully compatible with Unity Catalog + file arrival triggers | Compatible with Unity Catalog (manual registration)  |
---

``` sql

COPY INTO my_table
FROM '/mnt/raw/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
