In [0]:
%run ./01_Project_config

### Data Ingestion with Auto Loader

**`readStream`**
Enables real-time data ingestion using Spark Structured Streaming.

**`cloudFiles`**
The source identifier for **Auto Loader**, optimized for processing massive file volumes in cloud storage.

**`cloudFiles.format`**
Specifies the input file type (e.g., `csv`).

**`cloudFiles.inferColumnTypes`**
Triggers an automatic scan to detect data types for each column.

**`cloudFiles.schemaLocation`**
Persistent storage for the schema; required for tracking evolution and ensuring restart consistency.

**`cloudFiles.schemaEvolutionMode`**
Set to `addNewColumns` to allow the table to adapt when new fields arrive in source files.

**`cloudFiles.rescuedDataColumn`**
Captures malformed or schema-mismatched data in a specific column to prevent data loss.

**`load`**
Starts the read process from the defined landing zone path.

**`withColumn`**
Creates audit metadata, such as `ingestion_timestamp` and `source_file` path.

**`writeStream`**
Defines how the processed data is committed to the destination.

**`format("delta")`**
Writes output in **Delta Lake** format for ACID transactions and versioning.

**`outputMode("append")`**
Adds new records to the table without overwriting existing data.

**`checkpointLocation`**
Stores the streaming state (offsets) to ensure the job can resume exactly where it stopped.

**`trigger(availableNow=True)`**
Executes the stream as a "micro-batch," processing all current data and then shutting down the cluster.

**`toTable`**
Saves the data as a managed table in Unity Catalog.

---


In [0]:

from pyspark.sql.functions import *

bronze_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        
        .option("cloudFiles.schemaLocation", f"{paths['checkpoints']}/schema/card_txn")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.rescuedDataColumn", "_rescued_data")
        .option("header", "true")
       
        .load(paths["transactions"])
)


bronze_enriched = (
    bronze_df
        .withColumn("ingestion_timestamp", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))
)

(
    bronze_enriched.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", f"{paths['checkpoints']}/bronze_card_txn")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        # Dynamic table naming based on config
        .toTable(f"{catalog}.{bronze_schema}.card_transactions")
)

In [0]:
%sql
SELECT source_file, COUNT(*)
FROM fraud_lakehouse.bronze.card_transactions
GROUP BY source_file;