# Sync Tags

Databricks job to sync tags from the tagsonomy Flask app to Unity Catalog.

This job fetches tag mappings from the Flask app and applies them to UC objects,
using a difference-based approach to minimize API calls (the tag API is slow).


In [None]:
import requests
from databricks.sdk import WorkspaceClient

MAX_TAGS = 50


In [None]:
# Get job parameters
dbutils.widgets.text("prefix", "TYPE_")
dbutils.widgets.text("mappings_url", "")

prefix = dbutils.widgets.get("prefix")
mappings_url = dbutils.widgets.get("mappings_url")

print(f"Prefix: {prefix}")
print(f"Mappings URL: {mappings_url}")


In [None]:
def sync_tags(
    wc: WorkspaceClient,
    prefix: str,
    entity_type: str,
    entity_name: str,
    desired_tags: list[str],
):
    """
    Sync tags on an entity to match the desired state.

    Only adds/removes tags that differ from current state to minimize API calls.

    Args:
        entity_type: One of "catalogs", "schemas", "tables", "columns", "volumes"
        entity_name: Fully qualified name of the entity
    """
    # Build the set of desired tags with prefix
    new_tags = set(prefix + tag for tag in desired_tags)

    # Get current tags and filter to those with our prefix
    current_tags = list(wc.entity_tag_assignments.list(entity_type, entity_name))
    old_tags = set(t.tag_key for t in current_tags if t.tag_key.startswith(prefix))

    # Compute the difference
    to_add = new_tags - old_tags
    to_del = old_tags - new_tags

    # Check tag limit
    if len(current_tags) + len(to_add) - len(to_del) >= MAX_TAGS:
        print(f"ERROR: too many tags on {entity_type} {entity_name}!")
        return

    if to_del:
        print(f"Removing tags {to_del} from {entity_type} {entity_name}")
        for tag in to_del:
            wc.entity_tag_assignments.delete(entity_type, entity_name, tag)

    if to_add:
        print(f"Adding tags {to_add} to {entity_type} {entity_name}")
        for tag in to_add:
            wc.entity_tag_assignments.create(
                tag_assignment={
                    "entity_type": entity_type,
                    "entity_name": entity_name,
                    "tag_key": tag,
                }
            )


In [None]:
wc = WorkspaceClient()
h = wc.config.authenticate()
print(h)
print(h["Authorization"][7:])
print(h["Authorization"][7:15])
print(h["Authorization"][:-15])


In [None]:
wc = WorkspaceClient()
print(f"Fetching mappings from {mappings_url}")

response = requests.get(mappings_url, headers=wc.config.authenticate())
print(response.text)
response.raise_for_status()
mappings = response.json()
print(f"Retrieved {len(mappings)} mappings")

for mapping in mappings:
    sync_tags(wc, prefix, mapping["type"], mapping["name"], mapping["tags"])

print("Sync complete")
