In [0]:
from databricks.sdk import WorkspaceClient
from pyspark.sql import Row
from pyspark.sql import functions as F

TARGET_TABLE_NAME = dbutils.widgets.get("user_id_map_table_name")
print(f"TARGET_TABLE_NAME: {TARGET_TABLE_NAME}")

if TARGET_TABLE_NAME.count(".") != 2:
    raise ValueError("TARGET_TABLE_NAME should contain two '.'")

CATALOG, SCHEMA, TABLE = TARGET_TABLE_NAME.split(".")

wc = WorkspaceClient()
users = list(wc.users.list(attributes=['id', 'name', 'userName']))

rows = []
for u in users:
    rows.append(Row(
        id=u.id,
        email=getattr(u, "user_name", None),
        family_name=getattr(getattr(u, "name", None), "family_name", None),
        given_name=getattr(getattr(u, "name", None), "given_name", None),
    ))

df = spark.createDataFrame(rows)

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

table_exists = spark.catalog.tableExists(TARGET_TABLE_NAME)

if not table_exists:
    (
        df.write
        .format("delta")
        .mode("overwrite")
        .clusterBy("id")
        .saveAsTable(TARGET_TABLE_NAME)
    )
    print(f"Created new table {TARGET_TABLE_NAME} with {df.count()} users.")
else:
    existing = spark.table(TARGET_TABLE_NAME).select("id").withColumn("id", F.col("id").cast(df.schema["id"].dataType))
    new_df = (
        df
        .withColumn("id", F.col("id").cast(existing.schema["id"].dataType))
        .join(existing, on="id", how="left_anti")
    )

    new_count = new_df.count()
    if new_count > 0:
        (
            new_df.write
            .format("delta")
            .mode("append")
            .saveAsTable(TARGET_TABLE_NAME)
        )
        print(f"Added {new_count} new users.")
    else:
        print("No new users to add.")
