**Helper notebook**

In [None]:
%run nb_helper

**Define a logging dataframe**

In [None]:
dfLogging = pd.DataFrame(columns = ['LoadId','NotebookId', 'NotebookName', 'WorkspaceId', 'CellId', 'Timestamp', 'ElapsedTime', 'Message', 'ErrorMessage'])
vContext = mssparkutils.runtime.context
vNotebookId = vContext["currentNotebookId"]
vLogNotebookName = vContext["currentNotebookName"]
vWorkspaceId = vContext["currentWorkspaceId"] # where the notebook is running, to not confuse with source and target workspaces

**Parameters --> convert to code for debugging the notebook. otherwise, keep commented as parameters are passed from DevOps pipelines**

In [None]:

pSqlToken = ""
pSourceWorkspaceId = ""
pTargetWorkspaceId = ""
pDebugMode = "yes"

**Resolve source and target workspace ids**

In [None]:
vSourceWorkspaceName = fabric.resolve_workspace_name(pSourceWorkspaceId)
vTargetWorkspaceName = fabric.resolve_workspace_name(pTargetWorkspaceId)

**List source and target warehouses**

In [None]:
df_source_warehouses = labs.list_warehouses(workspace=vSourceWorkspaceName)
df_target_warehouses = labs.list_warehouses(workspace=vTargetWorkspaceName)

**Verify that there is a least one warehouse in the source or the target workspace --> if there are no warehouses, exit the notebook**

In [None]:
if df_target_warehouses.empty or df_source_warehouses.empty:
    vMessage = f"workspace <vSourceWorkspaceName> or workspace <vTargetWorkspaceName> have 0 warehouse. pre-update is not required"
    print(vMessage)


    # Display an exit message
    display(Markdown("### ✅ Notebook execution stopped successfully!"))

    # Exit without error
    # sys.exit(0)
    # InteractiveShell.instance().ask_exit()
    mssparkutils.notebook.exit(vMessage)

**Source and target sql analytics endpoints**

In [None]:
vSourceSqlEndpoint = df_source_warehouses.loc[0, 'Connection Info']
vTargetSqlEndpoint = df_target_warehouses.loc[0, 'Connection Info']

**Access Token**

In [None]:
vScope = "https://analysis.windows.net/powerbi/api"

# get the access token 
if pDebugMode == "yes":
    # in debug mode, use the token of the current user
    vSqlAccessToken  = mssparkutils.credentials.getToken(vScope)
else:
    # when the code is run from DevOps, the token passed as a parameter
    vSqlAccessToken = pSqlToken

**Sql statement to get the tables and their columns**

In [None]:
vSqlStatement = """
SELECT 
	 DB_NAME() as [DATABASE_NAME]
	,c.TABLE_SCHEMA
	,c.TABLE_NAME
	,c.ORDINAL_POSITION
	,c.COLUMN_NAME
	, 
	'[' + DATA_TYPE + ']'
	+ 
	CASE  
		WHEN DATA_TYPE IN ('tinyint', 'smallint', 'int', 'bigint','xml', 'smalldatetime', 'datetime', 'datetime2', 'bit', 'date', 'money', 'float', 'real') THEN ''
		WHEN DATA_TYPE IN ('varchar', 'nvarchar', 'nchar', 'varbinary', 'char') 
		THEN 
			'(' 
			+ 
			CASE CHARACTER_MAXIMUM_LENGTH 
				WHEN -1 THEN 'max'
				ELSE CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10))
			END 
			+ ')'
		WHEN DATA_TYPE IN ('numeric', 'decimal') THEN '(' + CAST(NUMERIC_PRECISION AS VARCHAR(10)) + ',' + CAST(NUMERIC_SCALE AS VARCHAR(10)) + ')'
	END 
	AS COLUMN_DEFINITION
FROM 
	INFORMATION_SCHEMA.COLUMNS c
	INNER JOIN INFORMATION_SCHEMA.TABLES t 
		ON c.TABLE_NAME = t.TABLE_NAME AND t.TABLE_TYPE = 'BASE TABLE'
WHERE	
	c.TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA','queryinsights','sys')
ORDER BY 
	c.TABLE_SCHEMA
	,c.TABLE_NAME
	,c.ORDINAL_POSITION
"""

**Functions**

In [None]:
# compare source and target dataframes, identify new and modified columns
def compare_dataframes(source_dataframe, target_dataframe, key_columns):

    # Ensure both DataFrames have the same columns
    assert list(target_dataframe.columns) == list(source_dataframe.columns), "DataFrames must have the same columns"

    source_dataframe_indexed = source_dataframe.set_index(key_columns)
    target_dataframe_indexed = target_dataframe.set_index(key_columns)

    # columns in source but not in target --> added to source
    df_columns_only_in_source = source_dataframe_indexed.loc[~source_dataframe_indexed.index.isin(target_dataframe_indexed.index)].reset_index()\

    # # rows in target but not in source --> deleted from source
    # columns_only_in_target = target_dataframe_indexed.loc[~target_dataframe_indexed.index.isin(source_dataframe_indexed.index)].reset_index()

    # columns in common but with a data type change
    df_common_rows = target_dataframe_indexed.index.intersection(source_dataframe_indexed.index)
    columns_with_type_change_list = []
    for index in df_common_rows:
            if not target_dataframe_indexed.loc[index].equals(source_dataframe_indexed.loc[index]):  # Compare row values
                modified_row = source_dataframe_indexed.loc[[index]].reset_index()  # Fetch modified row from B
                # modified_row["Change_Type"] = "modified"
                columns_with_type_change_list.append(modified_row)

    df_columns_with_type_change = pd.concat(columns_with_type_change_list, ignore_index=True) if columns_with_type_change_list else pd.DataFrame()

    return df_columns_only_in_source, df_columns_with_type_change

In [None]:
# create the alchemy engine
def create_sqlalchemy_engine(connection_string : str):
    token = pSqlToken
    SQL_COPT_SS_ACCESS_TOKEN = 1256

    # the following code is required to structure the token for pyodbc.connect
    exptoken = b'';
    for i in bytes(token, "UTF-8"):
        exptoken += bytes({i});
        exptoken += bytes(1);
    tokenstruct = struct.pack("=i", len(exptoken)) + exptoken;

    return sqlalchemy.create_engine("mssql+pyodbc://", creator=lambda: pyodbc.connect(connection_string, attrs_before = { SQL_COPT_SS_ACCESS_TOKEN:bytearray(tokenstruct) }))

**Get the definition of warehouse(s) tables in the source workspace**

In [None]:
df_source_warehouses_columns = pd.DataFrame()
try:

    for index, row in df_source_warehouses.iterrows():

        # get the current warehouse
        vWarehouseName = row['Warehouse Name']

        # define the connection string for the alchemy engine
        vConnectionString = f"Driver={{ODBC Driver 18 for SQL Server}};Server={vSourceSqlEndpoint};Database={vWarehouseName};"
        # print(vConnectionString)

        # create the sql engine
        sql_engine = create_sqlalchemy_engine(vConnectionString)

        # connect to the engine
        with sql_engine.connect() as sql_connection:

            # get the definition of the tables
            df_source_warehouses_columns_temp = pd.read_sql(vSqlStatement, sql_connection)

            # append the rows to the dataframe
            df_source_warehouses_columns = pd.concat([df_source_warehouses_columns, df_source_warehouses_columns_temp], ignore_index=True)

    # logging
    vMessage = f"succeeded"
    dfLogging.loc[len(dfLogging.index)] = [None, vNotebookId, vLogNotebookName, vWorkspaceId, 'identify source warehouses tables definition', datetime.now(), None, vMessage, ''] 
except Exception as e:
    vMessage = f"failed"
    dfLogging.loc[len(dfLogging.index)] = [None, vNotebookId, vLogNotebookName, vWorkspaceId, 'identify source warehouses tables definition', datetime.now(), None, vMessage, str(e) ] 
    if pDebugMode == "yes":
        print(str(e))


**Get the definition of warehouse(s) tables in the target workspace**

In [None]:
df_target_warehouses_columns = pd.DataFrame()
try:

    for index, row in df_target_warehouses.iterrows():

        # get the current warehouse
        vWarehouseName = row['Warehouse Name']

        # define the connection string for the alchemy engine
        vConnectionString = f"Driver={{ODBC Driver 18 for SQL Server}};Server={vTargetSqlEndpoint};Database={vWarehouseName}"

        # create the sql engine
        sql_engine = create_sqlalchemy_engine(vConnectionString)

        # connect to the engine
        with sql_engine.connect() as sql_connection:

            # get the definition of the tables
            df_target_warehouses_columns_temp = pd.read_sql(vSqlStatement, sql_connection)

            # append the rows to the dataframe
            df_target_warehouses_columns = pd.concat([df_target_warehouses_columns, df_target_warehouses_columns_temp], ignore_index=True)

    # logging
    vMessage = f"succeeded"
    dfLogging.loc[len(dfLogging.index)] = [None, vNotebookId, vLogNotebookName, vWorkspaceId, 'identify target warehouses tables definition', datetime.now(), None, vMessage, ''] 
except Exception as e:
    vMessage = f"failed"
    dfLogging.loc[len(dfLogging.index)] = [None, vNotebookId, vLogNotebookName, vWorkspaceId, 'identify target warehouses tables definition', datetime.now(), None, vMessage, str(e) ] 
    if pDebugMode == "yes":
        print(str(e))


**Build the logic for the sql statements**

In [None]:
# key columns for the merge and comparison
key_columns = ['DATABASE_NAME', 'TABLE_SCHEMA', 'TABLE_NAME', 'COLUMN_NAME']

# source and target comparison
df_columns_only_in_source, df_columns_with_type_changed = compare_dataframes(df_source_warehouses_columns, df_target_warehouses_columns, key_columns)
df_compare_dataframes_unioned = pd.concat([df_columns_only_in_source, df_columns_with_type_changed]).drop_duplicates()

if not df_compare_dataframes_unioned.empty:
    df_columns_in_common_not_changed = df_source_warehouses_columns.merge(df_compare_dataframes_unioned, on=list(df_source_warehouses_columns.columns), how='left', indicator=True).query('_merge == "left_only"').drop('_merge', axis=1)
    df_columns_in_common_not_changed['SelectColumnStatement'] = df_columns_in_common_not_changed['COLUMN_NAME']

# # select statement generation
if not df_columns_only_in_source.empty:
    df_columns_only_in_source['SelectColumnStatement'] = "CAST(NULL AS " + df_columns_only_in_source['COLUMN_DEFINITION'] + ") AS " + df_columns_only_in_source['COLUMN_NAME']
else:
    df_columns_only_in_source = pd.DataFrame()

if not df_columns_with_type_changed.empty:
    df_columns_with_type_changed['SelectColumnStatement'] = "CAST([" + df_columns_with_type_changed['COLUMN_NAME'] + "] AS " + df_columns_with_type_changed['COLUMN_DEFINITION'] + ") AS " + df_columns_with_type_changed['COLUMN_NAME']
else:
    df_columns_with_type_changed = pd.DataFrame()


# sources tables that changed
if not df_compare_dataframes_unioned.empty:

    # generate a distinct list of tables that changed
    df_changed_tables = df_compare_dataframes_unioned[['DATABASE_NAME', 'TABLE_SCHEMA', 'TABLE_NAME']].drop_duplicates()
    # build the sql statement to run against the target warehouse
    df_sql_statements = pd.concat([df_columns_only_in_source, df_columns_with_type_changed, df_columns_in_common_not_changed]).drop_duplicates().sort_values(by=['DATABASE_NAME', 'TABLE_SCHEMA', 'TABLE_NAME', 'ORDINAL_POSITION' ])
    df_sql_statements_grouped = df_sql_statements.groupby(['DATABASE_NAME', 'TABLE_SCHEMA', 'TABLE_NAME'])['SelectColumnStatement'].agg(','.join).reset_index()
    df_sql_statements_grouped["DropBackupTableStatement"] = "DROP TABLE IF EXISTS " + df_sql_statements_grouped["TABLE_SCHEMA"] + "." + df_sql_statements_grouped["TABLE_NAME"] + "_backup"
    df_sql_statements_grouped["CtasStatement"] = "CREATE TABLE " + df_sql_statements_grouped["TABLE_SCHEMA"] + "." + df_sql_statements_grouped["TABLE_NAME"] + "_backup AS SELECT " + df_sql_statements_grouped["SelectColumnStatement"] + " FROM " + df_sql_statements_grouped["TABLE_SCHEMA"] + "." + df_sql_statements_grouped["TABLE_NAME"]
    df_sql_statements_grouped["DropTableStatement"] = "DROP TABLE IF EXISTS " + df_sql_statements_grouped["TABLE_SCHEMA"] + "." + df_sql_statements_grouped["TABLE_NAME"]
    df_sql_statements_grouped["RenamingTableStatement"] = "EXEC sp_rename '" + df_sql_statements_grouped["TABLE_SCHEMA"] + "." + df_sql_statements_grouped["TABLE_NAME"] + "_backup', '" + df_sql_statements_grouped["TABLE_NAME"] + "';"

else:
    df_changed_tables = pd.DataFrame()




**Run the sql statements against the target sql endpoint**

In [None]:

try:

    # iterate through the source warehouses
    for index, row in df_source_warehouses.iterrows():

        # get the current warehouse
        vWarehouseName = row['Warehouse Name']

        if not df_changed_tables.empty:

            # filter the changed tables on the current warehouse
            df_changed_tables_in_scope = df_changed_tables[df_changed_tables['DATABASE_NAME']==vWarehouseName]

            # if the changed tables df is not empty
            if not df_changed_tables_in_scope.empty:

                # define the connection string for the alchemy engine
                vConnectionString = f"Driver={{ODBC Driver 18 for SQL Server}};Server={vTargetSqlEndpoint};Database={vWarehouseName}"

                # create the sql engine
                sql_engine = create_sqlalchemy_engine(vConnectionString)

                # connect to the engine
                with sql_engine.connect() as sql_connection:

                    connection = sql_engine.raw_connection()
                    cursor = connection.cursor()


                    # iterate over tables that require an update
                    for index_table, row_table in df_changed_tables_in_scope.iterrows():
                        vChangedSchema = row_table['TABLE_SCHEMA']
                        vChangedTable = row_table['TABLE_NAME']

                        # filter the sql statements on the current warehouse, schema and table
                        df_sql_statements_in_scope = df_sql_statements_grouped[(df_sql_statements_grouped['DATABASE_NAME']==vWarehouseName) & (df_sql_statements_grouped['TABLE_SCHEMA']==vChangedSchema) & (df_sql_statements_grouped['TABLE_NAME']==vChangedTable)]


                        # retrieve each of the sql statement and execute it
                        vDropBackupTableStatement = df_sql_statements_in_scope.loc[0, 'DropBackupTableStatement'] + ';'
                        print(f"running statement: {vDropBackupTableStatement}")
                        cursor.execute(vDropBackupTableStatement)

                        vCtasStatement = df_sql_statements_in_scope.loc[0, 'CtasStatement'] + ';'
                        print(f"running statement: {vCtasStatement}")
                        cursor.execute(vCtasStatement)

                        vDropTableStatement = df_sql_statements_in_scope.loc[0, 'DropTableStatement'] + ';'
                        print(f"running statement: {vDropTableStatement}")
                        cursor.execute(vDropTableStatement)

                        vRenamingTableStatement = df_sql_statements_in_scope.loc[0, 'RenamingTableStatement'] + ';'
                        print(f"running statement: {vRenamingTableStatement}")
                        cursor.execute(vRenamingTableStatement)

                    # commit
                    connection.commit()

            else:
                vMessage = f"no change detected in warehouse <{vWarehouseName}>"
                if pDebugMode == "yes":
                    print(vMessage)

        else:
            vMessage = f"no change detected in existings warehouses"
            if pDebugMode == "yes":
                print(vMessage)

    # logging
    vMessage = f"succeeded"
    dfLogging.loc[len(dfLogging.index)] = [None, vNotebookId, vLogNotebookName, vWorkspaceId, 'updating target warehouses tables definition', datetime.now(), None, vMessage, ''] 

except Exception as e:
    vMessage = f"failed"
    dfLogging.loc[len(dfLogging.index)] = [None, vNotebookId, vLogNotebookName, vWorkspaceId, 'updating target warehouses tables definition', datetime.now(), None, vMessage, str(e) ] 
    if pDebugMode == "yes":
        print(str(e))


**Logging**

In [None]:
try:
    # perform the conversion of columns
    dfLogging = dfLogging.astype({
            "LoadId": "string",	
            "NotebookId": "string", 	
            "NotebookName": "string", 
            "WorkspaceId": "string", 
            "CellId": "string", 
            "Timestamp": "datetime64[ns]", 
            "ElapsedTime": "string", 
            "Message": "string", 
            "ErrorMessage" : "string"
        })

    # save panda dataframe to a spark dataframe 
    sparkDF_Logging = spark.createDataFrame(dfLogging) 

    # save to the lakehouse
    sparkDF_Logging.write.mode("append").format("delta").option("mergeSchema", "true").saveAsTable("staging.notebook_logging_cicd")

except Exception as e:
    vMessage = "saving logs to the lakehouse failed"
    if pDebugMode == "yes":
        print(str(e))