# Power BI Inventory
## This notebook fetches the Power BI inventory using the scanner API and writes it to JSON files for the current day. 
## If the notebook is run multiple times in a day it will overwrite the files for the day

## Import necessary libraries

In [1]:

import msal
import requests
import json
import pandas as pd
from datetime import date, timedelta, time

StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, 3, Finished, Available)

## Set Client ID and Secret for Service Principal

In [2]:

client_id = "XXXXXXXX-1234-1234-1234-XXXXXXXXXXXX"
client_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
authority_url = "https://login.microsoftonline.com/northinsights.com"
scope = ["https://analysis.windows.net/powerbi/api/.default"]

StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, 4, Finished, Available)

## Set the date to today

In [14]:

import datetime
activityDate = str(datetime.datetime.now().strftime("%Y%m%d"))
activityYear = str(datetime.datetime.now().strftime("%Y"))
activityMonth = str(datetime.datetime.now().strftime("%Y%m"))
print(activityMonth)

StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, 17, Finished, Available)

202306


## Use MSAL to fetch a token for authentication

In [5]:

app = msal.ConfidentialClientApplication(client_id, authority=authority_url, client_credential=client_secret)
result = app.acquire_token_for_client(scopes=scope)

StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, 7, Finished, Available)

## Set URL for the Power BI REST API to get modified workspaces

In [6]:

url = "https://api.powerbi.com/v1.0/myorg/admin/workspaces/modified"

StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, 8, Finished, Available)

## Fetch a token and call the modified workspaces API

In [7]:
#Use MSAL to grab token
app = msal.ConfidentialClientApplication(client_id, authority=authority_url, client_credential=client_secret)
result = app.acquire_token_for_client(scopes=scope)
#Get changed workspaces
if 'access_token' in result:
    access_token = result['access_token']
    header = {'Content-Type':'application/json', 'Authorization':f'Bearer {access_token}'}
    api_call = requests.get(url=url, headers=header)
    print(json.loads(api_call.text))

df1 = spark.createDataFrame(json.loads(api_call.text))
df1.createOrReplaceTempView("modifiedworkspaces")
display(df1)

StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, 9, Finished, Available)

[{'id': '5312ff4d-15a8-4612-9032-283067db1e61'}, {'id': '634fb70c-6742-46da-b7bb-e9a2c9f4bb79'}, {'id': '975f6b3f-82a8-4bc3-b82f-feb2197ca733'}, {'id': 'e7f20fca-488f-4cf1-871a-ceb0f827bb8f'}, {'id': '95b2f755-3564-4197-978e-4a3585c0cd48'}, {'id': '8ae543ea-8eb5-4858-9a13-304f241a61cd'}, {'id': '8e5ca781-3717-401c-a035-57e3c8703ea3'}, {'id': 'f7139c40-5396-4949-b6ec-39df661030c9'}, {'id': '74ebe6ab-4be1-44aa-b068-fc07f0fd6f66'}, {'id': 'e68752d4-0921-461d-a81a-6b83a74fe39f'}, {'id': 'bacfa0c9-2bfa-40a0-96e5-545337336f28'}, {'id': 'f4c099f6-627c-4778-9542-3e3077ce6219'}, {'id': 'e0d9e1f0-cbb1-4d8f-b4fc-03277313eef7'}, {'id': '6cc997d0-9510-4011-8cae-08b803fdab91'}, {'id': 'ce907cc9-3e85-4f0d-af79-1bebe95b7aee'}, {'id': '1b453b96-1ce0-416f-b0a7-bf1618aacfe2'}, {'id': 'ddb98fb2-be71-4718-8b1d-23c9cc0adf98'}, {'id': 'e0ea4180-76e2-481b-9733-a993c0cb22b5'}, {'id': 'a34d713e-30fa-4f1f-aa52-717a85ee0f75'}, {'id': '9f921bb1-9716-44e8-be4a-89e008125f24'}, {'id': 'cdb0b366-a42b-485d-8057-36c622b

SynapseWidget(Synapse.DataFrame, 84c8bf40-5191-4b8f-be10-e05f1a85520a)

## Use SQL to create a bucket of 100 workspaces each

In [8]:
%%sql
-- Take the list of modified workspaces and create batches of 100 workspaces
create or replace temporary view WorkspaceBatches as
WITH WorkspaceRanked AS 
(
    SELECT ROW_NUMBER() OVER ( ORDER BY id) AS Rank
    , id
    FROM modifiedworkspaces
),
WorkspaceBatched AS
(
    SELECT ROW_NUMBER() OVER (PARTITION BY(Rank % 100) ORDER BY Rank ASC) AS Batch
    , id
    FROM WorkspaceRanked
)

SELECT replace(concat('{"workspaces": ["' , array_join(array_sort(collect_set(id)),","),'"]}'),',','","')  AS BodyExpression, Batch
FROM WorkspaceBatched
GROUP BY Batch;


-- Check if workspaces are correctly batched
SELECT * FROM WorkspaceBatches


StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 1 rows and 2 fields>

## Call the scanner API to get workspace information
### If more than 100 workspaces then loop through the batches of 100 each
### When each batch is done write the data to the Files section of the lakehouse

In [9]:
from pyspark.sql.functions import col,lit, concat
from pyspark.sql.types import StructType, StructField, StringType
import time
#URL for getting the inventory of workspaces and items
post_WorkspaceInfoUrl = "https://api.powerbi.com/v1.0/myorg/admin/workspaces/getInfo?lineage=True&datasourceDetails=True&getArtifactUsers=True&datasetSchema=True&datasetExpressions=True"
#Select the batches from the temp view
dfbatches = spark.sql("SELECT CAST(Batch AS VARCHAR(2)) AS Batch, BodyExpression FROM WorkspaceBatches")
#Create a collection to loop through
rows_looped = dfbatches.select("Batch", "BodyExpression").collect()
# looping through each batch
for rows in rows_looped:
    #Initiates the fetch of inventory
    post_WorkspaceInfo = requests.post(url=post_WorkspaceInfoUrl, headers=header, data = rows.BodyExpression)
    #Create a schema and fetch the ID of the inventory fetch run
    schema = StructType([
    StructField("id", StringType()),
    StructField("createdDateTime", StringType()),
    StructField("status", StringType())
    ])
    dfId = spark.createDataFrame([json.loads(post_WorkspaceInfo.text)], schema)
    #Fetches the status of the inventory run
    statusUrl = dfId.select(concat(lit("https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanStatus/"),col("id"))).first()[0]
    apiStatus = requests.get(url=statusUrl, headers=header)
    workspaceapistatus = apiStatus.json()['status']
    print(workspaceapistatus)
    #While the status is not Succeeded then continue to fetch the status every 10 seconds. When the status is Succeeded continue
    while workspaceapistatus != "Succeeded":
        time.sleep(10)
        apiStatus = requests.get(url=statusUrl, headers=header)
        workspaceapistatus = apiStatus.json()['status']
        print(workspaceapistatus)
    #Fetch the ready inventory run results
    get_WorkspaceResultsUrl = dfId.select(concat(lit("https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanResult/"),col("id"))).first()[0]
    apiResults = requests.get(url=get_WorkspaceResultsUrl, headers=header)
    #Write the results as JSON to the datalake
    json_rdd = spark.sparkContext.parallelize([apiResults.text])
    raw_df = spark.read.json(json_rdd)
    display(raw_df)
    tableName = "Files/pbi_artifact_inventory/year=" + activityYear + "/month=" + activityMonth + "/day=" + activityDate + "/powerbi_workspaceinfo_batch_" + rows.Batch + "_" + activityDate
    raw_df.write.format('json').option('inferSchema','true').mode('overwrite').save(tableName + '.json')

StatementMeta(, 8ab0ac44-b1c3-4480-b4a5-4e437ce37c20, 12, Finished, Available)

Running
Succeeded
