# Lakehouse Monitoring Setup

This notebook automates the setup of Databricks Lakehouse Monitoring for data quality profiling.

**Features:**
- Create monitors for Delta tables using the Databricks SDK
- Configure TimeSeries, Snapshot, and InferenceLog monitors
- Set up drift detection and baseline comparisons
- Schedule monitoring jobs
- Access metrics and dashboards

**Requirements:**
- Unity Catalog enabled
- Lakehouse Monitoring available
- Delta tables to monitor
- Databricks SDK for Python

## Configuration

In [None]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import (
    MonitorTimeSeries,
    MonitorSnapshot,
    MonitorInferenceLog,
    MonitorInferenceLogProblemType
)
from pyspark.sql import functions as F
import re

# Initialize Databricks Workspace Client
w = WorkspaceClient()

# Get current user email for assets directory
user_email = spark.sql("SELECT current_user()").collect()[0][0]

# Configuration parameters
dbutils.widgets.text("catalog", "nonprod_natapcd", "Catalog Name")
dbutils.widgets.text("schema", "observability", "Schema Name")
dbutils.widgets.text("table_to_monitor", "", "Table to Monitor (catalog.schema.table)")
dbutils.widgets.dropdown("monitor_type", "TimeSeries", ["TimeSeries", "Snapshot", "InferenceLog"], "Monitor Type")
dbutils.widgets.text("timestamp_col", "created_at", "Timestamp Column")
dbutils.widgets.text("granularity", "1 day", "Granularity")
dbutils.widgets.dropdown("refresh_if_exists", "false", ["true", "false"], "Refresh if Monitor Exists")

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
table_to_monitor = dbutils.widgets.get("table_to_monitor")
monitor_type = dbutils.widgets.get("monitor_type")
timestamp_col = dbutils.widgets.get("timestamp_col")
granularity = dbutils.widgets.get("granularity")

print(f"Lakehouse Monitoring Configuration:")
print(f"  Catalog: {catalog}")
print(f"  Schema: {schema}")
print(f"  Table to Monitor: {table_to_monitor}")
print(f"  Monitor Type: {monitor_type}")
print(f"  Timestamp Column: {timestamp_col}")
print(f"  Granularity: {granularity}")
print(f"  User: {user_email}")

## Helper Functions

In [None]:
def get_assets_dir(table_name: str) -> str:
    """
    Generate assets directory path for monitor
    
    Args:
        table_name: Full table name (catalog.schema.table)
    
    Returns:
        Path for monitor assets in Workspace
    """
    # Sanitize table name for path
    safe_table_name = re.sub(r'[^a-zA-Z0-9_.]', '_', table_name)
    return f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{safe_table_name}"

def create_timeseries_monitor(
    table_name: str,
    timestamp_col: str,
    granularities: list = ["1 day"],
    output_schema: str = None,
    baseline_table: str = None,
    slicing_exprs: list = None,
    custom_metrics: list = None
):
    """
    Create a time series monitor for a Delta table

    Args:
        table_name: Full table name (catalog.schema.table)
        timestamp_col: Column to use for time series analysis
        granularities: List of time granularities (e.g., ["1 hour", "1 day"])
        output_schema: Schema to store monitoring metrics (defaults to table's schema)
        baseline_table: Optional baseline table for drift detection
        slicing_exprs: Optional list of SQL expressions for data slicing
        custom_metrics: Optional list of custom metric definitions
    """

    print(f"Creating time series monitor for {table_name}...")

    try:
        assets_dir = get_assets_dir(table_name)
        
        monitor_info = w.quality_monitors.create(
            table_name=table_name,
            assets_dir=assets_dir,
            output_schema_name=output_schema,
            time_series=MonitorTimeSeries(
                timestamp_col=timestamp_col,
                granularities=granularities
            ),
            baseline_table_name=baseline_table,
            slicing_exprs=slicing_exprs,
            custom_metrics=custom_metrics
        )

        print(f"✅ Monitor created successfully!")
        print(f"   Monitor ID: {monitor_info.monitor_version}")
        print(f"   Profile Metrics Table: {monitor_info.profile_metrics_table_name}")
        print(f"   Drift Metrics Table: {monitor_info.drift_metrics_table_name}")
        print(f"   Dashboard ID: {monitor_info.dashboard_id}")
        print(f"   Assets Directory: {assets_dir}")

        return monitor_info

    except Exception as e:
        print(f"❌ Error creating monitor: {e}")
        return None

def create_snapshot_monitor(
    table_name: str,
    output_schema: str = None,
    baseline_table: str = None,
    slicing_exprs: list = None,
    custom_metrics: list = None
):
    """
    Create a snapshot monitor for a Delta table

    Args:
        table_name: Full table name (catalog.schema.table)
        output_schema: Schema to store monitoring metrics
        baseline_table: Optional baseline table for drift detection
        slicing_exprs: Optional list of SQL expressions for data slicing
        custom_metrics: Optional list of custom metric definitions
    """

    print(f"Creating snapshot monitor for {table_name}...")

    try:
        assets_dir = get_assets_dir(table_name)
        
        monitor_info = w.quality_monitors.create(
            table_name=table_name,
            assets_dir=assets_dir,
            output_schema_name=output_schema,
            snapshot=MonitorSnapshot(),
            baseline_table_name=baseline_table,
            slicing_exprs=slicing_exprs,
            custom_metrics=custom_metrics
        )

        print(f"✅ Monitor created successfully!")
        print(f"   Monitor ID: {monitor_info.monitor_version}")
        print(f"   Profile Metrics Table: {monitor_info.profile_metrics_table_name}")
        print(f"   Drift Metrics Table: {monitor_info.drift_metrics_table_name}")
        print(f"   Dashboard ID: {monitor_info.dashboard_id}")
        print(f"   Assets Directory: {assets_dir}")

        return monitor_info

    except Exception as e:
        print(f"❌ Error creating monitor: {e}")
        return None

def create_inference_monitor(
    table_name: str,
    timestamp_col: str,
    model_id_col: str,
    prediction_col: str,
    problem_type: str,
    label_col: str = None,
    granularities: list = ["1 day"],
    output_schema: str = None,
    custom_metrics: list = None
):
    """
    Create an inference log monitor for ML model predictions

    Args:
        table_name: Full table name (catalog.schema.table)
        timestamp_col: Column with prediction timestamp
        model_id_col: Column with model identifier
        prediction_col: Column with model predictions
        problem_type: ML problem type ('classification' or 'regression')
        label_col: Optional column with ground truth labels
        granularities: List of time granularities
        output_schema: Schema to store monitoring metrics
        custom_metrics: Optional list of custom metric definitions
    """

    print(f"Creating inference log monitor for {table_name}...")

    try:
        # Convert problem_type string to enum
        if problem_type.lower() == 'classification':
            problem_type_enum = MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION
        elif problem_type.lower() == 'regression':
            problem_type_enum = MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION
        else:
            raise ValueError(f"Invalid problem_type: {problem_type}. Must be 'classification' or 'regression'")
        
        assets_dir = get_assets_dir(table_name)
        
        monitor_info = w.quality_monitors.create(
            table_name=table_name,
            assets_dir=assets_dir,
            output_schema_name=output_schema,
            inference_log=MonitorInferenceLog(
                problem_type=problem_type_enum,
                prediction_col=prediction_col,
                timestamp_col=timestamp_col,
                granularities=granularities,
                model_id_col=model_id_col,
                label_col=label_col
            ),
            custom_metrics=custom_metrics
        )

        print(f"✅ Monitor created successfully!")
        print(f"   Monitor ID: {monitor_info.monitor_version}")
        print(f"   Profile Metrics Table: {monitor_info.profile_metrics_table_name}")
        print(f"   Drift Metrics Table: {monitor_info.drift_metrics_table_name}")
        print(f"   Dashboard ID: {monitor_info.dashboard_id}")
        print(f"   Assets Directory: {assets_dir}")

        return monitor_info

    except Exception as e:
        print(f"❌ Error creating monitor: {e}")
        return None

def get_monitor_info(table_name: str):
    """Get information about an existing monitor"""
    try:
        monitor_info = w.quality_monitors.get(table_name=table_name)
        return monitor_info
    except Exception as e:
        print(f"⚠️  Monitor not found or error: {e}")
        return None

def refresh_monitor(table_name: str):
    """Manually refresh monitor metrics"""
    try:
        refresh_info = w.quality_monitors.run_refresh(table_name=table_name)
        print(f"✅ Monitor refresh started for {table_name}")
        print(f"   Refresh ID: {refresh_info.refresh_id}")
        return refresh_info
    except Exception as e:
        print(f"❌ Error refreshing monitor: {e}")
        return None

def update_monitor(
    table_name: str,
    output_schema: str = None,
    baseline_table: str = None,
    slicing_exprs: list = None,
    custom_metrics: list = None
):
    """Update an existing monitor's configuration"""
    try:
        monitor_info = w.quality_monitors.update(
            table_name=table_name,
            output_schema_name=output_schema,
            baseline_table_name=baseline_table,
            slicing_exprs=slicing_exprs,
            custom_metrics=custom_metrics
        )
        print(f"✅ Monitor updated for {table_name}")
        return monitor_info
    except Exception as e:
        print(f"❌ Error updating monitor: {e}")
        return None

def delete_monitor(table_name: str):
    """Delete a monitor"""
    try:
        w.quality_monitors.delete(table_name=table_name)
        print(f"✅ Monitor deleted for {table_name}")
    except Exception as e:
        print(f"❌ Error deleting monitor: {e}")

print("✅ Helper functions defined")

## Create Monitor for Specified Table

In [None]:
if table_to_monitor:
    # Check if monitor already exists
    existing_monitor = get_monitor_info(table_to_monitor)

    if existing_monitor:
        print(f"ℹ️  Monitor already exists for {table_to_monitor}")
        print(f"   Status: {existing_monitor.status}")
        print(f"   Monitor Version: {existing_monitor.monitor_version}")
        print(f"   Profile Metrics: {existing_monitor.profile_metrics_table_name}")
        print(f"   Dashboard ID: {existing_monitor.dashboard_id}")

        # Optionally refresh
        refresh = dbutils.widgets.get("refresh_if_exists")
        if refresh and refresh.lower() == "true":
            refresh_monitor(table_to_monitor)
    else:
        # Create new monitor based on type
        output_schema = f"{catalog}.{schema}"

        if monitor_type == "TimeSeries":
            monitor_info = create_timeseries_monitor(
                table_name=table_to_monitor,
                timestamp_col=timestamp_col,
                granularities=[granularity],
                output_schema=output_schema
            )
        elif monitor_type == "Snapshot":
            monitor_info = create_snapshot_monitor(
                table_name=table_to_monitor,
                output_schema=output_schema
            )
        elif monitor_type == "InferenceLog":
            # For InferenceLog, additional widgets are needed
            print(f"⚠️  InferenceLog monitor requires additional configuration:")
            print(f"     - model_id_col: Column with model identifier")
            print(f"     - prediction_col: Column with predictions")
            print(f"     - problem_type: 'classification' or 'regression'")
            print(f"     - label_col (optional): Column with ground truth labels")
            print(f"\n     Please use the create_inference_monitor() function directly")
        else:
            print(f"⚠️  Unknown monitor type: {monitor_type}")
else:
    print("ℹ️  No table specified. Use the 'table_to_monitor' widget to specify a table.")

## Bulk Monitor Setup

In [None]:
def setup_monitors_for_schema(
    catalog_name: str,
    schema_name: str,
    timestamp_col_map: dict = None,
    exclude_tables: list = None,
    output_schema: str = None
):
    """
    Set up monitors for all tables in a schema

    Args:
        catalog_name: Catalog name
        schema_name: Schema name
        timestamp_col_map: Dict mapping table names to timestamp columns
        exclude_tables: List of table names to exclude
        output_schema: Schema for monitor output (defaults to catalog.observability)
    """

    # Get all tables in schema
    tables = spark.sql(f"""
        SHOW TABLES IN {catalog_name}.{schema_name}
    """).collect()

    exclude_tables = exclude_tables or []
    results = []
    
    # Default output schema
    if not output_schema:
        output_schema = f"{catalog_name}.observability"

    for table in tables:
        table_name = table.tableName

        if table_name in exclude_tables:
            print(f"⏭️  Skipping {table_name} (excluded)")
            continue

        full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

        # Check if already monitored
        if get_monitor_info(full_table_name):
            print(f"ℹ️  {table_name} already has a monitor")
            results.append({"table": table_name, "status": "exists"})
            continue

        # Determine timestamp column
        timestamp_col = timestamp_col_map.get(table_name) if timestamp_col_map else None

        if timestamp_col:
            # Create time series monitor
            monitor_info = create_timeseries_monitor(
                table_name=full_table_name,
                timestamp_col=timestamp_col,
                granularities=["1 day"],
                output_schema=output_schema
            )
            if monitor_info:
                results.append({"table": table_name, "status": "created_timeseries"})
            else:
                results.append({"table": table_name, "status": "failed"})
        else:
            # Create snapshot monitor
            monitor_info = create_snapshot_monitor(
                table_name=full_table_name,
                output_schema=output_schema
            )
            if monitor_info:
                results.append({"table": table_name, "status": "created_snapshot"})
            else:
                results.append({"table": table_name, "status": "failed"})

    return results

# Example: Set up monitors for all tables in a schema
# Uncomment and customize as needed
"""
timestamp_columns = {
    "customers": "created_at",
    "orders": "order_date",
    "transactions": "transaction_timestamp"
}

results = setup_monitors_for_schema(
    catalog_name=catalog,
    schema_name="bronze",
    timestamp_col_map=timestamp_columns,
    exclude_tables=["temp_table", "staging_table"],
    output_schema=f"{catalog}.observability"
)

print(f"\n✅ Setup complete for {len(results)} tables")
for result in results:
    print(f"   {result['table']}: {result['status']}")
"""

## Query Monitoring Metrics

In [None]:
def get_profile_metrics(table_name: str, limit: int = 100):
    """Get profile metrics for a monitored table"""

    monitor_info = get_monitor_info(table_name)

    if not monitor_info:
        print(f"⚠️  No monitor found for {table_name}")
        return None

    profile_table = monitor_info.profile_metrics_table_name

    metrics_df = spark.sql(f"""
        SELECT *
        FROM {profile_table}
        ORDER BY window.start DESC
        LIMIT {limit}
    """)

    return metrics_df

def get_drift_metrics(table_name: str, limit: int = 100):
    """Get drift metrics for a monitored table"""

    monitor_info = get_monitor_info(table_name)

    if not monitor_info:
        print(f"⚠️  No monitor found for {table_name}")
        return None

    drift_table = monitor_info.drift_metrics_table_name

    drift_df = spark.sql(f"""
        SELECT *
        FROM {drift_table}
        ORDER BY window.start DESC
        LIMIT {limit}
    """)

    return drift_df

# Example: Query metrics for the monitored table
if table_to_monitor:
    existing_monitor = get_monitor_info(table_to_monitor)
    
    if existing_monitor:
        print(f"\n📊 Profile Metrics for {table_to_monitor}:")
        profile_metrics = get_profile_metrics(table_to_monitor)

        if profile_metrics:
            display(profile_metrics)

        print(f"\n📊 Drift Metrics for {table_to_monitor}:")
        drift_metrics = get_drift_metrics(table_to_monitor)

        if drift_metrics:
            display(drift_metrics)

## Monitoring Analysis Queries

### Column Statistics Over Time

In [None]:
if table_to_monitor:
    monitor_info = get_monitor_info(table_to_monitor)

    if monitor_info:
        spark.sql(f"""
        SELECT
          window.start as time_window,
          column_name,
          null_count,
          null_percentage,
          num_distinct,
          min,
          max,
          avg,
          stddev
        FROM {monitor_info.profile_metrics_table_name}
        ORDER BY window.start DESC, column_name
        """).display()

### Drift Detection Results

In [None]:
if table_to_monitor:
    monitor_info = get_monitor_info(table_to_monitor)

    if monitor_info and monitor_info.drift_metrics_table_name:
        spark.sql(f"""
        SELECT
          window.start as time_window,
          column_name,
          drift_type,
          drift_score,
          threshold,
          CASE
            WHEN drift_score > threshold THEN 'DRIFT_DETECTED'
            ELSE 'NO_DRIFT'
          END as drift_status
        FROM {monitor_info.drift_metrics_table_name}
        WHERE drift_score IS NOT NULL
        ORDER BY drift_score DESC, window.start DESC
        """).display()

## Schedule Monitor Refreshes

### Create Monitoring Job

To schedule regular monitor refreshes, create a Databricks job that:
1. Runs this notebook or calls `w.quality_monitors.run_refresh()` for each monitored table
2. Schedules based on data freshness requirements (hourly, daily, etc.)
3. Sends notifications on failures

Example using Databricks SDK:

```python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
    JobSettings,
    Task,
    NotebookTask,
    CronSchedule,
    JobEmailNotifications
)

w = WorkspaceClient()

job = w.jobs.create(
    name="Lakehouse Monitoring Refresh",
    tasks=[
        Task(
            task_key="refresh_monitors",
            notebook_task=NotebookTask(
                notebook_path="/path/to/this/notebook",
                base_parameters={
                    "catalog": "main",
                    "schema": "observability",
                    "table_to_monitor": "main.bronze.customers",
                    "refresh_if_exists": "true"
                }
            ),
            existing_cluster_id="xxx-xxxxxx-xxxxxxx"
        )
    ],
    schedule=CronSchedule(
        quartz_cron_expression="0 0 * * * ?",
        timezone_id="America/Los_Angeles"
    ),
    email_notifications=JobEmailNotifications(
        on_failure=["data-team@company.com"]
    )
)

print(f"Created job: {job.job_id}")
```

## List All Monitors

In [None]:
def list_all_monitors(catalog_name: str = None, schema_name: str = None):
    """
    List all monitors in the workspace or specific catalog/schema
    """

    try:
        # Using Databricks SDK to list monitors
        monitors = w.quality_monitors.list_monitors()
        
        monitor_list = []
        for monitor in monitors:
            # Filter by catalog/schema if specified
            if catalog_name and not monitor.table_name.startswith(f"{catalog_name}."):
                continue
            if schema_name and not monitor.table_name.startswith(f"{catalog_name}.{schema_name}."):
                continue
                
            monitor_list.append({
                "table_name": monitor.table_name,
                "status": monitor.status,
                "monitor_version": monitor.monitor_version,
                "dashboard_id": monitor.dashboard_id
            })
        
        return spark.createDataFrame(monitor_list) if monitor_list else None
    except Exception as e:
        print(f"⚠️  Error listing monitors: {e}")
        return None

# List all monitors
all_monitors = list_all_monitors(catalog)

if all_monitors:
    print(f"📊 Active Monitors:")
    display(all_monitors)
else:
    print(f"ℹ️  No monitors found in catalog '{catalog}'")

## Example: Create Inference Log Monitor

Uncomment and customize for ML inference monitoring:

In [None]:
"""
# Example: Monitor ML model predictions
inference_monitor = create_inference_monitor(
    table_name="main.ml_models.customer_churn_predictions",
    timestamp_col="prediction_timestamp",
    model_id_col="model_version",
    prediction_col="churn_probability",
    problem_type="classification",
    label_col="actual_churn",  # Optional: for accuracy tracking
    granularities=["30 minutes", "1 day"],
    output_schema="main.observability"
)
"""