# üè¶ Investment Banking Market Data Quality - Anomaly Detection Demo

## Business Context

**Scenario**: Monitor real-time market data feeds to detect:
- Pricing anomalies and stale quotes
- Volume spikes indicating market manipulation
- Feed latency issues
- Off-hours trading anomalies

**Data**: Market data feed with prices, volumes, spreads, and timestamps across multiple exchanges

## What You'll Learn (30-45 min)

1. **Temporal Patterns**: Datetime encoding for trading hours
2. **Multi-Asset Monitoring**: Exchange-specific baselines
3. **Performance Optimization**: row_filter and merge_columns for high-frequency data
4. **Feature Contributions**: Triaging different anomaly types
5. **Ensemble Models**: Confidence intervals for financial data
6. **Production Integration**: Streaming batch patterns

---

## Section 1: Setup & Data Generation (5 min)

First, install DQX with anomaly support if not already installed:
```bash
%pip install databricks-labs-dqx[anomaly]
```

In [None]:
# Imports
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import random
import numpy as np

from databricks.labs.dqx.anomaly import AnomalyEngine, has_no_anomalies, AnomalyParams, IsolationForestConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQDatasetRule, DQRowRule
from databricks.sdk import WorkspaceClient

# Initialize
spark = SparkSession.builder.getOrCreate()
ws = WorkspaceClient()
anomaly_engine = AnomalyEngine(ws)
dq_engine = DQEngine(ws)

# Set seeds for reproducibility
random.seed(42)
np.random.seed(42)

print("‚úÖ Setup complete!")
print(f"   Spark version: {spark.version}")

### Generate Realistic Market Data

We'll create high-frequency market data with:
- **Mixed data types**: Numeric (prices, volume), categorical (exchange, symbol), datetime (timestamp), boolean (is_official_hours)
- **Exchange-specific patterns**: Different baselines for NASDAQ, NYSE, LSE
- **Injected anomalies**: Pricing errors, stale quotes, volume spikes

In [None]:
# Generate market data feed
def generate_market_data(num_rows=2000, anomaly_rate=0.05):
    data = []
    exchanges = ["NASDAQ", "NYSE", "LSE"]
    symbols = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META", "NVDA", "JPM", "BAC", "GS"]
    
    # Exchange-specific baseline patterns
    exchange_patterns = {
        "NASDAQ": {"base_price": 150, "spread_bp": 5, "volume": 50000, "lag_ms": 15},
        "NYSE": {"base_price": 100, "spread_bp": 8, "volume": 30000, "lag_ms": 20},
        "LSE": {"base_price": 80, "spread_bp": 12, "volume": 20000, "lag_ms": 35},
    }
    
    start_time = datetime(2024, 12, 1, 9, 30)  # Market open
    
    for i in range(num_rows):
        exchange = random.choice(exchanges)
        symbol = random.choice(symbols)
        pattern = exchange_patterns[exchange]
        
        # Generate timestamp (mostly during trading hours)
        minutes_offset = random.randint(0, 390)  # 6.5 hours = 390 min
        timestamp = start_time + timedelta(minutes=minutes_offset)
        
        # Trading hours: 9:30-16:00
        is_official_hours = 9.5 <= timestamp.hour + timestamp.minute/60 <= 16
        
        # Normal patterns
        if random.random() > anomaly_rate:
            base = pattern["base_price"] * random.uniform(0.95, 1.05)
            spread_bp = max(1, pattern["spread_bp"] + np.random.normal(0, 2))
            spread = base * (spread_bp / 10000)
            
            bid_price = round(base, 2)
            ask_price = round(base + spread, 2)
            volume = int(np.random.normal(pattern["volume"], pattern["volume"] * 0.3))
            lag_ms = int(np.random.normal(pattern["lag_ms"], 5))
        else:
            # Inject anomalies
            anomaly_type = random.choice(["pricing_error", "stale_quote", "volume_spike", "off_hours"])
            
            if anomaly_type == "pricing_error":
                base = pattern["base_price"] * random.uniform(1.5, 3.0)  # 50-200% price jump
                spread = base * random.uniform(0.05, 0.15)  # Wide spread
                bid_price = round(base, 2)
                ask_price = round(base + spread, 2)
                volume = int(pattern["volume"] * random.uniform(0.5, 1.0))
                lag_ms = int(pattern["lag_ms"])
            
            elif anomaly_type == "stale_quote":
                base = pattern["base_price"] * random.uniform(0.95, 1.05)
                spread = base * (pattern["spread_bp"] / 10000)
                bid_price = round(base, 2)
                ask_price = round(base + spread, 2)
                volume = int(pattern["volume"] * random.uniform(0.8, 1.2))
                lag_ms = int(pattern["lag_ms"] * random.uniform(10, 30))  # 10-30x normal lag
            
            elif anomaly_type == "volume_spike":
                base = pattern["base_price"] * random.uniform(0.98, 1.02)
                spread = base * (pattern["spread_bp"] / 10000)
                bid_price = round(base, 2)
                ask_price = round(base + spread, 2)
                volume = int(pattern["volume"] * random.uniform(5, 15))  # 5-15x normal volume
                lag_ms = int(pattern["lag_ms"])
            
            else:  # off_hours
                base = pattern["base_price"] * random.uniform(0.95, 1.05)
                spread = base * (pattern["spread_bp"] / 10000)
                bid_price = round(base, 2)
                ask_price = round(base + spread, 2)
                volume = int(pattern["volume"] * random.uniform(0.1, 0.3))
                lag_ms = int(pattern["lag_ms"])
                # Force off-hours timestamp
                timestamp = timestamp.replace(hour=random.choice([2, 3, 4, 22, 23]))
                is_official_hours = False
        
        # Generate unique quote_id (primary key)
        quote_id = f"Q{i+1:06d}"
        
        data.append((
            quote_id,
            symbol,
            exchange,
            timestamp,
            bid_price,
            ask_price,
            int(max(100, volume)),
            round(ask_price - bid_price, 3),
            max(1, lag_ms),
            is_official_hours
        ))
    
    return data

# Generate data
market_data = generate_market_data(num_rows=2000, anomaly_rate=0.05)

schema = StructType([
    StructField("quote_id", StringType(), False),
    StructField("symbol", StringType(), False),
    StructField("exchange", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("bid_price", DoubleType(), False),
    StructField("ask_price", DoubleType(), False),
    StructField("volume", IntegerType(), False),
    StructField("spread", DoubleType(), False),
    StructField("last_update_lag_ms", IntegerType(), False),
    StructField("is_official_hours", BooleanType(), False),
])

df_market = spark.createDataFrame(market_data, schema)

print("\nüìä Sample of market data feed:")
display(df_market.orderBy("timestamp"))
print(f"\n‚úÖ Generated {df_market.count()} rows with ~5% injected anomalies")
print(f"üí° Note: quote_id is the primary key for efficient merge operations")

---

## Section 2: Auto-Discovery with Temporal Patterns (10 min)

Train model with auto-discovery, which will automatically extract temporal features from the timestamp column.

In [None]:
# Save to table
catalog = spark.sql("SELECT current_catalog()").first()[0]
schema_name = "dqx_demo"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema_name}")

table_name = f"{catalog}.{schema_name}.market_data_feed"
df_market.write.mode("overwrite").saveAsTable(table_name)

# Define unique registry table for this demo
registry_table = f"{catalog}.{schema_name}.anomaly_model_registry_investment_banking"

# Clean up old table if it exists (ensures new nested schema)
spark.sql(f"DROP TABLE IF EXISTS {registry_table}")
print(f"üóëÔ∏è  Cleaned up old registry table (if existed)")

print(f"‚úÖ Data saved to: {table_name}")
print(f"üìã Model registry: {registry_table}")


In [None]:
# Train with ZERO configuration (auto-discovery)
print("üéØ Training with AUTO-DISCOVERY (zero config)...\n")

model_uri_auto = anomaly_engine.train(
    df=spark.table(table_name),
    # NO columns specified - auto-discovered!
    model_name="market_data_auto",  # Specify name for later reference
    registry_table=registry_table
)

print(f"\n‚úÖ Auto-discovery model trained!")
print(f"   Model URI: {model_uri_auto}")

# Check what was auto-discovered
registry_df = spark.table(registry_table)
auto_model = registry_df.filter(F.col("identity.model_uri") == model_uri_auto).first()

print(f"\nüìã Auto-Discovered Configuration:")
print(f"   Columns: {auto_model['training']['columns']}")
print(f"   Column types: {auto_model['features']['column_types']}")
print(f"\nüí° Datetime columns automatically encoded as 5 cyclical features:")
print(f"   - hour_sin, hour_cos (daily cycle)")
print(f"   - day_of_week_sin, day_of_week_cos (weekly cycle)")
print(f"   - is_weekend (binary)")

In [None]:
# Score with auto-discovered model
checks_auto = [
    DQDatasetRule(
        check_func=has_no_anomalies,
        check_func_kwargs={
            "merge_columns": ["quote_id"],  # Simple primary key
            "model": "market_data_auto",  # Reference the auto-discovered model
            "score_threshold": 0.5,
            "registry_table": registry_table
        }
    )
]

df_scored_auto = dq_engine.apply_checks(df_market, checks_auto)
anomalies_auto = df_scored_auto.filter(F.col("_info.anomaly.score") >= 0.5)

print(f"\n‚ö†Ô∏è  Auto-discovery found {anomalies_auto.count()} anomalies:\n")
display(anomalies_auto.orderBy(F.col("_info.anomaly.score").desc()).select(
    "symbol", "exchange", "timestamp", "bid_price", "volume",
    F.round("_info.anomaly.score", 3).alias("score")
).limit(10))

### 2.2 Manual Column Selection & Parameter Tuning

Now let's manually select columns and tune hyperparameters for financial data.


In [None]:
# Train with MANUAL configuration and tuned parameters
print("üéØ Training with MANUAL tuning for financial data...\n")

model_uri_manual = anomaly_engine.train(
    df=spark.table(table_name),
    columns=["bid_price", "ask_price", "spread", "volume", "last_update_lag_ms", "timestamp"],  # Manual selection
    model_name="market_data_tuned",
    params=AnomalyParams(
        algorithm_config=IsolationForestConfig(
            contamination=0.05,  # Expected 5% anomaly rate
            num_trees=200,    # More trees for financial data stability
            subsampling_rate=1024,    # Larger subsample for accuracy
            random_seed=42
        ),
        sample_fraction=1.0,
    ),
    registry_table=registry_table
)

print(f"\n‚úÖ Manual tuned model trained!")
print(f"   Model URI: {model_uri_manual}")
print(f"\nüí° Financial Data Tuning:")
print(f"   ‚Ä¢ Higher num_trees (200) for stability")
print(f"   ‚Ä¢ Larger subsampling_rate (1024) for accuracy")
print(f"   ‚Ä¢ Datetime features automatically extracted")


---


---

## Section 3: Multi-Asset Monitoring (8 min)

Different exchanges have different characteristics. Train per-exchange models.

In [None]:
# Train with exchange segmentation
print("üåç Training exchange-specific models...\n")

model_uri_segmented = anomaly_engine.train(
    df=spark.table(table_name),
    columns=["bid_price", "ask_price", "spread", "volume", "last_update_lag_ms"],
    segment_by=["exchange"],
    model_name="market_data_by_exchange",
    registry_table=registry_table
)

print(f"\n‚úÖ Exchange-specific models trained!")

# Show exchange baselines
exchange_models = spark.table(registry_table).filter(
    F.col("identity.model_name") == "market_data_by_exchange"
)

print("\nüìä Exchange-Specific Baselines:\n")
display(exchange_models.select("segment_values.exchange", "training.training_rows", "training.baseline_stats"))

---

## Section 4: High-Frequency Scoring with Performance (8 min)

Optimize scoring for high-frequency data using row_filter and merge_columns.

In [None]:
# Score only official trading hours data with optimized joins
checks_optimized = [
    DQDatasetRule(
        check_func=has_no_anomalies,
        check_func_kwargs={
            "merge_columns": ["quote_id"],  # Simple primary key
            "model": "market_data_by_exchange",
            "score_threshold": 0.5,
            "row_filter": "is_official_hours = true",  # Filter before scoring
            "registry_table": registry_table
        }
    )
]

df_scored_optimized = dq_engine.apply_checks(df_market, checks_optimized)
print(f"‚úÖ Scored {df_scored_optimized.count()} rows with optimized join")

---

## Section 5: Feature Contributions for Investigation (5 min)

Use SHAP to understand which features drove anomaly scores and triage by type.

In [None]:
# Score with feature contributions
checks_contrib = [
    DQDatasetRule(
        check_func=has_no_anomalies,
        check_func_kwargs={
            "merge_columns": ["quote_id"],  # Simple primary key
            "model": "market_data_by_exchange",
            "score_threshold": 0.5,
            "include_contributions": True,
            "registry_table": registry_table
        }
    )
]

df_with_contrib = dq_engine.apply_checks(df_market, checks_contrib)

print("üîç Top Anomalies with Feature Contributions:\n")
anomalies_contrib = df_with_contrib.filter(F.col("_info.anomaly.score") >= 0.5).orderBy(
    F.col("_info.anomaly.score").desc()
).limit(10)

display(anomalies_contrib.select(
    "symbol", "exchange", "bid_price", "volume", "last_update_lag_ms",
    F.round("_info.anomaly.score", 3).alias("score"),
    "_info.anomaly.contributions"))

print("\nüí° Triage by contribution pattern:")
print("   - High 'last_update_lag_ms' ‚Üí Stale quote, route to data feed team")
print("   - High 'bid_price' + 'spread' ‚Üí Pricing error, route to pricing team")
print("   - High 'volume' ‚Üí Potential manipulation, route to compliance")

---

## Section 6: Ensemble Models & Confidence (4 min)

For financial data, use ensemble models to get confidence intervals on anomaly scores.

In [None]:
# Train ensemble model (3 models)
print("üé≤ Training ensemble model (3 members)...\n")

model_uri_ensemble = anomaly_engine.train(
    df=spark.table(table_name),
    columns=["bid_price", "ask_price", "spread", "volume", "last_update_lag_ms"],
    segment_by=["exchange"],
    model_name="market_data_ensemble",
    params=AnomalyParams(
        ensemble_size=3  # 3-model ensemble
    ),
    registry_table=registry_table
)

print(f"\n‚úÖ Ensemble trained!")

---

## Section 7: Drift Detection & Retraining (6 min)

Market conditions change. Detect when models become stale and need retraining.


In [None]:
# Simulate market regime change (increased volatility, wider spreads)
def generate_volatile_market_data(num_rows=300):
    """Generate data with shifted volatility (market stress period)."""
    data = []
    exchanges = ["NASDAQ", "NYSE", "LSE"]
    symbols = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META", "NVDA", "JPM", "BAC", "GS"]
    
    # NEW PATTERNS: Higher volatility, wider spreads, increased volume
    volatile_patterns = {
        "NASDAQ": {"base_price": 150, "spread_bp": 15, "volume": 75000, "lag_ms": 25},  # +200% spread
        "NYSE": {"base_price": 100, "spread_bp": 20, "volume": 45000, "lag_ms": 35},    # +150% spread
        "LSE": {"base_price": 80, "spread_bp": 25, "volume": 30000, "lag_ms": 50},      # +108% spread
    }
    
    start_time = datetime(2024, 12, 15, 9, 30)  # New period
    
    for i in range(num_rows):
        exchange = random.choice(exchanges)
        symbol = random.choice(symbols)
        pattern = volatile_patterns[exchange]
        
        minutes_offset = random.randint(0, 390)
        timestamp = start_time + timedelta(minutes=minutes_offset)
        is_official_hours = 9.5 <= timestamp.hour + timestamp.minute/60 <= 16
        
        base = pattern["base_price"] * random.uniform(0.90, 1.10)  # +100% price volatility
        spread_bp = max(1, pattern["spread_bp"] + np.random.normal(0, 5))
        spread = base * (spread_bp / 10000)
        
        bid_price = round(base, 2)
        ask_price = round(base + spread, 2)
        volume = int(np.random.normal(pattern["volume"], pattern["volume"] * 0.4))  # +33% vol volatility
        lag_ms = int(np.random.normal(pattern["lag_ms"], 10))  # +100% lag variability
        
        # Generate unique quote_id
        quote_id = f"Q{2000+i+1:06d}"  # Continue from main dataset
        
        data.append((quote_id, symbol, exchange, timestamp, bid_price, ask_price, int(max(100, volume)), 
                    round(ask_price - bid_price, 3), max(1, lag_ms), is_official_hours))
    
    return data

# Generate volatile market data
volatile_data = generate_volatile_market_data(num_rows=300)
df_volatile = spark.createDataFrame(volatile_data, schema)

print("üìä Normal vs Volatile Market Comparison:\n")
print("Normal Market (original):")
display(df_market.agg(
    F.avg("spread").alias("avg_spread"),
    F.avg("volume").alias("avg_volume"),
    F.avg("last_update_lag_ms").alias("avg_lag")))

print("Volatile Market (stress period):")
display(df_volatile.agg(
    F.avg("spread").alias("avg_spread"),
    F.avg("volume").alias("avg_volume"),
    F.avg("last_update_lag_ms").alias("avg_lag")))

print("‚úÖ Market regime changed:")
print("   ‚Ä¢ Spreads: +150-200% (market stress)")
print("   ‚Ä¢ Volume: +50% (panic trading)")
print("   ‚Ä¢ Latency: +67% (system overload)")


In [None]:
# Score volatile data with drift detection
checks_with_drift = [
    DQDatasetRule(
        check_func=has_no_anomalies,
        check_func_kwargs={
            "merge_columns": ["quote_id"],  # Simple primary key
            "model": "market_data_by_exchange",
            "drift_threshold": 3.0,  # Z-score threshold
            "registry_table": registry_table
        }
    )
]

print("üîç Scoring volatile market data (watch for drift warnings)...\n")
df_drift_scored = dq_engine.apply_checks(df_volatile, checks_with_drift)

print("\n‚ÑπÔ∏è  Drift warnings indicate distribution shift!")
print("   Example: 'Data drift detected in columns: spread, last_update_lag_ms (drift score: 5.3)'")
print("   Action: Retrain model to adapt to new market regime")
print("\nüí° In production: Set up alerts when drift_score > 3.0")


In [None]:
# Retrain with combined data (normal + volatile periods)
df_combined = df_market.union(df_volatile)

print("üîÑ Retraining model with combined market conditions...\n")

model_uri_retrained = anomaly_engine.train(
    df=df_combined,
    columns=["bid_price", "ask_price", "spread", "volume", "last_update_lag_ms"],
    segment_by=["exchange"],
    model_name="market_data_by_exchange",  # Same name = new version
    params=AnomalyParams(
        algorithm_config=IsolationForestConfig(
            contamination=0.05,
            num_trees=200,
            subsampling_rate=1024,
            random_seed=42
        )
    ),
    registry_table=registry_table
)

print("\n‚úÖ Model retrained!")
print("   ‚Ä¢ Old model automatically archived")
print("   ‚Ä¢ New model adapts to both normal and volatile market conditions")
print("   ‚Ä¢ Baseline now includes wider spreads and higher volatility")
print("\nüí° Best Practice:")
print("   ‚Ä¢ Monitor drift_score in production dashboards")
print("   ‚Ä¢ Set up automated retraining when drift > threshold")
print("   ‚Ä¢ Retrain quarterly or when market regimes change")


---

## Section 8: Production Integration & Quarantine (6 min)

Integrate anomaly detection into production workflows with automated quarantine.


In [None]:
# Combine anomaly detection with traditional DQ checks
from databricks.labs.dqx.check_funcs import is_not_null, is_in_range

checks_combined = [
    # Traditional data quality checks
    DQRowRule(check_func=is_not_null, check_func_kwargs={"column": "symbol"}),
    DQRowRule(check_func=is_not_null, check_func_kwargs={"column": "exchange"}),
    DQRowRule(check_func=is_not_null, check_func_kwargs={"column": "timestamp"}),
    DQRowRule(check_func=is_in_range, check_func_kwargs={"column": "bid_price", "min_limit": 0, "max_limit": 10000}),
    DQRowRule(check_func=is_in_range, check_func_kwargs={"column": "spread", "min_limit": 0, "max_limit": 100}),
    DQRowRule(check_func=is_in_range, check_func_kwargs={"column": "last_update_lag_ms", "min_limit": 0, "max_limit": 5000}),
    
    # ML-based anomaly detection with explanations
    DQDatasetRule(
        check_func=has_no_anomalies,
        check_func_kwargs={
            "model": "market_data_by_exchange",
            "score_threshold": 0.5,
            "include_contributions": True,  # For root cause
            "drift_threshold": 3.0,
            "merge_columns": ["quote_id"],  # Simple primary key for efficient merge
            "registry_table": registry_table
        }
    )
]

# Apply all checks in single pass
df_full_dq = dq_engine.apply_checks(df_market, checks_combined)

print("üìä Full Data Quality Summary:\n")
total_rows = df_full_dq.count()
anomalies_found = df_full_dq.filter(F.col("_info.anomaly.score") >= 0.5).count()

print(f"Total Rows: {total_rows}")
print(f"Anomalies Detected: {anomalies_found}")
print(f"Clean Records: {total_rows - anomalies_found}")
print(f"Anomaly Rate: {(anomalies_found/total_rows)*100:.2f}%")
print(f"\n‚úÖ All checks (traditional + ML) applied in single pass!")


In [None]:
# Quarantine anomalies for investigation
quarantine_table = f"{catalog}.{schema_name}.market_data_quarantine"

quarantine_df = df_full_dq.filter(
    F.col("_info.anomaly.score") >= 0.5
).select(
    "*",
    F.current_timestamp().alias("quarantine_timestamp"),
    F.lit("anomaly_detected").alias("quarantine_reason"),
    # Extract top contributor for triage (using element_at with 1-based indexing)
    # Extract feature with highest contribution value (not just first key)
    F.expr("""
        element_at(
            array_sort(
                transform(
                    map_entries(`_info.anomaly.contributions`),
                    x -> named_struct('feature', x.key, 'contrib', x.value)
                ),
                (left, right) -> case 
                    when left.contrib > right.contrib then -1 
                    when left.contrib < right.contrib then 1 
                    else 0 
                end
            ),
            1
        ).feature
    """).alias("top_contributor")
)

quarantine_df.write.mode("overwrite").saveAsTable(quarantine_table)

print(f"‚úÖ Quarantined {quarantine_df.count()} anomalies to: {quarantine_table}")
print("\nüìã Quarantine Summary by Exchange and Issue Type:")
display(spark.table(quarantine_table).groupBy("exchange", "top_contributor").agg(
    F.count("*").alias("count"),
    F.avg("_info.anomaly.score").alias("avg_score")
).orderBy("exchange", F.desc("count")))

print("\nüí° Automated Triage Workflow:")
print("   1. Anomalies automatically quarantined with contributions")
print("   2. Route by top_contributor:")
print("      ‚Ä¢ 'last_update_lag_ms' ‚Üí Data feed team")
print("      ‚Ä¢ 'bid_price' or 'spread' ‚Üí Pricing team")
print("      ‚Ä¢ 'volume' ‚Üí Compliance/surveillance team")
print("   3. Investigate using _info.anomaly.contributions map")
print("   4. False positives ‚Üí Adjust threshold or retrain")


### YAML Configuration for Production

For automated workflows, define checks in YAML:

```yaml
run_configs:
  - name: market_data_quality_monitoring
    input_config:
      location: catalog.schema.market_data_feed
    
    # Traditional checks
    quality_checks:
      - function: is_not_null
        arguments:
          columns: [symbol, exchange, timestamp]
      - function: is_in_range
        arguments:
          column: bid_price
          min_value: 0
          max_value: 10000
      - function: is_in_range
        arguments:
          column: spread
          min_value: 0
          max_value: 100
    
    # Anomaly detection
    anomaly_config:
      columns: [bid_price, ask_price, spread, volume, last_update_lag_ms, timestamp]
      segment_by: [exchange]
      model_name: market_data_by_exchange
      registry_table: catalog.schema.anomaly_model_registry
      params:
        isolation_forest:
          contamination: 0.05
          num_trees: 200
          subsampling_rate: 1024
          random_seed: 42
        sample_fraction: 1.0
      
      # Scoring options
      score_options:
        score_threshold: 0.5
        include_contributions: true
        drift_threshold: 3.0
        row_filter: "is_official_hours = true"
        merge_columns: [quote_id]
    
    # Quarantine configuration
    quarantine_config:
      enabled: true
      table: catalog.schema.market_data_quarantine
      condition: "anomaly_score >= 0.5"
      
    # Output configuration
    output_config:
      location: catalog.schema.market_data_clean
      save_mode: overwrite
```

**Run with Databricks Asset Bundles:**
```bash
# Initial model training (one-time or scheduled monthly)
databricks bundle run market_data_anomaly_trainer

# Daily quality checks and scoring (scheduled)
databricks bundle run market_data_quality_checker

# Drift monitoring (scheduled weekly)
databricks bundle run market_data_drift_monitor
```

**Alternative: Databricks Workflows**
```python
# In Databricks workflow notebook
from databricks.labs.dqx.anomaly import train, has_no_anomalies
from databricks.labs.dqx.engine import DQEngine

# Task 1: Train (runs weekly)
if dbutils.widgets.get("task") == "train":
    train(df=spark.table("market_data_feed"), ...)

# Task 2: Score (runs hourly for real-time feeds)
elif dbutils.widgets.get("task") == "score":
    dq_engine = DQEngine(ws)
    checks = [has_no_anomalies(...)]
    df_scored = dq_engine.apply_checks(df, checks)
    df_scored.write.mode("append").saveAsTable("scored_data")
```


---


In [None]:
# Score with confidence intervals
checks_ensemble = [
    DQDatasetRule(
        check_func=has_no_anomalies,
        check_func_kwargs={
            "merge_columns": ["quote_id"],  # Simple primary key
            "model": "market_data_ensemble",
            "score_threshold": 0.5,
            "include_confidence": True,  # Add standard deviation of scores
            "registry_table": registry_table
        }
    )
]

df_ensemble_scored = dq_engine.apply_checks(df_market, checks_ensemble)

print("üìä Anomalies with Confidence Intervals:\n")
display(df_ensemble_scored.filter(F.col("_info.anomaly.score") >= 0.5).select(
    "symbol", "exchange", "bid_price", "volume",
    F.round("_info.anomaly.score", 3).alias("score"),
    F.round("_info.anomaly.confidence_std", 3).alias("std_dev")
).orderBy(F.desc("_info.anomaly.score")))

print("\nüí° Interpretation:")
print("   - Low std_dev ‚Üí High confidence anomaly (all models agree)")
print("   - High std_dev ‚Üí Ambiguous case (models disagree, may need retraining)")

## üéì Summary

### What You Learned (Comprehensive 45-min Demo):

1. ‚úÖ **Auto-Discovery vs Manual Tuning** - Zero-config start, then refine for financial data
2. ‚úÖ **Parameter Tuning** - Higher num_trees and subsampling_rate for financial stability
3. ‚úÖ **Temporal Patterns** - Datetime encoding captures trading hour patterns automatically
4. ‚úÖ **Multi-Asset Monitoring** - Exchange-specific baselines (NASDAQ vs NYSE vs LSE)
5. ‚úÖ **Performance Optimization** - row_filter and merge_columns for high-frequency data
6. ‚úÖ **Feature Contributions** - SHAP-based triage (pricing vs lag vs volume)
7. ‚úÖ **Ensemble Models** - Confidence intervals for ambiguous cases
8. ‚úÖ **Drift Detection** - Detect market regime changes, automated retraining signals
9. ‚úÖ **Production Integration** - DQEngine + YAML workflows + quarantine

### Key Takeaways:

- **Start simple**: Auto-discovery first, then tune for financial data requirements
- **Temporal encoding**: Hour-of-day patterns captured via cyclical sin/cos features
- **Exchange segmentation**: Critical for multi-venue data with different characteristics
- **Performance**: row_filter + merge_columns essential for high-frequency scoring (millions of ticks)
- **Ensembles**: Use for high-stakes financial data to quantify model uncertainty
- **Drift monitoring**: Market regimes change - set up automated retraining triggers
- **Triage**: Feature contributions enable automated routing (feed team vs pricing team vs compliance)
- **Quarantine workflow**: Automate investigation with root cause explanations

### Model Comparison Results:

| Approach | Use Case | Configuration |
|----------|----------|---------------|
| Auto-discovery | Quick start, exploration | Zero config, auto column selection |
| Manual tuned | Production, optimal performance | num_trees=200, subsampling_rate=1024 |
| Exchange-segmented | Multi-venue monitoring | Separate baselines per exchange |
| Ensemble | High-stakes decisions | ensemble_size=3, includes confidence intervals |

### Production Deployment Checklist:

- ‚úÖ Train models per exchange with tuned hyperparameters
- ‚úÖ Set up drift monitoring (drift_threshold=3.0, alert on warnings)
- ‚úÖ Configure quarantine workflow with automated triage
- ‚úÖ Use row_filter + merge_columns for high-frequency scoring
- ‚úÖ Enable include_contributions for investigation
- ‚úÖ Schedule retraining: weekly/monthly or on drift detection
- ‚úÖ Integrate with alerting (PagerDuty, Slack) for critical anomalies

### Next Steps:

1. **Apply to your data**: `train(df=spark.table("your_market_data"))`
2. **Set up YAML workflows**: Copy configuration above, customize for your tables
3. **Configure quarantine**: Route anomalies to appropriate teams by contribution
4. **Monitor drift**: Set up dashboards for drift_score, automate retraining
5. **Optimize performance**: Use merge_columns with your primary keys
6. **Test ensemble**: Evaluate if confidence intervals help decision-making

### Resources:

- [DQX Anomaly Detection Documentation](https://databrickslabs.github.io/dqx/guide/anomaly_detection)
- [Performance Optimization Guide](https://databrickslabs.github.io/dqx/guide/anomaly_detection#performance-optimization)
- [API Reference](https://databrickslabs.github.io/dqx/reference/quality_checks#has_no_anomalies)
- [GitHub Repository](https://github.com/databrickslabs/dqx)

---

**Questions? Feedback?** Open an issue on GitHub or contact the DQX team!