Notebook: Sync Dataverse to OneLake

Author: Jen Beiser

Runtime: PySpark (Fabric Notebook)

Frequency: Every 12 hours (Fabric Job Scheduler)

In [34]:
import os
import json
import time
import uuid
import random
import requests
from typing import Dict, Any, List, Tuple, Optional, Set

from pyspark.sql import SparkSession, Row, functions 
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 36, Finished, Available, Finished)

## Config

In [35]:
API_VERSION = "v9.2"
SCHEMA = "crm_meta"

ENVIRONMENTS = {
    "UAT": "https://org5c6f9bc3.api.crm.dynamics.com",
    # "PROD": "https://<prod>.api.crm.dynamics.com",
}

TARGET_ENV = "UAT"
ORG_BASE = ENVIRONMENTS[TARGET_ENV]

LOCALES = [1033, 1036, 3082]  # English, French, Spanish


# --- Entra ID (Service Principal) ---
TENANT_ID = os.environ.get("DV_TENANT_ID", "<TENANT_ID>")
CLIENT_ID = os.environ.get("DV_CLIENT_ID", "<CLIENT_ID>")
CLIENT_SECRET = os.environ.get("DV_CLIENT_SECRET", None)

MODE = os.environ.get("DV_MODE", "incremental")  # "full" or "incremental"


# Control + logs
CONTROL_TABLE = f"{SCHEMA}.retrievemetadatachanges_state"
RUN_LOG_TABLE = f"{SCHEMA}.run_logs"

# Target tables (Delta tables)
TBL_GLOBAL = f"{SCHEMA}.globaloptionsetmetadata"
TBL_OPTIONS = f"{SCHEMA}.optionsetmetadata"
TBL_STATE = f"{SCHEMA}.statemetadata"
TBL_STATUS = f"{SCHEMA}.statusmetadata"
TBL_TARGET = f"{SCHEMA}.targetmetadata"


StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 37, Finished, Available, Finished)

## Schemas

In [36]:
control_schema = StructType([
    StructField("environment_name", StringType(), True),
    StructField("scope_name",       StringType(), True),
    StructField("server_version_stamp", StringType(), True),
    StructField("last_run_ts",      TimestampType(), True),
])

global_schema = StructType([
    StructField("optionset_scope",        StringType(),  True),
    StructField("optionset_name",         StringType(),  True),
    StructField("entity_logical_name",    StringType(),  True),
    StructField("attribute_logical_name", StringType(),  True),
    StructField("attribute_metadata_id",  StringType(),  True),
    StructField("option_value",           IntegerType(), True),
    StructField("language_code",          IntegerType(), True),
    StructField("label",                  StringType(),  True),
])

attr_schema = global_schema

state_schema = StructType([
    StructField("entity_logical_name",    StringType(),  True),
    StructField("attribute_logical_name", StringType(),  True),
    StructField("attribute_metadata_id",  StringType(),  True),
    StructField("option_value",           IntegerType(), True),
    StructField("language_code",          IntegerType(), True),
    StructField("label",                  StringType(),  True),
])

status_schema = state_schema

target_schema = StructType([
    StructField("entity_logical_name", StringType(), True),
    StructField("language_code",       IntegerType(), True),
    StructField("label",               StringType(), True),
])

run_log_schema = StructType([
    StructField("run_id",               StringType(),   True),
    StructField("environment_name",     StringType(),   True),
    StructField("mode",                 StringType(),   True),
    StructField("start_ts",             TimestampType(),True),
    StructField("end_ts",               TimestampType(),True),
    StructField("duration_seconds",     LongType(),     True),
    StructField("global_row_count",     LongType(),     True),
    StructField("options_row_count",    LongType(),     True),
    StructField("state_row_count",      LongType(),     True),
    StructField("status_row_count",     LongType(),     True),
    StructField("target_row_count",     LongType(),     True),
    StructField("deleted_attributes",   IntegerType(),  True),
    StructField("server_version_stamp", StringType(),   True),
    StructField("changed_entities",     IntegerType(),  True),
])

# Ensure schema + tables exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")
spark.createDataFrame([], schema=control_schema).write.mode("ignore").format("delta").saveAsTable(CONTROL_TABLE)
spark.createDataFrame([], schema=run_log_schema).write.mode("ignore").format("delta").saveAsTable(RUN_LOG_TABLE)

StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 38, Finished, Available, Finished)

## Helpers

In [37]:
def _sleep_with_jitter(base_seconds: float):
    time.sleep(base_seconds + random.random() * 0.25)

def http_request_with_retry(method: str, url: str, *, headers: Dict[str, str], params=None, data=None, json_body=None,
                            timeout=60, max_attempts=6) -> requests.Response:
    attempt = 0
    while True:
        attempt += 1
        try:
            r = requests.request(
                method,
                url,
                headers=headers,
                params=params,
                data=data,
                json=json_body,
                timeout=timeout,
            )

            # Throttling
            if r.status_code == 429:
                retry_after = r.headers.get("Retry-After")
                wait = float(retry_after) if retry_after else min(2 ** attempt, 30)
                if attempt >= max_attempts:
                    r.raise_for_status()
                _sleep_with_jitter(wait)
                continue

            # Transient server errors
            if r.status_code in (500, 502, 503, 504):
                if attempt >= max_attempts:
                    r.raise_for_status()
                _sleep_with_jitter(min(2 ** attempt, 30))
                continue

            r.raise_for_status()
            return r

        except requests.RequestException:
            if attempt >= max_attempts:
                raise
            _sleep_with_jitter(min(2 ** attempt, 30))

def dv_headers(token: str) -> Dict[str, str]:
    return {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
        "OData-MaxVersion": "4.0",
        "OData-Version": "4.0",
    }

def dv_get_json(path_or_url: str, token: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
    # Accept either a relative path or a full nextLink URL
    if path_or_url.startswith("http"):
        url = path_or_url
    else:
        url = f"{ORG_BASE}/api/data/{API_VERSION}/{path_or_url.lstrip('/')}"
    r = http_request_with_retry("GET", url, headers=dv_headers(token), params=params or {})
    return r.json()

def dv_get_all(path: str, token: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
    """Follows @odata.nextLink and returns full aggregated 'value'."""
    out: List[Dict[str, Any]] = []
    j = dv_get_json(path, token, params=params)
    out.extend(j.get("value", []) or [])
    next_link = j.get("@odata.nextLink")
    while next_link:
        j = dv_get_json(next_link, token)
        out.extend(j.get("value", []) or [])
        next_link = j.get("@odata.nextLink")
    return out

StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 39, Finished, Available, Finished)

## Auth

In [38]:
def get_token() -> str:
    if not CLIENT_SECRET:
        raise RuntimeError("CLIENT_SECRET is missing. Set DV_CLIENT_SECRET env var or use a secret store.")
    token_url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "scope": f"{ORG_BASE}/.default"
    }
    r = http_request_with_retry("POST", token_url, headers={"Accept": "application/json"}, data=data)
    return r.json()["access_token"]



StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 40, Finished, Available, Finished)

## Control table (ServerVersionStamp)

In [39]:
def get_server_stamp(env_name: str, scope: str) -> Optional[str]:
    df = spark.table(CONTROL_TABLE).where(
        (col("environment_name") == env_name) & (col("scope_name") == scope)
    )
    rows = df.limit(1).collect()
    return rows[0]["server_version_stamp"] if rows else None

def upsert_server_stamp(env_name: str, scope: str, stamp: str):
    stamp_escaped = (stamp or "").replace("'", "''")
    spark.sql(f"""
      MERGE INTO {CONTROL_TABLE} AS t
      USING (SELECT '{env_name}' AS environment_name,
                    '{scope}'     AS scope_name,
                    '{stamp_escaped}' AS server_version_stamp,
                    current_timestamp() AS last_run_ts) AS s
      ON t.environment_name = s.environment_name AND t.scope_name = s.scope_name
      WHEN MATCHED THEN UPDATE SET
          t.server_version_stamp = s.server_version_stamp,
          t.last_run_ts = s.last_run_ts
      WHEN NOT MATCHED THEN INSERT *
    """)

StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 41, Finished, Available, Finished)

## RetrieveMetadataChanges (incremental marker)

In [40]:
ENUM_FQN = "Microsoft.Dynamics.CRM.DeletedMetadataFilters"

def build_simple_entity_query() -> dict:
    return {
        "@odata.type": "Microsoft.Dynamics.CRM.EntityQueryExpression",
        "Properties": {"PropertyNames": ["LogicalName", "DisplayName", "Attributes"]},
        "Criteria": {"FilterOperator": "And", "Conditions": []},
        "AttributeQuery": {
            "@odata.type": "Microsoft.Dynamics.CRM.AttributeQueryExpression",
            "Properties": {"PropertyNames": ["LogicalName", "AttributeType"]},
            "Criteria": {"FilterOperator": "And", "Conditions": []},
        },
    }

def retrieve_metadata_changes(token: str, query: dict, client_version_stamp: Optional[str] = None,
                              deleted_filters: Optional[str] = None) -> Dict[str, Any]:
    parts = ["RetrieveMetadataChanges(Query=@p1"]
    params = {"@p1": json.dumps(query, separators=(",", ":"))}

    if client_version_stamp:
        parts.append("ClientVersionStamp=@p2")
        params["@p2"] = f"'{client_version_stamp}'"

    if deleted_filters:
        parts.append("DeletedMetadataFilters=@p3")
        params["@p3"] = f"{ENUM_FQN}'{deleted_filters}'"

    path = ",".join(parts) + ")"
    url = f"{ORG_BASE}/api/data/{API_VERSION}/{path}"
    r = http_request_with_retry("GET", url, headers=dv_headers(token), params=params)
    return r.json()

def extract_changed_entity_logical_names(changed_entities: List[Dict[str, Any]]) -> List[str]:
    seen = set()
    out = []
    for e in changed_entities or []:
        n = e.get("LogicalName")
        if n and n not in seen:
            seen.add(n)
            out.append(n)
    return out

def parse_deleted_attribute_ids(resp: Dict[str, Any]) -> Set[str]:
    deleted_attr_ids: Set[str] = set()
    deleted = resp.get("DeletedMetadata") or {}

    # Common: {"Attribute": [ {MetadataId: ...}, ... ]}
    for item in (deleted.get("Attribute") or []):
        mid = item.get("MetadataId") or item.get("AttributeId") or item.get("ObjectId")
        if mid:
            deleted_attr_ids.add(mid)

    # Defensive: {"DeletedMetadataCollection": [ {DeletedMetadataFilters: "Attribute", MetadataId: ...}, ... ]}
    for item in (deleted.get("DeletedMetadataCollection") or []):
        if item.get("DeletedMetadataFilters") in ("Attribute", "Attributes"):
            mid = item.get("MetadataId") or item.get("AttributeId") or item.get("ObjectId")
            if mid:
                deleted_attr_ids.add(mid)

    return deleted_attr_ids

def retrieve_metadata_incremental(token: str) -> Tuple[str, List[str], Set[str]]:
    prev = get_server_stamp(TARGET_ENV, "attributes")
    if not prev:
        raise RuntimeError("No previous ServerVersionStamp found. Run MODE=full once to bootstrap.")

    resp = retrieve_metadata_changes(
        token,
        build_simple_entity_query(),
        client_version_stamp=prev,
        deleted_filters="Attribute",  # singular
    )

    new_stamp = resp.get("ServerVersionStamp")
    if not new_stamp:
        raise RuntimeError(f"RetrieveMetadataChanges returned no ServerVersionStamp. Keys: {list(resp.keys())}")

    changed_entities = resp.get("EntityMetadata") or []
    changed_entity_names = extract_changed_entity_logical_names(changed_entities)
    deleted_attr_ids = parse_deleted_attribute_ids(resp)
    return new_stamp, changed_entity_names, deleted_attr_ids


StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 42, Finished, Available, Finished)

## Extractors

In [41]:
def extract_entity_display_labels(token: str, entity_logical: str) -> List[Dict[str, Any]]:
    ent = dv_get_json(
        f"EntityDefinitions(LogicalName='{entity_logical}')",
        token,
        params={
            "$select": "LogicalName,DisplayName",
            "LabelLanguages": ",".join(str(l) for l in LOCALES),
        },
    )
    rows = []
    for ll in ((ent.get("DisplayName") or {}).get("LocalizedLabels") or []):
        if ll.get("LanguageCode") in LOCALES:
            rows.append({
                "entity_logical_name": entity_logical,
                "language_code": ll.get("LanguageCode"),
                "label": ll.get("Label"),
            })
    return rows

def extract_global_options(token: str) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    defs = dv_get_all("GlobalOptionSetDefinitions", token)

    for g in defs:
        name = g.get("Name")
        metadata_id = g.get("MetadataId")
        if not metadata_id:
            continue

        full = dv_get_json(
            f"GlobalOptionSetDefinitions({metadata_id})",
            token,
            params={"$format": "application/json;odata.metadata=none"},
        )
        for opt in (full.get("Options") or []):
            labels = ((opt.get("Label") or {}).get("LocalizedLabels") or [])
            for ll in labels:
                if ll.get("LanguageCode") in LOCALES:
                    out.append({
                        "optionset_scope": "Global",
                        "optionset_name": name,
                        "entity_logical_name": None,
                        "attribute_logical_name": None,
                        "attribute_metadata_id": None,
                        "option_value": opt.get("Value"),
                        "language_code": ll.get("LanguageCode"),
                        "label": ll.get("Label"),
                    })
    return out

def extract_entity_attribute_options(token: str, entity_logical: str) -> Tuple[List[Dict[str, Any]],
                                                                             List[Dict[str, Any]],
                                                                             List[Dict[str, Any]]]:
    """Returns: picklist_rows, state_rows, status_rows for a single entity."""
    picklist_rows: List[Dict[str, Any]] = []
    state_rows: List[Dict[str, Any]] = []
    status_rows: List[Dict[str, Any]] = []

    # Picklists (single & multi) with expand
    for cast in [
        "Microsoft.Dynamics.CRM.PicklistAttributeMetadata",
        "Microsoft.Dynamics.CRM.MultiSelectPicklistAttributeMetadata",
    ]:
        attrs = dv_get_all(
            f"EntityDefinitions(LogicalName='{entity_logical}')/Attributes/{cast}",
            token,
            params={"$select": "LogicalName,MetadataId", "$expand": "OptionSet,GlobalOptionSet"},
        )
        for a in attrs:
            attr_logical = a.get("LogicalName")
            attr_id = a.get("MetadataId")

            # Local OptionSet
            for opt in (((a.get("OptionSet") or {}).get("Options") or [])):
                labels = ((opt.get("Label") or {}).get("LocalizedLabels") or [])
                for ll in labels:
                    if ll.get("LanguageCode") in LOCALES:
                        picklist_rows.append({
                            "optionset_scope": "Attribute",
                            "optionset_name": None,
                            "entity_logical_name": entity_logical,
                            "attribute_logical_name": attr_logical,
                            "attribute_metadata_id": attr_id,
                            "option_value": opt.get("Value"),
                            "language_code": ll.get("LanguageCode"),
                            "label": ll.get("Label"),
                        })

            # GlobalOptionSet attached
            for opt in (((a.get("GlobalOptionSet") or {}).get("Options") or [])):
                labels = ((opt.get("Label") or {}).get("LocalizedLabels") or [])
                for ll in labels:
                    if ll.get("LanguageCode") in LOCALES:
                        picklist_rows.append({
                            "optionset_scope": "GlobalAttached",
                            "optionset_name": None,
                            "entity_logical_name": entity_logical,
                            "attribute_logical_name": attr_logical,
                            "attribute_metadata_id": attr_id,
                            "option_value": opt.get("Value"),
                            "language_code": ll.get("LanguageCode"),
                            "label": ll.get("Label"),
                        })

            

    # StateAttributeMetadata: must expand OptionSet Options
    states = dv_get_all(
        f"EntityDefinitions(LogicalName='{entity_logical}')/Attributes/Microsoft.Dynamics.CRM.StateAttributeMetadata",
        token,
        params={"$select": "LogicalName,MetadataId", "$expand": "OptionSet($select=Options)"},
    )
    for a in states:
        attr_logical = a.get("LogicalName")
        attr_id = a.get("MetadataId")
        for opt in (((a.get("OptionSet") or {}).get("Options") or [])):
            labels = ((opt.get("Label") or {}).get("LocalizedLabels") or [])
            for ll in labels:
                if ll.get("LanguageCode") in LOCALES:
                    state_rows.append({
                        "entity_logical_name": entity_logical,
                        "attribute_logical_name": attr_logical,
                        "attribute_metadata_id": attr_id,
                        "option_value": opt.get("Value"),
                        "language_code": ll.get("LanguageCode"),
                        "label": ll.get("Label"),
                    })

    # StatusAttributeMetadata: must expand OptionSet Options
    statuses = dv_get_all(
        f"EntityDefinitions(LogicalName='{entity_logical}')/Attributes/Microsoft.Dynamics.CRM.StatusAttributeMetadata",
        token,
        params={"$select": "LogicalName,MetadataId", "$expand": "OptionSet($select=Options)"},
    )
    for a in statuses:
        attr_logical = a.get("LogicalName")
        attr_id = a.get("MetadataId")
        for opt in (((a.get("OptionSet") or {}).get("Options") or [])):
            labels = ((opt.get("Label") or {}).get("LocalizedLabels") or [])
            for ll in labels:
                if ll.get("LanguageCode") in LOCALES:
                    status_rows.append({
                        "entity_logical_name": entity_logical,
                        "attribute_logical_name": attr_logical,
                        "attribute_metadata_id": attr_id,
                        "option_value": opt.get("Value"),
                        "language_code": ll.get("LanguageCode"),
                        "label": ll.get("Label"),
                    })

    return picklist_rows, state_rows, status_rows


def full_extract(token: str) -> Tuple[List[Dict[str, Any]],
                                     List[Dict[str, Any]],
                                     List[Dict[str, Any]],
                                     List[Dict[str, Any]],
                                     List[Dict[str, Any]]]:
    # Entities (paged)
    entities = dv_get_all(
        "EntityDefinitions",
        token,
        params={"$select": "LogicalName,DisplayName", "LabelLanguages": ",".join(str(l) for l in LOCALES)},
    )
    entity_logicals = [e.get("LogicalName") for e in entities if e.get("LogicalName")]

    # Global options
    global_rows = extract_global_options(token)

    # Target display labels
    target_rows: List[Dict[str, Any]] = []
    for e in entities:
        logical = e.get("LogicalName")
        if not logical:
            continue
        for ll in ((e.get("DisplayName") or {}).get("LocalizedLabels") or []):
            if ll.get("LanguageCode") in LOCALES:
                target_rows.append({
                    "entity_logical_name": logical,
                    "language_code": ll.get("LanguageCode"),
                    "label": ll.get("Label"),
                })

    # Attribute-level
    picklist_rows: List[Dict[str, Any]] = []
    state_rows: List[Dict[str, Any]] = []
    status_rows: List[Dict[str, Any]] = []

    for logical in entity_logicals:
        p, s, st = extract_entity_attribute_options(token, logical)
        picklist_rows.extend(p)
        state_rows.extend(s)
        status_rows.extend(st)

    return global_rows, picklist_rows, state_rows, status_rows, target_rows


def incremental_extract(token: str, changed_entity_logicals: List[str]) -> Tuple[List[Dict[str, Any]],
                                                                                List[Dict[str, Any]],
                                                                                List[Dict[str, Any]],
                                                                                List[Dict[str, Any]]]:
    picklist_rows: List[Dict[str, Any]] = []
    state_rows: List[Dict[str, Any]] = []
    status_rows: List[Dict[str, Any]] = []
    target_rows: List[Dict[str, Any]] = []

    for logical in changed_entity_logicals:
        # entity display labels for targetmetadata
        target_rows.extend(extract_entity_display_labels(token, logical))

        # attribute-level rows
        p, s, st = extract_entity_attribute_options(token, logical)
        picklist_rows.extend(p)
        state_rows.extend(s)
        status_rows.extend(st)

    return picklist_rows, state_rows, status_rows, target_rows

StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 43, Finished, Available, Finished)

## Writers

In [42]:

from typing import Any, Dict, List, Set
from delta.tables import DeltaTable
from pyspark.sql import functions as F, Window, Row
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame


def _merge_delete_keys(table_name: str, delete_keys_df: DataFrame, key_cols: List[str]) -> None:
    """
    Perform a Delta MERGE ... WHEN MATCHED THEN DELETE for the given key columns.
    """
    if delete_keys_df.rdd.isEmpty():
        return
    tgt_delta = DeltaTable.forName(spark, table_name)
    # Build ON clause dynamically from key columns
    on_parts = [f"t.{c} = d.{c}" for c in key_cols]
    on_expr = " AND ".join(on_parts)
    tgt_delta.alias("t").merge(
        delete_keys_df.alias("d"),
        on_expr
    ).whenMatchedDelete().execute()

def write_delta_init(rows: List[Dict[str, Any]], table_name: str, schema_struct: StructType):
    df = spark.createDataFrame(rows, schema=schema_struct)
    # Cast safety
    for c, t in [("option_value", "int"), ("language_code", "int")]:
        if c in df.columns:
            df = df.withColumn(c, F.col(c).cast(t))
    df.write.mode("overwrite").format("delta").saveAsTable(table_name)

def write_delta_incremental(
    picklist_rows,
    state_rows,
    status_rows,
    target_rows,
    deleted_attr_ids: Set[str]
):
    # -------------------------
    # 1) PICKLISTS (OptionSet)
    # -------------------------
    if picklist_rows:
        pick_df = spark.createDataFrame(picklist_rows, schema=attr_schema)

        key_cols = [
            "entity_logical_name",
            "attribute_logical_name",
            "attribute_metadata_id",
            "option_value",
            "language_code",
        ]

        # Priority: Attribute (best) > GlobalAttached (fallback) > everything else
        pick_df = pick_df.withColumn(
            "_scope_pri",
            F.when(F.col("optionset_scope") == "Attribute", F.lit(0))
             .when(F.col("optionset_scope") == "GlobalAttached", F.lit(1))
             .otherwise(F.lit(2))
        )

        w = Window.partitionBy(*key_cols).orderBy(F.col("_scope_pri").asc())

        pick_df = (
            pick_df
            .withColumn("_rn", F.row_number().over(w))
            .where(F.col("_rn") == 1)
            .drop("_rn", "_scope_pri")
        )

        pick_df.createOrReplaceTempView("picklist_rows")

        spark.sql(f"""
        MERGE INTO {TBL_OPTIONS} AS tgt
        USING picklist_rows AS src
        ON  tgt.entity_logical_name    = src.entity_logical_name
        AND tgt.attribute_logical_name = src.attribute_logical_name
        AND tgt.attribute_metadata_id  = src.attribute_metadata_id
        AND tgt.option_value           = src.option_value
        AND tgt.language_code          = src.language_code
        WHEN MATCHED THEN UPDATE SET
            tgt.label           = src.label,
            tgt.optionset_scope = src.optionset_scope
        WHEN NOT MATCHED THEN INSERT *
        """)

    # Anti-join delete for picklists (no subqueries)
    current_opts_df = spark.createDataFrame(
        picklist_rows, schema=attr_schema
    ).select("attribute_metadata_id", "option_value", "language_code").dropDuplicates()

    if not current_opts_df.rdd.isEmpty() or deleted_attr_ids:
        # Limit the scope to attributes touched in this batch
        touched_attr_ids_df = current_opts_df.select("attribute_metadata_id").distinct()
        target_keys_df = spark.table(TBL_OPTIONS).select(
            "attribute_metadata_id", "option_value", "language_code"
        )
        if not touched_attr_ids_df.rdd.isEmpty():
            target_keys_df = target_keys_df.join(
                F.broadcast(touched_attr_ids_df), on="attribute_metadata_id", how="inner"
            )

        # Keys present in target but missing from current -> delete
        delete_keys_df = target_keys_df.join(
            F.broadcast(current_opts_df),
            on=["attribute_metadata_id", "option_value", "language_code"],
            how="left_anti",
        )

        # Also delete all options under attributes Dataverse reported deleted
        if deleted_attr_ids:
            deleted_attr_ids_df = spark.createDataFrame(
                [(x,) for x in deleted_attr_ids], ["attribute_metadata_id"]
            )
            delete_attr_rows_df = spark.table(TBL_OPTIONS).select(
                "attribute_metadata_id", "option_value", "language_code"
            ).join(F.broadcast(deleted_attr_ids_df), on="attribute_metadata_id", how="inner")

            delete_keys_df = delete_keys_df.union(delete_attr_rows_df).dropDuplicates()

        _merge_delete_keys(TBL_OPTIONS, delete_keys_df, ["attribute_metadata_id", "option_value", "language_code"])

    # --------------
    # 2) STATE labels
    # --------------
    if state_rows:
        spark.createDataFrame(state_rows, schema=state_schema).createOrReplaceTempView("state_rows")
        spark.sql(f"""
          MERGE INTO {TBL_STATE} AS tgt
          USING state_rows AS src
          ON  tgt.entity_logical_name    = src.entity_logical_name
          AND tgt.attribute_logical_name = src.attribute_logical_name
          AND tgt.attribute_metadata_id  = src.attribute_metadata_id
          AND tgt.option_value           = src.option_value
          AND tgt.language_code          = src.language_code
          WHEN MATCHED THEN UPDATE SET tgt.label = src.label
          WHEN NOT MATCHED THEN INSERT *
        """)

        # Delete stale state labels (anti-join)
        current_state_keys = spark.createDataFrame(state_rows, schema=state_schema) \
            .select("attribute_metadata_id", "option_value", "language_code").dropDuplicates()

        touched_attr_ids_df = current_state_keys.select("attribute_metadata_id").distinct()
        state_target_keys = spark.table(TBL_STATE).select(
            "attribute_metadata_id", "option_value", "language_code"
        )
        if not touched_attr_ids_df.rdd.isEmpty():
            state_target_keys = state_target_keys.join(
                F.broadcast(touched_attr_ids_df), on="attribute_metadata_id", how="inner"
            )

        delete_state_keys = state_target_keys.join(
            F.broadcast(current_state_keys),
            on=["attribute_metadata_id", "option_value", "language_code"],
            how="left_anti",
        )

        # Attributes deleted -> delete all state labels for them
        if deleted_attr_ids:
            deleted_attr_ids_df = spark.createDataFrame([(x,) for x in deleted_attr_ids], ["attribute_metadata_id"])
            del_all_state = spark.table(TBL_STATE).select(
                "attribute_metadata_id", "option_value", "language_code"
            ).join(F.broadcast(deleted_attr_ids_df), on="attribute_metadata_id", how="inner")
            delete_state_keys = delete_state_keys.union(del_all_state).dropDuplicates()

        _merge_delete_keys(TBL_STATE, delete_state_keys, ["attribute_metadata_id", "option_value", "language_code"])

    # ---------------
    # 3) STATUS labels
    # ---------------
    if status_rows:
        spark.createDataFrame(status_rows, schema=status_schema).createOrReplaceTempView("status_rows")
        spark.sql(f"""
          MERGE INTO {TBL_STATUS} AS tgt
          USING status_rows AS src
          ON  tgt.entity_logical_name    = src.entity_logical_name
          AND tgt.attribute_logical_name = src.attribute_logical_name
          AND tgt.attribute_metadata_id  = src.attribute_metadata_id
          AND tgt.option_value           = src.option_value
          AND tgt.language_code          = src.language_code
          WHEN MATCHED THEN UPDATE SET tgt.label = src.label
          WHEN NOT MATCHED THEN INSERT *
        """)

        # Delete stale status labels (anti-join)
        current_status_keys = spark.createDataFrame(status_rows, schema=status_schema) \
            .select("attribute_metadata_id", "option_value", "language_code").dropDuplicates()

        touched_attr_ids_df = current_status_keys.select("attribute_metadata_id").distinct()
        status_target_keys = spark.table(TBL_STATUS).select(
            "attribute_metadata_id", "option_value", "language_code"
        )
        if not touched_attr_ids_df.rdd.isEmpty():
            status_target_keys = status_target_keys.join(
                F.broadcast(touched_attr_ids_df), on="attribute_metadata_id", how="inner"
            )

        delete_status_keys = status_target_keys.join(
            F.broadcast(current_status_keys),
            on=["attribute_metadata_id", "option_value", "language_code"],
            how="left_anti",
        )

        if deleted_attr_ids:
            deleted_attr_ids_df = spark.createDataFrame([(x,) for x in deleted_attr_ids], ["attribute_metadata_id"])
            del_all_status = spark.table(TBL_STATUS).select(
                "attribute_metadata_id", "option_value", "language_code"
            ).join(F.broadcast(deleted_attr_ids_df), on="attribute_metadata_id", how="inner")
            delete_status_keys = delete_status_keys.union(del_all_status).dropDuplicates()

        _merge_delete_keys(TBL_STATUS, delete_status_keys, ["attribute_metadata_id", "option_value", "language_code"])

    # -------------------------
    # 4) TARGET entity labels
    # -------------------------
    if target_rows:
        spark.createDataFrame(target_rows, schema=target_schema).createOrReplaceTempView("target_rows")
        spark.sql(f"""
          MERGE INTO {TBL_TARGET} AS tgt
          USING target_rows AS src
          ON  tgt.entity_logical_name = src.entity_logical_name
          AND tgt.language_code       = src.language_code
          WHEN MATCHED THEN UPDATE SET tgt.label = src.label
          WHEN NOT MATCHED THEN INSERT *
        """)

        # Delete stale entity labels (anti-join on (entity_logical_name, language_code))
        current_target_keys = spark.createDataFrame(target_rows, schema=target_schema) \
            .select("entity_logical_name", "language_code").dropDuplicates()

        target_keys = spark.table(TBL_TARGET).select("entity_logical_name", "language_code")
        # Limit to entities touched in this batch
        target_keys = target_keys.join(
            F.broadcast(current_target_keys.select("entity_logical_name").distinct()),
            on="entity_logical_name", how="inner"
        )

        delete_target_keys = target_keys.join(
            F.broadcast(current_target_keys),
            on=["entity_logical_name", "language_code"],
            how="left_anti",
        )

        _merge_delete_keys(TBL_TARGET, delete_target_keys, ["entity_logical_name", "language_code"])


StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 44, Finished, Available, Finished)

## Metrics + Logging

In [43]:
def table_count(tbl: str) -> int:
    try:
        return int(spark.sql(f"SELECT COUNT(*) c FROM {tbl}").collect()[0]["c"])
    except Exception:
        return 0

def log_run(run_id: str, start_ts, end_ts, mode: str, deleted_attrs: int, changed_entities: int):
    duration_seconds = int((end_ts.timestamp() - start_ts.timestamp()))
    log_row = Row(
        run_id=run_id,
        environment_name=TARGET_ENV,
        mode=mode,
        start_ts=start_ts,
        end_ts=end_ts,
        duration_seconds=duration_seconds,
        global_row_count=table_count(TBL_GLOBAL),
        options_row_count=table_count(TBL_OPTIONS),
        state_row_count=table_count(TBL_STATE),
        status_row_count=table_count(TBL_STATUS),
        target_row_count=table_count(TBL_TARGET),
        deleted_attributes=int(deleted_attrs),
        server_version_stamp=(get_server_stamp(TARGET_ENV, "attributes") or ""),
        changed_entities=int(changed_entities),
    )
    spark.createDataFrame([log_row], schema=run_log_schema).write.mode("append").format("delta").saveAsTable(RUN_LOG_TABLE)


StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 45, Finished, Available, Finished)

## Runner

In [44]:
def ensure_target_tables_exist():
    # Create empty Delta tables if needed (so merges don't fail on first run)
    spark.createDataFrame([], schema=global_schema).write.mode("ignore").format("delta").saveAsTable(TBL_GLOBAL)
    spark.createDataFrame([], schema=attr_schema).write.mode("ignore").format("delta").saveAsTable(TBL_OPTIONS)
    spark.createDataFrame([], schema=state_schema).write.mode("ignore").format("delta").saveAsTable(TBL_STATE)
    spark.createDataFrame([], schema=status_schema).write.mode("ignore").format("delta").saveAsTable(TBL_STATUS)
    spark.createDataFrame([], schema=target_schema).write.mode("ignore").format("delta").saveAsTable(TBL_TARGET)

def run():
    ensure_target_tables_exist()

    run_id = f"{uuid.uuid4()}_{TARGET_ENV}_{MODE}"
    start_ts = spark.sql("SELECT current_timestamp() ts").collect()[0]["ts"]
    t0 = time.time()

    token = get_token()

    deleted_attr_ids: Set[str] = set()
    changed_entity_logicals: List[str] = []

    if MODE.lower() == "full":
        # Full extract
        global_rows, picklist_rows, state_rows, status_rows, target_rows = full_extract(token)

        # Write
        write_delta_init(global_rows,  TBL_GLOBAL,  global_schema)
        write_delta_init(picklist_rows, TBL_OPTIONS, attr_schema)
        write_delta_init(state_rows,   TBL_STATE,   state_schema)
        write_delta_init(status_rows,  TBL_STATUS,  status_schema)
        write_delta_init(target_rows,  TBL_TARGET,  target_schema)

        # Bootstrap stamp after successful full
        resp0 = retrieve_metadata_changes(token, build_simple_entity_query())
        stamp0 = resp0.get("ServerVersionStamp")
        if stamp0:
            upsert_server_stamp(TARGET_ENV, "attributes", stamp0)

    else:
        # Incremental marker
        new_stamp, changed_entity_logicals, deleted_attr_ids = retrieve_metadata_incremental(token)

        # If nothing changed and no deletes, still update stamp? Usually yes (keeps in sync with server)
        # But only after successful "no-op" decision:
        picklist_rows, state_rows, status_rows, target_rows = incremental_extract(token, changed_entity_logicals)

        # Always refresh global option sets (cheap + correct for choice edits)
        global_rows = extract_global_options(token)
        write_delta_init(global_rows, TBL_GLOBAL, global_schema)

        # Apply merges/deletes
        write_delta_incremental(picklist_rows, state_rows, status_rows, target_rows, deleted_attr_ids)

        # Persist new stamp only after successful writes
        upsert_server_stamp(TARGET_ENV, "attributes", new_stamp)

    end_ts = spark.sql("SELECT current_timestamp() ts").collect()[0]["ts"]
    log_run(run_id, start_ts, end_ts, MODE.lower(), deleted_attrs=len(deleted_attr_ids), changed_entities=len(changed_entity_logicals))

    # Console summary
    print("✅ Dataverse Metadata Sync Complete")
    print(f"  run_id            : {run_id}")
    print(f"  env               : {TARGET_ENV}")
    print(f"  mode              : {MODE.lower()}")
    print(f"  duration_seconds  : {int(time.time() - t0)}")
    print(f"  changed_entities  : {len(changed_entity_logicals)}")
    print(f"  deleted_attributes: {len(deleted_attr_ids)}")
    print(f"  stamp             : {get_server_stamp(TARGET_ENV, 'attributes')}")

run()

StatementMeta(, ca89b146-d56f-4647-aad5-c130a141d080, 46, Finished, Available, Finished)

✅ Dataverse Metadata Sync Complete
  run_id            : 008a41c9-1043-4988-8639-af151e25f1b8_UAT_incremental
  env               : UAT
  mode              : incremental
  duration_seconds  : 66
  changed_entities  : 1
  deleted_attributes: 0
  stamp             : 5934330!12/18/2025 17:30:18
