In [117]:
from pyspark.sql import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *
import sempy.fabric as fabric
from datetime import date, datetime as dt
import ast
import notebookutils
from functools import reduce
from random import randint
from time import sleep
from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException
import json
import time
import re
import pandas as pd
from sempy.fabric import FabricDataFrame
from sempy.dependencies import plot_dependency_metadata
import pytz as tz 



# Configuration
spark.conf.set('spark.sql.parquet.int96RebaseModeInWrite','LEGACY')
spark.conf.set('spark.sql.parquet.int96RebaseModeInRead','LEGACY')
spark.conf.set('spark.sql.parquet.datetimeRebaseModeInRead','LEGACY')


StatementMeta(, 1e9d8c98-b606-47a6-aab6-28b1e5d4c192, 119, Finished, Available, Finished)

In [None]:
# file path function
def udf_GetFilePath (workspace, lakehouse, table):

    workspaceID = fabric.list_workspaces( f"name eq '{workspace}'")['Id'][0]
         
    lakehousePath = notebookutils.lakehouse.get( name = lakehouse, workspaceId = workspaceID )['properties']['abfsPath']


    return f'{lakehousePath}/Tables/{table}'

In [1]:
# metadata functions



def udf_AddHashKeyColumn (df, naturalKeyColumnList) :
    
    # compile list of columns to include in hash, and exclude the natural key(s) of the table
    hashColumns = [c for c in df.columns if c not in naturalKeyColumnList]
    
    # hash column logic: replace nulls with the string 'NA', concatenate columns with a pipe separator, and create hash
    hash_column = sha2(concat_ws("|", *[when(col(c).isNotNull(), col(c).cast(StringType())).otherwise(lit('NA')) for c in hashColumns]), 256)
    
    # return the dataframe with the hash column
    return df.select(
                "*"
                ,hash_column.alias("ETLHashKey"))
    
def udf_AddSCD2Columns (df) :

    # add Start/End datetime columns, IsCurrent flag
    return df.select(
                "*"
                ,lit("1900-01-01").cast("timestamp").alias("StartDateTime")
                ,lit("9999-12-31").cast("timestamp").alias("EndDateTime")
                ,lit(True).alias("IsCurrent")
                )

def udf_AddModifiedDateColumn (df) :

    # add  modified datetime column
    return df.select(
                "*"
                ,current_timestamp().alias("ETLModifiedDateTime")
                )

def udf_AddPrimaryKey (df, targetPath, primaryKeyColumnName="ID") :

    # default the max existing ID to 0 (in the event of an initial load)
    maxID = 0

    # find the largest ID if the table exists  
    if notebookutils.fs.exists(targetPath):
        maxID = spark.read.format("delta").load( targetPath ).agg({primaryKeyColumnName:"max"}).collect()[0][0]

    return df.select((maxID + row_number().over(Window.orderBy(lit(None)))).alias(primaryKeyColumnName)
            ,"*"
            )
                    

StatementMeta(, , -1, SessionStarting, , SessionStarting)

In [121]:
def udf_LoadTableInitial (df, targetPath) :

    # write data to gold lakehouse
    (
        df
            .write
            .format( 'delta' )
            .mode( 'overwrite' )
            .option('delta.columnMapping.mode', 'name')
            .option( 'overwriteSchema', 'True' )
            .save( targetPath )
    )

StatementMeta(, 1e9d8c98-b606-47a6-aab6-28b1e5d4c192, 123, Finished, Available, Finished)

In [None]:
def udf_UpsertDimension(df, dimensionType, targetPath, naturalKeyColumnList, primaryKeyColumnName="ID", flagSoftDeletes=False):
    
    # collect stats for logging
    startTime = dt.now()
    staging_rows_added = df.count()

    # default some strings that we'll need in logic further down to operate for a type 1 dimension. We'll overwrite them for Type 2 SCDs next if applicable.
    dfTargetWhereClause = "1=1"
    mergeCondition = "1=1 AND "
    updateCondition = "1=1"

    # create filter strings to limit to current rows if we are loading a type 2 dimension
    if dimensionType==2:
        dfTargetWhereClause = "IsCurrent = 1"
        mergeCondition =  f"source.`{primaryKeyColumnName}` IS NULL AND target.IsCurrent = 1 AND "
        # for the merge statement below; "updates" in a type 2 will not update the entire record but rather expire a record
        updateSetClause = {"EndDateTime": "source.EndDateTime",
        "IsCurrent": "source.IsCurrent",
        "ETLModifiedDateTime": "source.ETLModifiedDateTime"}

    # for soft deletes, deleted flag will be treated like a normal field and included in the hash
    if flagSoftDeletes:

        # if records exist in the source DF, they are not deleted
        df = df.withColumn("IsDeleted",lit(False))

        # if this is not the initial load, pull in target data and join to source data to add deleted records which are no longer in the source
        if notebookutils.fs.exists(targetPath):
            dfTarget = (
            spark.read.format("delta").load( targetPath )
            .select(*naturalKeyColumnList)
            .where(dfTargetWhereClause)
            )

            deletes = (
                dfTarget
                .alias("target")
                .join(df.alias("source"), naturalKeyColumnList, "left_anti")
                .withColumn("IsDeleted",lit(True))
            )

            df = df.unionByName(deletes,allowMissingColumns=True)

    # first, take the source dataset and add a hash key
    df = udf_AddHashKeyColumn( df, naturalKeyColumnList )

    # if we are loading a type 2 dimension, add start/end/is current columns
    if dimensionType==2:
        df = udf_AddSCD2Columns( df )

    # add ETL Modified Date
    df = udf_AddModifiedDateColumn( df )


    # check if the target path exists; if not, load the data for the first time
    if notebookutils.fs.exists(targetPath):


        targetDelta = DeltaTable.forPath( spark, targetPath )

        dfTarget = (
        spark.read.format("delta").load( targetPath )
        .select(*naturalKeyColumnList,col("ETLHashKey").alias("targetHashKey"),col(primaryKeyColumnName).alias("targetID"))
        .where(dfTargetWhereClause)
        )

        # limit source dataset to rows which differ from target or are new entirely
        updates = (
            df
            .alias("source")
            .join(dfTarget.alias("target"), naturalKeyColumnList, "inner")
            .where("source.ETLHashKey != target.targetHashKey")
        )

        # if this is a type 2 dimension, updates result in expiring the current row
        if dimensionType==2:
            expires = (
                updates
                .withColumn("IsCurrent",lit(False))
                .withColumn("EndDateTime",current_timestamp())
            )

        inserts = (
            df
            .alias("source")
            .join(dfTarget.alias("target"), naturalKeyColumnList, "left")
            .where("target.targetHashKey IS NULL")
        )
        
        #if this is a type 2 dimension, inserts are really the new record for updates and brand new dimension records altogether
        if dimensionType==2:
            inserts = (
                inserts
                .union(updates.withColumn("StartDateTime",current_timestamp()))
            )
            updates = expires

        inserts = udf_AddPrimaryKey(inserts, targetPath, primaryKeyColumnName)

        mergeCondition += " AND ".join([f"target.`{c}` = source.`{c}`" for c in naturalKeyColumnList])

        insertsCount = inserts.count()
        updatesCount = updates.count()

        stageChanges = (
            inserts
            .union(
                updates
                .select(
                    lit(None).cast(IntegerType()).alias(primaryKeyColumnName)
                    ,"*"
                )
            )
            .drop("targetHashKey","targetID")
        )


        if dimensionType == 1:
            # we created this clause for type 2 dimensions above, but for type 1 we'll want to update every column except for the primary key
            updateSetClause = {f"`{c}`": f"source.`{c}`" for c in stageChanges.columns if c != primaryKeyColumnName}

        # perform updates and inserts using merge statement
        (targetDelta.alias("target").merge(
            source = stageChanges.alias("source"),
            condition = mergeCondition
        )
        .whenMatchedUpdate(set=updateSetClause)
        .whenNotMatchedInsertAll().execute()
        )
        print('Upsert complete')

    else:

        df = udf_AddPrimaryKey(df, targetPath, primaryKeyColumnName)
        udf_LoadTableInitial(df, targetPath)
        insertsCount = df.count()
        updatesCount = 0
        print('Initial load complete')    


    # Return output to caller for logging
    stopTime = dt.now()
    details = f'{updatesCount} records updated, {insertsCount} records inserted from {staging_rows_added} staging rows'
    returnVal = {
        'startTime': str( startTime ),
        'stopTime': str( stopTime ),
        'details': str( details)
    }

    return returnVal

In [None]:
# Function to sync SQL endpoint with the Lakehouse
def udf_SyncSqlEndpoint(workspace, lakehouse):
    logs = []
    try:
        # Fetch SQL endpoint properties

        workspaceID = fabric.list_workspaces( f"name eq '{workspace}'")['Id'][0]
        lakehouseID = notebookutils.lakehouse.get( name = lakehouse, workspaceId = workspaceID )['id']

        client = fabric.FabricRestClient()
        lakehouse_info = client.get(f"/v1/workspaces/{workspaceID}/lakehouses/{lakehouseID}").json()
        sql_endpoint_id = lakehouse_info['properties']['sqlEndpointProperties']['id']
        
        # Set URI for the API call
        uri = f"/v1.0/myorg/lhdatamarts/{sql_endpoint_id}"
        payload = {"commands": [{"$type": "MetadataRefreshExternalCommand"}]}
        
        # Call REST API to initiate the sync
        response = client.post(uri, json=payload)
        if response.status_code != 200:
            logs.append(f"Error initiating sync: {response.status_code} - {response.text}")
            return
        
        data = json.loads(response.text)

        batch_id = data["batchId"]
        progress_state = data["progressState"]

        # URL for checking the sync status
        status_uri = f"/v1.0/myorg/lhdatamarts/{sql_endpoint_id}/batches/{batch_id}"
        
        # Polling until the sync is complete
        while progress_state == 'inProgress':
            time.sleep(1)  # Polling interval
            status_response = client.get(status_uri)
            status_data = status_response.json()
            progress_state = status_data["progressState"]
        
        # Check if the sync completed successfully
        if progress_state == 'success':
            table_details = [
                {
                    'tableName': table['tableName'],
                    'lastSuccessfulUpdate': table.get('lastSuccessfulUpdate', 'N/A'),
                    'tableSyncState': table['tableSyncState'],
                    'sqlSyncState': table['sqlSyncState'],
                    'warningMessages': table['warningMessages']
                }
                for table in status_data['operationInformation'][0]['progressDetail']['tablesSyncStatus']
            ]
            
            # Print extracted table details
            for detail in table_details:
                print(f"Table: {detail['tableName']}   Last Update: {detail['lastSuccessfulUpdate']}  "
                      f"Table Sync State: {detail['tableSyncState']}  SQL Sync State: {detail['sqlSyncState']}   "
                      f"Table Warnings: {detail['warningMessages']}")
                logs.append(f"Table: {detail['tableName']}   Last Update: {detail['lastSuccessfulUpdate']}  "
                      f"Table Sync State: {detail['tableSyncState']}  SQL Sync State: {detail['sqlSyncState']}   "
                      f"Table Warnings: {detail['warningMessages']}")
            #uncomment if you need to see all the details
            #display(status_data)
            
        # Handle failure
        elif progress_state == 'failure':
            logs.append(f"Sync failed: {status_data}")
    
    except FabricHTTPException as fe:
        logs.append(f"Fabric HTTP Exception: {fe}")
    except WorkspaceNotFoundException as we:
        logs.append(f"Workspace not found: {we}")
    except Exception as e:
        logs.append(f"An unexpected error occurred: {e}")
    return(logs)

