# Fabric Notebook: Monitor ALL Mirrored Database Tables

#### Monitors all tables in ALL mirrored databases in the workspace and FAILS the notebook if any are unhealthy.

#### Prerequisites:
- Notebook identity needs MirroredDatabase.Read.All or Item.Read.All

## IMPORTS & CONSTANTS

In [None]:
import notebookutils
import requests
from datetime import datetime
from typing import Optional

# ------------------------------------------------------------------------------
# Statuses that indicate healthy replication
# Add/remove statuses as needed based on Fabric API documentation
# ------------------------------------------------------------------------------
HEALTHY_STATUSES = ["Replicating", "Running", "Succeeded"]

## CONFIGURATION

In [None]:
# ------------------------------------------------------------------------------
# Get workspace context from notebookutils (no Spark session required)
# ------------------------------------------------------------------------------
RUNTIME_CONTEXT = notebookutils.runtime.context

WORKSPACE_ID = RUNTIME_CONTEXT.get("workspaceId") or RUNTIME_CONTEXT.get("currentWorkspaceId")
WORKSPACE_NAME = RUNTIME_CONTEXT.get("workspaceName") or RUNTIME_CONTEXT.get("currentWorkspaceName") or "Unknown"

if not WORKSPACE_ID:
    raise Exception("Could not determine workspace ID from notebookutils.runtime.context")

# OneLake path root for this workspace
ONELAKE_WORKSPACE_ROOT = f"abfss://{WORKSPACE_ID}@onelake.dfs.fabric.microsoft.com/"

print(f"Workspace ID:   {WORKSPACE_ID}")
print(f"Workspace Name: {WORKSPACE_NAME}")
print(f"OneLake Root:   {ONELAKE_WORKSPACE_ROOT}")

## HELPER FUNCTIONS

In [None]:
def get_fabric_token() -> str:
    """
    Retrieve OAuth token for Fabric API calls.
    """
    return notebookutils.credentials.getToken("https://api.fabric.microsoft.com")


def get_all_mirrored_databases(workspace_id: str) -> list[dict]:
    """
    Returns ALL MirroredDatabase items in the workspace.
    
    Each item has: { "id": "...", "displayName": "...", "type": "MirroredDatabase", ... }
    
    Returns empty list if none found.
    """
    token = get_fabric_token()
    url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items"

    response = requests.get(url, headers={"Authorization": f"Bearer {token}"})
    response.raise_for_status()

    # Filter to only MirroredDatabase items
    mirrors = [
        item for item in response.json().get("value", [])
        if item.get("type") == "MirroredDatabase"
    ]

    return mirrors

In [None]:
def resolve_mirror_tables_path(
    onelake_workspace_root: str,
    mirrored_db_id: str,
    mirrored_db_name: str
) -> Optional[str]:
    """
    Attempts to resolve the Tables folder path for a mirrored database.
    
    Fabric may use either the item ID or the display name with suffix.
    
    Returns the resolved path, or None if neither candidate exists.
    """
    candidates = [
        f"{onelake_workspace_root}{mirrored_db_id}/Tables",
        f"{onelake_workspace_root}{mirrored_db_name}.MountedRelationalDatabase/Tables",
    ]

    for path in candidates:
        try:
            notebookutils.fs.ls(path)
            return path
        except Exception:
            pass

    return None


def discover_tables_in_mirror(tables_root_path: str) -> list[str]:
    """
    Discovers all tables under the mirrored database Tables folder.
    
    Expects structure: Tables/{schema}/{table_name}/
    
    Returns list of 'schema.table' strings.
    """
    tables = []

    # Iterate schemas (top-level folders under Tables)
    for schema_item in notebookutils.fs.ls(tables_root_path):
        if schema_item.isDir:
            schema_name = schema_item.name
            schema_path = f"{tables_root_path}/{schema_name}"

            # Iterate tables within each schema
            for table_item in notebookutils.fs.ls(schema_path):
                if table_item.isDir:
                    tables.append(f"{schema_name}.{table_item.name}")

    return tables

In [None]:
def get_tables_mirroring_status(workspace_id: str, mirrored_db_id: str) -> dict:
    """
    Call the Get Tables Mirroring Status API for a specific mirrored database.
    
    Returns dict keyed by 'schema.table' (lowercase) for easy lookup.
    """
    token = get_fabric_token()
    
    url = (
        f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"
        f"/mirroredDatabases/{mirrored_db_id}/getTablesMirroringStatus"
    )
    
    response = requests.post(url, headers={"Authorization": f"Bearer {token}"})
    response.raise_for_status()
    
    # Build lookup dict keyed by schema.table (lowercase)
    result = {}
    for table in response.json().get("data", []):
        schema = table.get("sourceSchemaName", "")
        name = table.get("sourceTableName", "")
        key = f"{schema}.{name}".lower()
        result[key] = {
            "schema": schema,
            "table": name,
            "status": table.get("status", "Unknown"),
            "metrics": table.get("metrics", {}),
            "error_message": table.get("errorMessage")
        }
    
    return result

In [None]:
def check_tables(
    tables_to_monitor: list[str],
    status_lookup: dict,
    healthy_statuses: list[str]
) -> tuple[list[dict], list[dict]]:
    """
    Check each table in the monitor list against the API response.
    
    Returns: (healthy_tables, failed_tables)
        - healthy_tables: list of dicts with table info
        - failed_tables: list of dicts with table info and failure reason
    """
    healthy = []
    failed = []
    
    for table_spec in tables_to_monitor:
        key = table_spec.lower().strip()
        
        # Table not found in mirroring response
        if key not in status_lookup:
            failed.append({
                "table": table_spec,
                "status": "NOT_FOUND",
                "reason": "Table not found in mirroring status - may be removed or mirroring stopped",
                "last_sync": None
            })
            continue
        
        info = status_lookup[key]
        status = info["status"]
        metrics = info["metrics"]
        last_sync = metrics.get("lastSyncDateTime", "N/A")
        
        if status in healthy_statuses:
            healthy.append({
                "table": table_spec,
                "status": status,
                "last_sync": last_sync,
                "processed_rows": metrics.get("processedRows", 0)
            })
        else:
            failed.append({
                "table": table_spec,
                "status": status,
                "reason": info.get("error_message") or f"Status '{status}' not in healthy list",
                "last_sync": last_sync
            })
    
    return healthy, failed

In [None]:
def print_db_summary(db_name: str, healthy: list[dict], failed: list[dict]) -> None:
    """
    Print a formatted summary for a single mirrored database.
    """
    print(f"\n{'─' * 60}")
    print(f"Mirrored DB: {db_name}")
    print(f"{'─' * 60}")
    
    # Healthy tables
    print(f"\n  ✅ HEALTHY ({len(healthy)}):")
    if healthy:
        for t in healthy:
            print(f"     {t['table']:<40} {t['status']:<15} Last sync: {t['last_sync']}")
    else:
        print("     (none)")
    
    # Failed tables
    print(f"\n  ❌ FAILED ({len(failed)}):")
    if failed:
        for t in failed:
            print(f"     {t['table']:<40} {t['status']:<15}")
            print(f"        Reason: {t['reason']}")
            if t['last_sync']:
                print(f"        Last sync: {t['last_sync']}")
    else:
        print("     (none)")


def print_final_summary(
    total_dbs: int,
    total_healthy: int,
    total_failed: int,
    all_failures: list[dict]
) -> None:
    """
    Print the final aggregated summary across all mirrored databases.
    """
    print("\n" + "=" * 60)
    print(f"FINAL SUMMARY - {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
    print("=" * 60)
    print(f"Mirrored Databases Checked: {total_dbs}")
    print(f"Total Healthy Tables:       {total_healthy}")
    print(f"Total Failed Tables:        {total_failed}")
    print("=" * 60)

## MAIN EXECUTION

In [None]:
# ------------------------------------------------------------------------------
# Step 1: Discover all mirrored databases in the workspace
# ------------------------------------------------------------------------------
mirrored_databases = get_all_mirrored_databases(WORKSPACE_ID)

if not mirrored_databases:
    print("⚠️  No mirrored databases found in this workspace. Nothing to monitor.")
else:
    print(f"Found {len(mirrored_databases)} mirrored database(s) in workspace '{WORKSPACE_NAME}':")
    for db in mirrored_databases:
        print(f"  - {db['displayName']} ({db['id']})")

In [None]:
# ------------------------------------------------------------------------------
# Step 2: Iterate over each mirrored database, check table health
# ------------------------------------------------------------------------------
all_failures = []       # Accumulate failures across all DBs: {"db_name": ..., "table": ..., ...}
total_healthy_count = 0
total_failed_count = 0
dbs_processed = 0

for mirror_item in mirrored_databases:
    db_id = mirror_item["id"]
    db_name = mirror_item["displayName"]
    
    print(f"\n{'=' * 60}")
    print(f"Processing: {db_name}")
    print(f"{'=' * 60}")
    
    # 2a: Resolve the Tables folder path
    tables_root_path = resolve_mirror_tables_path(ONELAKE_WORKSPACE_ROOT, db_id, db_name)
    
    if tables_root_path is None:
        print(f"  ⚠️  Could not resolve Tables path for '{db_name}'. Skipping.")
        # Optionally treat this as a failure
        all_failures.append({
            "db_name": db_name,
            "db_id": db_id,
            "table": "(entire database)",
            "status": "PATH_NOT_FOUND",
            "reason": "Could not resolve Tables folder path in OneLake"
        })
        total_failed_count += 1
        continue
    
    print(f"  Tables path: {tables_root_path}")
    
    # 2b: Discover all tables in this mirrored database
    tables_to_monitor = discover_tables_in_mirror(tables_root_path)
    
    if not tables_to_monitor:
        print(f"  ⚠️  No tables discovered under '{db_name}'. Skipping.")
        continue
    
    print(f"  Discovered {len(tables_to_monitor)} tables.")
    
    # 2c: Get mirroring status from API
    status_lookup = get_tables_mirroring_status(WORKSPACE_ID, db_id)
    print(f"  API returned status for {len(status_lookup)} tables.")
    
    # 2d: Check health of each table
    healthy_tables, failed_tables = check_tables(
        tables_to_monitor,
        status_lookup,
        HEALTHY_STATUSES
    )
    
    # 2e: Print per-DB summary
    print_db_summary(db_name, healthy_tables, failed_tables)
    
    # 2f: Accumulate results
    total_healthy_count += len(healthy_tables)
    total_failed_count += len(failed_tables)
    dbs_processed += 1
    
    # Tag failures with the DB name for final error message
    for f in failed_tables:
        all_failures.append({
            "db_name": db_name,
            "db_id": db_id,
            **f
        })

In [None]:
# ------------------------------------------------------------------------------
# Step 3: Print final summary and fail notebook if any tables are unhealthy
# ------------------------------------------------------------------------------
print_final_summary(dbs_processed, total_healthy_count, total_failed_count, all_failures)

if all_failures:
    # Build detailed error message
    error_lines = [
        f"Mirrored table replication failure detected!",
        f"",
        f"Failed tables ({len(all_failures)}):"
    ]
    for f in all_failures:
        error_lines.append(f"  - [{f['db_name']}] {f['table']}: {f['status']} - {f.get('reason', 'Unknown')}")
    
    error_msg = "\n".join(error_lines)
    
    # Raise exception to fail the notebook (and any calling pipeline)
    raise Exception(error_msg)

print("\n✅ All monitored tables across all mirrored databases are healthy - notebook completed successfully.")