# Lab 7: Performance - Splitting high cardinality columns

**Split Columns**
- Cold cache performance
- Model Size
- Warm cache performance


## 1. Install Semantic Link Labs Python Library

In [None]:
%pip install -q --disable-pip-version-check semantic-link-labs

## 2. Install Python Libraries

In [None]:
import sempy_labs as labs
from sempy import fabric
import sempy
import pandas
import time
import json

LakehouseName = "Performance"
SemanticModelName = f"{LakehouseName}_model"
Shortcut_LakehouseName = "HighCardinality"
Shortcut_WorkspaceName = "DL Labs - Source Data"

def myDeltaAnalyzer(table_name:str):
    # Run Delta Analyzer
    analyzer:dict = labs.delta_analyzer(lakehouse=LakehouseName, table_name=table_name,skip_cardinality=True)
    for key , value in analyzer.items():
        displayHTML(f"<H2>#### {key} ({table_name}) ####</H2>")
        display(value)

## 3. Create Lakehouse

In [None]:
lakehouses=labs.list_lakehouses()["Lakehouse Name"]
if LakehouseName in lakehouses.values:
    lakehouseId = notebookutils.lakehouse.getWithProperties(LakehouseName)["id"]
else:
    lakehouseId = fabric.create_lakehouse(LakehouseName)

workspaceId = notebookutils.lakehouse.getWithProperties(LakehouseName)["workspaceId"]
workspaceName = sempy.fabric.resolve_workspace_name(workspaceId)
print(f"WorkspaceId = {workspaceId}, LakehouseID = {lakehouseId}, Workspace Name = {workspaceName}")

## 4. Create Lakehouse Shortcuts

In [None]:
#1. Remove any existing shortcuts
for index, row in labs.lakehouse.list_shortcuts(lakehouse=LakehouseName).iterrows():
    labs.lakehouse.delete_shortcut(shortcut_name=row["Shortcut Name"],lakehouse=LakehouseName)
    print(f"Deleted shortcut {row['Shortcut Name']}")

time.sleep(3)

#2. Creates correct shortcuts
labs.lakehouse.create_shortcut_onelake(table_name="Dim_Attributes"          ,source_lakehouse=Shortcut_LakehouseName,source_workspace=Shortcut_WorkspaceName,destination_lakehouse=LakehouseName)
labs.lakehouse.create_shortcut_onelake(table_name="Dim_CustomerAttributes"  ,source_lakehouse=Shortcut_LakehouseName,source_workspace=Shortcut_WorkspaceName,destination_lakehouse=LakehouseName)
labs.lakehouse.create_shortcut_onelake(table_name="Dim_Date"                ,source_lakehouse=Shortcut_LakehouseName,source_workspace=Shortcut_WorkspaceName,destination_lakehouse=LakehouseName)
labs.lakehouse.create_shortcut_onelake(table_name="Dim_DeviceAttributes"    ,source_lakehouse=Shortcut_LakehouseName,source_workspace=Shortcut_WorkspaceName,destination_lakehouse=LakehouseName)
labs.lakehouse.create_shortcut_onelake(table_name="Fact_A_Level"            ,source_lakehouse=Shortcut_LakehouseName,source_workspace=Shortcut_WorkspaceName,destination_lakehouse=LakehouseName)
labs.lakehouse.create_shortcut_onelake(table_name="Fact_A_Level_Split_1"    ,source_lakehouse=Shortcut_LakehouseName,source_workspace=Shortcut_WorkspaceName,destination_lakehouse=LakehouseName)
labs.lakehouse.create_shortcut_onelake(table_name="Fact_A_Level_Split_2"    ,source_lakehouse=Shortcut_LakehouseName,source_workspace=Shortcut_WorkspaceName,destination_lakehouse=LakehouseName)

## 5. Trigger background job to sync Lakehouse tables

In [None]:
##https://medium.com/@sqltidy/delays-in-the-automatically-generated-schema-in-the-sql-analytics-endpoint-of-the-lakehouse-b01c7633035d

def triggerMetadataRefresh():
    client = fabric.FabricRestClient()
    response = client.get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouseId}")
    sqlendpoint = response.json()['properties']['sqlEndpointProperties']['id']

    # trigger sync
    uri = f"/v1.0/myorg/lhdatamarts/{sqlendpoint}"
    payload = {"commands":[{"$type":"MetadataRefreshExternalCommand"}]}
    response = client.post(uri,json= payload)
    batchId = response.json()['batchId']

    # Monitor Progress
    statusuri = f"/v1.0/myorg/lhdatamarts/{sqlendpoint}/batches/{batchId}"
    statusresponsedata = client.get(statusuri).json()
    progressState = statusresponsedata['progressState']
    print(progressState)
    while progressState != "success":
        statusuri = f"/v1.0/myorg/lhdatamarts/{sqlendpoint}/batches/{batchId}"
        statusresponsedata = client.get(statusuri).json()
        progressState = statusresponsedata['progressState']
        print(progressState)
        time.sleep(1)

    print('done')

## Show sample of data

In [None]:
from pyspark.sql.functions import col
path = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables/Fact_A_Level_Split_1"
delta_table = spark.read.load(path)

display(delta_table.limit(20).orderBy(col("Metric").desc()))

## How many unique values exist in the Metric column

In [None]:
# how many unique values in Metric column?  

print(f"{delta_table.select('Metric').distinct().count():,}")              #   44,257,013

print(f"{delta_table.select('Metric_x0').distinct().count():,}")           #       10,001     
print(f"{delta_table.select('Metric_x10000').distinct().count():,}")       #       10,001
print(f"{delta_table.select('Metric_x100000000').distinct().count():,}")   #        1,904


## 6. Run Delta Analyzer to look at size of Metric column in Lakehouse Delta Tables

The first block of code runs the full Delta Analyzer over three tables.  

Once the results are stored in a variable, we can run queries over the results without needing to run Delta Analyzer again

- **Fact_A_Level**          This is the base table with a high cardinally column
- **Fact_A_Level_Split_1**   Contains the original Metric column, plus the new split cols
- **Fact_A_Level_Split_2**   Contains only the new split cols

In [None]:
df1:dict[str, pandas.DataFrame] = labs.delta_analyzer(lakehouse=LakehouseName, workspace=workspaceName, table_name="Fact_A_Level"          ,skip_cardinality=True)
df2:dict[str, pandas.DataFrame] = labs.delta_analyzer(lakehouse=LakehouseName, workspace=workspaceName, table_name="Fact_A_Level_Split_1"  ,skip_cardinality=True)
df3:dict[str, pandas.DataFrame] = labs.delta_analyzer(lakehouse=LakehouseName, workspace=workspaceName, table_name="Fact_A_Level_Split_2"  ,skip_cardinality=True)

print("Done")

### Review Delta Analyzer results

In [None]:
for key , value in df1.items():
    print(key)

Review Summary statistics for all three Delta Tables

In [None]:
# display(df1["Summary"])
# display(df2["Summary"])
# display(df3["Summary"])

display(
    pandas.concat([
        df1["Summary"] ,
        df2["Summary"] ,
        df3["Summary"]
        ]).drop_duplicates().reset_index(drop=True)
    )

In [None]:
names_to_filter = ['Metric', 'Metric_x100000000', 'Metric_x10000','Metric_x0']

df4 = pandas.concat([
        df1["Columns"].loc[df1["Columns"]['Column Name'].isin(names_to_filter)] ,
        df2["Columns"].loc[df2["Columns"]['Column Name'].isin(names_to_filter)] ,
        df3["Columns"].loc[df3["Columns"]['Column Name'].isin(names_to_filter)]
        ]
        ).drop_duplicates().reset_index(drop=True)
display(df4)
# Create chart

## Create Custom Semantic Model from Lakehouse

In [None]:
#1. Generate list of ALL table names from lakehouse to add to Semantic Model
lakehouseTables:list = labs.lakehouse.get_lakehouse_tables(lakehouse=LakehouseName)["Table Name"]

#2. Remove any previously created semantic model of this name
for index, row in sempy.fabric.list_items().iterrows():
    if row["Type"] == "SemanticModel" and row["Display Name"] == SemanticModelName:
        sempy.fabric.delete_item(item_id=row["Id"],workspace=workspaceId)
        print(f"Deleted semantic model {row['Display Name']}")

#3 Create the semantic model
labs.directlake.generate_direct_lake_semantic_model(dataset=SemanticModelName,lakehouse_tables=lakehouseTables,workspace=workspaceName,lakehouse=lakehouseId,refresh=False,overwrite=True)

## Add Model Relationships

In [None]:
with labs.tom.connect_semantic_model(dataset=SemanticModelName, readonly=False) as tom:
    #1. Remove any existing relationships
    for r in tom.model.Relationships:
        tom.model.Relationships.Remove(r)

    #2. Creates correct relationships
    tom.add_relationship(from_table="Fact_A_Level"          , from_column="DateId"                , to_table="Dim_Date"                 , to_column="DateId"                , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level"          , from_column="AppAttributesKey"      , to_table="Dim_Attributes"           , to_column="AppAttributesKey"      , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level"          , from_column="CustomerAttributesKey" , to_table="Dim_CustomerAttributes"   , to_column="CustomerAttributesKey" , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level"          , from_column="DeviceAttributesKey"   , to_table="Dim_DeviceAttributes"     , to_column="DeviceAttributesKey"   , from_cardinality="many" , to_cardinality="one")

    tom.add_relationship(from_table="Fact_A_Level_Split_1"  , from_column="DateId"                , to_table="Dim_Date"                 , to_column="DateId"                , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level_Split_1"  , from_column="AppAttributesKey"      , to_table="Dim_Attributes"           , to_column="AppAttributesKey"      , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level_Split_1"  , from_column="CustomerAttributesKey" , to_table="Dim_CustomerAttributes"   , to_column="CustomerAttributesKey" , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level_Split_1"  , from_column="DeviceAttributesKey"   , to_table="Dim_DeviceAttributes"     , to_column="DeviceAttributesKey"   , from_cardinality="many" , to_cardinality="one")

    tom.add_relationship(from_table="Fact_A_Level_Split_2"  , from_column="DateId"                , to_table="Dim_Date"                 , to_column="DateId"                , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level_Split_2"  , from_column="AppAttributesKey"      , to_table="Dim_Attributes"           , to_column="AppAttributesKey"      , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level_Split_2"  , from_column="CustomerAttributesKey" , to_table="Dim_CustomerAttributes"   , to_column="CustomerAttributesKey" , from_cardinality="many" , to_cardinality="one")
    tom.add_relationship(from_table="Fact_A_Level_Split_2"  , from_column="DeviceAttributesKey"   , to_table="Dim_DeviceAttributes"     , to_column="DeviceAttributesKey"   , from_cardinality="many" , to_cardinality="one")

## Hide Columns in Fact Table

In [None]:
tom = labs.tom.TOMWrapper(dataset=SemanticModelName, workspace=workspaceName, readonly=False)

i:int=0
for t in tom.model.Tables:
    if t.Name in ["Fact_A_Level","Fact_A_Level_Split_1","Fact_A_Level_Split_2"]:

        for c in t.Columns:
            c.IsHidden=True

        bim = json.dumps(tom.get_bim()["model"]["tables"][i],indent=4)
        print(bim)
    i=i+1

## Add Model Measures

In [None]:
with labs.tom.connect_semantic_model(dataset=SemanticModelName, readonly=False) as tom:
    #1. Remove any existing measures
    for t in tom.model.Tables:
        for m in t.Measures:
            tom.remove_object(m)
            print(f"Measure {m.Name} removed")

    tom.add_measure(table_name="Fact_A_Level",measure_name="Sum of Metric",   expression="SUM(Fact_A_Level[Metric])")
    tom.add_measure(table_name="Fact_A_Level",measure_name="Sum of Metric 1", expression="""
    
    	VAR a = SUM(Fact_A_Level_Split_1[Metric_x100000000])
		VAR b = SUM(Fact_A_Level_Split_1[Metric_x10000])
		VAR c = SUM(Fact_A_Level_Split_1[Metric_x0])
		
		RETURN  ((a*1000000) + (b*100) + (c/100.0000)) 
            """)

    tom.add_measure(table_name="Fact_A_Level",measure_name="Sum of Metric 2", expression="""
    
    	VAR a = SUM(Fact_A_Level_Split_2[Metric_x100000000])
		VAR b = SUM(Fact_A_Level_Split_2[Metric_x10000])
		VAR c = SUM(Fact_A_Level_Split_2[Metric_x0])
		
		RETURN ((a*1000000) + (b*100) + (c/100.00000)) 
            """)

## 10. Mark DimDate as Date Table
This code block:

1.  Opens connection to semantic model.
2.  Marks DimDate table as Date Table

In [None]:
completedOK:bool=False
while not completedOK:
    try:
        with labs.tom.connect_semantic_model(dataset=SemanticModelName, readonly=False) as tom:
            tom.mark_as_date_table(table_name="Dim_Date",column_name="DateId")
            completedOK=True
    except:
        print('Error with date table... trying again.')
        sleep(3)

print('done')

## Reframe model to update changes

In [None]:
reframeOK:bool=False
while not reframeOK:
    try:
        result:pandas.DataFrame = labs.refresh_semantic_model(dataset=SemanticModelName)
        reframeOK=True
    except:
        print('Error with reframe... trying again.')
        triggerMetadataRefresh()
        sleep(3)

print('Custom Semantic Model reframe OK')

## Run DMV to check column details

In [None]:
def runDMV():
    df = sempy.fabric.evaluate_dax(
        dataset=SemanticModelName, 
        dax_string="""
        
        SELECT 
            MEASURE_GROUP_NAME AS [TABLE],
            ATTRIBUTE_NAME AS [COLUMN],
            DATATYPE ,
            DICTIONARY_SIZE 		    AS SIZE ,
            DICTIONARY_ISPAGEABLE 		AS PAGEABLE ,
            DICTIONARY_ISRESIDENT		AS RESIDENT ,
            DICTIONARY_TEMPERATURE		AS TEMPERATURE,
            DICTIONARY_LAST_ACCESSED	AS LASTACCESSED 
        FROM $SYSTEM.DISCOVER_STORAGE_TABLE_COLUMNS 
        ORDER BY 
            [DICTIONARY_TEMPERATURE] DESC
        
        """)
    display(df)

runDMV()

## Create function to run DAX query with a server timings trace

In [None]:
import warnings
from Microsoft.AnalysisServices.Tabular import TraceEventArgs
from typing import Dict, List, Optional, Callable

def runDMV():
    df = sempy.fabric.evaluate_dax(
        dataset=SemanticModelName, 
        dax_string="""
        
        SELECT 
            MEASURE_GROUP_NAME AS [TABLE],
            ATTRIBUTE_NAME AS [COLUMN],
            DATATYPE ,
            DICTIONARY_SIZE 		    AS SIZE ,
            DICTIONARY_ISPAGEABLE 		AS PAGEABLE ,
            DICTIONARY_ISRESIDENT		AS RESIDENT ,
            DICTIONARY_TEMPERATURE		AS TEMPERATURE,
            DICTIONARY_LAST_ACCESSED	AS LASTACCESSED 
        FROM $SYSTEM.DISCOVER_STORAGE_TABLE_COLUMNS 
        ORDER BY 
            [DICTIONARY_TEMPERATURE] DESC
        
        """)
    display(df)

def filter_func(e):
    retVal:bool=True
    if e.EventSubclass.ToString() == "VertiPaqScanInternal":
        retVal=False      
    #     #if e.EventSubClass.ToString() == "VertiPaqScanInternal":
    #     retVal=False
    return retVal

# define events to trace and their corresponding columns
def runQueryWithTrace (expr:str,workspaceName:str,SemanticModelName:str,Result:Optional[bool]=True,Trace:Optional[bool]=True,DMV:Optional[bool]=True) -> pandas.DataFrame:
    event_schema = fabric.Trace.get_default_query_trace_schema()
    event_schema.update({"ExecutionMetrics":["EventClass","TextData"]})
    del event_schema['VertiPaqSEQueryBegin']
    del event_schema['VertiPaqSEQueryCacheMatch']
    del event_schema['DirectQueryBegin']

    warnings.filterwarnings("ignore")

    WorkspaceName = workspaceName
    SemanticModelName = SemanticModelName

    with fabric.create_trace_connection(SemanticModelName,WorkspaceName) as trace_connection:
        # create trace on server with specified events
        with trace_connection.create_trace(
            event_schema=event_schema, 
            name="Simple Query Trace",
            filter_predicate=filter_func,
            stop_event="QueryEnd"
            ) as trace:

            trace.start()

            df=sempy.fabric.evaluate_dax(
                dataset=SemanticModelName, 
                dax_string=expr)

            if Result:
                displayHTML(f"<H2>####### DAX QUERY RESULT #######</H2>")
                display(df)

            # Wait 5 seconds for trace data to arrive
            time.sleep(5)

            # stop Trace and collect logs
            final_trace_logs = trace.stop()

    if Trace:
        displayHTML(f"<H2>####### SERVER TIMINGS #######</H2>")
        display(final_trace_logs)
    
    if DMV:
        displayHTML(f"<H2>####### SHOW DMV RESULTS #######</H2>")
        runDMV()

    return final_trace_logs



## Run some DAX Queries

## First run to test time to page columns in

### Running SUM() over original column

In [None]:
trace1 = runQueryWithTrace("""

    EVALUATE
        {[Sum of Metric]}

""",workspaceName,SemanticModelName,Result=False,DMV=False)

### Running SUM() over new column in model still with original column

In [None]:
trace2 = runQueryWithTrace("""

    EVALUATE
        {[Sum of Metric 1]}

""",workspaceName,SemanticModelName,Result=False,DMV=False)

In [None]:
trace3 = runQueryWithTrace("""

    EVALUATE
        {[Sum of Metric 2]}

""",workspaceName,SemanticModelName,Result=False,DMV=False)

In [None]:
display(trace1)
display(trace2)
display(trace3)
runDMV()

## Warm cache perf

Clear the DAX Query Cache and then run a query inside a Server Timings Trace

In [None]:
labs.clear_cache(SemanticModelName)

trace4 = runQueryWithTrace("""
    EVALUATE 
        SUMMARIZECOLUMNS(
            Dim_Date[DateId]

            , "SUM of Base" 	, [Sum of Metric]
            , "SUM of Metric 1" , [Sum of Metric 1]
            , "SUM of Metric 2" , [Sum of Metric 2]
            )
        ORDER BY [DateId]   

""",workspaceName,SemanticModelName,Result=False,DMV=False)

In [None]:
df=sempy.fabric.evaluate_dax(
    dataset=SemanticModelName, 
    dax_string="""
    
    evaluate tabletraits()
    
    """)
display(df)

In [None]:
runDMV()