In [2]:
# Install semantic-link-labs library quietly
%pip install semantic-link-labs --q

# Import required libraries
from pyspark.sql.functions import lit, col, expr, lpad, date_format, dayofweek, year, quarter, month, date_add, datediff, concat, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, DecimalType
from concurrent.futures import ThreadPoolExecutor, as_completed
from delta.tables import DeltaTable
import sempy.fabric as fabric
import sempy_labs as sl
import pandas as pd
import datetime
import time
import re

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 14, Finished, Available, Finished)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [3]:
# Set functions for processing

def get_lakehouse_from_catalog(
        lakehouse_name,
        create_if_not_exist = True,
        description = "",
        workspace_id = ""
    ):
        """
        Checks if a Lakehouse exists in Microsoft Fabric and optionally creates it if it does not exist.
        """
        try:
            # Try to get the Lakehouse using the mssparkutils.lakehouse.get() method
            mssparkutils.lakehouse.get(name=lakehouse_name, workspaceId=workspace_id)
            print(f"Lakehouse '{lakehouse_name}' exists.")
            return True
        except Exception as e:
            # If the Lakehouse does not exist, we can choose to create it
            if "not found" in str(e).lower():
                if create_if_not_exist:
                    print(f"Creating Lakehouse '{lakehouse_name}'...")
                    try:
                        mssparkutils.lakehouse.create(name=lakehouse_name, description=description, workspaceId=workspace_id)
                        print(f"Lakehouse '{lakehouse_name}' created successfully.")
                        return True
                    except Exception as create_error:
                        print(f"Failed to create Lakehouse '{lakehouse_name}': {create_error}")
                        raise Exception(f"Failed to create Lakehouse '{lakehouse_name}': {create_error}")
                return False
            else:
                # Re-raise any unexpected exceptions
                print(f"Error occurred while retrieving Lakehouse '{lakehouse_name}': {e}")
                raise Exception(f"Error occurred while retrieving Lakehouse '{lakehouse_name}': {e}")

def poll_refresh_status(
        model, 
        refresh_id, 
        workspace, 
        timeout=120, 
        interval=5
    ):
    """
    Polls the status of a refresh operation until completion or failure.
    Returns the final execution details.
    """
    start_time = time.time()
    while True:
        # Check the status of the refresh operation
        details = fabric.get_refresh_execution_details(model, refresh_id, workspace)
        status = details.status

        print(f'Current status: {status}')

        # Exit loop if the operation has completed or failed
        if status in ['Completed', 'Failed']:
            print(f'Refresh operation finished with status: {status}')
            return details

        # Check for timeout
        if time.time() - start_time > timeout:
            print('Timeout reached, exiting status check.')
            return details

        # Wait briefly before the next status check
        time.sleep(interval)

def clean_and_lowercase(name):
    """
    Remove special characters from the name, replace them with underscores, trim leading and trailing underscores,
    and convert the result to lowercase.
    """
    # Replace special characters with underscores
    name_with_underscores = re.sub(r'\W+', '_', name)

    # Trim leading/trailing underscores and convert to lowercase
    cleaned_name = re.sub(r'^_+|_+$', '', name_with_underscores).lower()
    
    return cleaned_name

def parse_header(name):
    """
    Extract text within brackets, remove special characters, then convert to lowercase.
    If no brackets are found, clean the name directly.
    """
    # Search for text within brackets
    match = re.search(r'\[(.*?)\]', name)
    
    # Use found text or the whole name if no brackets are found
    return clean_and_lowercase(match.group(1)) if match else clean_and_lowercase(name)

def evaluate_and_write_table(table_name):
    """
    Evaluate a DAX query on the specified table, clean column names, convert the DataFrame to Spark format,
    and write it to parquet and delta formats.
    """
    # Evaluate table using DAX and the Fabric Capacity model
    df_table = fabric.evaluate_dax(metrics_app_model, f"EVALUATE '{table_name}'", metrics_app_workspace)
    
    # Clean up column names
    df_table.columns = [parse_header(col) for col in df_table.columns]
    
    # Clean table name for use in paths
    clean_table_name = f'stage_{clean_and_lowercase(table_name)}'
    
    # Define file storage directory
    table_directory = f'{lakehouse_file_path}/capacity_metrics/{clean_table_name}'
    
    # File path with timestamp
    file_path = f"{table_directory}/{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    # Convert DataFrame to Spark format
    df_table = spark.createDataFrame(df_table)
    
    # Save DataFrame in parquet format
    df_table.write.format('parquet').save(file_path)
    
    # Save DataFrame in delta format
    df_table.write.format('delta').mode('overwrite').save(f'{lakehouse_table_path}/{clean_table_name}')

    # Create temp view
    df_table.createOrReplaceTempView(f'vw_{clean_table_name}')

def process_capacity_app_table(table_name):
    """
    Function used with threadpoolexecutor to capture data from semantic model tables in parallel.
    """
    try:
        evaluate_and_write_table(table_name)
        return f'{table_name} completed successfully.'
    except Exception as exc:
        return f'Error processing {table_name}: {exc}'

def upsert_to_table(config):
    """
    Implements a generic upsert pattern for loading data into a dimensional layer in a Delta table.
    Maintains audit fields for data insertion and updates.
    """
    table_name = config['table_name']
    lakehouse_table_path = config['lakehouse_table_path']
    df_source = config['source_dataframe']
    candidate_columns = config['candidate_columns']
    
    full_table_path = f'{lakehouse_table_path}/{table_name}'
    
    # Capture the current timestamp for audit fields
    current_ts = current_timestamp()

    try:
        # Set update columns
        source_columns = df_source.columns
        
        # Add audit columns
        df_source = df_source \
            .withColumn('insert_datetime', current_ts) \
            .withColumn('update_datetime', current_ts)
        
        # Check if the Delta table exists
        if not DeltaTable.isDeltaTable(spark, full_table_path):
            print(f"Table {table_name} does not exist. Creating it.")
            
            df_source.write.format('delta').save(full_table_path)
            
            print(f"{table_name} successfully created.")
            return

        # Set the Delta table reference
        target = DeltaTable.forPath(spark, full_table_path)
        
        # Dynamically set the merge condition using candidate_columns
        merge_condition = " AND ".join([f"target.`{col}` = source.`{col}`" for col in candidate_columns])

        # Dynamically set the update condition for columns where values differ
        update_condition = " OR ".join([f"target.`{col}` <> source.`{col}`" for col in source_columns])

        # Perform upsert operation
        merge_operation = target.alias('target').merge(
            source=df_source.alias('source'),
            condition=expr(merge_condition)
        ).whenMatchedUpdate(
            condition=expr(update_condition),
            set={
                col: f"source.`{col}`" for col in df_source.columns if col != 'insert_datetime'
            }
        ).whenNotMatchedInsertAll()

        # Execute merge
        merge_operation.execute()

        print(f"{table_name} upsert completed successfully.")
    except Exception as e:
        print(f'An error occurred in upsert_to_delta_table for table {table_name}: {e}')
        raise

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 16, Finished, Available, Finished)

In [4]:
# Set monitoring lakehouse name
monitoring_lakehouse = 'lh_capacity_monitoring'

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 17, Finished, Available, Finished)

In [6]:
# Check if lakehouse exists and create if not
get_lakehouse_from_catalog(
    lakehouse_name=monitoring_lakehouse
)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 19, Finished, Available, Finished)

Lakehouse 'lh_capacity_monitoring' exists.


True

In [7]:
# Retrieve lakehouse details
df_lakehouse_details = mssparkutils.lakehouse.get(
    name=monitoring_lakehouse
)

# Assign id values for lakehouse and workspace
lakehouse_id = df_lakehouse_details['id']
workspace_id = df_lakehouse_details['workspaceId']

# Build lakehouse paths for writes
lakehouse_table_path = f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables'
lakehouse_file_path = f'abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files'

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 20, Finished, Available, Finished)

#### Update with location and name of your Fabric Capacity Metrics app

```
# Define the location of the metrics application
metrics_app_model = 'Fabric Capacity Metrics'
metrics_app_workspace = 'Microsoft Fabric Capacity Metrics'
```

In [8]:
# Define the location of the metrics application
metrics_app_model = 'Fabric Capacity Metrics'
metrics_app_workspace = 'Microsoft Fabric Capacity Metrics'

# Ensure model exists
try:
    sl.list_semantic_model_objects(
        dataset=metrics_app_model,
        workspace=metrics_app_workspace
    )
except Exception as e:
    print(f"An error occurred while listing semantic model objects: {e}")
    raise

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 21, Finished, Available, Finished)

In [9]:
# Trigger refresh of the capacity metrics semantic model
refresh_id = fabric.refresh_dataset(
    dataset=metrics_app_model,
    workspace=metrics_app_workspace
)

# Poll for refresh completion and capture logs
execution_details = poll_refresh_status(metrics_app_model, refresh_id, metrics_app_workspace)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 22, Finished, Available, Finished)

Current status: Unknown
Current status: Unknown
Current status: Completed
Refresh operation finished with status: Completed


In [10]:
# List tables in the model and workspace
df_tables = fabric.list_tables(metrics_app_model, workspace=metrics_app_workspace)

# Extract table names
table_names = df_tables['Name'].tolist()

# Include only specific tables
include_tables = set([
    'MetricsByItemandOperationandHour',
    'StorageByWorkspacesandHour',
    'CUDetail',
    'MaxMemoryByItemAndHour',
    'Items'
])

# Filter table names to include only specified tables
filtered_table_names = [table_name for table_name in table_names if table_name in include_tables]

# Use ThreadPoolExecutor to process tables in parallel
results = []
with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    future_to_table = {executor.submit(process_capacity_app_table, table_name): table_name for table_name in filtered_table_names}
    for future in as_completed(future_to_table):
        table_name = future_to_table[future]
        try:
            result = future.result()
            results.append(result)
        except Exception as exc:
            results.append(f'Error processing {table_name}: {exc}')

# Print results
for result in results:
    print(result)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 23, Finished, Available, Finished)

MaxMemoryByItemAndHour completed successfully.
CUDetail completed successfully.
StorageByWorkspacesandHour completed successfully.
Items completed successfully.
MetricsByItemandOperationandHour completed successfully.


In [11]:
# Define the schema for the workspace DataFrame
schema = StructType([
    StructField('id', StringType(), True),
    StructField('is_read_only', BooleanType(), True),
    StructField('is_on_dedicated_capacity', BooleanType(), True),
    StructField('capacity_id', StringType(), True),
    StructField('default_dataset_storage_format', StringType(), True),
    StructField('type', StringType(), True),
    StructField('name', StringType(), True),
])

try:
    # Fetch workspace details from the fabric service
    df_stage = fabric.list_workspaces()
    
    # Convert the fetched data into a Spark DataFrame using the defined schema
    df_ws_stage_spark = spark.createDataFrame(df_stage, schema=schema)
    
    # Define the table name for storing workspace data
    table_name = 'stage_workspaces'

    # Define file storage directory
    file_directory = f'{lakehouse_file_path}/capacity_metrics/{table_name}'
    
    # File path with timestamp
    file_path = f"{file_directory}/{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    # Save DataFrame in parquet format
    df_ws_stage_spark.write.format('parquet').save(file_path)

    # Write the DataFrame to a Delta table with an overwrite mode
    df_ws_stage_spark.write.format('delta').mode('overwrite').save(f'{lakehouse_table_path}/{table_name}')
    print(f'{table_name} staged successfully.')

    # Create temp view
    df_ws_stage_spark.createOrReplaceTempView(f'vw_{table_name}')

except Exception as e:
    print(f'An unexpected error occurred while capturing workspace details: {e}')


StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 24, Finished, Available, Finished)

stage_workspaces staged successfully.


In [12]:
# Define the schema for the capacity DataFrame
schema = StructType([
    StructField('id', StringType(), True),
    StructField('display_name', StringType(), True),
    StructField('sku', StringType(), True),
    StructField('region', StringType(), True),
    StructField('state', StringType(), True),
])

try:
    # Fetch capacity details from the fabric service
    df_stage = fabric.list_capacities()
    
    # Convert the raw data to a Spark DataFrame using the predefined schema
    df_stage_spark = spark.createDataFrame(df_stage, schema=schema)
    
    # Define the table name for storing capacity data
    table_name = 'stage_capacities'

    # Define file storage directory
    file_directory = f'{lakehouse_file_path}/capacity_metrics/{table_name}'
    
    # File path with timestamp
    file_path = f"{file_directory}/{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    # Save DataFrame in parquet format
    df_stage_spark.write.format('parquet').save(file_path)

    # Write the DataFrame to a Delta table with an overwrite mode
    df_stage_spark.write.format('delta').mode('overwrite').save(f'{lakehouse_table_path}/{table_name}')
    print(f'{table_name} staged successfully.')

    # Create temp view
    df_stage_spark.createOrReplaceTempView(f'vw_{table_name}')

except Exception as e:
    print(f'An unexpected error occurred while capturing capacity details: {e}')


StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 25, Finished, Available, Finished)

stage_capacities staged successfully.


In [13]:
# Define the schema for the item DataFrame
schema = StructType([
    StructField('id', StringType(), True),
    StructField('display_name', StringType(), True),
    StructField('description', StringType(), True),
    StructField('type', StringType(), True),
    StructField('workspace_id', StringType(), True),
])

try:
    # Filter workspaces on dedicated capacity and collect their IDs
    df_workspaces = df_ws_stage_spark.filter(df_ws_stage_spark['is_on_dedicated_capacity'] == 1).select('id').collect()
    
    # Initialize an empty DataFrame to aggregate items from each workspace
    df_items_all = None

    # Aggregate items from each workspace with dedicated capacity
    for ws in df_workspaces:
        ws_id = ws['id']
        df_items = spark.createDataFrame(fabric.list_items(workspace=ws_id), schema=schema)
        
        # Union the current items DataFrame with the aggregated items DataFrame
        if df_items_all is None:
            df_items_all = df_items
        else:
            df_items_all = df_items_all.union(df_items)

    # Define the table name for storing item data
    table_name = 'stage_items_sl'

    # Define file storage directory
    file_directory = f'{lakehouse_file_path}/capacity_metrics/{table_name}'
    
    # File path with timestamp
    file_path = f"{file_directory}/{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    # Save DataFrame in parquet format
    df_items_all.write.format('parquet').save(file_path)

    # Write the DataFrame to a Delta table with an overwrite mode
    df_items_all.write.format('delta').mode('overwrite').save(f'{lakehouse_table_path}/{table_name}')
    print(f'{table_name} staged successfully.')

    # Create temp view
    df_items_all.createOrReplaceTempView(f'vw_{table_name}')

except Exception as e:
    print(f'An unexpected error occurred while capturing item details: {e}')

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 26, Finished, Available, Finished)

stage_items_sl staged successfully.


In [14]:
try:
    # Set table name
    table_name = 'stage_capacity_cost'

    # Set github file path
    url_github = 'https://raw.githubusercontent.com/Lucid-Will/Lucid-Capacity-Monitoring/main/supporting_data_files/capacity_cost_by_region/capacity_cost_by_region.csv'
    
    # Load capacity cost file
    pd_stage = pd.read_csv(url_github)
    df_stage_spark = spark.createDataFrame(pd_stage)
    
    # Define file storage directory
    file_directory = f'{lakehouse_file_path}/capacity_metrics/{table_name}'
    
    # File path with timestamp
    file_path = f"{file_directory}/{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    # Save DataFrame in parquet format
    df_stage_spark.write.format('parquet').save(file_path)

    # Write the DataFrame to a Delta table with an overwrite mode
    df_stage_spark.write.format('delta').mode('overwrite').save(f'{lakehouse_table_path}/{table_name}')
    print(f'{table_name} staged successfully.')

    # Create temp view
    df_stage_spark.createOrReplaceTempView(f'vw_{table_name}')

except Exception as e:
    print(f'An unexpected error occurred while capturing capacity details: {e}')


StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 27, Finished, Available, Finished)

stage_capacity_cost staged successfully.


In [15]:
try:
    # Set table name
    table_name = 'stage_storage_cost'

    # Set GitHub file path
    url_github = 'https://raw.githubusercontent.com/Lucid-Will/Lucid-Capacity-Monitoring/main/supporting_data_files/onelake_storage_by_region/onelake_storage_by_region.csv'
    
    # Load storage cost file
    pd_stage = pd.read_csv(url_github)

    # Create Spark DataFrame with the defined schema
    df_stage_spark = spark.createDataFrame(pd_stage)
    
    # Define file storage directory
    file_directory = f'{lakehouse_file_path}/capacity_metrics/{table_name}'
    
    # File path with timestamp
    file_path = f"{file_directory}/{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

    # Save DataFrame in parquet format
    df_stage_spark.write.format('parquet').save(file_path)

    # Write the DataFrame to a Delta table with an overwrite mode
    df_stage_spark.write.format('delta').mode('overwrite').save(f'{lakehouse_table_path}/{table_name}')
    print(f'{table_name} staged successfully.')

    # Create temp view
    df_stage_spark.createOrReplaceTempView(f'vw_{table_name}')

except Exception as e:
    print(f'An unexpected error occurred while capturing storage details: {e}')


StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 28, Finished, Available, Finished)

stage_storage_cost staged successfully.


In [16]:
try:
    # Assign table name
    table_name = 'stage_date'

    # Check if the date table already exists
    if DeltaTable.isDeltaTable(spark, f'{lakehouse_table_path}/{table_name}'):
        print(f'Table {table_name} already exists. Skipping creation.')
        df_date = spark.read.format("delta").load(f'{lakehouse_table_path}/{table_name}')
    else:
        print(f'Table {table_name} does not exist. Creating it.')

        # Define your date range
        start_date = '1901-01-01'
        end_date = '2100-12-31'

        # Create a DataFrame with the date range difference in days
        date_diff = spark.createDataFrame([(start_date, end_date)], ['start', 'end'])
        date_diff = date_diff.withColumn('diff', datediff(col('end'), col('start'))).collect()[0]['diff']

        # Generate a DataFrame with the sequence of dates
        date_range_diff = spark.range(0, date_diff + 1).withColumnRenamed('id', 'day_id')
        start_date_df = spark.createDataFrame([(start_date,)], ['start_date'])

        # Cast 'day_id' to integer and create a sequence of dates by adding it to 'start_date'
        df_date = start_date_df.crossJoin(date_range_diff) \
            .select(date_add(col('start_date'), col('day_id').cast('int')).alias('date'))

        # Enrich the DataFrame with date attributes
        df_date = df_date.select(
            date_format(col('date'), 'yyyyMMdd').cast('int').alias('calendar_date_key'),
            col('date').alias('calendar_date'),
            dayofweek(col('date')).cast('int').alias('calendar_weekday_number'),
            date_format(col('date'), 'EEEE').alias('calendar_weekday'),
            lpad(month(col('date')).cast('string'), 2, '0').alias('calendar_month_number'),
            date_format(col('date'), 'MMMM').alias('calendar_month'),
            concat(year(col('date')), lpad(month(col('date')).cast('string'), 2, '0')).alias('calendar_year_month_number'),
            date_format(col('date'), 'MMMM yyyy').alias('calendar_month_year'),
            quarter(col('date')).cast('string').alias('calendar_quarter_number'),
            concat(lit('Q'), quarter(col('date')).cast('string')).alias('calendar_quarter'),
            year(col('date')).alias('calendar_year_number')
        ).distinct()

        # Write to delta table
        df_date.write.format('delta').save(f'{lakehouse_table_path}/{table_name}')
        print(f'Table {table_name} created.')

    # Create temp view
    df_date.createOrReplaceTempView(f'vw_{table_name}')

except Exception as e:
    print(f'An unexpected error occurred while capturing date details: {e}')


StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 29, Finished, Available, Finished)

Table stage_date already exists. Skipping creation.


In [17]:
# Assign measure table name
table_name = 'measure_table'

try:
    # Check if the measure table already exists
    if DeltaTable.isDeltaTable(spark, f'{lakehouse_table_path}/{table_name}'):
        print(f'Table {table_name} already exists. Skipping creation.')
    else:
        # Create measure table shell since it does not exist
        print(f'Table {table_name} does not exist. Creating it.')

        df_measure = spark.sql("""
            SELECT 1 AS Value
        """)

        # Write the DataFrame to a delta table
        df_measure.write.format('delta').save(f'{lakehouse_table_path}/{table_name}')

        print(f'Table {table_name} created.')

except Exception as e:
    print(f'An unexpected error occurred while creating {table_name}: {e}')

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 30, Finished, Available, Finished)

Table measure_table already exists. Skipping creation.


In [20]:
# Stage and load workspace
df_workspace = spark.sql("""
    SELECT UPPER(id)    workspace_id,
           name         workspace_name,
           capacity_id
    FROM   vw_stage_workspaces
    WHERE  capacity_id IS NOT NULL
""")

# Define configuration for the upsert operation
config = {
    'table_name': 'dim_workspace',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_workspace,
    'candidate_columns': ['workspace_id']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 33, Finished, Available, Finished)

dim_workspace upsert completed successfully.


In [21]:
# Stage and load capacity
df_capacity = spark.sql("""
    WITH storage AS (
        SELECT region, cost
        FROM   vw_stage_storage_cost
        WHERE  storage_type = 'OneLake storage'
    ),
    bcdr AS (
        SELECT region, cost
        FROM   vw_stage_storage_cost
        WHERE  storage_type = 'OneLake BCDR storage'
    ),
    cache AS (
        SELECT region, cost
        FROM   vw_stage_storage_cost
        WHERE  storage_type = 'OneLake cache'
    )
    SELECT DISTINCT 
           UPPER(a.id)         capacity_id,
           a.display_name      capacity_name,
           a.sku,
           a.region,
           b.capacity_unit,
           b.pay_go_hour,
           b.reservation_hour,
           c.cost              onelake_storage_cost,
           d.cost              onelake_bcdr_storage_cost,
           e.cost              onelake_cache_cost,
           a.state
    FROM   vw_stage_capacities a
    LEFT JOIN vw_stage_capacity_cost b
        ON a.region = b.region
       AND a.sku = b.sku
    LEFT JOIN storage c
        ON a.region = c.region
    LEFT JOIN bcdr d
        ON a.region = d.region
    LEFT JOIN cache e
        ON a.region = e.region
    WHERE  capacity_unit IS NOT NULL
""")

# Define configuration for upsert operation
config = {
    'table_name': 'dim_capacity',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_capacity,
    'candidate_columns': ['capacity_id']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 34, Finished, Available, Finished)

dim_capacity upsert completed successfully.


In [22]:
# Stage and load item
df_item = spark.sql("""
	SELECT UPPER(itemid)            item_id,
           UPPER(workspaceid)       workspace_id,
           itemname                 item_name,
           itemkind                 item_type
    FROM vw_stage_items
""")

# Define configuration for the upsert operation
config = {
    'table_name': 'dim_item',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_item,
    'candidate_columns': ['item_id']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 35, Finished, Available, Finished)

dim_item upsert completed successfully.


In [23]:
# Stage and load date
df_date = spark.sql("""
    SELECT DISTINCT d.*
    FROM   vw_stage_date d
    INNER JOIN vw_stage_metricsbyitemandoperationandhour m 
        ON d.calendar_date = m.date
""")

# Define configuration for the upsert operation
config = {
    'table_name': 'dim_date',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_date,
    'candidate_columns': ['calendar_date_key']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 36, Finished, Available, Finished)

dim_date upsert completed successfully.


In [24]:
# Stage and load operation
df_operation = spark.sql("""
    SELECT DISTINCT
           abs(hash(operationname)) operation_id,
           operationname            operation_name
    FROM vw_stage_metricsbyitemandoperationandhour
    UNION
    SELECT DISTINCT
           abs(hash(operationname)) operation_id,
           operationname            operation_name
    FROM vw_stage_storagebyworkspacesandhour
""")

# Define configuration for the upsert operation
config = {
    'table_name': 'dim_operation',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_operation,
    'candidate_columns': ['operation_id']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 37, Finished, Available, Finished)

dim_operation upsert completed successfully.


In [25]:
# Stage and load billing type
df_billing_type = spark.sql("""
    SELECT DISTINCT
           abs(hash(billing_type)) billing_type_id,
           billing_type
    FROM vw_stage_storagebyworkspacesandhour
""")

# Define configuration for the upsert operation
config = {
    'table_name': 'dim_billing_type',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_billing_type,
    'candidate_columns': ['billing_type_id']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 38, Finished, Available, Finished)

dim_billing_type upsert completed successfully.


In [26]:
# Read capacity
spark.read.format("delta").load(f'{lakehouse_table_path}/dim_capacity') \
    .createOrReplaceTempView("vw_dim_capacity")

# Stage and load workspace storage
df_storage = spark.sql("""
    SELECT UPPER(workspaceid)        workspace_id,
           UPPER(premiumcapacityid)  capacity_id,
           CAST(date AS DATE)        date,
           datetime                  date_time,
           abs(hash(billing_type))   billing_type_id,
           abs(hash(operationname))  operation_id,
           workloadkind              workload_kind,
           storagetype               storage_type,
           staticstorageingb         static_storage_gb,
           CASE WHEN storagetype = 'OneLake Standard Storage'
                THEN staticstorageingb * c.onelake_storage_cost
           END                       static_storage_standard_cost,
           utilization_gb            utilization_gb
    FROM vw_stage_storagebyworkspacesandhour s
    LEFT JOIN vw_dim_capacity c
           ON UPPER(s.premiumcapacityid) = c.capacity_id
""")

# Define configuration for the upsert operation
config = {
    'table_name': 'fact_workspace_storage',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_storage,
    'candidate_columns': ['workspace_id', 'date_time', 'capacity_id', 'operation_id', 'billing_type_id', 'storage_type']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 39, Finished, Available, Finished)

fact_workspace_storage upsert completed successfully.


In [27]:
# Stage and load capacity metrics
df_metrics = spark.sql("""
    SELECT UPPER(premiumcapacityid)         capacity_id,
           UPPER(m.workspaceid)             workspace_id,
           UPPER(m.itemid)                  item_id,
           CAST(date AS DATE)               date,
           datetime                         date_time,
           abs(hash(operationname))         operation_id,
           sum_cu                           capacity_unit_seconds_consumed,
           sum_cu / 3600                    capacity_units_consumed,
           (sum_cu / 3600) * (c.pay_go_hour / c.capacity_unit)          activity_cost_pay_go,
           (sum_cu / 3600) * (c.reservation_hour / c.capacity_unit)     activity_cost_reservation,
           sum_duration                     activity_duration,
           count_operations                 total_operations,
           count_users                      users,
           avg_durationms                   average_activity_duration,
           throttling_min                   minutes_throttled,
           count_failure_operations         failed_operations,
           count_rejected_operations        rejected_operations,
           count_successful_operations      successful_operations,
           count_inprogress_operations      in_progress_operations,
           count_cancelled_operations       cancelled_operations,
           count_invalid_operations         invalid_operations
    FROM vw_stage_metricsbyitemandoperationandhour m
    LEFT JOIN vw_dim_capacity c
           ON UPPER(m.premiumcapacityid) = c.capacity_id
    WHERE sum_cu != 0
""")

# Define configuration for the upsert operation
config = {
    'table_name': 'fact_capacity_metrics',
    'lakehouse_table_path': lakehouse_table_path,
    'source_dataframe': df_metrics,
    'candidate_columns': ['date_time', 'operation_id', 'item_id', 'workspace_id', 'capacity_id']
}

# Execute upsert to the dimensional table
upsert_to_table(config)

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 40, Finished, Available, Finished)

fact_capacity_metrics upsert completed successfully.


In [28]:
# Monitoring semantic model
semantic_model = 'Capacity Monitoring'

try:
    # Retrieve the list of datasets
    df_datasets = fabric.list_datasets()
    dataset_names = df_datasets['Dataset Name'].tolist()

    # Check if the semantic model exists
    if semantic_model in dataset_names:
        print(f'Semantic model {semantic_model} already exists. Skipping creation.')
    else:
        import requests
        import json

        # Define .bim file URL
        bim_url = 'https://raw.githubusercontent.com/Lucid-Will/Lucid-Capacity-Monitoring/main/semantic_model_bim/lucid_capacity_monitor.bim'

        # Download and load BIM file as JSON
        response = requests.get(bim_url)
        bim_json = response.json()

        # Deploy new semantic model using the BIM JSON
        sl.create_semantic_model_from_bim(semantic_model, bim_json)
        print(f'Semantic model {semantic_model} created.')

        # Attach semantic model to monitoring lakehouse
        sl.directlake.update_direct_lake_model_lakehouse_connection(dataset=semantic_model, workspace=None, lakehouse=monitoring_lakehouse) 

except Exception as e:
    print(f'An unexpected error occurred while processing the semantic model: {e}')

StatementMeta(, 947d3672-1b33-406b-87d9-354da4504cd3, 41, Finished, Available, Finished)

🟢 The 'Capacity Monitoring' semantic model has been created within the 'Lucid Control Demo' workspace.
Semantic model Capacity Monitoring created.
An unexpected error occurred while processing the semantic model: Attribute does not exist
