In [None]:
#Import required libraries

%pip install semantic-link
%load_ext sempy
import pandas as pd
import sempy.fabric as fabric
from datetime import date, datetime, timedelta
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampNTZType

spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
spark.conf.set("spark.sql.catalog.pbi", "com.microsoft.azure.synapse.ml.powerbi.PowerBICatalog")

In [None]:
#Define required parameters
dataset = 'Fabric Capacity Metrics' #Default name of the Semantic Model
talbe_name = 'Capacities' #Table Name to get all Capacities
today = datetime.today() #Get Today's date
timepoint_start = today - timedelta(days=1) #Get yesterday's date on which we will extract all details
path_bronze = 'Files/01 Bronze/'
path_silver = 'Files/02 Silver/'
path_gold = 'Files/03 Gold/'
dataframe_schema = StructType([
                StructField("BillingType", StringType(), nullable=True),
                StructField("Status", StringType(), nullable=True),
                StructField("OperationStartTime", TimestampNTZType(), nullable=True),
                StructField("OperationEndTime", TimestampNTZType(), nullable=True),
                StructField("User", StringType(), nullable=True),
                StructField("Operation", StringType(), nullable=True),
                StructField("OperationID", StringType(), nullable=True),
                StructField("WorkspaceName", StringType(), nullable=True),
                StructField("Item", StringType(), nullable=True),
                StructField("ItemName", StringType(), nullable=True),
                StructField("TimepointCUs", FloatType(), nullable=True),
                StructField("DurationInS", IntegerType(), nullable=True),
                StructField("TotalCUInS", FloatType(), nullable=True),
                StructField("Throttling", IntegerType(), nullable=True),
                StructField("PercentageOfBaseCapacity", FloatType(), nullable=True),
                StructField("capacityId", StringType(), nullable=True),
                StructField("Timepoint", TimestampNTZType(), nullable=True),
                StructField("OperationType", StringType(), nullable=True)
            ])

In [None]:
#Get all capacities
df_capacities = spark.sql("""
    SELECT c.capacityId

    FROM pbi.`""" + dataset + """`.Capacities c
""")

In [None]:
#For testing purpose filtering it down to only one capacity
capacity_id = '...'
df_capacities = df_capacities[df_capacities['capacityId'] == capacity_id]

display(df_capacities)

In [None]:
#Create a function to get Background Operations of the Metrics App

def generate_dax_background_operation(date_today, capacity_id):
    """
    Generate the DAX statement which is used to get the background operations of the Metrics App for a given Capacity and day. 
    
    Arguments required: 
        date_today (datetime) - Date on which the background operation should be extracted
        capacity_id (string) - Capacity ID on which the background operation should be extracted

    Returns:
        DAX Statement (string)

    """

    #timepoint_start = date_today.replace(hour=0, minute=0, second=0, microsecond=0) #Set timepoint to the beginning of the day
    timepoint_start = date_today.replace(day=24, month=4, year=2024, hour=9, minute=00, second=00, microsecond=00) #Use this timepoint to get a specific one - used for testing purpose
    timepoint_next = timepoint_start
    i = 0 #Initialising iteration count to check if all timepoints (2880 in total for a day) has been covered

    #while timepoint_next.day == timepoint_start.day: #As long as the day of the next timepoint is the same as start timepiont, loop will continue and add 30seconds at the end
    while timepoint_next <= datetime.strptime('24.04.2024 09:02:00', "%d.%m.%Y %H:%M:%S"): #Use this filter to get some specific timepoints only - used for testing purpose
        current_year = str(timepoint_next.year)
        current_month = str(timepoint_next.month)
        current_day = str(timepoint_next.day)
        starting_hour = str(timepoint_next.hour)
        starting_minutes = str(timepoint_next.minute)
        starting_seconds = str(timepoint_next.second)

        dax_background_operation = '''
                    DEFINE
                        MPARAMETER 'CapacityID' = "''' + capacity_id + '''"
                        MPARAMETER 'TimePoint' = (DATE(''' + current_year + ''', ''' + current_month + ''', ''' + current_day + ''') + TIME(''' + starting_hour + ''', ''' + starting_minutes + ''', ''' + starting_seconds + '''))

                        VAR varFilter_Capacity = TREATAS({"''' + capacity_id + '''"}, 'Capacities'[capacityId])	
                        VAR varFilter_TimePoint = 
                            TREATAS(
                                {(DATE(''' + current_year + ''', ''' + current_month + ''', ''' + current_day + ''') + TIME(''' + starting_hour + ''', ''' + starting_minutes + ''', ''' + starting_seconds + '''))},
                                'TimePoints'[TimePoint]
                            )		
                        VAR varTable_Details =
                            SUMMARIZECOLUMNS(
                                'TimePointBackgroundDetail'[OperationStartTime],
                                'TimePointBackgroundDetail'[OperationEndTime],
                                'TimePointBackgroundDetail'[Status],
                                'TimePointBackgroundDetail'[Operation],
                                'TimePointBackgroundDetail'[User],
                                'TimePointBackgroundDetail'[OperationId],
                                'TimePointBackgroundDetail'[Billing type],
                                'Items'[WorkspaceName],
                                'Items'[ItemKind],
                                'Items'[ItemName],
                                
                                varFilter_Capacity,
                                varFilter_TimePoint,
                                
                                "Timepoint CU (s)", SUM('TimePointBackgroundDetail'[Timepoint CU (s)]),
                                "Duration (s)", SUM('TimePointBackgroundDetail'[Duration (s)]),
                                "Total CU (s)", CALCULATE(SUM('TimePointBackgroundDetail'[Total CU (s)])),
                                "Throttling", CALCULATE(SUM('TimePointBackgroundDetail'[Throttling (s)])),
                                "% of Base Capacity", CALCULATE(SUM('TimePointBackgroundDetail'[% of Base Capacity]))
                            )
                                
                    EVALUATE  SELECTCOLUMNS(
                        varTable_Details,
                        "BillingType", [Billing type],
                        "Status", [Status],
                        "OperationStartTime", [OperationStartTime],
                        "OperationEndTime", [OperationEndTime],
                        "User", [User],
                        "Operation", [Operation],
                        "OperationID", [OperationId],
                        "WorkspaceName", [WorkspaceName],
                        "Item", [ItemKind],
                        "ItemName", [ItemName],
                        "TimepointCUs", [Timepoint CU (s)],
                        "DurationInS", [Duration (s)],
                        "TotalCUInS", [Total CU (s)],
                        "Throttling", [Throttling],
                        "PercentageOfBaseCapacity", [% of Base Capacity]	
                    )'''
        
        df_dax_result = fabric.evaluate_dax(
            dataset,
            dax_background_operation
            )

        if not df_dax_result.empty:
        
            df_dax_result['capacityId'] = capacity_id
            df_dax_result['Timepoint'] = timepoint_next
            df_dax_result['OperationType'] = 'background'

            #Set path and file name
            subfolder = str(timepoint_next.date()) + '/'
            file_name = timepoint_next.strftime("%H-%M-%S")

            #Convert Fabric DataFrames into Spark DataFrames
            df_dax_result_spark = spark.createDataFrame(df_dax_result, schema=dataframe_schema)

            #Save DataFrames to OneLake
            df_dax_result_spark.write.mode("overwrite").format("parquet").save(path_bronze + 'Background Operation/' + subfolder + file_name)
            
        #Don't change timepoint intervals, as 30sec intervals are given
        timepoint_next = timepoint_next + timedelta(seconds = 30)

        i = i + 1

    return i

In [None]:
#Create a function to get Interactive Operations of the Metrics App

def generate_dax_interactive_operation(date_today, capacity_id):
    """
    Generate the DAX statement which is used to get the interactive operations of the Metrics App for a given Capacity and day. 
    
    Arguments required: 
        date_today (datetime) - Date on which the interactive operation should be extracted
        capacity_id (string) - Capacity ID on which the interactive operation should be extracted

    Returns:
        DAX Statement (Pandas DataFrame)

    """

    #timepoint_start = date_today.replace(hour=0, minute=0, second=0, microsecond=0) #Set timepoint to the beginning of the day
    timepoint_start = date_today.replace(day=24, month=4, year=2024, hour=9, minute=00, second=00, microsecond=00) #Use this timepoint to get a specific one - used for testing purpose
    timepoint_next = timepoint_start
    i = 0 #Initialising iteration count to check if all timepoints (2880 in total for a day) has been covered

    #while timepoint_next.day == timepoint_start.day: #As long as the day of the next timepoint is the same as start timepoint, loop will continue and add 30seconds at the end
    while timepoint_next <= datetime.strptime('24.04.2024 09:02:00', "%d.%m.%Y %H:%M:%S"): #Use this filter to get some specific timepoints only - used for testing purpose
        current_year = str(timepoint_next.year)
        current_month = str(timepoint_next.month)
        current_day = str(timepoint_next.day)
        starting_hour = str(timepoint_next.hour)
        starting_minutes = str(timepoint_next.minute)
        starting_seconds = str(timepoint_next.second)

        dax_interactive_operation = '''
                    DEFINE
                        MPARAMETER 'CapacityID' = "''' + capacity_id + '''"
                        MPARAMETER 'TimePoint' = (DATE(''' + current_year + ''', ''' + current_month + ''', ''' + current_day + ''') + TIME(''' + starting_hour + ''', ''' + starting_minutes + ''', ''' + starting_seconds + '''))

                        VAR varFilter_Capacity = TREATAS({"''' + capacity_id + '''"}, 'Capacities'[capacityId])	
                        VAR varFilter_TimePoint = 
                            TREATAS(
                                {(DATE(''' + current_year + ''', ''' + current_month + ''', ''' + current_day + ''') + TIME(''' + starting_hour + ''', ''' + starting_minutes + ''', ''' + starting_seconds + '''))},
                                'TimePoints'[TimePoint]
                            )		
                        VAR varTable_Details =
                            SUMMARIZECOLUMNS(
                                'TimePointInteractiveDetail'[OperationStartTime],
                                'TimePointInteractiveDetail'[OperationEndTime],
                                'TimePointInteractiveDetail'[Status],
                                'TimePointInteractiveDetail'[Operation],
                                'TimePointInteractiveDetail'[User],
                                'TimePointInteractiveDetail'[OperationId],
                                'TimePointInteractiveDetail'[Billing type],
                                'Items'[WorkspaceName],
                                'Items'[ItemKind],
                                'Items'[ItemName],
                                
                                varFilter_Capacity,
                                varFilter_TimePoint,
                                
                                "Timepoint CU (s)", SUM('TimePointInteractiveDetail'[Timepoint CU (s)]),
                                "Duration (s)", SUM('TimePointInteractiveDetail'[Duration (s)]),
                                "Total CU (s)", CALCULATE(SUM('TimePointInteractiveDetail'[Total CU (s)])),
                                "Throttling", CALCULATE(SUM('TimePointInteractiveDetail'[Throttling (s)])),
                                "% of Base Capacity", CALCULATE(SUM('TimePointInteractiveDetail'[% of Base Capacity]))
                            )
                                
                    EVALUATE  SELECTCOLUMNS(
                        varTable_Details,
                        "BillingType", [Billing type],
                        "Status", [Status],
                        "OperationStartTime", [OperationStartTime],
                        "OperationEndTime", [OperationEndTime],
                        "User", [User],
                        "Operation", [Operation],
                        "OperationID", [OperationId],
                        "WorkspaceName", [WorkspaceName],
                        "Item", [ItemKind],
                        "ItemName", [ItemName],
                        "TimepointCUs", [Timepoint CU (s)],
                        "DurationInS", [Duration (s)],
                        "TotalCUInS", [Total CU (s)],
                        "Throttling", [Throttling],
                        "PercentageOfBaseCapacity", [% of Base Capacity]	
                    )'''
        
        df_dax_result = fabric.evaluate_dax(
            dataset,
            dax_interactive_operation
            )
        
        if not df_dax_result.empty:

            df_dax_result['capacityId'] = capacity_id
            df_dax_result['Timepoint'] = timepoint_next
            df_dax_result['OperationType'] = 'interactive'
            
            #Set path and file name
            subfolder = str(timepoint_next.date()) + '/'
            file_name = timepoint_next.strftime("%H-%M-%S")

            #Convert Fabric DataFrames into Spark DataFrames
            df_dax_result_spark = spark.createDataFrame(df_dax_result, schema=dataframe_schema)

            #Save DataFrames to OneLake
            df_dax_result_spark.write.mode("overwrite").format("parquet").save(path_bronze + 'Interactive Operation/' + subfolder + file_name)

        #print(i, timepoint_next, "interactive")
        
        #Don't change timepoint intervals, as 30sec intervals are given
        timepoint_next = timepoint_next + timedelta(seconds = 30)

        i = i + 1

    return i

In [None]:
#Get for each capacity background and interactive operations
for row in df_capacities.toLocalIterator():
    capacity_id = row['capacityId']
    
    i_background = generate_dax_background_operation(timepoint_start, capacity_id)
    i_interactive = generate_dax_interactive_operation(timepoint_start, capacity_id)

In [None]:
#Set Subfolder and file name
subfolder = str(timepoint_start.date()) + '/'
file_name = str(timepoint_start.date())

#Read folder
df_background_bronze = spark.read.parquet(path_bronze + 'Background Operation/' + subfolder + '/*')
df_interactive_bronze = spark.read.parquet(path_bronze + 'Interactive Operation/' + subfolder + '/*')

#Save DataFrames to OneLake Silver layer
df_background_bronze.write.mode("overwrite").format("parquet").save(path_silver + 'Background Operation/' + file_name)
df_interactive_bronze.write.mode("overwrite").format("parquet").save(path_silver + 'Interactive Operation/' + file_name)

In [None]:
#Read folder from Silver layer
df_background_silver = spark.read.parquet(path_silver + 'Background Operation/' + file_name)
df_interactive_silver = spark.read.parquet(path_silver + 'Interactive Operation/' + file_name)

#Combine background and interactive operations into one DataFrame
df_all_operations = df_background_silver.unionByName(df_interactive_silver)

#Save DataFrame into Gold Layer of OneLake
df_all_operations.write.mode("overwrite").format("delta").save(path_gold + file_name)

In [None]:
df_all_operations_gold = spark.read.parquet(path_gold + file_name)
df_all_operations_gold.write.mode("append").format("delta").save('Tables/AllOperations')