In [None]:
# PARAMETERS 
# Provide two workspaces and two lakehouses names

workspace_name_1 = ''       # <----- FILL
lakehouse_name_1 = ''       # <----- FILL

workspace_name_2 = ''       # <----- FILL
lakehouse_name_2 = ''       # <----- FILL

config_param_comparison_mode = "structure"                  # <----- FILL
# Which dataframes to display as a result?
# rowscols - table_name, rowcount_1, rowcount_2, rowscount_match, colscount_1, colscount_2, colscount_match |||||| structure - table_name, column_name, type_1, type_2, type_match
# ### OPTIONS: rowscols (default)    | structure | full

config_param_shortcuts = "included"                         # <----- FILL
# Do you want to include shortcuts or scan tables only?
# ### OPTIONS: included (default)    | excluded  | only

config_param_results_match = "all"                          # <----- FILL
# Do you want to return all records, or no matches (on rows count, cols count or type checks)?
# ### OPTIONS: all (default)         | nomatchonly

config_param_item_filter = "e%"                             # <----- FILL
# Do you want to scan all tables/shortcut names or only specified?
# ### OPTIONS: empty (default)       | custom table name, e.g. charges ; multiple table names separated with , ; regular expressions work as well

config_param_progress_log = "enabled"                       # <----- FILL
# Do you want to show the progress log?
# ### OPTIONS: enabled (default)     | disabled

In [None]:
import sempy.fabric as fabric
import json
import re
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
from pyspark.sql import Row, DataFrame
from pyspark.sql.functions import col, lower, array_except, lit, split
from pyspark.sql import types

# Function returning arrays operation result
def select_array(array1, array2, param):
    if param == "included":
        return array1
    elif param == "only":
        return array2
    elif param == "excluded":
        return list(set(array1) - set(array2))  # Remove items in array2 from array1
    else:
        return array1

# Function filtering lakehouse items
def filter_lakehouse_tables(lakehouse_items_names, config_param_item_filter):
    # If filter is empty, return all table names
    if not config_param_item_filter.strip():
        return lakehouse_items_names

    # Split filters by commas and convert SQL-like % to regex-compatible .*
    filters = [f.strip().replace("%", ".*") for f in config_param_item_filter.split(",")]

    # Apply regex filtering
    filtered_tables = [table for table in lakehouse_items_names if any(re.fullmatch(pattern, table) for pattern in filters)]

    return filtered_tables


# Create a function to get LH delta tables stats
def get_lh_table_data_stats(workspace_name: str, lakehouse_name: str):

    # Define the schema for rowscols
    schema_df_rowscols = StructType([
        StructField("workspace_name", StringType(), True),
        StructField("lakehouse_name", StringType(), True),
        StructField("table_name", StringType(), True),
        StructField("rows_cnt", IntegerType(), True),
        StructField("cols_cnt", IntegerType(), True)
    ])

    # Define the schema for structure
    schema_df_structure = StructType([
        StructField("name_col", StringType(), True),
        StructField("type_col", StringType(), True)
    ])

    # Create an empty DataFrame with the defined schema
    df_rowscols = spark.createDataFrame([], schema_df_rowscols)    

    # Create an empty DataFrame with the defined schema
    df_structure = spark.createDataFrame([], schema_df_structure)   

    # Extract a list of all environments
    workspace_raw = fabric.FabricRestClient().get(f"/v1/workspaces").json()
    workspaces = workspace_raw.get("value", [])
    workspaces_df = spark.createDataFrame(workspaces)

    # Find workspace id
    selected_workspace_name = workspace_name
    selected_workspace_df = workspaces_df.filter(workspaces_df["displayName"] == selected_workspace_name)
    selected_workspace_id_row = selected_workspace_df.select("id").collect()
    selected_workspace_id = selected_workspace_id_row[0]["id"]

    # Extract a list of all items in workspace
    workspace_items_path = "v1/workspaces/"+selected_workspace_id+"/items"
    workspace_items_raw = fabric.FabricRestClient().get(workspace_items_path).json()
    workspace_items = workspace_items_raw.get("value", [])
    workspace_items_df = spark.createDataFrame(workspace_items)

    # Find lakehouse id
    lakehouses_df = workspace_items_df.filter((workspace_items_df["displayName"] == lakehouse_name) & (workspace_items_df["type"] == 'Lakehouse'))
    lakehouse_id_row = lakehouses_df.select("id").collect()
    lakehouse_id = lakehouse_id_row[0]["id"]

    # Extract a list of all tables in lakehouse
    lakehouse_items_path = "v1/workspaces/"+selected_workspace_id+"/lakehouses/"+lakehouse_id+"/tables"
    lakehouse_items_raw = fabric.FabricRestClient().get(lakehouse_items_path).json()
    lakehouse_tables_names = [item['name'] for item in lakehouse_items_raw['data']]

    # Extract a list of all shortcuts in lakehouse
    shortcuts_list = "v1/workspaces/"+selected_workspace_id+"/items/"+lakehouse_id+"/shortcuts"
    shortcuts_list_raw = fabric.FabricRestClient().get(shortcuts_list).json()
    shortcuts_list_names = [item['name'] for item in shortcuts_list_raw['value']]
    # df_shortcuts_list_raw = spark.createDataFrame(shortcuts_list_raw["value"])
    # df_shortcut_list = df_shortcuts_list_raw.select(col("name"))

    # Apply config_param_shortcuts value
    lakehouse_tables_names = select_array(lakehouse_tables_names, shortcuts_list_names, config_param_shortcuts)

    # Apply filtering on tables names
    lakehouse_tables_names = filter_lakehouse_tables(lakehouse_tables_names, config_param_item_filter)

    # Iterator built
    item_count = len(lakehouse_tables_names)
    iterator = 1

    # Apply progress log display
    if config_param_progress_log == "enabled":
        display(f"{workspace_name} - {lakehouse_name} lakehouse")

    for table_item in lakehouse_tables_names:
        
        # Define delta table path
        delta_table_path = (f"abfss://{selected_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_item}")

        # Check DeltaTable params remotely
        try:
            delta_table = DeltaTable.forPath(spark, delta_table_path)
        except AnalysisException as e:
            if "DELTA_MISSING_DELTA_TABLE" in str(e):
                display(f"Skipping non-Delta table: {delta_table_path}")
                delta_table = None  # or continue if inside a loop
            else:
                raise  # Re-raise other exceptions

        if config_param_comparison_mode != "structure":

            # Count rows and columns
            row_count = delta_table.toDF().count()          # Optimized rows count
            column_count = len(delta_table.toDF().columns)  # Optimized cols count

            # Create a new row to insert into result
            new_row = Row(workspace_name = workspace_name, lakehouse_name = lakehouse_name, table_name = table_item, rows_cnt = row_count, cols_cnt = column_count)
            new_row_df = spark.createDataFrame([new_row], schema_df_rowscols)
            df_rowscols = df_rowscols.union(new_row_df)

        if config_param_comparison_mode != "rowscols":

            # Create a new row to insert schema info
            schema_info = [(field.name, field.dataType.simpleString()) for field in delta_table.toDF().schema.fields]
            new_row_df = spark.createDataFrame(schema_info, schema_df_structure)
            new_row_df = new_row_df.withColumn(
                "name_col",  # Name of the column you want to update
                F.concat(F.lit(table_item + "."), F.col("name_col"))  # Concatenate table_item + "." with the existing name_col value
                )
            df_structure = df_structure.union(new_row_df)

        # Iterator increase
        if config_param_progress_log == "enabled":
            display(f"({iterator}/{item_count}) {table_item} processed")

        iterator += 1
    
    if config_param_progress_log == "enabled":
        display("---")
    return df_rowscols, df_structure

# Call the function twice
ws1_result, ws1_result_structures = get_lh_table_data_stats(workspace_name_1, lakehouse_name_1)
ws2_result, ws2_result_structures = get_lh_table_data_stats(workspace_name_2, lakehouse_name_2)

# Format the partialresults
ws1_result = ws1_result.withColumn("table_name", lower(col("table_name")))
ws1_result = ws1_result.withColumnRenamed("rows_cnt", f"ROWS__{workspace_name_1}__{lakehouse_name_1}")
ws1_result = ws1_result.withColumnRenamed("cols_cnt", f"COLS__{workspace_name_1}__{lakehouse_name_1}")

ws1_result_structures = ws1_result_structures.withColumn("name_col", lower(col("name_col")))
ws1_result_structures = ws1_result_structures.withColumnRenamed("type_col", f"TYPE__{workspace_name_1}__{lakehouse_name_1}")

ws2_result = ws2_result.withColumn("table_name", lower(col("table_name")))
ws2_result = ws2_result.withColumnRenamed("rows_cnt", f"ROWS__{workspace_name_2}__{lakehouse_name_2}")
ws2_result = ws2_result.withColumnRenamed("cols_cnt", f"COLS__{workspace_name_2}__{lakehouse_name_2}")

ws2_result_structures = ws2_result_structures.withColumn("name_col", lower(col("name_col")))
ws2_result_structures = ws2_result_structures.withColumnRenamed("type_col", f"TYPE__{workspace_name_2}__{lakehouse_name_2}")

ws1_result = ws1_result.drop("workspace_name", "lakehouse_name")
ws2_result = ws2_result.drop("workspace_name", "lakehouse_name")

# Join (merge) the results
df_result_rowscols = ws1_result.join(ws2_result, on="table_name", how="outer")
df_result_structure = ws1_result_structures.join(ws2_result_structures, on="name_col", how="outer")

# Create match columns
df_result_rowscols = df_result_rowscols.withColumn(
    "ROWS_MATCH", 
    F.when(F.col(f"ROWS__{workspace_name_1}__{lakehouse_name_1}") == F.col(f"ROWS__{workspace_name_2}__{lakehouse_name_2}"), True).otherwise(False)
)

df_result_rowscols = df_result_rowscols.withColumn(
    "COLS_MATCH", 
    F.when(F.col(f"COLS__{workspace_name_1}__{lakehouse_name_1}") == F.col(f"COLS__{workspace_name_2}__{lakehouse_name_2}"), True).otherwise(False)
)

df_result_structure = df_result_structure.withColumn(
    "TYPE_MATCH", 
    F.when(F.col(f"TYPE__{workspace_name_1}__{lakehouse_name_1}") == F.col(f"TYPE__{workspace_name_2}__{lakehouse_name_2}"), True).otherwise(False)
)

# Split name_col column in structure df into two new columns and drop the original
df_result_structure = df_result_structure.withColumn("table_name", split(col("name_col"), "\\.").getItem(0)) \
       .withColumn("column_name", split(col("name_col"), "\\.").getItem(1)) \
       .drop("name_col")  # Remove the original column

# Format and display result
df_result_rowscols = df_result_rowscols.select("table_name", f"ROWS__{workspace_name_1}__{lakehouse_name_1}", f"ROWS__{workspace_name_2}__{lakehouse_name_2}", "ROWS_MATCH", f"COLS__{workspace_name_1}__{lakehouse_name_1}", f"COLS__{workspace_name_2}__{lakehouse_name_2}", "COLS_MATCH")
df_result_structure = df_result_structure.select("table_name", "column_name", f"TYPE__{workspace_name_1}__{lakehouse_name_1}", f"TYPE__{workspace_name_2}__{lakehouse_name_2}", "TYPE_MATCH")

# Apply filtering on results match
if config_param_results_match == "nomatchonly":
    df_result_rowscols = df_result_rowscols.filter((col("ROWS_MATCH") == False) | (col("COLS_MATCH") == False))
    df_result_structure = df_result_structure.filter(col("TYPE_MATCH") == False)

# Display result
if config_param_comparison_mode == "rowscols":
    display(df_result_rowscols)
elif config_param_comparison_mode == "structure":
    display(df_result_structure)
elif config_param_comparison_mode == "full":
    display(df_result_rowscols)
    display(df_result_structure)
else:
    display(df_result_rowscols)

