# <span style="color:red">!! &nbsp; Important Information &nbsp; !!</span>

<span style="color:red">This code is provided as is with no guarantee of accuracy and no support. Be sure to test and validate all results in your environment.</span>

# Notebook Parameters

**Capacity Metrics App**
* capacity_metrics_workspace
    * The name of the workspace that hosts the Capacity Metrics app.
* capacity_metrics_dataset
    * The name of the dataset used by the Capacity Metrics app.

**Workspace**
* capacity_id
    * The capacity which hosted the workspace where operations were run.
* date_of_operations
    * The date the operations were run.
* operation_id_list
    * The list of operations of which you want to collect the capacity unit usage. 
    * This should be a comma separate list and each operation should be enclosed in double quotes 
    * For example: "AAAAAAAA-BBBB-CCCC-DDDD-1234567890AB", "EEEEEEEE-FFFF-GGGG-HHHH-1234567890AB"


In [None]:
# Capacity Metrics App
capacity_metrics_workspace  = 'Fabric Capacity Metrics'
capacity_metrics_dataset    = 'Fabric Capacity Metrics'

# Workspace
capacity_id         = 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'
date_of_operations  = 'YYYY-MM-DD'
operation_id_list   = \
'''
"AAAAAAAA-BBBB-CCCC-DDDD-1234567890AB", "EEEEEEEE-FFFF-GGGG-HHHH-1234567890AB"
'''

# Gather data from Capacity Metrics
Proceed with caution if making modifications to the code below.
<hr>

In [None]:
import sempy.fabric         as fabric
from datetime               import datetime, timedelta
from pyspark.sql.functions  import col, lit, sum, min, max
from pyspark.sql.types      import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType

In [None]:
def get_capacity_metrics_usage(time_point, operation_id_list):
	
	schema = StructType([
	StructField("TimePoint", 		StringType(), 		True),
	StructField("WorkspaceName", 	StringType(), 		True),
	StructField("ItemKind", 		StringType(), 		True),
	StructField("ItemName", 		StringType(), 		True),
	StructField("StartTime", 		TimestampType(), 	True),
	StructField("EndTime", 			TimestampType(), 	True),
	StructField("OperationId", 		StringType(), 		True),
	StructField("Sum_CUs", 			DoubleType(), 		True),
	StructField("Sum_Duration", 	IntegerType(), 		True)
	])

	df_empty = spark.createDataFrame([], schema)

	dax_command = f"""
	DEFINE
		
		MPARAMETER 'CapacityID' 	= "{capacity_id}"
		MPARAMETER 'TimePoint' 		= (DATE({time_point.year}, {time_point.month}, {time_point.day}) + TIME({time_point.hour}, {time_point.minute}, {time_point.second}))

		VAR __Var_CapacityId	= {{"{capacity_id}"}}
		VAR __Var_OperationId	= {{{operation_id_list}}}

		VAR __Filter_OperationId 	= TREATAS(__Var_OperationId, 'TimePointBackgroundDetail'[OperationId])
		VAR __Filter_CapacityId 	= TREATAS(__Var_CapacityId, 'Capacities'[capacityId])

		VAR __DS0Core = 
			SUMMARIZECOLUMNS(
				'Items'[WorkspaceName],
				'Items'[ItemKind],
				'Items'[ItemName],
				'TimePointBackgroundDetail'[OperationStartTime],
				'TimePointBackgroundDetail'[OperationEndTime],
				'TimePointBackgroundDetail'[OperationId],
				__Filter_OperationId,
				__Filter_CapacityId,
				"Sum_CUs", CALCULATE(SUM('TimePointBackgroundDetail'[Total CU (s)])),
				"Sum_Duration", CALCULATE(SUM('TimePointBackgroundDetail'[Duration (s)]))
			)

	EVALUATE
		__DS0Core
	"""

	df_dax = fabric.evaluate_dax(dax_string = dax_command, dataset = capacity_metrics_dataset, workspace = capacity_metrics_workspace)

	if df_dax.count()[0] > 0:
		df = df_empty.unionAll(\
			spark.createDataFrame(df_dax).withColumn("TimePoint", lit(time_point)).select(
				col("TimePoint"),\
				col("Items[WorkspaceName]").alias("WorkspaceName"),\
				col("Items[ItemKind]").alias("ItemKind"),\
				col("Items[ItemName]").alias("ItemName"),\
				col("TimePointBackgroundDetail[OperationStartTime]").alias("StartTime"),\
				col("TimePointBackgroundDetail[OperationEndTime]").alias("EndTime"),\
				col("TimePointBackgroundDetail[OperationId]").alias("OperationId"),\
				col("[Sum_CUs]").cast(DoubleType()).alias("Sum_CUs"),\
				col("[Sum_Duration]").cast(IntegerType()).alias("Sum_Duration"))
			)
	else:
		df = df_empty
	
	return df

In [None]:
'''
Get the capacity usage for 3PM on the date provided and 3AM on the following day.
This ensures that the background operations are captured no matter what time they are run as they are smoothed over a 24 hour time period.
Next, filter that down to the distinct records. This is necessary because a record may show up in the today and tomorrow datasets depending on the time it was run.
Nest, aggregate the records into a single record. This is necessary becuase some operations will have two entries, one under the executing user and one under the user "System".
Finally, display the dataframe with the clean, aggregated dataset.
'''

time_point = datetime.strptime(f'{date_of_operations} 15:00:00', '%Y-%m-%d %H:%M:%S')

df_today    = get_capacity_metrics_usage(time_point, operation_id_list)
df_tomorrow = get_capacity_metrics_usage(time_point + timedelta(hours = 12), operation_id_list)
df_all_days = df_today.unionAll(df_tomorrow)
df_distinct = df_all_days.select('WorkspaceName', 'ItemKind', 'ItemName', 'StartTime', 'EndTime', 'OperationId', 'Sum_CUs', 'Sum_Duration').distinct()
df_final    = df_distinct.groupBy('WorkspaceName', 'ItemKind', 'ItemName', 'OperationId').agg(min("StartTime").alias("StartTime"), max("EndTime").alias("EndTime"), sum("Sum_CUs").alias("Sum_CUs"), sum("Sum_Duration").alias("SumDuration"))

display(df_final)