## Purpose:
This notebook uses **Databricks Auto Loader** to ingest JSON batches into a Delta table with schema enforcement.

## Create the Dedicated Unity Catalog


In [0]:
%sql

-- Create a top-level container for the project
CREATE CATALOG IF NOT EXISTS fraud_sentinel_catalog;

--Use this catalog for all subsequent operations
USE CATALOG fraud_sentinel_catalog;

--Create the schema (database) inside this catalog
CREATE SCHEMA IF NOT EXISTS detection_service
MANAGED LOCATION 'abfss://fraud-sentinel@giftmapote2ete.dfs.core.windows.net/managed_tables/'

## Environment Setup & Schema Definition

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 1. Configuration: Target paths and Catalog details
STORAGE_PATH = "abfss://fraud-sentinel@giftmapote2ete.dfs.core.windows.net/raw/transactions/"
BRONZE_CHECKPOINT = "abfss://fraud-sentinel@giftmapote2ete.dfs.core.windows.net/_checkpoints/bronze"
CATALOG, SCHEMA_NAME = "fraud_sentinel_catalog", "detection_service"

# We map the "location" as MapType to handle the nested lat/lon JSON structure
json_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("device_id", StringType(), True),
    StructField("location", MapType(StringType(), DoubleType()), True),
    StructField("recipient_status", StringType(), True),
    StructField("timestamp", StringType(), True)
])

print(f"Bronze Ingestion Ready. Target: {CATALOG}.{SCHEMA_NAME}.bronze_transactions")




## Auto Loader Streaming Ingestion
Auto Loader (`cloudFiles`) is the industry standard for this because it handles file discivery efficiently, even as the Mock API adds hundreds of new files.

In [0]:
# Create the Ingestion Stream
# Add `_ingested_at` and `_source_file` for auditability

bronze_stream = (spark.readStream
                 .format("cloudFiles")
                 .option("cloudFiles.format", "json")
                 .schema(json_schema)
                 .load(STORAGE_PATH)
                 .withColumn("_ingested_at", current_timestamp())
                 .withColumn("_source_file", col("_metadata.file_path"))
)

# Write to Delta Table
# checkpointing ensures that if the cluster restarts, we don't re-process old files
(bronze_stream.writeStream
              .format("delta")
              .option("checkpointLocation", BRONZE_CHECKPOINT)
              .trigger(availableNow=True)
              .toTable(f"{CATALOG}.{SCHEMA_NAME}.bronze_transactions"))
              

In [0]:
%sql
SELECT * FROM fraud_sentinel_catalog.detection_service.bronze_transactions