In [1]:
import requests
import time
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import DataFrame

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 3, Finished, Available, Finished)

In [2]:
pbi_resource = "https://analysis.windows.net/powerbi/api"
pbi_Uri = 'https://api.powerbi.com/v1.0/myorg/'

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 4, Finished, Available, Finished)

In [3]:
def get_token():
    return mssparkutils.credentials.getToken(pbi_resource)

def get_powerbiAPIclusterURI():
    fullurl = pbi_Uri+'datasets'
    pbi_access_token = get_token()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    response = requests.get(fullurl, headers=headers)
    unaltered = response.json()['@odata.context']
    stripped = unaltered.split('/')
    return f'https://{stripped[2]}/beta/myorg/groups'

clusteredURI = get_powerbiAPIclusterURI()

def get_AccessibleWorkspaces():
    fullUrl = pbi_Uri+"/groups?$filter=type eq 'Workspace'"
    pbi_access_token = get_token()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    response = requests.get(fullUrl, headers=headers)
    return response.json()['value']

def get_WorkspaceUsageMetricsId(wsId):
    fullurl = f'{clusteredURI}/{wsId}/usageMetricsReportV2'
    # print('Asked for token')
    pbi_access_token = get_token()
    # print('Token received')

    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    attemps = 0
    while attemps < 4:
       # print('Starting ' + str(attemps))
        try:
            response = requests.get(fullurl, headers=headers, timeout=60)
        #    print('Received data')
            return response.json()['models'][0]['dbName']
        except:
        #    print('Fallin asleep')
            time.sleep(30)
        #    print('Awaken')
            attemps += 1

def post_ExecuteQuery(wsId, dsId, daxQ):
    fullurl = f'{pbi_Uri}/groups/{wsId}/datasets/{dsId}/executeQueries'
    pbi_access_token = get_token()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    content = {"queries": [{"query": daxQ}], "serializerSettings": {"includeNulls": True}}
    
    attempts = 0
    while attempts < 4:
        try:
            # Make the POST request
            response = requests.post(fullurl, json=content, headers=headers, timeout=120)
            
            # Check for HTTP errors
            response.raise_for_status()
            
            # Parse the JSON response
            data = response.json()
            
            # Validate the structure of the response
            if 'results' in data and len(data['results']) > 0 and 'tables' in data['results'][0]:
                return data['results'][0]['tables'][0]
            else:
                print(f"Unexpected response structure: {data}")
                return None
        
        except requests.exceptions.RequestException as e:
            print(f"Request failed on attempt {attempts + 1}: {e}")
            time.sleep(30)
            attempts += 1
        
        except KeyError as e:
            print(f"Key error in response: {e}")
            return None
    
    # If all attempts fail, return None
    print(f"Failed to execute query after {attempts} attempts.")
    return None

def refresh_execute(wsId, dsId) :
    fullurl = f'{pbi_Uri}/groups/{wsId}/datasets/{dsId}/refreshes'
    pbi_access_token = get_token()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    content = {
    "type": "Full",
    "commitMode": "transactional",
    "maxParallelism": 2,
    "retryCount": 2}
    response = requests.post(fullurl, json=content, headers=headers, timeout=120)
    return print(f"Refresh status for WsId {wsId} DsId {dsId} is {response}.")

def replace_ColumnNames(frame, replacingName):
    return frame.toDF(*(c.replace(replacingName,'').replace('[','').replace(']','') for c in frame.columns))

def extract_DataFrame(response_list, preCreatedSchema):
    rl = spark.createDataFrame(response_list)
    # print('Frame Created')
    # display(rl)
    rl = rl.select(explode(rl.rows)).select("col").rdd.flatMap(lambda x: x).collect()
    #print('Columns Extracted')
    if not schema:
        rl = spark.createDataFrame(rl)
    else:
        rl = spark.createDataFrame(rl, preCreatedSchema)
    #print('Extraction Completed')
    rl=rl.withColumn("insert_date", lit(current_date()))
    return rl

def merge_workspace_data(existing_df: DataFrame, new_df: DataFrame) -> DataFrame:
    # Use Spark's distinct operation to ensure there are no duplicates
    merged_df = existing_df.union(new_df).dropDuplicates(["id"])  # Assuming "id" is the unique identifier for workspaces
    return merged_df

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 5, Finished, Available, Finished)

In [4]:
tblsInLK = spark.catalog.listTables()
tblsReady = len(tblsInLK)
if tblsReady == 0:
    doesTableUMDExists = 0
else:
    tblsInLK = spark.createDataFrame(pd.DataFrame(tblsInLK))
    doesTableUMDExists = tblsInLK.filter(col('name') == 'UsageMetricsDatasets').count()

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 6, Finished, Available, Finished)

In [5]:
wsList = get_AccessibleWorkspaces()

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 7, Finished, Available, Finished)

In [6]:
# If the table does not exist, create it
if doesTableUMDExists == 0:
    spark.conf.set("spark.sql.caseSensitive", "true")   
    # Add the DatasetId to each workspace entry
    wsList_with_DId = []
    for ws in wsList:
        wsId = ws['id']  
        dataset_id = get_WorkspaceUsageMetricsId(wsId)  # Fetch the DatasetId
        # Only add the workspace if the DatasetId is not None
        if dataset_id is not None:
            ws['DatasetId'] = dataset_id 
            wsList_with_DId.append(ws)
    
    # Convert the enriched wsList (with DatasetId) into a Spark DataFrame
    wsList_df = spark.createDataFrame(wsList_with_DId)
    # Add an 'insert_date' column with the current date
    wsList_df = wsList_df.withColumn("insert_date", lit(current_date()))
    wsList_df = wsList_df.withColumn("ExtractionState", lit(0))
    # Create the table in the Lakehouse
    wsList_df.write.format("delta").mode("overwrite").saveAsTable("UsageMetricsDatasets")
    print("Table 'UsageMetricsDatasets' created successfully with DatasetId column.")
else:
    spark.conf.set("spark.sql.caseSensitive", "true")
    existing_UMD = spark.table("UsageMetricsDatasets")
    
    # Get the list of workspace IDs that already exist in the table
    existing_workspace_ids = existing_UMD.select("id").rdd.flatMap(lambda x: x).collect()

    # Fetch new workspaces and filter out those that already exist in the table
    new_UMD = []
    for ws in wsList:
        wsId = ws['id']

        # Only fetch the DatasetId for workspaces that don't exist in the table
        if wsId not in existing_workspace_ids:
            dataset_id = get_WorkspaceUsageMetricsId(wsId)
            
            # Only include workspaces with a non-None DatasetId
            if dataset_id is not None:
                ws['DatasetId'] = dataset_id
                new_UMD.append(ws)

    # If there are new workspaces to process
    if new_UMD:
        # Convert the new data to a DataFrame
        new_UMD_df = spark.createDataFrame(new_UMD)
        new_UMD_df = new_UMD_df.withColumn("insert_date", lit(current_date()))
        new_UMD_df = new_UMD_df.withColumn("ExtractionState", lit(0))
        # Merge the new workspace data with the existing table data
        merged_df = merge_workspace_data(existing_UMD, new_UMD_df)
        # Overwrite the existing table with the merged data
        merged_df.write.format("delta").mode("overwrite").saveAsTable("UsageMetricsDatasets")

        print("Table 'UsageMetricsDatasets' updated with new workspace data and DatasetId.")
    else:
        print("No new workspaces to update.")

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 8, Finished, Available, Finished)

Table 'UsageMetricsDatasets' updated with new workspace data and DatasetId.


In [7]:
updated_UMD = spark.table("UsageMetricsDatasets")
updated_UMD = updated_UMD.filter(col("id").isNotNull() & col("DatasetId").isNotNull())
updated_UMD = updated_UMD.select("id", "DatasetId","ExtractionState").rdd.map(lambda row: row.asDict()).collect()

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 9, Finished, Available, Finished)

In [None]:
for ws in updated_UMD:
    dsId = ws['DatasetId'] 
    wsId = ws['id']
    response = refresh_execute(wsId,dsId)
    time.sleep(0.5)

In [9]:
basicListOfTables = ['Reports','Users','Report pages','Workspace views'] # List of all basic tables from that need to be extracted
for bsTbl in basicListOfTables:
    response_list = []
    if bsTbl == "Reports":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[IsUsageMetricsReport]",StringType(),True), \
                StructField(f"{bsTbl}[OrganizationId]",StringType(),True), \
                StructField(f"{bsTbl}[ReportGuid]",StringType(),True), \
                StructField(f"{bsTbl}[ReportName]",StringType(),True), \
                StructField(f"{bsTbl}[WorkspaceId]", StringType(), True), \
            ])
    elif bsTbl == "Users":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[UniqueUser]",StringType(),True), \
                StructField(f"{bsTbl}[UserGuid]",StringType(),True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[UserKey]",StringType(),True), \
            ])
    elif bsTbl == "Report pages":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[ReportId]",StringType(),True), \
                StructField(f"{bsTbl}[SectionId]",StringType(),True), \
                StructField(f"{bsTbl}[SectionName]",StringType(),True), \
                StructField(f"{bsTbl}[WorkspaceId]", StringType(), True), \
            ])
    elif bsTbl == "Workspace views":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[ConsumptionMethod]",StringType(),True), \
                StructField(f"{bsTbl}[DistributionMethod]",StringType(),True), \
                StructField(f"{bsTbl}[ReportId]",StringType(),True), \
                StructField(f"{bsTbl}[UniqueUser]", StringType(), True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[UserKey]", StringType(), True), \
                StructField(f"{bsTbl}[Views]", StringType(), True), \
            ])           
    else:
        schema = None    
    for ws in updated_UMD:
        dsId = ws['DatasetId'] 
        wsId = ws['id']
        response = post_ExecuteQuery(wsId,dsId,f"EVALUATE '{bsTbl}'")
        size = len(response['rows'])
        if size != 0:
            response_list.append(response)
        time.sleep(0.5)
    df = extract_DataFrame(response_list,schema)
    df = replace_ColumnNames(df,bsTbl)
    updatedText = bsTbl.replace("'","").title().replace(" ","")
    if bsTbl == 'Users':
        df = df.distinct()    
    writetolake = df.write.mode("overwrite").format("delta").save(f"Tables/{updatedText}")

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 11, Finished, Available, Finished)

In [10]:
basicListOfTables = ["Report page views","Report load times","Report views"] # List of all basic tables from that need to be extracted
md = ""
dax = ""
for bsTbl in basicListOfTables:
    findTable = ""
    updatedText = bsTbl.replace("'","").title().replace(" ","")
    if tblsReady == 0:
        doesTableExists = 0
    else:
        findTable = tblsInLK.filter(col('name') == updatedText)
        doesTableExists = findTable.count()
    response_list = []
    if bsTbl == "Report page views":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[UserKey]",StringType(),True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[TenantId]",StringType(),True), \
                StructField(f"{bsTbl}[Timestamp]",StringType(),True), \
                StructField(f"{bsTbl}[AppGuid]", StringType(), True), \
                StructField(f"{bsTbl}[SectionId]", StringType(), True), \
                StructField(f"{bsTbl}[AppName]", StringType(), True), \
                StructField(f"{bsTbl}[Date]", StringType(), True), \
                StructField(f"{bsTbl}[DeviceBrowserVersion]", StringType(), True), \
                StructField(f"{bsTbl}[ReportId]", StringType(), True), \
                StructField(f"{bsTbl}[SessionSource]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalReportId]", StringType(), True), \
                StructField(f"{bsTbl}[WorkspaceId]", StringType(), True), \
                StructField(f"{bsTbl}[Client]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalWorkspaceId]", StringType(), True), \
                StructField(f"{bsTbl}[DeviceOSVersion]", StringType(), True) \
            ])
    elif bsTbl == "Report load times":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[DeviceOSVersion]",StringType(),True), \
                StructField(f"{bsTbl}[AppGuid]",StringType(),True), \
                StructField(f"{bsTbl}[Timestamp]",StringType(),True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[loadTime]", StringType(), True), \
                StructField(f"{bsTbl}[Client]", StringType(), True), \
                StructField(f"{bsTbl}[DeviceBrowserVersion]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalGroupId]", StringType(), True), \
                StructField(f"{bsTbl}[TenantId]", StringType(), True), \
                StructField(f"{bsTbl}[EndTime]", StringType(), True), \
                StructField(f"{bsTbl}[SessionSource]", StringType(), True), \
                StructField(f"{bsTbl}[AppName]", StringType(), True), \
                StructField(f"{bsTbl}[Browser]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalReportId]", StringType(), True), \
                StructField(f"{bsTbl}[GroupId]", StringType(), True), \
                StructField(f"{bsTbl}[StartTime]", StringType(), True), \
                StructField(f"{bsTbl}[Date]", StringType(), True), \
                StructField(f"{bsTbl}[LocationCity]", StringType(), True), \
                StructField(f"{bsTbl}[Country]", StringType(), True) \
            ])
    elif bsTbl == "Report views":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[AppName]",StringType(),True), \
                StructField(f"{bsTbl}[CapacityId]",StringType(),True), \
                StructField(f"{bsTbl}[CapacityName]",StringType(),True), \
                StructField(f"{bsTbl}[ConsumptionMethod]", StringType(), True), \
                StructField(f"{bsTbl}[CreationTime]",StringType(),True), \
                StructField(f"{bsTbl}[DatasetName]", StringType(), True), \
                StructField(f"{bsTbl}[Date]", StringType(), True), \
                StructField(f"{bsTbl}[DistributionMethod]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalConsumptionMethod]",StringType(),True), \
                StructField(f"{bsTbl}[ReportId]", StringType(), True), \
                StructField(f"{bsTbl}[ReportName]", StringType(), True), \
                StructField(f"{bsTbl}[ReportType]", StringType(), True),  \
                StructField(f"{bsTbl}[UserAgent]", StringType(), True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[UserKey]",StringType(),True) \
            ])
    else:
        schema = None
    for ws in updated_UMD:
        dsId = ws['DatasetId'] 
        wsId = ws['id']
        dsExportState = ws['ExtractionState']
        if doesTableExists == 0:
            dax = f"EVALUATE '{bsTbl}'"
            md = "overwrite"
        else:
            tableLastDate = spark.sql(f"SELECT MAX(Date) AS date FROM {updatedText}").collect()[0]['date']
            fromDate = tableLastDate.strftime('DATE(%Y,%m,%d)')
            dax = f"DEFINE VAR _td = TODAY() VAR _from = {fromDate} EVALUATE FILTER('{bsTbl}',[Date]>_from && [Date]<_td)"
            md = "append"

        response = post_ExecuteQuery(wsId, dsId, dax)
        if response and 'rows' in response:
            size = len(response['rows'])
            if size != 0:
                response_list.append(response)
        time.sleep(0.5)

    if response_list:
        # Extract data into DataFrame
        exports = extract_DataFrame(response_list, schema)
        # Convert 'Date' field to TimestampType after creating the DataFrame
        exports = exports.withColumn(f"{bsTbl}[Date]", to_timestamp(col(f"{bsTbl}[Date]")))
        # Replace column names and write to Delta
        exports = replace_ColumnNames(exports, bsTbl)
        writetolake = exports.write.mode(md).format("delta").save(f"Tables/{updatedText}")
    else:
        table_df = spark.read.format("delta").load(f"Tables/{updatedText}")
        current_date = to_date(lit(datetime.now().strftime('%Y-%m-%d')), 'yyyy-MM-dd')
        updated_table_df = table_df.withColumn("insert_date", current_date)
        updated_table_df.write.format("delta").mode("overwrite").save(f"Tables/{updatedText}")


StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 12, Finished, Available, Finished)

In [11]:
# #Only run this code if you need to replace old report id with new id (same report)
# df = spark.read.format("delta").load("")

# updated_df = df.withColumn(
#     "ReportId",
#     when(col("ReportId") == "", "")
#     .otherwise(col("ReportId"))
# )
# updated_df.write.format("delta").mode("overwrite").save("")

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 13, Finished, Available, Finished)

In [12]:
# #Only run this code if you just reset usage metric semantic model
# df = spark.read.format("delta").load("")

# # Filter out the row to delete
# filtered_df = df.filter(col("id") != "12d07675-c0f1-461c-a09c-d6d3e351f39c")

# # Overwrite the table with the updated DataFrame
# filtered_df.write.format("delta").mode("overwrite").save("")

StatementMeta(, a9003428-e1d1-472e-af1a-f546f0f96fa0, 14, Finished, Available, Finished)