### Generate incidents

In [0]:
import uuid

spark.sql("USE demo_subledger_monitoring")

# Clear table data for repeatable demo
spark.sql("TRUNCATE TABLE incident_log")


def write_incidents(check_name: str, severity: str, category: str, sql_stmt: str):
    """
    sql_stmt must return:
    posting_date, account_id, currency, metric_name, metric_value, expected_value, details
    """
    incident_id = str(uuid.uuid4())

    spark.sql(f"""
        INSERT INTO incident_log
        SELECT
            '{incident_id}' AS incident_id,
            current_timestamp() AS incident_ts,
            '{severity}' AS severity,
            '{category}' AS category,
            '{check_name}' AS check_name,
            posting_date,
            account_id,
            currency,
            metric_name,
            metric_value,
            expected_value,
            details,
            'OPEN' AS status
        FROM ({sql_stmt})
    """)

# Check 1 - missing transaction_id
write_incidents(
    check_name="missing_transaction_id",
    severity="HIGH",
    category="DATA_QUALITY",
    sql_stmt="""
        SELECT
            posting_date,
            account_id,
            currency,
            'missing_transaction_id_count' AS metric_name,
            CAST(COUNT(*) AS STRING) AS metric_value,
            '0' AS expected_value,
            'Found rows with NULL transaction_id' AS details
        FROM transactions_raw
        WHERE transaction_id IS NULL
        GROUP BY posting_date, account_id, currency
        HAVING COUNT(*) > 0
    """
)

# Check 2 - negative amounts
write_incidents(
    check_name="negative_amounts",
    severity="MEDIUM",
    category="DATA_QUALITY",
    sql_stmt="""
        SELECT
            posting_date,
            account_id,
            currency,
            'negative_amount_count' AS metric_name,
            CAST(COUNT(*) AS STRING) AS metric_value,
            '0' AS expected_value,
            'Found rows with amount < 0' AS details
        FROM transactions_raw
        WHERE amount < 0
        GROUP BY posting_date, account_id, currency
        HAVING COUNT(*) > 0
    """
)

# Check 3 - reconciliation mismatch vs expected totals
write_incidents(
    check_name="reconciliation_mismatch",
    severity="HIGH",
    category="RECONCILIATION",
    sql_stmt="""
        SELECT
            s.posting_date,
            s.account_id,
            s.currency,
            'total_amount' AS metric_name,
            CAST(s.total_amount AS STRING) AS metric_value,
            CAST(e.expected_total AS STRING) AS expected_value,
            CONCAT('subledger=', CAST(s.total_amount AS STRING), ', expected=', CAST(e.expected_total AS STRING)) AS details
        FROM subledger_daily s
        JOIN expected_daily_totals e
        ON s.posting_date = e.posting_date
        AND s.account_id = e.account_id
        AND s.currency = e.currency
        WHERE s.total_amount <> e.expected_total
    """
)

display(spark.sql("""
    SELECT
        incident_ts, 
        severity, 
        category, 
        check_name,
        posting_date, 
        account_id, 
        currency,
        metric_name, 
        metric_value, 
        expected_value, 
        status
    FROM incident_log
    ORDER BY incident_ts DESC, severity
"""))


incident_ts,severity,category,check_name,posting_date,account_id,currency,metric_name,metric_value,expected_value,status
2026-01-11T21:12:15.746Z,HIGH,RECONCILIATION,reconciliation_mismatch,2026-01-10,4010,EUR,total_amount,5.0,6.0,OPEN
2026-01-11T21:12:13.972Z,MEDIUM,DATA_QUALITY,negative_amounts,2026-01-10,4010,EUR,negative_amount_count,1.0,0.0,OPEN
2026-01-11T21:12:11.815Z,HIGH,DATA_QUALITY,missing_transaction_id,2026-01-10,4010,EUR,missing_transaction_id_count,1.0,0.0,OPEN
