# Automated stress testing for PowerBI embeded Report

In [1]:
import csv
import threading
import time
import requests
import msal
import random
import os
from datetime import timedelta
from datetime import datetime
from azure.identity import DefaultAzureCredential
from azure.monitor.query import MetricsQueryClient
from azure.core.exceptions import HttpResponseError
from azure.identity import ClientSecretCredential
from azure.monitor.query import MetricsQueryClient
from flask import Flask, render_template_string, abort
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options

As till now you connot directly apply filter on a embeded paiginated report so we are first going to open the powerBI report Then navigate to paiginated report page. in order to get to that page we need page name and that can be found in the url when you goto that page it should be right after report_id it would wither start with ReportSection or just random character after "/"

In [None]:
#Here apply your data extraction logic Your data should be stored in REPORTS[] where it shoud have


# report_name: name_of_the_report
# report_id: id_of_the_report
# dataset_id: dataset_id
# report_id_p: id_of_the_embeded_paiginated_report
# page_name: name_of_the_page_you_want_to_navigate_to
# filter: any_filter_value_you_need_to_apply


In [None]:
for report in REPORTS:
    for key, value in report.items():  # Iterate through each key-value pair
        print(f"{key}: {value}")
    print("-" * 30)  # Separator line for readability

In [7]:
# --- Flask app setup ---
app = Flask(__name__)

# --- CONFIGURATION ---
#These credentials can be found in azure Entra > app registration and there you can find Client ID and Client secret and if you dont have any registration then you need to make one 
CLIENT_ID = "Enter_Client_ID"
CLIENT_SECRET = "Enter_Client_Secret"
TENANT_ID = "Enter_Tenan_ID"
WORKSPACE_ID = "Enter_Workspace_ID"
USERNAME = "Enter_Username"
ROLES = ["Enter_your_role"] #If there is no role set up remove it from token generation request

AUTHORITY_URL = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPE = ["https://analysis.windows.net/powerbi/api/.default"]

# These 3 credentials are only required if you need to check the load on the server i.e cpu metrics
subscription_id = 'Enter_Subscription_ID' 
resource_group = 'Enter_Resource_group'
capacity_name = 'Enter_Capacity name'


In [None]:

#----------------------------
#This Code is responsible for checking the load on the server if you are trying to set that up then you can remove this
# Authenticate
credential = ClientSecretCredential(
    tenant_id=TENANT_ID,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET
)

client = MetricsQueryClient(credential)

# Azure resource ID for Power BI capacity
resource_uri = (
    f"/subscriptions/{subscription_id}/resourceGroups/{resource_group}/"
    f"providers/Microsoft.PowerBIDedicated/capacities/{capacity_name}"
)

# Query CPU metric
metrics_response = client.query_resource(
    resource_uri=resource_uri,
    metric_names=["cpu_metric"], #qpu_metric   cpu_metric  cpu_workload_metric
    timespan=timedelta(minutes=1),  
    aggregations=["Average"]
)

# Display the CPU usage data
#for metric in metrics_response.metrics:
#    print(f"Metric: {metric.name}")
#    for ts in metric.timeseries:
#        for data in ts.data:
#            print(f"Time: {data.timestamp}, Avg CPU: {data.average}")

#------------------------------------------------------


In [9]:
HTML_TEMPLATE = """<!DOCTYPE html>
<html>
<head>
    <title>Power BI Embedded Report {{ report_name }}</title>
    <script src="https://cdn.jsdelivr.net/npm/powerbi-client@2.19.1/dist/powerbi.min.js"></script>
</head>
<body>
    <h2>Embedded Power BI Report {{ report_name }}</h2>
    <div id="reportContainer" style="height: 700px; width: 100%;"></div>

    <script>
        const models = window['powerbi-client'].models;

        // Define the filter 
        // provide the details here if you need apply the filter below
        
        const filters =  [
            {
                $schema: "http://powerbi.com/product/schema#basic",
                target: {
                    table: "",
                    column: ""
                },
                operator: "In",
                values: ["{{  }}"]
               
            }
        ];

        const embedConfig = {
            type: 'report',
            id: '{{ report_id }}',
            embedUrl: '{{ embed_url }}',
            accessToken: '{{ embed_token }}',
            tokenType: models.TokenType.Embed,
            settings: {
                panes: {
                    filters: { visible: false },
                    pageNavigation: { visible: true }
                }
            },
            // filters: filters // uncomment to Add filters 
        };

        const reportContainer = document.getElementById('reportContainer');
        const report = powerbi.embed(reportContainer, embedConfig);

        report.on("loaded", function () {
            report.getPages().then(function (pages) {
                const targetPageName = "{{ page_name }}";
                const targetPage = pages.find(p => p.name === targetPageName);
                if (targetPage) {
                    report.setPage(targetPage.name).catch(function(error) {
                        console.error("Error setting page:", error);
                    });
                } else {
                    console.warn("Target page not found:", targetPageName);
                }
            }).catch(function(error) {
                console.error("Error retrieving pages:", error);
            });
        });
    </script>
</body>
</html>

"""


In [10]:

def run_flask():
    app.run(debug=False, port=5000)

# Function to get access token from Power BI
def get_access_token():
    app_conf = msal.ConfidentialClientApplication(
        CLIENT_ID,
        authority=AUTHORITY_URL,
        client_credential=CLIENT_SECRET
    )
    token_result = app_conf.acquire_token_for_client(scopes=SCOPE)
    if "access_token" not in token_result:
        return None, token_result
    return token_result["access_token"], None

In [11]:

# this function returns an array of data which has the time and CPU metric in it 
def get_cpu_data_points():
    
    # Query CPU metric
    metrics_response = client.query_resource(
        resource_uri=resource_uri,
        metric_names=["cpu_metric"],
        #you can set how far in the past you want the data to be 
        timespan=timedelta(minutes=1), #minutes, hours, days etc
        aggregations=["Average"]
    )
    
    # Flatten and filter data points with valid average
    data_points = [
        data for metric in metrics_response.metrics
        for ts in metric.timeseries
        for data in ts.data
        if data.average is not None
    ]
    
    return data_points

In [12]:


# Flask route for index (listing available reports)
@app.route("/")
def index():
    
    links = [f'<li><a href="/report/{i}">Report {i+1}</a></li>' for i in range(len(REPORTS))]
    return f"<h1>Available Reports</h1><ul>{''.join(links)}</ul>"

# Flask route for embedding a report
@app.route("/report/<int:report_index>")
def embed_report(report_index):
    try:
        if report_index < 0 or report_index >= len(REPORTS):
            abort(404, description="Report not found")

        report_info = REPORTS[report_index]
        report_id = report_info["report_id"]
        report_id_p = report_info.get("report_id_p")
        page_name = report_info.get("page_name")
        dataset_id = report_info.get("dataset_id")
        #filter = report_info.get("filter") 
        report_name = report_info.get("report_name")

        if not dataset_id:
            # If dataset_id is missing, return a user-friendly error message
            return f"Error: No dataset found for report {report_id}."

        access_token, error = get_access_token()
        if error:
            return f"Error acquiring access token: {error}"

        headers = {"Authorization": f"Bearer {access_token}"}

        report_url = f"https://api.powerbi.com/v1.0/myorg/groups/{WORKSPACE_ID}/reports/{report_id}"
        report_response = requests.get(report_url, headers=headers)

        if report_response.status_code != 200:
            print(f"Failed to get report. Status code: {report_response.status_code}")
            print(f"Error response: {report_response.text}")
            return f"Failed to retrieve report: {report_response.text}"

        embed_url = report_response.json().get("embedUrl")
          
       



        
        if not embed_url:
            print(f"Error: embed_url is missing for report {report_id}")
            return "Error: embed URL not found for the report."

        #if you do not have assigned role refer to powerBI token generation documentation and update the token body
        token_body = {
            "reports": [{"id": report_id, "groupId": WORKSPACE_ID},
             {"id": report_id_p, "groupId": WORKSPACE_ID}
                       ],
            "datasets": [{"id": dataset_id, "groupId": WORKSPACE_ID,"xmlaPermissions": "ReadOnly"}],
            "identities": [{
                "username": USERNAME,
                "roles": ROLES,
                "datasets": [dataset_id]
            }],
            "accessLevel": "View"
        }

        print(token_body)

        token_response = requests.post(
            "https://api.powerbi.com/v1.0/myorg/GenerateToken",
            headers={**headers, "Content-Type": "application/json"},
            json=token_body
        )

        if token_response.status_code != 200:
            print(f"Failed to generate embed token. Status code: {token_response.status_code}")
            print(f"Error response: {token_response.text}")
            return f"Failed to generate embed token: {token_response.text}"

        embed_token = token_response.json().get("token")

        if not embed_token:
            print(f"Error: embed_token is missing for report {report_id}")
            return "Error: embed token not found."

        print("page Name = ", page_name)
        print("Report Name = ", report_name)


        # below is the code to log which report opened and when also the CPU usage and its timestamp since the usage metrics provide the data 2 minutes in the past
        
        data_points = get_cpu_data_points()

        if data_points:
            usage_lines = [
                f"(CPU Time: {dp.timestamp}, Avg CPU: {dp.average})"
                for dp in data_points
            ]
            usage_summary = " | ".join(usage_lines)
        else:
            usage_summary = "No valid data points"

        # Step 3: Log with report details
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_line = (
            f"[{timestamp}], Paginated Report opened: {report_name}, "
            f"Report ID: {report_id_p}, 
            #filter: {filter}, 
            "
            f"Usage: {usage_summary}\n"
        )
        with open("Load_Testing.txt","a") as f:
            
            f.write(log_line)



        
        return render_template_string(
            HTML_TEMPLATE,
            report_id=report_id,
            embed_url=embed_url,
            embed_token=embed_token,
            page_name = page_name,
            report_name = report_name
            #,filter = filter uncomment if you have any filter
            
        )
    
    except Exception as e:
        # Catch all unexpected errors and return a generic error message
        print(f"Error: {str(e)}")
        return f"An unexpected error occurred: {str(e)}"

In [13]:


# --- Selenium setup and run ---
def run_selenium(num_reports):
    delay_time = 10.0
    brave_path = r"C:\Users\<YourUsername>\AppData\Local\BraveSoftware\Brave-Browser\Application" #enter your browser.exe path here
    
    chromedriver_path = r"C:\chromedriver\chromedriver-win64\chromedriver.exe" # enter your chrome driver path here
    
    options = Options()
    options.binary_location = brave_path
    options.add_argument("--disable-popup-blocking")
    options.add_argument("--disable-web-security")
    
    service = Service(executable_path=chromedriver_path)
    driver = webdriver.Chrome(service=service, options=options)
    delay_time = 10.0
    
    # Open the reports in new tabs
    for i in range(0, num_reports):  #num_reports
        
        

        driver.get(f"http://localhost:5000/report/{i}")
       
        
        #if you need to dynamically test the load 
        
        #data_points = get_cpu_data_points()
        #if data_points:
            #latest = max(data_points, key=lambda d: d.timestamp)
            #print(f"Latest Metric Time: {latest.timestamp}, Avg CPU: {latest.average}")
            #if latest.average > 80.0 and delay_time < 40.0:
                #delay_time += 5.0
            #elif latest.average < 75.0 and delay_time > 10.0:
                #delay_time -= 0.5
        #else:
            #print("No data available")
        
    
            
           
        
        time.sleep(delay_time)
        #driver.quit()
 

    #Keep browser open for a while (adjust as needed)
    #time.sleep(15)
    #driver.quit()




In [None]:
# Function to check if the server is up
def check_flask_server():
    url = "http://localhost:5000/"
    while True:
        try:
            response = requests.get(url)
            if response.status_code == 200:
                print("Flask server is up and running!")
                return True
        except requests.exceptions.RequestException:
            pass  # Ignore errors and keep trying
        time.sleep(5)  # Wait before retrying

# Main execution block
if __name__ == "__main__":
    # Start Flask app in a thread
    flask_thread = threading.Thread(target=run_flask, daemon=True)
    flask_thread.start()
    
    print("Waiting for Flask server to start...")
    
    # Wait until Flask server is ready to respond
    check_flask_server()

    # Open all report pages in browser tabs
    run_selenium(len(REPORTS))