In [None]:
#installing semlink lib in fabric notebook
%pip install semantic-link

In [None]:
#imports
from notebookutils import mssparkutils
from datetime import datetime
import pandas as pd
from pyspark.sql.functions import when,isnull
import pandas as pd
import json
import sempy.fabric as fabric 
from functools import lru_cache
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, StringType
from notebookutils import mssparkutils  
from pyspark.sql import functions as F 

In [None]:
#variables 
spark.conf.set("fs.azure.account.key.xx.dfs.core.windows.net","xx")
#offset should be updated everytime the script runs
offset = datetime.strptime("2024-01-01 09:00:00", "%Y-%m-%d %H:%M:%S")
pathDeltaLog="abfss://xx@xx.dfs.core.windows.net/deltaTablePartitioned1/_delta_log/"
dataset = "xx" 
workspace = "xx"

In [None]:
listOfPartsToAdd = ()

emptyRDD = spark.sparkContext.emptyRDD()

# Define the main schema of the delta log with nested structures
partition_values_type = StructType([StructField("cca2", StringType())])
tags_type = StructType([
    StructField("INSERTION_TIME", StringType()),
    StructField("MAX_INSERTION_TIME", StringType()),
    StructField("MIN_INSERTION_TIME", StringType()),
    StructField("OPTIMIZE_TARGET_SIZE", StringType())
])

main_schema = StructType([
    StructField("dataChange", BooleanType()),
    StructField("modificationTime", LongType()),
    StructField("partitionValues", partition_values_type),
    StructField("path", StringType()),
    StructField("size", LongType()),
    StructField("stats", StringType()),
    StructField("tags", tags_type)
])

main_main_schema =  StructType([StructField("add",main_schema),StructField("remove",main_schema)])

#create initial empty spark dataframe 
dfJsons = spark.createDataFrame(emptyRDD,main_main_schema)

#Iterate over the delta log files and check what partitions had addition or removals of files with reference to the offset (that should be updated everytime the script runs).
for fileX in mssparkutils.fs.ls(pathDeltaLog):
    if (datetime.fromtimestamp(fileX.modifyTime/1000) > offset) and ".json" in fileX.name and "compacted" not in fileX.name:
        print("File: " + fileX.name + " ModidiedTimestamp: " + str(datetime.fromtimestamp(fileX.modifyTime/1000)))
        dfJsons = dfJsons.unionByName(spark.read.json(pathDeltaLog + fileX.name),allowMissingColumns=True)

#consider partitions that had either additions or removals of files
PartsToLoad = dfJsons.select("add","remove").filter("add is not null or remove is not null")
PartsToLoad = PartsToLoad.withColumn("partitionToRefresh",when(isnull(dfJsons["add"]),dfJsons.remove.partitionValues.cca2).otherwise(dfJsons.add.partitionValues.cca2))
PartsToLoad1 = PartsToLoad.select("partitionToRefresh").drop_duplicates(subset=["partitionToRefresh"]).toPandas()

In [None]:
import pandas as pd
import json
import sempy.fabric as fabric 
from functools import lru_cache

#Important! Refresh to the latest version of the semantic model. If we update the semantic model in the meantime we will need to refresh the cache
fabric.refresh_tom_cache(workspace)

#credit to Sandeep Pawar: https://fabric.guru/refreshing-individual-tables-and-partitions-with-semantic-link
#function that returns the partitions on the semantic model
def get_partition_refreshes(dataset, workspace):
    tmsl_data = None
    tmsl_data = json.loads(fabric.get_tmsl(dataset=dataset, workspace=workspace))

    df = pd.json_normalize(
        tmsl_data, 
        record_path=['model', 'tables', 'partitions'], 
        meta=[
            ['model', 'name'], 
            ['model', 'tables', 'name']
        ],
        errors='ignore',
        record_prefix='partition_'
    )

    df = df.rename(columns={'model.tables.name': 'table_name'})
    return df[['table_name', 'partition_name']]

#returns existing partitions
df = get_partition_refreshes(dataset=dataset, workspace=workspace)
display(df)

In [None]:
#Left join - partitions that exist in source (delta table) and NOT in target (power bi semantic model)
partsToAddViaTMSL = PartsToLoad1.merge(df,left_on="partitionToRefresh",right_on="partition_name",how="left")
partsToAddViaTMSL1 = partsToAddViaTMSL[partsToAddViaTMSL["table_name"].isnull()]
partsToAddViaTMSL1

In [None]:
#OPTIONAL - Creating missing partitions programatically
#Explanation - Iterating over partitions that exist in source and not in power bi semantic model
for i in range(len(partsToAddViaTMSL1)):
  print(partsToAddViaTMSL1.loc[i, "partitionToRefresh"])
  data = {
      "create": {
          "parentObject": {
              "database": "partitionedDelta",
              "table": "Query1" #replace by your table name
          },
          "partition": {
              "name": "{part}".format(part=partsToAddViaTMSL1.loc[i, "partitionToRefresh"]), #partition Name of the filter that we will add to the m source
              "source": {
                  "type": "m",
                  "expression": 'let\n Source = let\n Source = AzureStorage.DataLake("https://xx.dfs.core.windows.net/xx/deltaTablePartitioned1",[HierarchicalNavigation = true])\n,ToDelta = DeltaLake.Table(Source),\n#"Filtered Rows" = Table.SelectRows(ToDelta, each [cca2] = "{part}")\nin\n #"Filtered Rows"\n in\n Source'.format(part=partsToAddViaTMSL1.loc[i, "partitionToRefresh"]),
              }
          }
      }
  }
  print(data)
  fabric.execute_tmsl(workspace=workspace,refresh_tom_cache=True,script=data)

In [None]:
#Inner join - check if the partitions were created in the target
df = get_partition_refreshes(dataset=dataset, workspace=workspace)
dfFinal = df.merge(PartsToLoad1,left_on="partition_name",right_on="partitionToRefresh",how="inner")
display(dfFinal)

In [None]:
# Objects to refresh, define using a dictionary
objects_to_refresh = []
for index, row in dfFinal.iterrows():
    objects_to_refresh.append({
        "table": row['table_name'],
        "partition": row['partitionToRefresh']
    })

objects_to_refresh

In [None]:
# Refresh the dataset
fabric.refresh_tom_cache(workspace)
fabric.refresh_dataset(
    workspace=workspace,
    dataset=dataset, 
    objects=objects_to_refresh,
    verbose=1,
    refresh_type="full"
)

# List the refresh requests
fabric.list_refresh_requests(dataset=dataset, workspace=workspace)
