### Using Semantic Link Labs to run Best Practice Analyzer

https://github.com/microsoft/semantic-link-labs

https://semantic-link-labs.readthedocs.io/en/latest/

In [None]:
import sempy
import sempy.fabric as fabric
import sempy_labs as labs

In [None]:
# Run BPA for a single model, displaying the results in an HTML table within the notebook

bpa_results = labs.run_model_bpa(
    dataset = "My Semantic Model",
    workspace = "My Workspace"
    )

In [None]:
# Obtain a list of all workspaces in a given Fabric capacity

capacityID = 'My capacity GUID' 
x = fabric.list_workspaces()
filter_condition = [capacityID]
x = x[x['Capacity Id'].isin(filter_condition)]
workspace_names = x['Name'].tolist()

In [None]:
# For the list of workspaces, run BPA against all the semantic models within. Specify models to be excluded from scanning
# The results will be saved as a Delta table within the default lakehouse attached to the notebook

labs.run_model_bpa_bulk(
    workspace=workspace_names, 
    skip_models=
        [
            'ModelBPA', 
            'Fabric Capacity Metrics', 
            'Report Usage Metrics Model', 
            'Usage Metrics Report', 
            'Dashboard Usage Metrics Model'
        ]
    )

In [None]:
# Create a direct lake semantic model using the table we created above

labs.create_model_bpa_semantic_model()

In [None]:
# Create a default report using the model we created above

import sempy_labs.report as report
report.create_model_bpa_report()

### Get Activity Events to retain audit logs from your Fabric/PBI tenant
The below sample code connects to the Activity Events endpoing and saves audit logs 

https://learn.microsoft.com/en-us/rest/api/power-bi/admin/get-activity-events

In [None]:
# Initial imports and configuration

spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

import json
import logging
import requests
import msal
import pandas as pd
from datetime import date, timedelta
from delta.tables import *
from pyspark.sql.functions import col, year, month, dayofmonth

workspace_id = spark.conf.get("trident.workspace.id")
lakehouse_bronze_id = notebookutils.lakehouse.get("Your Lakehouse Name", workspace_id)['id']

In [None]:
# Authenticate using a service principal

config = {
            "authority": "https://login.microsoftonline.com/<YOUR TENANT ID>",
            "client_id": "<YOUR CLIENT ID>",
            "scope": ["https://analysis.windows.net/powerbi/api/.default"],
            "secret": "<YOUR CLIENT SECRET>",
            "endpoint": "https://api.powerbi.com/v1.0/myorg/admin/activityevents"
        }

app = msal.ConfidentialClientApplication(
    config["client_id"], authority=config["authority"],
    client_credential=config["secret"],
    )
result = None
result = app.acquire_token_silent(config["scope"], account=None)

if not result:
    logging.info("No suitable token exists in cache. Get a new one from AAD.")
    result = app.acquire_token_for_client(scopes=config["scope"])

if "access_token" in result:
    # Calling graph using the access token
   logging.info("Got the Token")
   print("Got the Token")
else:
    print(result.get("error"))
    print(result.get("error_description"))
    print(result.get("correlation_id"))

In [None]:
# Specify a start & end date manually, or retreive the last day of records. You cannot currently retreive audit logs older than 30 days, so retreive your logs often.
# start_date = date(2024, 8, 26)
# end_date = date(2024, 8, 26)
start_date = date.today() - timedelta(days=1)
end_date = date.today() - timedelta(days=1)

delta = timedelta(days=1)
while start_date <= end_date:
    activityDate = start_date.strftime("%Y-%m-%d")

    url = "https://api.powerbi.com/v1.0/myorg/admin/activityevents?startDateTime='" + activityDate + "T00:00:00'&endDateTime='" + activityDate + "T23:59:59'"

    access_token = result['access_token']
    header = {'Content-Type':'application/json', 'Authorization':f'Bearer {access_token}'}
    response = requests.get(url=url, headers=header)

    response_obj = response.json()
    event_entities = response_obj["activityEventEntities"]
    continuation_uri = response_obj["continuationUri"]
    continuation_token = response_obj["continuationToken"]
    activity_events = event_entities
    cont_count = 1
    while continuation_token is not None:
        response = requests.get(continuation_uri, headers=header)
        response_obj = response.json()
        event_entities = response_obj["activityEventEntities"]
        continuation_uri = response_obj["continuationUri"]
        continuation_token = response_obj["continuationToken"]

        activity_events.extend(event_entities)
        cont_count += 1

    print(f"Took {cont_count} tries to exhaust continuation token for {len(activity_events)} events.")

    df = pd.DataFrame(activity_events)
    Object_cols =[col for col, col_type in df.dtypes.items() if col_type=="object"]
    df[Object_cols] = df[Object_cols].astype(str)

    float64_cols =[col for col, col_type in df.dtypes.items() if col_type=="float64"]
    df[float64_cols] = df[float64_cols].astype(str)

    sdf = spark.createDataFrame(df)

    # Save the audit logs to a lakehouse as Parquet files
    targetPath = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_bronze_id}/Files/activity_events/in"

    if not notebookutils.fs.exists(targetPath):
        notebookutils.fs.mkdirs(targetPath)
    sdf = sdf.withColumn('Year', year(col("CreationTime")))
    sdf = sdf.withColumn('Month', month(col("CreationTime")))
    sdf = sdf.withColumn('Day', dayofmonth(col("CreationTime")))
    sdf.write.mode("append").option("mergeSchema","true").format("parquet").partitionBy("Year","Month","Day").save(targetPath)
    print(f"Output files for {activityDate}")
    start_date += delta

### Use Metadata Scanning to build an inventory of all content in your Fabric/PBI tenant
https://learn.microsoft.com/en-us/fabric/governance/metadata-scanning-overview

In [None]:
# Authentication 

 
import json, requests, pandas as pd
from datetime import datetime
import time

tenant = "<YOUR TENANT ID>"
client = "<YOUR CLIENT ID>"
client_secret = "<YOUR CLIENT SECRET>"

# Mount our bronze lakehouse to work with non-Spark APIs.
workspace_id = spark.conf.get("trident.workspace.id")
lakehouse_bronze_id = notebookutils.lakehouse.get("<YOUR LAKEHOUSE NAME>", workspace_id)['id']
base_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_bronze_id}"
mount_name = "/bronze"
mssparkutils.fs.mount(base_path, mount_name)
mount_points = notebookutils.fs.mounts()
local_path = "file:" + next((mp["localPath"] for mp in mount_points if mp["mountPoint"] == mount_name), None)
 
try:
    from azure.identity import ClientSecretCredential
except Exception:
     !pip install azure.identity
     from azure.identity import ClientSecretCredential
 
# Generates the access token for the Service Principal
api = 'https://analysis.windows.net/powerbi/api/.default'
auth = ClientSecretCredential(authority = 'https://login.microsoftonline.com/',
                              tenant_id = tenant,
                              client_id = client,
                              client_secret = client_secret)
access_token = auth.get_token(api)
access_token = access_token.token
 
print('\nSuccessfully authenticated.')   
# print(local_path)   

In [None]:
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + access_token}

# Base URL for Power BI API calls
base_url = "https://api.powerbi.com/v1.0/myorg/admin"

# Get list of workspace IDs
workspaces_url = base_url + "/workspaces/modified?excludePersonalWorkspaces=False&excludeInActiveWorkspaces=False"
response = requests.get(workspaces_url, headers=headers)

workspace_ids = [workspace["id"] for workspace in response.json()]

# Process workspaces in chunks of 25
chunk_size = 25
for i in range(0, len(workspace_ids), chunk_size):
    chunk = workspace_ids[i:i + chunk_size]
    
    # Trigger a scan for these workspaces
    scan_url = base_url + "/workspaces/getInfo?lineage=true&datasourceDetails=true&datasetSchema=true&datasetExpressions=true&getArtifactUsers=true"
    response = requests.post(scan_url, headers=headers, data=json.dumps({"workspaces": chunk}))
    
    if response.status_code == 202:
        scan_id = response.headers["location"].split("/")[-1]
        
        # Poll until the scan succeeds
        status_url = base_url + "/workspaces/scanStatus/" + scan_id
        while True:
            status_response = requests.get(status_url, headers=headers)
            status = status_response.json().get("status", "")
            
            if status == "Succeeded":
                break
            elif status == "Failed":
                raise Exception("Scan Failed")
                
            time.sleep(3)  # Wait for 3 seconds before polling again
        
        # Once scan succeeds, fetch scan results
        result_url = base_url + "/workspaces/scanResult/" + scan_id
        result_response = requests.get(result_url, headers=headers)
        
        # Write the results to JSON
        targetPath = local_path + "/Files/workspaces/in"

        # Ensure the target directory exists
        if not notebookutils.fs.exists(targetPath):
            notebookutils.fs.mkdirs(targetPath) 

        now = datetime.today().strftime('%Y-%m-%d-%H-%M-%S')

        targetFile = f"{targetPath}/scan_results_{i//chunk_size}_{now}.json"

        # Serialize the JSON data
        json_data = json.dumps(result_response.json())

        # Write the data to the Lakehouse using mssparkutils.fs.put
        mssparkutils.fs.put(targetFile, json_data, overwrite=True)
        print(f'Successfully written to {targetFile}')

        # with open(f"{targetPath}/scan_results_{i//chunk_size}_{now}.json", "w") as f:
        #     json.dump(result_response.json(), f)
        #     print(f'Successfully written to {targetPath}/scan_results_{i//chunk_size}_{now}.json')