# Admin Tasks


Absolutely—here are practical, copy-pasteable patterns I see admins automate with Semantic Link Labs (+Sempy) in real Fabric environments. 
They’re written to be idempotent, service-principal friendly, and easy to drop into a Notebook task inside a Data Pipeline so you can schedule them.

Setup (once per notebook)

In [None]:
%pip install -U semantic-link-labs sempy

In [None]:
import time, json, pandas as pd
import sempy_labs as labs
from sempy_labs import admin, report, graph
from sempy import fabric  # for workspace/items helpers

In [None]:
# (Optional) SPN auth via Key Vault; otherwise use user auth
kv_uri = "https://<your-keyvault>.vault.azure.net/"
with labs.service_principal_authentication(
    key_vault_uri=kv_uri,
    key_vault_tenant_id="<tenant-id>",
    key_vault_client_id="<client-id>",
    key_vault_client_secret="<client-secret>"
):
    pass  # context active for all calls below

In [None]:
# 1) “Dataset swap night”: rebind every report from TEST → PROD

# When you promote a Direct Lake/Import model, rebind all downstream reports in one go.

old_ds = "Enterprise Model (TEST)"
new_ds = "Enterprise Model (PROD)"

# Rebind all reports that point to the old dataset, across all workspaces
report.report_rebind_all(dataset=old_ds, new_dataset=new_ds, report_workspace=None)

# Safety: verify nothing still points at the old dataset
dangling = admin.list_reports(referenced_dataset=old_ds)
assert len(dangling) == 0, f"Dangling reports still bound to {old_ds}"

In [None]:
# 2) Auto-heal failed refreshes with backoff + owner notification

# Useful when a transient failure is common (gateway hiccup, throttling).

target_ds = "Ops Finance Model"
ws = "Ops BI"

hist = admin.list_refresh_history(dataset=target_ds, workspace=ws, top=1)
if len(hist) and hist.iloc[0]["Status"] == "Failed":
    for attempt in range(1, 4):  # 3 tries
        try:
            admin.trigger_dataset_refresh(dataset=target_ds, workspace=ws)
            time.sleep(30 * attempt)  # backoff
            latest = admin.list_refresh_history(dataset=target_ds, workspace=ws, top=1)
            if latest.iloc[0]["Status"] == "Completed":
                break
        except Exception as e:
            if attempt == 3:
                owner = admin.get_dataset_owner(dataset=target_ds, workspace=ws)
                graph.send_mail(
                    user="me@contoso.com",
                    to_recipients=[owner],
                    subject=f"[Fabric] Refresh failed after retries: {target_ds}",
                    content=f"<p>Last error: {e}</p>"
                )
                raise

In [None]:
# 3) Capacity hot-spot watch: flag top refresh consumers & throttle with rules

# Great to catch models hammering your capacity during business hours.

cap_use = admin.get_refreshables()  # returns refreshables + recent durations/status
offenders = (cap_use[cap_use["LastRefreshDuration"] > 1800]  # >30 minutes
             .sort_values("LastRefreshDuration", ascending=False)
             .head(10))

# Email a daily digest
if len(offenders):
    html = "<h3>Top Refresh Consumers (last 24h)</h3>" + offenders.to_html(index=False)
    graph.send_mail(
        user="me@contoso.com",
        to_recipients=["bi-admins@contoso.com"],
        subject="[Fabric] Capacity hot-spot watch",
        content=html
    )

In [None]:
# 4) Drift detection: tenant & delegated settings baseline compare

# Catch unapproved changes to admin/tenant settings.

# Load yesterday's baseline from Lakehouse/Files (parquet/json), compare to current
current = admin.list_tenant_settings()
yesterday = pd.read_parquet("Files/fabric_admin/tenant_settings/prev_snapshot.parquet")

diff = current.merge(yesterday, on="SettingName", how="outer", suffixes=("_now","_prev"), indicator=True)
changed = diff.query("_merge!='both' or Value_now!=Value_prev'")

if len(changed):
    changed.to_parquet("Files/fabric_admin/tenant_settings/diff.parquet", index=False)
    graph.send_mail(
        user="me@contoso.com",
        to_recipients=["fabric-admins@contoso.com"],
        subject="[Fabric] Tenant settings drift detected",
        content="<p>See attached diff.</p>",
    )
current.to_parquet("Files/fabric_admin/tenant_settings/prev_snapshot.parquet", index=False)


In [None]:
# 5) “Publish to web” & org-wide links sweeper (weekly)

# Reduce exposure risk.

wide = admin.list_widely_shared_artifacts()            # org-wide links + publish-to-web
risky = wide.query("IsPublishedToWeb == True or IsOrgWide == True")
if len(risky):
    graph.send_mail(
        user="me@contoso.com",
        to_recipients=["security@contoso.com","bi-admins@contoso.com"],
        subject="[Fabric] Widely shared artifacts detected",
        content=risky.to_html(index=False)
    )

In [None]:
# 6) “Leaver” automation: remove user access across workspaces & models

# Hook this up to your HR offboarding feed.

ex_user = "former.user@contoso.com"
# Remove from every workspace they belong to
ent = admin.list_access_entities(user_email_address=ex_user)
for _, r in ent.iterrows():
    try:
        admin.remove_workspace_user(workspace=r["WorkspaceName"], principal=ex_user)
    except Exception:
        pass

# Remove from all dataset roles
roles = admin.list_dataset_roles(user_email_address=ex_user)
for _, r in roles.iterrows():
    admin.remove_dataset_role_member(dataset=r["DatasetName"], workspace=r["WorkspaceName"], principal=ex_user)

In [None]:
# 7) Gateway hygiene: verify bindings + orphaned data sources

# Catch datasets that should use a gateway but aren’t bound, and fix them.

targets = admin.list_datasets(needs_gateway=True)  # filter helper
gws = labs.list_gateways()
default_gw = gws.iloc[0]['Id']  # pick your standard cluster

for _, row in targets.iterrows():
    try:
        labs.bind_dataset_to_gateway(
            dataset=row["Name"], workspace=row["WorkspaceName"], gateway=default_gw
        )
    except Exception:
        pass

In [None]:
# 8) Naming policy enforcement (workspaces/items)

# Keep your catalog clean (domain-prefix, owner code, environment suffix).

policy = {"Workspace": r"^(WM|FICM|ECM)-[A-Za-z0-9\-]+-(DEV|TEST|PROD)$"}

items = fabric.list_items()  # across all workspaces you can see
bad_ws = items.drop_duplicates("WorkspaceName").query("~WorkspaceName.str.match(@policy['Workspace'])")
if len(bad_ws):
    # Send owners a gentle “please rename” notice (include examples)
    owners = bad_ws.groupby("WorkspaceName")["WorkspaceOwner"].first().tolist()
    graph.send_mail(
        user="me@contoso.com",
        to_recipients=owners,
        subject="[Fabric] Workspace naming policy reminder",
        content=bad_ws[["WorkspaceName"]].to_html(index=False)
    )

In [None]:
# 9) Bulk assign workspaces to capacity (after tenant re-org)
target_capacity = "Premium-Capacity-A"
to_move = ["WM-Research-Prod","WM-OCIO-Prod"]

for ws in to_move:
    admin.assign_workspace_to_capacity(workspace=ws, capacity_name=target_capacity)


In [None]:
# 10) Direct Lake migration assist: update connections then rebind
# 1) Convert/point dataset to Direct Lake source (tables=None -> all)
from sempy_labs import directlake
directlake.update_direct_lake_model_connection(dataset="Trading DL", workspace="Ops BI")

# 2) Rebind dependent reports (see #1)
report.report_rebind_all(dataset="Trading DL (Old)", new_dataset="Trading DL")

In [None]:
# 11) Programmatic refresh schedules (standardize cadence)
targets = [
  {"dataset":"WM Sales Model","workspace":"WM Analytics","days":["Monday","Tuesday","Wednesday","Thursday","Friday"],"times":["06:30","12:00"]},
  {"dataset":"ECM KPIs","workspace":"ECM Analytics","days":["Monday"],"times":["07:00"]}
]

for t in targets:
    admin.update_refresh_schedule(
        dataset=t["dataset"], workspace=t["workspace"],
        days=t["days"], times=t["times"], time_zone="Central Standard Time"
    )

In [None]:
# 12) RLS audit: enumerate roles, members, and orphaned emails
rls = admin.list_dataset_roles()
# Find members not in your AAD anymore (join to your HR export / Entra dump)
aad = pd.read_csv("Files/security/entra_users.csv")  # columns: UserPrincipalName
rls["IsActiveAAD"] = rls["MemberEmail"].isin(aad["UserPrincipalName"])
stale = rls.query("IsActiveAAD == False")
stale.to_parquet("Tables/rls_orphans.parquet", index=False)

In [None]:
# 13) Git hygiene: report on connected vs. unconnected workspaces
git = admin.list_git_connections()              # connected
all_ws = admin.list_workspaces()                # all
joined = all_ws.merge(git[["WorkspaceName","RepositoryUrl"]], how="left", on="WorkspaceName")
unconnected = joined[joined["RepositoryUrl"].isna()]

In [None]:
# 14) “No owner” & “no contact” find-and-fix
ws = admin.list_workspaces()
no_owner = ws[ws["WorkspaceOwner"].isna() | (ws["WorkspaceOwner"]=="")]
for _, r in no_owner.iterrows():
    # Add a default owner group to ensure accountability
    admin.add_workspace_user(workspace=r["WorkspaceName"], principal="bi-admins@contoso.com", role="Admin")

In [None]:
# 15) License/cost hygiene: unused artifacts & “zombie” models cleanup
unused = admin.list_unused_artifacts(days=45)  # your threshold
zombies = unused.query("ArtifactType=='Dataset' and LastRefreshDate.isna()")
# Notify owners; after grace period, delete or archive
for _, r in zombies.iterrows():
    graph.send_mail(
        user="me@contoso.com",
        to_recipients=[r["OwnerEmail"]],
        subject=f"[Fabric] {r['Name']} scheduled for archive",
        content=f"<p>No refresh & no usage in 45d. Reply if you need it retained.</p>"
    )
    # admin.delete_dataset(dataset=r['Name'], workspace=r['WorkspaceName'])  # if you truly want to automate

How I’d operationalize this in Fabric

Put each block in its own notebook (or parameterize one “admin runbook” notebook).

Orchestrate with a Data Pipeline (sequential or parallel activities).

Land outputs (diffs, inventories, audits) to Lakehouse tables (Delta).

Build a tiny Admin Dashboard on top (DirectLake) for proof and traceability.

Gate any destructive actions (delete, rebind) with a feature flag variable so you can run in “report-only” first.

If you tell me your workspace naming conventions, capacity names, and which tasks you want first, 
I’ll package these into a ready-to-run pipeline with parameters (and add broker-specific checks like BETA Host / Envestnet tags).