# Silver Drift Monitoring Notebook

This notebook implements lightweight drift monitoring for the Silver layer in a Databricks Medallion architecture. It is designed for metadata-only checks (schema, quality, and volume drift) and logs results to an audit table, following best practices for Databricks Serverless and Delta Lake.

**Author:** Senior Data Architect

---

## Notebook Structure
- **1. Imports and Setup**
- **2. Utility Functions**
- **3. Drift Monitoring Logic**
- **4. Main Execution**
- **5. Results and Audit Table**

> _All code and explanations are in English for professional portfolio use. Please refer to the README and documentation for further details._

## 1. Imports and Spark Session

This section imports the required libraries and initializes the Spark session for Databricks Connect. All code is optimized for Serverless and Delta Lake best practices.

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, countDistinct
from delta.tables import DeltaTable
import datetime

# Initialize Spark session (Databricks Connect)
spark = SparkSession.builder.getOrCreate()


## 2. Utility Functions

This section defines helper functions for schema extraction, data quality checks, and audit table management. All logic is modular and reusable for professional data engineering workflows.

In [None]:
# --- Config ---
CATALOG = "workspace"
SCHEMA = "default"
SILVER_TABLES = [
    "silver_master_pdv",
    "silver_master_products",
    "silver_price_audit",
    "silver_sell_in"
]
AUDIT_TABLE = f"{CATALOG}.{SCHEMA}.silver_drift_history"

# --- Utility: Get current schema as dict ---
def get_table_schema(spark, table):
    schema = spark.table(table).schema
    return {f.name: str(f.dataType) for f in schema.fields}

# --- Utility: Get Delta History metrics ---
def get_delta_history_metrics(spark, table):
    hist = spark.sql(f"DESCRIBE HISTORY {table} LIMIT 1").first()
    metrics = hist["operationMetrics"]
    return {
        "numOutputRows": int(metrics.get("numOutputRows", 0)),
        "numFiles": int(metrics.get("numFiles", 0)),
        "timestamp": hist["timestamp"]
    }

# --- Utility: Load baseline (from audit table or static) ---
def load_baseline(spark, table):
    import json
    try:
        # Check if audit table exists (serverless-friendly)
        if not spark._jsparkSession.catalog().tableExists(AUDIT_TABLE):
            return None
        df = spark.table(AUDIT_TABLE).filter(col("table_name") == table).orderBy(col("timestamp").desc())
        last = df.first()
        if last:
            return json.loads(last["baseline_json"])
    except Exception:
        return None
    return None


In [None]:
# --- Utility: Save drift event to audit table ---
def save_drift_event(spark, table, drift_type, severity, details, baseline_json):
    from pyspark.sql import Row
    import json
    event = Row(
        table_name=table,
        drift_type=drift_type,
        severity=severity,
        details=json.dumps(details),
        baseline_json=json.dumps(baseline_json),
        timestamp=datetime.datetime.now()
    )
    df = spark.createDataFrame([event])
    df.write.format("delta").mode("append").saveAsTable(AUDIT_TABLE)


## 3. Drift Monitoring Logic

This section implements the main drift detection logic for schema, quality, and volume drift. Each function is designed for metadata-only checks and logs results to the audit table.

In [None]:
# --- Utility: Compare schemas ---
def compare_schema(current, baseline):
    drift = {"new_columns": [], "missing_columns": [], "type_changes": []}
    if not baseline:
        return drift
    for col in current:
        if col not in baseline:
            drift["new_columns"].append(col)
        elif current[col] != baseline[col]:
            drift["type_changes"].append({"column": col, "old": baseline[col], "new": current[col]})
    for col in baseline:
        if col not in current:
            drift["missing_columns"].append(col)
    return drift

# --- Utility: Compare metrics ---
def compare_metrics(current, baseline, key="numOutputRows", threshold=0.2):
    if not baseline or key not in baseline or key not in current:
        return None
    prev = baseline[key]
    curr = current[key]
    if prev == 0:
        return None
    change = (curr - prev) / prev
    if abs(change) > threshold:
        return {"metric": key, "prev": prev, "curr": curr, "change": change}
    return None


In [None]:
# --- Main monitoring loop ---
def run_silver_drift_monitoring(spark):
    for table in SILVER_TABLES:
        full_table = f"{CATALOG}.{SCHEMA}.{table}"
        # 1. Get current schema and metrics
        schema_now = get_table_schema(spark, full_table)
        metrics_now = get_delta_history_metrics(spark, full_table)
        # 2. Load baseline
        baseline = load_baseline(spark, table)
        # 3. Compare schema
        schema_drift = compare_schema(schema_now, baseline["schema"] if baseline else None)
        # 4. Compare volume
        volume_drift = compare_metrics(metrics_now, baseline["metrics"] if baseline else None)
        # 5. Quality drift: use Silver logs if available (placeholder)
        # (You can extend this to read from a Silver quality log table)
        # 6. Severity logic
        severity = "LOW"
        details = []
        if schema_drift["missing_columns"] or schema_drift["type_changes"]:
            severity = "HIGH"
            details.append(f"Missing/type-changed columns: {schema_drift}")
        elif schema_drift["new_columns"]:
            severity = "MEDIUM"
            details.append(f"New columns: {schema_drift['new_columns']}")
        if volume_drift:
            severity = "MEDIUM"
            details.append(f"Volume drift: {volume_drift}")
        # 7. Save event if drift detected
        if details:
            save_drift_event(
                spark,
                table=table,
                drift_type="schema/volume",
                severity=severity,
                details=details,
                baseline_json={"schema": schema_now, "metrics": metrics_now}
            )


## 4. Main Execution

This section runs the drift monitoring process for all Silver tables and displays a summary of detected drift events. Results are also logged to the audit table for observability.

In [None]:
# Run drift monitoring for Silver tables
run_silver_drift_monitoring(spark)

# Display recent drift events from audit table (if exists)
try:
    if spark._jsparkSession.catalog().tableExists(AUDIT_TABLE):
        display(spark.table(AUDIT_TABLE).orderBy(col("timestamp").desc()).limit(10))
    else:
        print("Audit table does not exist yet.")
except Exception as e:
    print(f"Error displaying audit table: {e}")


## 5. Results and Audit Table

This section displays the latest drift events detected for Silver tables. All events are logged in the Delta audit table `silver_drift_history` for traceability and observability. Review these results to monitor data health and take action if high-severity drift is detected.

---

> _For more details, see the monitoring/README.md and docs/data_dictionary.md files in the repository._