# Project Cyber-Trace: Bronze Layer Ingestion
**Author:** Jakub Milczarczyk  
**Pipeline Type:** Streaming Ingestion (Medallion Architecture)

## Objective
Ingest raw security logs (OTRF/Mordor dataset) from **Azure Data Lake Gen2 (ADLS)** into the Bronze Layer (Delta Lake) ensuring schema resilience and data durability.

## Technical Highlights
* **Auto Loader (cloudFiles):** Utilized for scalable, incremental ingestion without listing overhead.
* **Schema Evolution:** Enabled (`mergeSchema`) to handle dynamic changes in JSON log structures without breaking the pipeline.
* **Security:** Credentials retrieved via **Azure Key Vault** (Secret Scopes) - zero hardcoded secrets.
* **Quality Gate:** "Bad Records" are automatically quarantined to `_quarantine` path for retrospective analysis.

In [0]:
# ==============================================================================
# CELL 1: CONFIGURATION & AUTHENTICATION
# ==============================================================================

# 1. Secure Authentication
from src.config import setup_authentication, ProjectConfig, Paths
from src.logger import get_logger

logger = get_logger("BronzeIngestion")

setup_authentication(spark, dbutils)

# 2. Path Definitions
base_path       = ProjectConfig.get_base_path()
raw_logs_path   = Paths.RAW_LOGS
schema_path     = Paths.SCHEMA
checkpoint_path_bronze = Paths.CHECKPOINT_BRONZE
quarantine_path = Paths.QUARANTINE

logger.info(f"CONFIG: Paths set. Ingestion target: {raw_logs_path}")

## 2. Ingestion Pipeline (Auto Loader)
**Objective:** ingest raw JSON logs efficiently and robustly.

**Technology Stack:**
* **Databricks Auto Loader (`cloudFiles`):** An optimized file source that detects new files as they arrive in ADLS without listing the entire directory (solving the "S3/ADLS listing" performance bottleneck).
* **Schema Evolution:** The pipeline is configured to automatically detect and adapt to changes in the log structure (e.g., new fields in JSON events) using `schema_path`.
* **Checkpointing:** Ensures fault tolerance. If the cluster crashes, the stream resumes exactly where it left off.

**Output:**
The stream writes to an **Delta Lake** (`delta` format).

In [0]:
# ==============================================================================
# CELL 2: DATA INGESTION PIPELINE
# ==============================================================================

# 1. Define the Stream (Auto Loader)
logger.info(f"SYSTEM: Initializing Auto Loader from {raw_logs_path}...")
df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", schema_path)
    .option("badRecordsPath", quarantine_path)
    .load(raw_logs_path)
)

# 2. Run the Stream (In-Memory for testing)
bronze_table_name = "bronze_mordor_logs_v2"

query = (df_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path_bronze + "_v2")
    .option("mergeSchema", "true")
    .table(bronze_table_name)
)

logger.info("SYSTEM: Stream started in background...")
logger.info("SYSTEM: Stream finished processing. Data is durable in Delta Lake.")

## 3. Bad Records Validation

**Objective:** Find and quarantine wrong records

**Monitoring Quality:** Checks for records rejected by Auto Loader due to schema mismatch.

**Output:** The Stream writes founded records to _quarantine directory.


In [0]:
# ==============================================================================
# CELL 3: BAD RECORDS VALIDATION
# ==============================================================================

# 1. Define the Stream (Bad Records)
logger.info(f"Checking quarantine path: {quarantine_path}")

quarantine_schema = "path STRING, record STRING, reason STRING"

bad_records_df = (spark.read
    .schema(quarantine_schema)
    .option("recursiveFileLookup", "true")
    .json(quarantine_path)
)

logger.info("SYSTEM: Data checked.")