# Umgang mit Mirrored Databases / gespiegelte Datenbanken

## 1 | Allgemeine Informationen über Mirrored Databases

- https://www.element61.be/en/resource/fabric-mirroring-replacing-e-etl
- https://learn.microsoft.com/en-us/fabric/mirroring/sql-server
- https://data-mozart.com/how-to-bring-sql-server-data-into-microsoft-fabric/



# 2 | Details zu einer gespiegelten Datenbank ermitteln

In [26]:
# ---------------------------------------------------------
# Mirrored Database Overview – alles in EINER Funktion
# (nur REST, ohne sempy_labs; Helfer als Unterfunktionen)
# ---------------------------------------------------------
from typing import Tuple, Optional, Dict, Any, List
import base64, json
import pandas as pd
from sempy.fabric import FabricRestClient


def get_mirrored_database_overview(
    workspace_id: str,
    item_id: str,
    *,
    include_definition: bool = True,
    include_empty: bool = False,  # wenn True, zeige auch None/""-Werte im Summary
) -> Tuple[pd.DataFrame, pd.DataFrame, int, Optional[int]]:
    """
    Vereinigt:
      - "normales" Item (GET /items/{itemId})  -> Zeiten, CreatedBy/ModifiedBy, Folder (falls vorhanden)
      - Mirrored-DB (GET /mirroredDatabases/{itemId}) -> properties.oneLakeTablesPath, defaultSchema, sqlEndpointProperties.*
      - Mirroring-Status (POST getMirroringStatus)  -> status
      - Tabellenstatus (POST getTablesMirroringStatus) -> schema, tableName, status, metrics.*
      - Optional: Definition (POST getDefinition) dekodiert -> properties.source/target/mountedTables
      - NEU: Source-Infos (Type/Server/DB/Connection) werden in tables_df als Zusatzspalten integriert

    Gibt zurück: (summary_df[Property|Value], tables_df, table_count, None)
    """

    # ---------- Unterfunktionen ----------
    def _now_utc() -> pd.Timestamp:
        return pd.Timestamp.now(tz="UTC")

    def _try_json(resp):
        try:
            return resp.json()
        except Exception:
            return {}

    def _safe_get(d: dict, *path, default=None):
        cur = d
        for p in path:
            if not isinstance(cur, dict):
                return default
            cur = cur.get(p, default)
        return cur

    def _decode_definition_payload(def_resp_json: Dict[str, Any]) -> Dict[str, Any]:
        parts = _safe_get(def_resp_json, "definition", "parts", default=[])
        if not parts:
            return {}
        part = next((p for p in parts if str(p.get("path", "")).lower().endswith(".json")), parts[0])
        b64 = part.get("payload")
        if not b64:
            return {}
        try:
            raw = base64.b64decode(b64).decode("utf-8")
            return json.loads(raw)
        except Exception:
            return {}

    def _fetch_tables_mirroring_status(client: FabricRestClient) -> List[Dict[str, Any]]:
        """POST getTablesMirroringStatus (mit Pagination via continuationUri) → Liste der Zeilen."""
        rows: List[Dict[str, Any]] = []

        def _extend(payload_part):
            if isinstance(payload_part, list):
                rows.extend(payload_part)
            elif isinstance(payload_part, dict):
                block = payload_part.get("data") or payload_part.get("value") or []
                if isinstance(block, list):
                    rows.extend(block)

        resp = client.post(f"v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getTablesMirroringStatus")
        payload = _try_json(resp)
        _extend(payload)

        cont = payload.get("continuationUri") if isinstance(payload, dict) else None
        while cont:
            r2 = client.post(cont)  # komplette URL ist erlaubt
            p2 = _try_json(r2)
            _extend(p2)
            cont = p2.get("continuationUri") if isinstance(p2, dict) else None

        return rows

    def _parse_tables_mirroring_status(rows_or_payload: Dict[str, Any] | List[Dict[str, Any]]) -> pd.DataFrame:
        """Tolerant gegen 'data'/'value'/Liste; flacht 'metrics' auf Spalten ab und fügt Kontext an."""
        if isinstance(rows_or_payload, list):
            rows = rows_or_payload
        elif isinstance(rows_or_payload, dict):
            rows = rows_or_payload.get("data") or rows_or_payload.get("value") or []
        else:
            rows = []

        flat: List[Dict[str, Any]] = []
        for r in rows:
            m = r.get("metrics") or {}
            flat.append({
                "schema":           r.get("sourceSchemaName"),
                "tableName":        r.get("sourceTableName"),
                "status":           r.get("status"),
                "processedBytes":   m.get("processedBytes"),
                "processedRows":    m.get("processedRows"),
                "lastSyncDateTime": m.get("lastSyncDateTime"),
            })

        df = pd.DataFrame(flat)
        if not df.empty:
            if "lastSyncDateTime" in df.columns:
                df["lastSyncDateTime"] = pd.to_datetime(df["lastSyncDateTime"], errors="coerce", utc=True)
                df["minutesSinceLastSync"] = (_now_utc() - df["lastSyncDateTime"]).dt.total_seconds() / 60.0
            for col in ("processedBytes", "processedRows"):
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col], errors="coerce")

            df.insert(0, "workspaceId", workspace_id)
            df.insert(1, "itemId", item_id)
            df = df.sort_values(["schema", "tableName"], kind="stable", na_position="last").reset_index(drop=True)

        return df

    def _resolve_source_info(src_type_code: Optional[str], conn_id: Optional[str]) -> Dict[str, Any]:
        """
        Liefert Quellinfos der Mirrored DB (auch bei eingeschränkten Scopes):
          {
            'sourceType','sourceTypeLabel','connectionId',
            'server','database','connectionType',
            'resolutionPath', 'warning'
          }
        """
        SOURCE_LABELS = {
            "MSSQL": "SQL Server",
            "SqlServer": "SQL Server",
            "AzureSqlDatabase": "Azure SQL Database",
            "Snowflake": "Snowflake",
            "CosmosDb": "Azure Cosmos DB",
            "Dataverse": "Microsoft Dataverse",
            "PostgreSql": "PostgreSQL",
            "MySql": "MySQL",
        }

        info = {
            "sourceType": src_type_code,
            "sourceTypeLabel": SOURCE_LABELS.get(src_type_code, src_type_code),
            "connectionId": conn_id,
            "server": None,
            "database": None,
            "connectionType": None,
            "resolutionPath": "definitionOnly",
            "warning": None,
        }

        if not conn_id:
            return info

        # 1) Bevorzuge item-lokale Connections (oft erlaubt)
        try:
            lst = _try_json(client.get(f"v1/workspaces/{workspace_id}/items/{item_id}/connections"))
            rows = lst if isinstance(lst, list) else lst.get("value", [])
            hit = None
            for r in rows or []:
                if r.get("id") == conn_id or r.get("connectionId") == conn_id:
                    hit = r
                    break
            if hit:
                details = hit.get("connectionDetails") or hit.get("properties") or {}
                path = details.get("path") or ""
                server = database = None
                if ";" in path:
                    server, database = path.split(";", 1)
                elif path:
                    server = path
                info.update({
                    "server": server,
                    "database": database,
                    "connectionType": details.get("type"),
                    "resolutionPath": "itemConnections",
                })
                return info
        except Exception:
            pass

        # 2) Fallback: globale Connections-API (kann 403 liefern)
        try:
            conn = _try_json(client.get(f"v1/connections/{conn_id}"))
            details = conn.get("connectionDetails") or {}
            path = details.get("path") or ""
            server = database = None
            if ";" in path:
                server, database = path.split(";", 1)
            elif path:
                server = path
            info.update({
                "server": server,
                "database": database,
                "connectionType": details.get("type"),
                "resolutionPath": "connectionsApi",
            })
            return info
        except Exception:
            info["warning"] = "InsufficientScopes: Zugriff auf /v1/connections/{id} verweigert."
            return info

    # ---------- Hauptlogik ----------
    client = FabricRestClient()

    # A) Normales Item
    item_norm = _try_json(client.get(f"v1/workspaces/{workspace_id}/items/{item_id}"))
    if (item_norm.get("type") or "").lower() != "mirroreddatabase":
        alt_type = (item_norm.get("type") or "").lower()
        if alt_type and alt_type != "mirroreddatabase":
            raise ValueError(f"Item {item_id} ist kein 'MirroredDatabase' (type={item_norm.get('type')}).")

    folder_id = (
        item_norm.get("workspaceFolderId")
        or _safe_get(item_norm, "workspaceFolder", "id")
        or _safe_get(item_norm, "folder", "id")
    )
    folder_path = (
        _safe_get(item_norm, "workspaceFolder", "path")
        or _safe_get(item_norm, "folder", "path")
        or _safe_get(item_norm, "path")
    )
    folder_name = (
        _safe_get(item_norm, "workspaceFolder", "name")
        or _safe_get(item_norm, "folder", "name")
    )
    created_by_user = _safe_get(item_norm, "createdBy", "user", "displayName")
    created_by_mail = _safe_get(item_norm, "createdBy", "user", "email")
    modified_by_user = _safe_get(item_norm, "modifiedBy", "user", "displayName")
    modified_by_mail = _safe_get(item_norm, "modifiedBy", "user", "email")

    # B) Mirrored-DB (Get mirrored database)
    mdb = _try_json(client.get(f"v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}"))
    props = mdb.get("properties") or {}
    sql_ep_cs   = _safe_get(props, "sqlEndpointProperties", "connectionString")
    sql_ep_id   = _safe_get(props, "sqlEndpointProperties", "id")
    sql_ep_stat = _safe_get(props, "sqlEndpointProperties", "provisioningStatus")

    # C) Mirroring-Status
    status = _try_json(client.post(f"v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getMirroringStatus"))

    # D) Tabellenstatus (inkl. Pagination)
    table_rows = _fetch_tables_mirroring_status(client)
    tables_df = _parse_tables_mirroring_status(table_rows)
    table_count = len(tables_df) if not tables_df.empty else 0
    column_count: Optional[int] = None

    # E) Optional: Definition dekodieren + Source-Infos (Type/Connection/Server/DB)
    src_type = src_conn = src_db = None
    tgt_default_schema = tgt_format = tgt_retention = None
    mounted_tables_count = None
    src_info: Dict[str, Any] = {}
    if include_definition:
        def_raw = _try_json(client.post(f"v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getDefinition"))
        decoded = _decode_definition_payload(def_raw)
        src_type = _safe_get(decoded, "properties", "source", "type")
        src_conn = _safe_get(decoded, "properties", "source", "typeProperties", "connection")
        src_db   = _safe_get(decoded, "properties", "source", "typeProperties", "database")
        tgt_default_schema = _safe_get(decoded, "properties", "target", "typeProperties", "defaultSchema")
        tgt_format         = _safe_get(decoded, "properties", "target", "typeProperties", "format")
        tgt_retention      = _safe_get(decoded, "properties", "target", "typeProperties", "retentionInDays")
        mounted            = _safe_get(decoded, "properties", "mountedTables", default=[])
        mounted_tables_count = len(mounted) if isinstance(mounted, list) else None

        # >>> NEU: Source-Infos aus Connections auflösen (mit Fallbacks)
        src_info = _resolve_source_info(src_type, src_conn)

        # >>> tables_df um Source-Spalten erweitern (auch wenn leer – Spalten entstehen trotzdem)
        if tables_df is None:
            tables_df = pd.DataFrame()
        tables_df["src.type"]           = src_info.get("sourceTypeLabel") or src_info.get("sourceType")
        tables_df["src.typeCode"]       = src_info.get("sourceType")
        tables_df["src.connectionId"]   = src_info.get("connectionId")
        tables_df["src.connectionType"] = src_info.get("connectionType")
        tables_df["src.server"]         = src_info.get("server")
        tables_df["src.database"]       = src_info.get("database")
        tables_df["src.resolutionPath"] = src_info.get("resolutionPath")
        tables_df["src.warning"]        = src_info.get("warning")

    # F) Summary (nur existierende Felder, außer include_empty=True)
    pairs: List[tuple[str, Any]] = []

    def add(label: str, value: Any):
        if include_empty or (value not in (None, "", [], {})):
            pairs.append((label, value))

    # Normales Item
    add("workspaceId",            item_norm.get("workspaceId") or workspace_id)
    add("itemId",                 item_norm.get("id") or item_id)
    add("Item.DisplayName",       item_norm.get("displayName"))
    add("Item.Description",       item_norm.get("description"))
    add("Item.Type",              item_norm.get("type"))
    add("Item.Created",           item_norm.get("createdDateTime"))
    add("Item.LastModified",      item_norm.get("lastModifiedDateTime"))
    add("Item.CreatedBy.DisplayName", created_by_user)
    add("Item.CreatedBy.Email",       created_by_mail)
    add("Item.ModifiedBy.DisplayName", modified_by_user)
    add("Item.ModifiedBy.Email",       modified_by_mail)
    add("Item.WorkspaceFolder.Id",     folder_id)
    add("Item.WorkspaceFolder.Path",   folder_path)
    add("Item.WorkspaceFolder.Name",   folder_name)

    # Mirrored Database (Properties lt. „Get mirrored database“)
    add("Properties.OneLakeTablesPath", _safe_get(props, "oneLakeTablesPath"))
    add("Properties.DefaultSchema",     _safe_get(props, "defaultSchema"))
    add("SqlEndpoint.ConnectionString", sql_ep_cs)
    add("SqlEndpoint.Id",               sql_ep_id)
    add("SqlEndpoint.Provisioning",     sql_ep_stat)

    # Mirroring-Status
    add("Mirror.Status",       status.get("status"))
    add("Mirror.LastSyncTime", status.get("lastSyncTime"))
    add("Mirror.IsPaused",     status.get("isPaused"))
    add("Mirror.Error",        status.get("errorMessage") or status.get("error"))

    # Definition (dekodiert)
    add("Def.Source.Type",            src_type)
    add("Def.Source.ConnectionId",    src_conn)
    add("Def.Source.Database",        src_db)
    add("Def.Target.DefaultSchema",   tgt_default_schema)
    add("Def.Target.Format",          tgt_format)
    add("Def.Target.RetentionInDays", tgt_retention)
    add("Def.MountedTables.Count",    mounted_tables_count)

    # Optional ein paar der aufgelösten Source-Infos auch in die Summary schreiben
    if src_info:
        add("Source.Type.Label",        src_info.get("sourceTypeLabel") or src_info.get("sourceType"))
        add("Source.Server",            src_info.get("server"))
        add("Source.Database",          src_info.get("database"))
        add("Source.ConnectionType",    src_info.get("connectionType"))
        add("Source.ResolutionPath",    src_info.get("resolutionPath"))
        if src_info.get("warning"):
            add("Source.Warning",       src_info.get("warning"))

    # Tabellen-Zahl
    add("Tables.Count", table_count)

    summary_df = pd.DataFrame(pairs, columns=["Property", "Value"])
    return summary_df, tables_df, table_count, column_count


StatementMeta(, fc65f526-0e8c-447d-8df9-4c30e0623993, 28, Finished, Available, Finished)

In [27]:
WSID   = "fa1fd3b0-75bd-4969-9060-1d4aa94d0634"
ITEMID = "c0d2ac7d-ced6-458b-b65e-4b0868905a15"

summary_df, tables_df, t_count, c_count = get_mirrored_database_overview(WSID, ITEMID)

# Ausgabe der Zusammenfassung
summary_df

StatementMeta(, fc65f526-0e8c-447d-8df9-4c30e0623993, 29, Finished, Available, Finished)

Unnamed: 0,Property,Value
0,workspaceId,fa1fd3b0-75bd-4969-9060-1d4aa94d0634
1,itemId,c0d2ac7d-ced6-458b-b65e-4b0868905a15
2,Item.DisplayName,BISTAGE
3,Item.Type,MirroredDatabase
4,Properties.OneLakeTablesPath,https://onelake.dfs.fabric.microsoft.com/fa1fd...
5,Properties.DefaultSchema,dbo
6,SqlEndpoint.ConnectionString,naoauqylgy7ulje7jpwe6hjqiu-wdjr76v5ovuutedadvf...
7,SqlEndpoint.Id,1c537973-a845-4bc7-8a85-6ccaa10e571f
8,SqlEndpoint.Provisioning,Success
9,Mirror.Status,Running


In [28]:
# Tables
tables_df

StatementMeta(, fc65f526-0e8c-447d-8df9-4c30e0623993, 30, Finished, Available, Finished)

Unnamed: 0,workspaceId,itemId,schema,tableName,status,processedBytes,processedRows,lastSyncDateTime,minutesSinceLastSync,src.type,src.typeCode,src.connectionId,src.connectionType,src.server,src.database,src.resolutionPath,src.warning
0,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,CustomerClustering,Replicating,606058,3814,2025-08-20 14:19:24.187940400+00:00,6901.667534,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
1,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,DIM_LW_Product,Replicating,24687048,59593,2025-08-24 23:30:07.603624100+00:00,590.943939,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
2,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,DIM_SAP_Product,Replicating,152710345,131215,2025-08-24 22:31:50.794772800+00:00,649.224086,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
3,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,DIM_Supplier_SAP,Replicating,1774689,3955,2025-08-20 15:02:41.213504900+00:00,6858.383774,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
4,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_Delivery_SAP,Replicating,21322402700,9285790,2025-08-25 07:33:17.012830300+00:00,107.787119,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
5,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_Purchase_SAP,Replicating,7936892160,5266788,2025-08-25 07:22:16.937326+00:00,118.788377,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
6,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_StockMovement_CBC,Replicating,481981268,601203,2025-08-23 13:01:50.725474200+00:00,2659.225241,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
7,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_VendorInvoice_SAP,Replicating,6595600507,6672461,2025-08-25 08:34:43.469896400+00:00,46.346168,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
8,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,tmp,DIM_LW_Pricelist,Replicating,84636773,178375,2025-08-20 14:53:26.964897700+00:00,6867.621251,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,


In [30]:
# Alle Tabellen inkl. deren Status
tables_df

StatementMeta(, fc65f526-0e8c-447d-8df9-4c30e0623993, 32, Finished, Available, Finished)

Unnamed: 0,workspaceId,itemId,schema,tableName,status,processedBytes,processedRows,lastSyncDateTime,minutesSinceLastSync,src.type,src.typeCode,src.connectionId,src.connectionType,src.server,src.database,src.resolutionPath,src.warning
0,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,CustomerClustering,Replicating,606058,3814,2025-08-20 14:19:24.187940400+00:00,6901.706606,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
1,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,DIM_LW_Product,Replicating,24687048,59593,2025-08-24 23:30:07.603624100+00:00,590.983011,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
2,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,DIM_SAP_Product,Replicating,152710345,131215,2025-08-24 22:31:50.794772800+00:00,649.263159,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
3,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,DIM_Supplier_SAP,Replicating,1774689,3955,2025-08-20 15:02:41.213504900+00:00,6858.422846,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
4,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_Delivery_SAP,Replicating,21322402700,9285790,2025-08-25 07:33:17.012830300+00:00,107.826191,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
5,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_Purchase_SAP,Replicating,7936892160,5266788,2025-08-25 07:22:16.937326+00:00,118.827449,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
6,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_StockMovement_CBC,Replicating,481981268,601203,2025-08-23 13:01:50.725474200+00:00,2659.264314,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
7,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,dbo,SAP_VendorInvoice_SAP,Replicating,6595600507,6672461,2025-08-25 08:34:43.469896400+00:00,46.38524,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,
8,fa1fd3b0-75bd-4969-9060-1d4aa94d0634,c0d2ac7d-ced6-458b-b65e-4b0868905a15,tmp,DIM_LW_Pricelist,Replicating,84636773,178375,2025-08-20 14:53:26.964897700+00:00,6867.660323,SQL Server,MSSQL,43871a55-6986-4ad8-893f-bd5af2196080,SQL,SRV-RZ-BI-03,BI_STAGE,itemConnections,


In [32]:
# fehlerhafte Sync
status_norm = tables_df["status"].fillna("").astype(str).str.casefold()
mask = status_norm.ne("replicating")  # alles ≠ "replicating"
bad_tables = tables_df.loc[mask].copy()

print(f"Problematische Tabellen: {len(bad_tables)} von {len(tables_df)}")
cols = [c for c in ["schema","tableName","status","lastSyncDateTime","processedRows","processedBytes","minutesSinceLastSync"]
        if c in bad_tables.columns]
bad_tables = bad_tables[cols].sort_values(["schema","tableName"])
bad_tables

StatementMeta(, fc65f526-0e8c-447d-8df9-4c30e0623993, 34, Finished, Available, Finished)

Problematische Tabellen: 0 von 9


Unnamed: 0,schema,tableName,status,lastSyncDateTime,processedRows,processedBytes,minutesSinceLastSync


# 3 | Abfragen gegen die gespiegelte DB


## 3.1 | Microsoft Lösung

Microsoft schlägt vor, dass man 

- https://learn.microsoft.com/en-us/fabric/mirroring/explore (Explore data in your mirrored database using Microsoft Fabric)
- https://learn.microsoft.com/en-us/fabric/mirroring/explore-onelake-shortcut (Explore data in your mirrored database with notebooks)

## 3.2 | SQL-Endpunkt der gespiegelten Datenbank

### 3.2.1 | Einführung

#### 3.2.1.1 | Kernprinzip in 3 Schritten

1. Connection String des SQL-Endpoints holen (Warehouse, Lakehouse-SQL-Endpoint oder Mirrored DB). In Fabric ist das i. d. R. ein *.fabric.microsoft.com-Server; die Doku zeigt, wo du ihn findest. [Connect to your SQL database in Microsoft Fabric](https://learn.microsoft.com/en-us/fabric/database/sql/connect)
2. AAD-Token im Notebook beziehen – in Fabric via mssparkutils.credentials.getToken(...). [Microsoft Spark Utilities (MSSparkUtils) for Fabric](https://learn.microsoft.com/en-us/fabric/data-engineering/microsoft-spark-utilities)
3. ODBC Driver 18 verwenden und das Access-Token per attrs_before (1256) an pyodbc übergeben; Authentication ist Entra ID (ohne Benutzer/Passwort).  [Warehouse connectivity in Microsoft Fabric](https://learn.microsoft.com/en-us/fabric/data-warehouse/connectivity) & [Connect to and query Azure SQL Database using Python and the pyodbc driver](https://learn.microsoft.com/en-us/azure/azure-sql/database/azure-sql-python-quickstart?view=azuresql)


#### 3.2.1.2 | Lesestoff / How-To-Quellen

- [mssparkutils (Fabric)](https://learn.microsoft.com/en-us/fabric/data-engineering/microsoft-spark-utilities) – offizielle Referenz: so holst du im Notebook Access Tokens. *(mssparkutils.credentials.getToken) * 
- [Warehouse/Lakehouse SQL-Endpoints](https://learn.microsoft.com/en-us/fabric/data-warehouse/connectivity) – Konnektivität: offizielle Seite, nennt ODBC 18+ und Entra-ID-Auth für externe Tools/Skripte.
(gilt auch für Notebook-Code mit pyodbc) 
- SQL-/Endpoint-Connection-Strings in Fabric finden (Patterns wie tcp:<servername>.fabric.microsoft.com,1433, Initial Catalog=...).
(hilft, Server/DB korrekt zu parsen) 
Microsoft Learn

- Azure SQL + Python/pyodbc (passwortlos) – Microsoft-Quickstart mit AAD-Token: zeigt das Prinzip, das in Fabric identisch gilt (TDS-Endpoint).
(Token an pyodbc, kein User/Pass) 
Microsoft Learn
+1

- Entra-ID-Authentication für Warehouse/Lakehouse – Doku (Fabric Docs Repo): Tenant/Workspace-Settings & Berechtigungen, wenn Tokens nicht akzeptiert werden. 
GitHub

- Notebook-Beispiel: SQL-Endpoint mit AAD-Token – Blog mit fertigem Code (Connection-String programmatic, Token, pyodbc).
(genau dein Szenario in Fabric-Notebooks) 
Prodata

- Praxisartikel: SQL-Endpoint aus Notebook abfragen – Überblick & Beispiele (pyodbc/JDBC, Hinweise auf Treiber/Token). 
red-gate.com

- Community-Threads (gelöst): Lakehouse-SQL-Endpoint aus Notebook – Diskussion + akzeptierte Lösung (pyodbc/JDBC Muster). (hilfreich bei Fehlermeldungen) 
community.fabric.microsoft.com


### 3.2.2 | SQL-Endpunkt eines gespiegelten Datenbank testen

#### 3.2.2.1 | Select Abfrage

Diese Methode ist notwendig, um zB Tabellen wie INFORMATION_SCHEMA auszuführen, da dazu keine ShortCuts möglich sind. 

In [9]:
from typing import Dict, Tuple, Optional
import re
import pandas as pd
import pyodbc
from sempy.fabric import FabricRestClient
from notebookutils import mssparkutils

def getData(wsid: str, sychdbid: str, sql: str, *, debug: bool = False) -> pd.DataFrame:
    """
    Führt ein SELECT gegen den SQL-Endpoint einer Mirrored-DB aus (AAD-Token),
    robust für unterschiedliche Connection-String-Formate (ODBC/ADO.NET/JDBC).

    Nur SELECT-Statements (Mirror-Endpoint ist read-only).
    """
    if not isinstance(sql, str) or not sql.strip().lower().startswith("select"):
        raise ValueError("Nur SELECT-Statements sind erlaubt (read-only).")

    client = FabricRestClient()
    md = client.get(f"v1/workspaces/{wsid}/mirroredDatabases/{sychdbid}").json()
    conn_str = (
        (md.get("properties") or {})
        .get("sqlEndpointProperties", {})
        .get("connectionString")
    )
    if not conn_str:
        raise RuntimeError("SQL-Endpoint-ConnectionString nicht gefunden.")

    # ---------- CS-Parsing ----------
    def _parse_kv(cs: str) -> Dict[str, str]:
        kv = {}
        # klassische key=value;key=value
        for part in [p for p in cs.split(";") if p.strip()]:
            if "=" in part:
                k, v = part.split("=", 1)
                kv[k.strip().lower()] = v.strip()
        return kv

    def _strip_tcp(host: str) -> str:
        # z.B. "tcp:server,1433" -> "server,1433"
        return host[4:] if host.lower().startswith("tcp:") else host

    def _from_jdbc(cs: str) -> Tuple[Optional[str], Optional[str]]:
        # jdbc:sqlserver://host[:port][;prop=val...]
        m = re.match(r"jdbc:sqlserver://([^;]+)", cs, flags=re.IGNORECASE)
        server = _strip_tcp(m.group(1).strip()) if m else None
        # ;databaseName=... oder ;database=...
        mdb = re.search(r"(?:;|\b)(databaseName|database)=([^;]+)", cs, flags=re.IGNORECASE)
        database = mdb.group(2).strip() if mdb else None
        return server, database

    def _extract_server_db(cs: str) -> Tuple[Optional[str], Optional[str]]:
        cs_stripped = cs.strip()
        if cs_stripped.lower().startswith("jdbc:sqlserver://"):
            return _from_jdbc(cs_stripped)

        kv = _parse_kv(cs_stripped)
        # mögliche Alias-Schlüssel
        server = (
            kv.get("server")
            or kv.get("data source")
            or kv.get("addr")
            or kv.get("address")
            or kv.get("network address")
            or kv.get("host")
        )
        database = (
            kv.get("database")
            or kv.get("initial catalog")
            or kv.get("databasename")
        )
        # tcp:-Präfix entfernen (z.B. Server=tcp:foo,1433)
        if server:
            server = _strip_tcp(server)
        return server, database

    server, database = _extract_server_db(conn_str)

    # ➕ Fallbacks
    # - Wenn DB fehlt, probiere das Item-Id (Fabric nutzt oft die Item-GUID als DB-Namen)
    if not database:
        database = md.get("id") or sychdbid
    # - Wenn Server Port separat enthält (z. B. ;port=1433), füge ihn als ",1433" an
    if server:
        kv_all = _parse_kv(conn_str)
        port = kv_all.get("port")
        if port and "," not in server:
            server = f"{server},{port}"

    if debug:
        print(f"[DEBUG] Parsed server={server}, database={database}")
        print(f"[DEBUG] Original connectionString: {conn_str}")

    if not server:
        raise RuntimeError(
            "Konnte 'Server' aus dem ConnectionString nicht ermitteln. "
            "Bitte gib mir (anonymisiert) den connectionString, dann erweitere ich den Parser."
        )

    # ---------- AAD-Token ----------
    def _get_sql_token() -> str:
        raw = mssparkutils.credentials.getToken("https://database.windows.net/")
        if isinstance(raw, str):
            return raw
        if hasattr(raw, "token"):
            return raw.token
        if isinstance(raw, dict):
            return raw.get("token") or raw.get("accessToken") or raw.get("access_token")
        raise RuntimeError(f"Unbekanntes Token-Format: {type(raw)}")

    token = _get_sql_token()
    token_bytes = token.encode("utf-16-le")  # ODBC erwartet UTF-16-LE

    # ---------- ODBC-Connection-String ----------
    # Wichtig: KEIN Authentication=... setzen; Token kommt via attrs_before (1256)
    odbc_cs = (
        "Driver={ODBC Driver 18 for SQL Server};"
        "Encrypt=yes;"
        "TrustServerCertificate=no;"
        f"Server={server};"
        f"Database={database};"
    )

    if debug:
        print(f"[DEBUG] ODBC CS: {odbc_cs}")

    # ---------- Ausführen ----------
    conn = pyodbc.connect(odbc_cs, attrs_before={1256: token_bytes})
    try:
        df = pd.read_sql_query(sql, conn)
    finally:
        conn.close()
    return df


StatementMeta(, 7dd8dd66-e7fc-429f-9900-9848695965d3, 11, Finished, Available, Finished)

In [10]:
WSID = "fa1fd3b0-75bd-4969-9060-1d4aa94d0634"
MDB  = "c0d2ac7d-ced6-458b-b65e-4b0868905a15"

df_result = getData(
    WSID, MDB,
    """
    SELECT TOP 100 schema_name = s.name, t.name AS table_name, COUNT(*) AS col_count
    FROM sys.tables t
    JOIN sys.schemas s ON s.schema_id = t.schema_id
    JOIN sys.columns c ON c.object_id = t.object_id
    GROUP BY s.name, t.name
    ORDER BY col_count DESC
    """
)
display(df_result.head(10))


StatementMeta(, 7dd8dd66-e7fc-429f-9900-9848695965d3, 12, Finished, Available, Finished)

RuntimeError: Konnte 'Server' aus dem ConnectionString nicht ermitteln. Bitte gib mir (anonymisiert) den connectionString, dann erweitere ich den Parser.

In [None]:
df = sare