In [2]:
from pyspark.sql import SparkSession
import json
import pandas as pd
import requests
import concurrent.futures
import time
from pyspark.sql.functions import col
import msal


StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 4, Finished, Available, Finished)

In [None]:

# Azure AD Credentials (Replace with your values)
TENANT_ID = ""
CLIENT_ID = ""
CLIENT_SECRET = ""

# Azure AD Authority & Scope
# AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
# SCOPE = ["https://api.fabric.microsoft.com/.default"]  # Fabric API Scope
# TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"

AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPE = ["https://analysis.windows.net/powerbi/api/.default"]  # Power BI Scpoe
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/token"



# Global variables for token caching
ACCESS_TOKEN = None
EXPIRATION_TIME = 0  # Store expiration time

def get_access_token():
    """Fetch and cache an access token from Azure AD for Power BI API."""
    global ACCESS_TOKEN, EXPIRATION_TIME

    # Check if token is still valid
    if ACCESS_TOKEN and time.time() < EXPIRATION_TIME:
        return ACCESS_TOKEN  # Return cached token if still valid

    # Create MSAL client application
    app = msal.ConfidentialClientApplication(CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET)

    # Acquire token
    token_response = app.acquire_token_for_client(scopes=SCOPE)

    if "access_token" in token_response:
        ACCESS_TOKEN = token_response["access_token"]
        EXPIRATION_TIME = time.time() + token_response.get("expires_in", 3600) - 300  # Refresh 5 minutes before expiry
        return ACCESS_TOKEN
    else:
        raise Exception(f"Failed to get token: {token_response}")




StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 5, Finished, Available, Finished)

In [None]:
#Reading the Datafrom the delta file
workspacess_df = (
    spark.read.format("delta")
    .load("abfss://d3120490-76ae-4ef4-a440-2bd65732ccdc@onelake.dfs.fabric.microsoft.com/fecab367-5d3a-41c1-8037-7801192932ba/Tables/fabric_workpsaces")
)

StatementMeta(, , -1, SessionStarting, , SessionStarting)

In [5]:
#Filter Workspaces for a capactity
filtered_workspaces_df = workspacess_df.filter(col("capacityId") == "76F4499E-05FF-44A9-8C2F-323EE14EA1A7")
workspaceIds_list = [row["id"] for row in filtered_workspaces_df.select("id").collect()]

StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 7, Finished, Available, Finished)

In [6]:
#Reading the Datafrom the delta file
datasets_df = (
    spark.read.format("delta")
    .load("abfss://d3120490-76ae-4ef4-a440-2bd65732ccdc@onelake.dfs.fabric.microsoft.com/fecab367-5d3a-41c1-8037-7801192932ba/Tables/datasets")
)

StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 8, Finished, Available, Finished)

In [7]:
#Filter Datsets for a capactity
filtered_datasets_df = datasets_df.filter(col("workspaceId").isin(workspaceIds_list))
datasetIds_list = [row["id"] for row in filtered_datasets_df.select("id").collect()]

StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 9, Finished, Available, Finished)

In [14]:
#get_dataset_refresh_histroy
def get_dataset_refresh_histroy(workspaceId, datasetId):
    """Fetch dataset refresh history from Power BI API."""
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}"}
    errors = []
    url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspaceId}/datasets/{datasetId}/refreshes"
    response = requests.get(url, headers=headers)

    if response.status_code != 200:
        errors.append(pd.DataFrame([{"datasetId": datasetId, "statusCode":response.status_code} | {key: None for key in [
            "requestId", "id", "refreshType", "status", "refreshAttempts",
            "startTime", "endTime", "DataStartTime", "DataEndTime", 
            "QueryStartTime", "QueryEndTime"]}]))

        return (pd.DataFrame([{"datasetId": datasetId} | {key: None for key in [
            "requestId", "id", "refreshType", "status", "refreshAttempts",
            "startTime", "endTime", "DataStartTime", "DataEndTime", 
            "QueryStartTime", "QueryEndTime"]}]), errors)

    res = response.json()
    if not res.get("value"):  # If "value" is empty or missing, return default None values
        errors.append(pd.DataFrame([{"datasetId": datasetId, "statusCode":response.status_code} | {key: None for key in [
            "requestId", "id", "refreshType", "status", "refreshAttempts",
            "startTime", "endTime", "DataStartTime", "DataEndTime", 
            "QueryStartTime", "QueryEndTime"]}]))
        return (pd.DataFrame([{"datasetId": datasetId} | {key: None for key in [
            "requestId", "id", "refreshType", "status", "refreshAttempts",
            "startTime", "endTime", "DataStartTime", "DataEndTime", 
            "QueryStartTime", "QueryEndTime"]}]), errors)

    records = [{
        "datasetId": datasetId,
        "requestId": entry.get("requestId", None),
        "id": entry.get("id", None),
        "refreshType": entry.get("refreshType", None),
        "status": entry.get("status", None),
        "refreshAttempts": len(entry.get("refreshAttempts", [])),
        "startTime": entry.get("startTime", None),
        "endTime": entry.get("endTime", None),
        "DataStartTime": next((att.get("startTime") for att in entry.get("refreshAttempts", []) if att.get("type") == "Data"), None),
        "DataEndTime": next((att.get("endTime") for att in entry.get("refreshAttempts", []) if att.get("type") == "Data"), None),
        "QueryStartTime": next((att.get("startTime") for att in entry.get("refreshAttempts", []) if att.get("type") == "Query"), None),
        "QueryEndTime": next((att.get("endTime") for att in entry.get("refreshAttempts", []) if att.get("type") == "Query"), None)
    } for entry in res["value"]]

    return (pd.DataFrame(records),errors)


StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 16, Finished, Available, Finished)

In [15]:
all_temp_dfs = []  # List to store non-empty DataFrames

for row in filtered_datasets_df.toLocalIterator():  # No need for unpacking
    temp_df, errors = get_dataset_refresh_histroy(row["workspaceId"], row["id"])
    
    if not temp_df.empty:  # Append only if temp_df is not empty
        all_temp_dfs.append(temp_df)
    else:
        print(f"No data for: workspaceId={row['workspaceId']}, id={row['id']}")


StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 17, Finished, Available, Finished)

In [17]:
datasets_refresh_history_df = pd.concat(all_temp_dfs, ignore_index=True)
#Creating Spark DataFrame
spark_datasets_refresh_history_df = spark.createDataFrame(datasets_refresh_history_df)

# Read existing requestIds from datasets_refresh_history table
#existing_df = spark.read.format("delta").load("abfss://d3120490-76ae-4ef4-a440-2bd65732ccdc@onelake.dfs.fabric.microsoft.com/fecab367-5d3a-41c1-8037-7801192932ba/Tables/datasets_refresh_history").select("requestId")

existing_df = (
    spark.read.format("delta")
    .load("abfss://d3120490-76ae-4ef4-a440-2bd65732ccdc@onelake.dfs.fabric.microsoft.com/fecab367-5d3a-41c1-8037-7801192932ba/Tables/datasets_refresh_history")
    .select("requestId")
    .dropna(subset=["requestId"])  # Remove null requestId
    .dropDuplicates(["requestId"])  # Remove duplicates
)

new_spark_datasets_refresh_history_df = spark_datasets_refresh_history_df.join(existing_df, "requestId", "left_anti")

if new_spark_datasets_refresh_history_df.count() > 0:
    new_spark_datasets_refresh_history_df.write.format("delta").mode("append").saveAsTable("datasets_refresh_history")
    print(f"Appended {new_spark_datasets_refresh_history_df.count()} new records to datasets_refresh_history.")
else:
    print("No new records found. Skipping append.")

StatementMeta(, 1b46249c-5f8b-4775-8df1-a14b161ed0af, 19, Finished, Available, Finished)

Appended 46 new records to datasets_refresh_history.


In [None]:
# Load the Delta table
df = spark.read.format("delta").load("abfss://d3120490-76ae-4ef4-a440-2bd65732ccdc@onelake.dfs.fabric.microsoft.com/fecab367-5d3a-41c1-8037-7801192932ba/Tables/datasets_refresh_history")

# Remove duplicate rows based on all columns
df_deduplicated = df.dropDuplicates()

# Overwrite the table with deduplicated data
df_deduplicated.write.format("delta").mode("overwrite").save("abfss://d3120490-76ae-4ef4-a440-2bd65732ccdc@onelake.dfs.fabric.microsoft.com/fecab367-5d3a-41c1-8037-7801192932ba/Tables/datasets_refresh_history")

print("Duplicate rows removed, and table updated.")


StatementMeta(, f56a0605-a805-41d4-92bd-b17518eb2276, 30, Finished, Available, Finished)

Duplicate rows removed, and table updated.
