In [1]:
#Ensures that the latest version of semantic-link libraries are loaded
%pip install -U semantic-link

%pip install -U semantic-link-labs

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, 9, Finished, Available, Finished)

Collecting semantic-link
  Downloading semantic_link-0.13.0-py3-none-any.whl.metadata (15 kB)
Collecting semantic-link-sempy==0.13.0 (from semantic-link)
  Downloading semantic_link_sempy-0.13.0-py3-none-any.whl.metadata (15 kB)
Collecting semantic-link-functions-geopandas==0.13.0 (from semantic-link)
  Downloading semantic_link_functions_geopandas-0.13.0-py3-none-any.whl.metadata (2.1 kB)
Collecting semantic-link-functions-holidays==0.13.0 (from semantic-link)
  Downloading semantic_link_functions_holidays-0.13.0-py3-none-any.whl.metadata (2.0 kB)
Collecting semantic-link-functions-meteostat==0.13.0 (from semantic-link)
  Downloading semantic_link_functions_meteostat-0.13.0-py3-none-any.whl.metadata (2.2 kB)
Collecting semantic-link-functions-phonenumbers==0.13.0 (from semantic-link)
  Downloading semantic_link_functions_phonenumbers-0.13.0-py3-none-any.whl.metadata (2.0 kB)
Collecting semantic-link-functions-validators==0.13.0 (from semantic-link)
  Downloading semantic_link_function

In [2]:
#import necessary libraries
import sempy.fabric as fabric
import pandas as pd
import numpy as np
import re
from notebookutils import mssparkutils

import sempy_labs as labs
from sempy_labs import directlake
from sempy_labs.tom import connect_semantic_model

# set temporary config for date issue
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")

##########################################################################################################
##                    THIS IS THE SECTION THAT REQUIRES INITIAL CONFIGURATION BY THE USER
##########################################################################################################
########################################################################################
###   MANUAL CONFIG HERE
#######################################################################################
#Define lakehouse and schema names
workspace_name = "Analytics_Dev" #this is the workspace name where the notebook runs, and where the lakehouse lives
lakehouse_name = "lh_FabricManagement" #this is the name of the lakehouse, in the event that it needs to becreated by this notebook
lakehouse_description = "Fabric Management lakehouse"  # this is only used if the notebook creates a new lakehouse
semantic_model_name = "sem_ModelCatalog" ###NOTE: IF THIS MODEL ALREADY EXISTS, IT WILL BE OVERWRITTEN
schema_name = "modelcatalog" # NOTE, if you're using an existing non-schema-enabled lakehouse, this MUST be set to 'dbo'
model_type = "Import"  # enter DirectLake if a DirectLake semantic model is desired.  Otherwise, enter Import

# Define colors for table navigator page.  You don't need to adjust these, unless you want to rebrand the report
factcolor = '#A6B916'
dimcolor = '#C8B78A'
defaultcolor = '#A66999'

#Create list of workspaces and Models.  This allows users to limit which models get documented.  A user could also
#   tweak this notebook, to use sempy functions, and query all workspaces/models that he/she has access to, if that's preferred.
#THE ORDER OF THE WORKSPACES AND MODELS MUST BE THE SAME
#  ie the third report MUST be in the third workspace
data = {
    'Workspace': ['Analytics_Dev'],
    'Model': ['Financial Model']
}
###########################################################################################
## from this section onward, there shouldn't be any updating necessary
#######################################################################################################

workspace_id = fabric.get_workspace_id()

#######################################################
## Check for existing items
######################################################
## check if lakehouse exists.  If not, create a lakehouse (non schema-enabled)
try:
    lakehouse_object = mssparkutils.lakehouse.get(lakehouse_name)
    lakehouse_id=lakehouse_object.id
except Exception as e:
    try:
        fabric.create_lakehouse(display_name=lakehouse_name,description=lakehouse_description,workspace=workspace_name,enable_schema=True)
        lakehouse_object = mssparkutils.lakehouse.get(lakehouse_name)
        lakehouse_id=lakehouse_object.id
    except Exception as e:
        print('Unable to create lakehouse')
        print(e)


lakehouse_value = f'{workspace_name}.{lakehouse_name}.'
lakehouse_value_full = f'{workspace_name}.{lakehouse_name}.{schema_name}.'
#Create schema if not exists
schema_sql = f"CREATE SCHEMA IF NOT EXISTS {workspace_name}.{lakehouse_name}.{schema_name}"
spark.sql(schema_sql)
print("Schema exists, or was created")
#print(lakehouse_value)

####################################################################################################################


#DEFINE FUNCTIONS


#function to write the dataframe as a database
def writeTable(pd_df, tablename):
  if len(pd_df) != 0:
    query = f"drop table if exists {lakehouse_value_full}{tablename}"
    spark.sql(query)
    spark_df = spark.createDataFrame(pd_df)
    full_table_name = f'{lakehouse_value_full}{tablename}'
    #print(full_table_name)
    try:
        spark_df.write.format("delta").mode("overwrite").option("overwriteSchema","true").saveAsTable(full_table_name)
        print(f'{tablename} written to lakehouse')
    except Exception as e:
        print(f'Failed to write {tablename}')
  else:
    # if no data (ie no hierarchies exist) create a table so tahat the relationship code will work, and refreshes won't fail
    #   The page related to these will need to be hidden, in the event that one of the tables isnt used in any of your models
    strcreatequery = f'Create TABLE IF NOT EXISTS {lakehouse_value_full}{tablename} (semanticmodel_id INTEGER, {tablename}_name STRING)'
    spark.sql(strcreatequery)       

#function to get data from model
def getMetaData(daxquery,fieldname,indexfield):
    indexname = indexfield+'_id'
    #build out first row
    model_df = fabric.evaluate_dax(
        workspace=df.iloc[0,0],
        dataset=df.iloc[0,1],
        dax_string=daxquery
    )
    model_df['semanticmodel_id'] = df.iloc[0,3]
    model_df['SemanticModel'] = df.iloc[0,1]
    #cycle through remaining rows, if more than 1 row
    if len(df) > 1:
        for index,row in df.iloc[1:].iterrows():
            temp_df = fabric.evaluate_dax(
                workspace=df.iloc[index,0],
                dataset=df.iloc[index,1],
                dax_string=dax_query
            )
            temp_df['semanticmodel_id']=df.iloc[index,3]
            temp_df['SemanticModel'] = df.iloc[index,1]
            #print('temp df executed')
            #append most recent
            model_df = pd.concat([model_df,temp_df],ignore_index=True)

    model_df.columns = model_df.columns.str.replace('[', '')
    model_df.columns = model_df.columns.str.replace(']', '')
    model_df = model_df.rename(columns={'Name':fieldname})
    model_df[indexname] = model_df.index + 1
    return model_df


# Function to extract hashtags from a string
def extract_hashtags(description):
    if isinstance(description, str):  # Check if description is a string
        hashtags = re.findall(r'%%(\w+)', description)
        return hashtags
    else:
        return []




StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, 11, Finished, Available, Finished)

Schema exists, or was created


Process list of semantic models from the user-defined list. save as a table in the lakehouse

In [3]:
# take the provided workspaces/models and convert it to a dataframe.  Add the full path as an extra column.
df = pd.DataFrame(data)
df['WorkspacePath'] = f'powerbi://api.powerbi.com/v1.0/myorg/'+ df['Workspace']
df['semanticmodel_id'] = df.index + 1 # addsan auto increment index to  the table.
logActivity('SemanticModel','SemanticModel processing started')

writeTable(df,"SemanticModel")
print('SemanticModel processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, 12, Finished, Available, Finished)

NameError: name 'logActivity' is not defined

Process Model query.  Save Model data to the lakehouse

In [None]:
#Get Model Data
print('Model processing started')
dax_query = "EVALUATE INFO.MODEL()"
model_df = getMetaData(dax_query,'ModelName','model')
#display(model_df)

writeTable(model_df,"Model")
print('Model processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

Process Tables dax query to collect table data from all defined models

In [None]:
#get Table Data
print('Table processing started')
dax_query = """EVALUATE
VAR __tables =
    INFO.TABLES ()
VAR __storagemode =
    SELECTCOLUMNS ( INFO.VIEW.TABLES (), \"ID\", [ID], \"StorageMode\", [StorageMode] )
VAR __combined =
    NATURALLEFTOUTERJOIN ( __tables, __storagemode )
RETURN
    __combined

"""
table_df = getMetaData(dax_query,'TableName','table')

#add table type column for slicer. Cleanup column names and create combined key for joining
table_df['TableType']=table_df['Description'].apply(lambda x:extract_hashtags(x))
table_df['TableType']=table_df['TableType'].astype(str)
table_df['TableType'] = table_df['TableType'].str.removeprefix('[\'')
table_df['TableType'] = table_df['TableType'].str.replace('\']', '')
table_df['TableType'] = table_df['TableType'].str.replace(']', '')
table_df['TableType'] = table_df['TableType'].str.replace('[', '')
table_df['compoundkey'] = table_df['SemanticModel'] + table_df['ID'].astype(str)

#display(table_df)
writeTable(table_df,"PBITables")
lprint('Table processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

collect measure data from listed models and store in the lakehouse

In [None]:
#get measure Data
print('MeasureList processing started')
dax_query = "EVALUATE INFO.Measures()"
measure_df = getMetaData(dax_query,'MeasureName','measures')

#display(measure_df)

writeTable(measure_df,"MeasureList")
print('MeasureList processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)


get role data (if any) from all selected models, and store it in the lakehouse

In [None]:
#get role Data
print('Role processing started')
dax_query = "EVALUATE INFO.Roles()"
role_df = getMetaData(dax_query,'RoleName','roles')

#display(role_df)

writeTable(role_df,"PBIRoles")
print('Role processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

get data about any partitions.  This may only be important in regards to incremental refresh, but either way, the data is available in the documentation lakehouse

In [None]:
#get partition Data
print('Partition processing started')
dax_query = """
EVALUATE
VAR __partitions =
    INFO.PARTITIONS ()
VAR __refreshpolicies =
    SELECTCOLUMNS (
        INFO.REFRESHPOLICIES (),
        \"TableID\", [TableID],
        \"SourceExpression\", [SourceExpression]
    )
VAR __combined =
    NATURALLEFTOUTERJOIN ( __partitions, __refreshpolicies )
RETURN
    __combined
"""
partition_df = getMetaData(dax_query,'PartitionName','partitions')

#add extra columns for joins, and cleanup unused columns
partition_df['compoundkey'] = partition_df['SemanticModel'] + partition_df['TableID'].astype(str)
partition_df =pd.merge(partition_df,table_df[['compoundkey','TableName']],left_on='compoundkey',right_on='compoundkey',how='left')
partition_df['Source'] = np.where(partition_df['QueryDefinition'].isnull(),partition_df['SourceExpression'],partition_df['QueryDefinition'])
partition_df = partition_df.drop(['QueryDefinition','SourceExpression'],axis=1)
#display(partition_df)

writeTable(partition_df,"Partitions")
print('Partition processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

get expression data.  This includes data sources (as shown in TE) and any parameters that may be defined.

In [None]:
#get expression Data
print('Expression processing started')
dax_query = "EVALUATE INFO.Expressions()"
expression_df = getMetaData(dax_query,'ExpressionName','expression')

#display(expression_df)

writeTable(expression_df,"Expression")
print('Expression processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

get hierarchy data (if any)

In [None]:
#get hierarchy Data
print('Hierarchy processing started')
dax_query = "EVALUATE INFO.Hierarchies()"
hierarchy_df = getMetaData(dax_query,'HierarchyName','hierarchy')

#display(final_df)
#add colulmn for join with table
hierarchy_df['compoundkey'] = hierarchy_df['SemanticModel'] + hierarchy_df['TableID'].astype(str)

hierarchy_df =pd.merge(hierarchy_df,table_df[['compoundkey','TableName']],left_on='compoundkey',right_on='compoundkey',how='left')
#display(hierarchy_df)

writeTable(hierarchy_df,"Hierarchies")
lprint('Hierarchy processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

get column data from selected models

In [None]:
#get column Data
print('Column processing started')
dax_query = "EVALUATE INFO.Columns()"
columns_df = getMetaData(dax_query,'ColumnName','column')

#add compound key for table join.  add colulmn to determine calculated columns (used for slicer)
columns_df['compoundkey'] = columns_df['SemanticModel'] + columns_df['TableID'].astype(str)
columns_df =pd.merge(columns_df,table_df[['compoundkey','TableName']],left_on='compoundkey',right_on='compoundkey',how='left')
columns_df['ColumnName'] = columns_df['ExplicitName'].combine_first(columns_df['InferredName'])
del columns_df['ExplicitName']
del columns_df['InferredName']
columns_df['ColumnType'] = np.where(columns_df['Expression'].isnull(),'Standard','Calculated')
#display(columns_df)

writeTable(columns_df,"PBIColumns")
print('Column processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

get info on any calculation groups from selected models

In [None]:
#get calculation groups
print('Calculation Item processing started')
dax_query = "EVALUATE INFO.CalculationGroups()"
calcgroup_df = getMetaData(dax_query,'CalculationGroupName','calcgroup')
calcgroup_df['compoundkey'] = calcgroup_df['SemanticModel'] + calcgroup_df['TableID'].astype(str)
calcgroup_df['groupcompoundkey'] = calcgroup_df['SemanticModel'] + calcgroup_df['ID'].astype(str)
calcgroup_df =pd.merge(calcgroup_df,table_df[['compoundkey','TableName']],left_on='compoundkey',right_on='compoundkey',how='left')
calcgroup_df['CalculationGroupName'] = calcgroup_df['TableName']
#display(calcgroup_df)

#get calcitems
dax_query = "EVALUATE INFO.CalculationItems()"
items_df = getMetaData(dax_query,'CalculationItemName','calcitem')
items_df['compoundkey'] = items_df['SemanticModel'] + items_df['CalculationGroupID'].astype(str)
items_df = pd.merge(items_df,calcgroup_df[['groupcompoundkey','CalculationGroupName']],left_on='compoundkey',right_on='groupcompoundkey',how='left')
#display(items_df)

writeTable(items_df,"CalculationItems")
print('Calculation Item processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

get Relationship data from selected models.  this does not include relationships for this documentation model 

In [None]:
#get relationships
print('Relationship processing started')
dax_query = "EVALUATE INFO.Relationships()"
relationship_df = getMetaData(dax_query,'RelationshipName','relationship')
# clean up colulmns for to/from relationships.  
relationship_df['FromCompoundKey'] = relationship_df['SemanticModel'] + relationship_df['FromTableID'].astype(str)
relationship_df['ToCompoundKey'] = relationship_df['SemanticModel'] + relationship_df['ToTableID'].astype(str)
relationship_df =pd.merge(relationship_df,table_df[['compoundkey','TableName','TableType']],left_on='FromCompoundKey',right_on='compoundkey',how='left')
relationship_df = relationship_df.rename(columns={'TableName':'FromTableName','TableType':'FromTableType'})
relationship_df =pd.merge(relationship_df,table_df[['compoundkey','TableName','TableType']],left_on='ToCompoundKey',right_on='compoundkey',how='left')
relationship_df = relationship_df.rename(columns={'TableName':'ToTableName','TableType':'ToTableType'})
relationship_df['FromCardinality'] = relationship_df['FromCardinality'].astype(str)
relationship_df['FromCardinality'] = relationship_df['FromCardinality'].replace('2','Many')
relationship_df['ToCardinality'] = relationship_df['ToCardinality'].astype(str)
relationship_df['ToCardinality'] = relationship_df['ToCardinality'].replace('2','Many')
####################################################################################################################
#define parameters. you do not need to edit this section
FromCondition = [
    (relationship_df['FromTableType'] == 'Fact'),
    (relationship_df['FromTableType'] == 'Dimension')
]
ToCondition = [
    (relationship_df['ToTableType'] == 'Fact'),
    (relationship_df['ToTableType'] == 'Dimension')
]
values = [factcolor,dimcolor]
###########################################################################################
relationship_df['ToColor'] = np.select(ToCondition,values,default=defaultcolor)
relationship_df['FromColor'] = np.select(FromCondition,values,default=defaultcolor)
#display(relationship_df)

writeTable(relationship_df,"Relationships")
print('Relationship processing completed')

StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)

In [None]:
if model_type == "DirectLake":
    ##Check if semantic model exists
    
    # List all datasets in the workspace
    # If workspace_name is not provided, it defaults to the notebook's attached lakehouse workspace or the notebook's own workspace.
    all_datasets = fabric.list_datasets(workspace=workspace_name if workspace_name in locals() else None)

    # Check if the semantic model exists in the list
    if semantic_model_name in all_datasets['Dataset Name'].values:
        print(f"The semantic model '{semantic_model_name}' exists in the workspace.")
        print("THE SEMANTIC MODEL WILL BE REPLACED")
        
    else:
        print(f"The semantic model '{semantic_model_name}' does not exist in the workspace.")
        

    table_list = ["calculationitems","expression","hierarchies","measurelist","model","partitions","pbicolumns","pbiroles","pbitables","relationships","semanticmodel"]

    directlake.generate_direct_lake_semantic_model(dataset=semantic_model_name,lakehouse_tables=table_list,workspace=workspace_name,lakehouse=lakehouse_name,lakehouse_workspace=workspace_name,schema=schema_name,overwrite=True,refresh=True) 
    df_datasets = fabric.list_datasets()
    semantic_model_id = df_datasets[df_datasets['Dataset Name'] == semantic_model_name]['Dataset ID'].iloc[0]
    print(f"Created Semantic Model {semantic_model_name}")
    ##rename tables
    with fabric.connect_semantic_model(dataset=semantic_model_name, workspace=workspace_name, readonly=False) as tom:
        # Iterate through all tables in the model to find the target table
        for table in tom.model.Tables:
            if table.Name == 'expression':
                table.Name = 'Expressions'
            if table.Name == 'calculationitems':
                table.Name = 'Calculation Items'
            if table.Name == 'hierarchies':
                table.Name = 'Hierarchies'
            if table.Name == 'measurelist':
                table.Name = 'Measure List'
            if table.Name == 'model':
                table.Name = 'Models'
            if table.Name == 'partitions':
                table.Name = 'Partitions'
            if table.Name == 'pbicolumns':
                table.Name = 'Columns'
            if table.Name == 'pbiroles':
                table.Name = 'Roles'
            if table.Name == 'pbitables':
                table.Name = 'Tables'
            if table.Name == 'relationships':
                table.Name = 'Relationships'
            if table.Name == 'semanticmodel':
                table.Name = 'Semantic Models'
    
        print("Table rename completed")
        

       
    #create relationships for directlake model
    with labs.tom.connect_semantic_model(dataset=semantic_model_name,
                            workspace=workspace_name,
                            readonly=False) as model:    
        
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Models",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Tables",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Tables",
            to_column="compoundkey",
            from_table="Columns",
            from_column="compoundkey",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Calculation Items",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Expressions",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Measure List",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Hierarchies",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Partitions",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Roles",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_relationship(
            to_table="Semantic Models",
            to_column="semanticmodel_id",
            from_table="Relationships",
            from_column="semanticmodel_id",
            from_cardinality="Many",
            to_cardinality="One",
            cross_filtering_behavior="Automatic",
            is_active=True
        )
        model.add_table(name=f'_Measures',description=f'Table to hold any measures.%%Other',hidden=False)
        
        print("_Measures table created")
        model.add_measure(
            table_name=f'_Measures',
            measure_name=f'Model Name',
            expression=f"IF(COUNTROWS(VALUES('Semantic Models'[Model]))>1 , \"Multiple Models\",MAX('Semantic Models'[Model]))",
            description=f'Measure to display selected model. Otherwise displays [Multiple Models]',
            hidden=False
        )
    print("Relationships added")
    

    #final refresh
    #refresh the newly created dataset
    print("Dataset refresh started...")
    fabric.refresh_dataset(workspace=workspace_name, dataset=semantic_model_name)
    print("Dataset refresh completed")
    
else:
    print("Use the Import mode template and connect to the selected lakehouse for the report. ")
    
print('All activity completed!')


StatementMeta(, 26a3b08a-c323-41a2-a046-2dd18163f6e0, -1, Cancelled, , Cancelled)