In [27]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from functools import reduce
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import to_timestamp, from_utc_timestamp

remove_tables=False

In [28]:
from azure.storage.blob import BlobClient, BlobServiceClient
from azure.identity import ClientSecretCredential

def read_from_file(blob_name):
    blob_service_client = BlobServiceClient.from_connection_string(os.getenv("STORAGE_ACCOUNT_CONNECTION_STRING"))
    blob_client = blob_service_client.get_blob_client(container=os.getenv("STORAGE_ACCOUNT_CONTAINER_NAME"), blob=blob_name)
    try:
        blob_content = blob_client.download_blob().readall()
        return blob_content 
    except Exception as e:
        print(f"Blob not found: {e}")
        return None

In [29]:
def list_blobs_in_path(path):
    container_client = blob_service_client.get_container_client(STORAGE_ACCOUNT_CONTAINER_NAME)
    blob_list = container_client.list_blobs(name_starts_with=path)
    return [blob.name for blob in blob_list]

# Example usage:
blobs_in_path = list_blobs_in_path("stage/catalog/scans/2025/07/15/")
print(blobs_in_path)



['stage/catalog/scans/2025/07/15/20250715_00001.scanResults.json']


In [31]:
blobs = read_from_file(blobs_in_path[0])  # Assuming blobs_in_path is not empty and contains the path to the JSON file


In [45]:
import io
import json
import pandas as pd
from pyspark.sql import SparkSession

# Convert binary JSON data to string
json_str = blobs.decode("utf-8")
# Parse JSON string to Python object
json_obj = json.loads(json_str)

# If the JSON is a list of records, create DataFrame directly
if isinstance(json_obj, list):
    df = spark.createDataFrame(json_obj)
else:
    # If it's a dict, wrap in a list
    dfs = []
    for workspace in json_obj["workspaces"]:
        df = pd.json_normalize([workspace])
        dfs.append(df)

    # Concatenate all DataFrames in the list
    final_df = pd.concat(dfs, ignore_index=True)

final_df = final_df.rename(columns={"id": "workspaceId"})
display(final_df.head(2))


Unnamed: 0,workspaceId,name,description,type,state,isOnDedicatedCapacity,capacityId,defaultDatasetStorageFormat,reports,dashboards,...,Reflex,Notebook,warehouses,SQLAnalyticsEndpoint,MLExperiment,MLModel,Eventstream,KQLDashboard,KQLDatabase,Eventhouse
0,a13a9f8a-0469-4efb-96c5-0be69ba5e83b,fill-in-Something-New,Admin workspace for admin reports,Workspace,Active,True,98E82059-755F-4163-AC49-B21C5E9243D3,Unknown,[],[],...,,,,,,,,,,
1,cb79cb87-3582-4e2c-bca3-fbd68f68d2da,fill-in-FabricMonitor,,Workspace,Active,True,98E82059-755F-4163-AC49-B21C5E9243D3,Small,"[{'reportType': 'PowerBIReport', 'id': '0bde31...",[],...,[{'id': '1b5f317a-637f-49f2-97a1-fda1996a2b2a'...,[{'id': 'e9d24c5b-c4bf-4642-8722-99e63727ed45'...,[{'id': '14f359f5-7d2d-42e0-9299-e74646bc7539'...,[{'id': 'd7d5d78b-18da-439b-a8a4-231c791f381f'...,,,,,,


In [None]:
import inspect


# Example: Flatten the 'datasets' column (which contains JSON/list data) into a new DataFrame
# Replace 'datasets' with the column you want to flatten

# Select the column with JSON/list data
def flatten_json_column(df, json_col, parent_id_col):
    """
    Flattens a column containing lists of dicts in a DataFrame and attaches the parent id column.
    """
    if json_col in df.columns and parent_id_col in df.columns:
        df = df.reset_index(drop=True)
        exploded = df[[json_col, parent_id_col]].explode(json_col)
        exploded = exploded[exploded[json_col].notna()]
        if not exploded.empty:
            normalized = pd.json_normalize(exploded[json_col])
            normalized[parent_id_col] = exploded[parent_id_col].values
            return normalized
    return pd.DataFrame()

# # Example usage:
# # normalized = flatten_json_column(final_df, json_col, 'workspaceId')
# # display(normalized.head(2))
# # Explode the column if it's a list of dicts
# if json_col in final_df.columns:
#     final_df = final_df.reset_index(drop=True)
#     exploded = final_df[[json_col, 'workspaceId']].explode(json_col)
#     exploded = exploded[exploded[json_col].notna()]
#     if not exploded.empty:
#         normalized = pd.json_normalize(exploded[json_col])
#         normalized['workspaceId'] = exploded['workspaceId'].values
#         display(normalized.head(2))


In [50]:
# datasets = flatten_json_column(final_df, 'datasets', 'workspaceId')
tables = flatten_json_column(datasets, 'tables', 'id')
display(tables.head(2))

Unnamed: 0,name,isHidden,storageMode,columns,measures,source,id
0,tenant,False,DirectLake,"[{'name': 'canSpecifySecurityGroups', 'dataTyp...",[],"[{'expression': 'tenant', 'schemaName': 'monit...",6e083520-f9d6-47b5-b289-3bc95739d7f4
1,activity,False,DirectLake,"[{'name': 'Activity', 'dataType': 'String', 'i...",[],"[{'expression': 'activity', 'schemaName': 'dbo'}]",6e083520-f9d6-47b5-b289-3bc95739d7f4


In [53]:
import sempy.fabric as fabric
import pandas as pd

# List datasets in your workspace
df_datasets = fabric.list_datasets()
print(df_datasets)

# Replace with your dataset name
dataset_name = "fill-in-Microsoft Fabric Capacity Metrics"

# List tables in the dataset
df_tables = fabric.list_tables(dataset_name, include_columns=True)
print(df_tables)

df = fabric.read_table(dataset=dataset_name, table="MetricsByItemandOperationandDay")
df.show(5)


log_telemetry: sempy.relationships
log_telemetry: sempy.dependencies


{'func': 'sempy.fabric._flat.list_datasets', 'total_seconds': 0.143}
Traceback (most recent call last):
  File "/home/brandon/miniconda3/envs/nbks/lib/python3.12/site-packages/sempy/_utils/_log.py", line 371, in log_decorator_wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/home/brandon/miniconda3/envs/nbks/lib/python3.12/site-packages/sempy/fabric/_flat.py", line 203, in list_datasets
    return _get_or_create_workspace_client(workspace).get_datasets(mode, additional_xmla_properties)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/brandon/miniconda3/envs/nbks/lib/python3.12/site-packages/sempy/fabric/_cache.py", line 32, in _get_or_create_workspace_client
    client = WorkspaceClient(workspace)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/brandon/miniconda3/envs/nbks/lib/python3.12/site-packages/sempy/fabric/_client/_workspace_client.py", line 62, in __init__
    _init_analysis_services()
  File "/home/brandon/minicond

log_telemetry: sempy.fabric
log_telemetry: sempy


RuntimeError: Can not determine dotnet root

In [None]:
tmp = spark.read.option("multiline","true").json(f"Files/stage/catalog/scans/*/*/*/*.json").withColumn("file_Name", input_file_name())
tmp = tmp.withColumn("ts_year", split(tmp['file_Name'],"/")[8])
tmp = tmp.withColumn("ts_month", split(tmp['file_Name'],"/")[9])
tmp = tmp.withColumn("ts_day", split(tmp['file_Name'],"/")[10])


In [None]:
df_workspaces = tmp \
    .withColumn("workspaces", explode(tmp["workspaces"])) \
    .drop(tmp["datasourceInstances"])

In [None]:
def extract_value(string):
    start = string.find(':') + 1
    end = string.find('(')
    return string[start:end].strip()

def extract_name(string):
    start = string.find('--') + 1
    end = string.find(':')
    return string[start:end].strip()

In [None]:
def get_meta(string):
    start = string.find(':') + 1
    end = string.find('(')
    value = string[start:end].strip()

    start_v = string.find('--') + 1
    end_v = string.find(':')
    key = string[start_v:end_v].split("-")[-1].strip()    

    meta = dict()
    meta[key]=value

    return meta

In [None]:
def build_df(df):

    lst = df._jdf.schema().treeString()
    lines = lst.split("\n")

    artifacts = list()
    arrays = list()
    root_columns = list()
    fields = list()
    meta = dict()

    for line in lines:
        x = line.split(sep="|")
        typ = extract_value(line)
        name = extract_name(line).split("-")[-1].strip()

        if len(x)==2:
            if typ not in ['struct','array']:
                fields.append(name)
                q = get_meta(line)
                meta[name]=q[name]
        if len(x)==3:
            if typ not in ['struct','array']:
                fields.append(name)
                q = get_meta(line)
                meta[name]=q[name]
            else:
                if typ == 'array':
                    arrays.append(name)


    return (arrays,fields,meta)



In [None]:
from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


In [None]:
def find_duplicates(lst):
    duplicates = {}
    for index, value in enumerate(lst):
        if value in duplicates:
            duplicates[value].append(index)
        else:
            duplicates[value] = [index]
    return {key: value for key, value in duplicates.items() if len(value) > 1}


In [None]:
from pyspark.sql.types import ( 
    StringType, BooleanType, IntegerType, FloatType, DateType, LongType, TimestampType, BinaryType, DoubleType 
) 
  

dataTypes = dict(
{
    "string":	StringType(),	
    "integer":	IntegerType(),	
    "long":	    LongType(),	
    "float":	FloatType(),	
    "double":	DoubleType(),	
    "boolean":	BooleanType(),	
    "date":	    DateType(),	
    "timestamp":TimestampType(),
    "binary":	BinaryType()
})

In [None]:
test = "bronze/catalog"
nxt = "Silver/catalog"
save_df = True

In [None]:
subitems = list()

lst = df_workspaces._jdf.schema().treeString()

lines = lst.split("\n")

for line in lines:
    x = line.split(sep="|")
    typ = extract_value(line)
    name = extract_name(line).split("-")[-1].strip()

    if len(x)==3:
        if typ == "array":
            subitems.append(name)

In [None]:
datasets = df_workspaces.withColumn("datasets", explode(df_workspaces["workspaces"]["datasets"])).drop("workspaces")

In [None]:
import random
import pandas as pd 


workspace = df_workspaces
ww = build_df(workspace)

for field in ww[1]:
    if field not in ["file_Name","ts_year","ts_month","ts_day"]:
        workspace = workspace.withColumn(field, workspace['workspaces'][field])

workspace = workspace.drop("workspaces")
workspace.write.format("parquet").mode("append").save(f"Files/{test}/workspaces")

for item in subitems:
    # if item not in "datasets": continue
    dp = df_workspaces.withColumn("dp", explode(df_workspaces["workspaces"][item]))\
        .withColumn("workspaceId", df_workspaces["workspaces"]["id"])\
        .drop("workspaces") 

    df = build_df(dp)

    for field in df[1]:
        meta = df[2]
        if field not in dp.columns:
            dp = dp.withColumn(field, dp['dp'][field].cast(meta[field]))

    if item not in "dataflows":
        pkfk_1 = f"{item}id"
        dp = dp.withColumnRenamed("id", pkfk_1 )
        dp = dp.withColumn("UID", concat(col(f"{item}id"), lit("-"), col("ts_year"), col("ts_month"),col("ts_day")))
    else:
        pkfk_1 = f"{item}objectid"
        dp = dp.withColumnRenamed("objectid", pkfk_1 )
        dp = dp.withColumn("UID", concat(col(f"{item}objectid"), lit("-"), col("ts_year"), col("ts_month"),col("ts_day")))


    if save_df:
        print(f"saving...Files/{test}/{item}")
        dp.drop("dp").write.format("parquet").mode("overwrite").save(f"Files/{test}/{item}")    




    print(f"Working on item: {item}")
    dataframes = dict()


    # make the sub dataframes of the parent
    for array in df[0]:
        id = "id"
        if item in "dataflows":
            id = "objectId"
        elif array in "tables":
            id = "id" # "name"
        elif item == "dataflows" and array == "relations":            
            id = "objectId"
        elif item in "dataflows" and array in "datasourceUsages":
            # id = "datasourceInstanceId"
            id = "objectId"

        print(f"what array are we using:{array}")
       
        dataframes[array] = dp.withColumn(array, explode(dp["dp"][array]))\
                .withColumn(f"{item}id", dp[pkfk_1])\
                .select(pkfk_1,"ts_year", "ts_month", "ts_day", array)

        dataframes[array] = dataframes[array].withColumn(f"{item}_UID",concat(col(pkfk_1),lit("-"),col("ts_year"),col("ts_month"),col("ts_day")))
        if array in ["roles","tables"]:
            print("i have entered the other side before")
            display(dataframes[array])
            dataframes[array] = dataframes[array].withColumn(f"{array}_UID", concat(col(f"{item}_UID"),lit("-"),dataframes[array][array]["name"]))
            print("i have entered the other side after")                  
            display(dataframes[array])
            
        

        # check for any arrays in the sub items
        df2 = build_df(dataframes[array])
        print("what is the extra",df2[0])

        if df2[0]:
            dataframes2 = dict()

            # display(dataframes[array])
            for array2 in df2[0]:
                
                tt = dataframes[array].withColumn(array2, explode(dataframes[array][array][array2]))
                tt = tt.withColumn(f"{array}_UID", concat(f"{item}_UID",lit("-"),tt[array]["name"]))
                tt = tt.drop(array)
                tt = tt.select(flatten(tt.schema))     
                # display(tt.limit(5))

                # some of the children tables have the same name as the parent. The children columns
                # will be appended with their index from the columns
                dups = find_duplicates(tt.columns)
                if dups:
                    for k in dups:
                        for i in range(1,len(dups[k])):
                            idx = dups[k][i]

                            # there isn't a way to do this with pyspark as renamed columns will in our case find the name of the column and rename it
                            # but we have duplicate column names and this will rename both the columns and not the column at the indexed position
                            # however, pandas does have that feature so we can rename a column at a specific index

                            t = tt.toPandas()
                            t.columns.values[idx]=f"{array2}_{k}"
                            tt = spark.createDataFrame(t)

                            if save_df:
                                print(f"saving...sub-sub items...Files/{test}/{item}/{array}/{array2}")
                                try:
                                    # check to see if the folder exists
                                    # if it does then append the files to the folder                            
                                    item = mssparkutils.fs.ls(f"Files/{test}/{item}/{array}/{array2}")
                                    tt.write.format("parquet").mode("append").save(f"Files/{test}/{item}/{array}/{array2}")
                                except Exception as e:
                                    pass
                                    tt.write.format("parquet").save(f"Files/{test}/{item}/{array}/{array2}")
                else:
                    if save_df:
                        print(f"saving...sub sub-items...Files/{test}/{item}/{array}/{array2}")
                        try:
                            # check to see if the folder exists
                            # if it does then append the files to the folder                            
                            item = mssparkutils.fs.ls(f"Files/{test}/{item}/{array}/{array2}")
                            tt.write.format("parquet").mode("append").save(f"Files/{test}/{item}/{array}/{array2}")
                        except Exception as e:
                            pass
                            tt.write.format("parquet").save(f"Files/{test}/{item}/{array}/{array2}")

                del(tt)


        # drop each sub dataframe from the parent
        dp = dp.drop(array)


    tmp = dict()

    for key in dataframes:
        print(f"what is the key {key}")
        output = build_df(dataframes[key])
        for field in output[1]:
            if field not in dataframes[key].columns:
                dataframes[key] = dataframes[key].withColumn(field, dataframes[key][key][field])

        dataframes[key] = dataframes[key].drop(key)
        if save_df:
            print(f"saving...sub items...Files/{test}/{item}/{key}")
            dataframes[key].write.format("parquet").mode("append").save(f"Files/{test}/{item}/{key}")
    
    del(dataframes)
    del(df)

In [None]:
for f in mssparkutils.fs.ls(f"Files/{nxt}"):
    if "_SUCCESS" in f.name or f.name.endswith("parquet"):continue
    try:
        df = spark.read.format("parquet").load(f"Files/{nxt}/{f.name}")

        df.write.format("delta").mode("overwrite").option("overwriteSchema","true").save(f"Tables/catalog_{f.name}")
    except Exception as e:
        print(f"{f.name} Part 1 threw an error {e} for {f.name}")

    for ff in mssparkutils.fs.ls(f"Files/{nxt}/{f.name}"):
        if "_SUCCESS" in ff.name or ff.name.endswith("parquet"):continue
        try:
            dff = spark.read.format("parquet").load(f"Files/{nxt}/{f.name}/{ff.name}")

            dff.write.format("delta").mode("overwrite").option("overwriteSchema","true").save(f"Tables/catalog_{f.name}_{ff.name}")
        except Exception as ee:
            print(f"{ff.name} threw an error {ee}")

        for fff in mssparkutils.fs.ls(f"Files/{nxt}/{f.name}/{ff.name}"):
            if "_SUCCESS" in fff.name or fff.name.endswith("parquet"):continue
            try:
                dfff = spark.read.format("parquet").load(f"Files/{nxt}/{f.name}/{ff.name}/{fff.name}")
                dfff.write.format("delta").mode("overwrite").option("overwriteSchema","true").save(f"Tables/catalog_{f.name}_{ff.name}_{fff.name}")
            except Exception as eee:
                print(f"{fff.name} threw an error {eee}")



In [None]:
dr = spark.read.option("multiline","true").json("Files/stage/datasetrefresh/*/*/*/*.json").withColumn("file_Name", input_file_name())

dr = dr.withColumn("ts_year", split(dr['file_Name'],"/")[7])\
    .withColumn("ts_month", split(dr['file_Name'],"/")[8])\
    .withColumn("ts_day", split(dr['file_Name'],"/")[9])
    

In [None]:
r = dr.select("dataset_id","refreshAttempts","ts_year","ts_month","ts_day")
r = r.withColumn("refreshAttemptsA", explode(r["refreshAttempts"]))

r = r.withColumn("startTime",r["refreshAttemptsA"]["startTime"])\
    .withColumn("endTime",r["refreshAttemptsA"]["endTime"])\
    .withColumn("attemptId",r["refreshAttemptsA"]["attemptId"])\
    .withColumn("type",r["refreshAttemptsA"]["type"])\
    .withColumn("serviceExceptionJson",r["refreshAttemptsA"]["serviceExceptionJson"])\
    .withColumn("dataset_UID", concat(col("dataset_id"),lit("-"),col("ts_year"),col("ts_month"),col("ts_day")))\
    .drop("refreshAttempts","refreshAttemptsA")

r = r.withColumn("errorCode", split(split(r["serviceExceptionJson"],",")[0],":")[1])\
    .withColumn("errorDescription", split(split(r["serviceExceptionJson"],",")[1],":")[1])\
    .drop("serviceExceptionJson")

cols = ["startTime","endTime"]

for c in cols:
    r = r.withColumn(f"{c}_UTC", from_utc_timestamp(c, "UTC"))


r.write.format("delta").mode("overwrite").option("overwriteSchema","true").save("Tables/datasetRefresh_attempts")

In [None]:
from pyspark.sql.types import StringType

dr = dr.withColumn("dataset_UID", concat(col("dataset_id"),lit("-"),col("ts_year"),col("ts_month"),col("ts_day")))
dr = dr.drop("refreshAttempts","serviceExceptionJson")

cols = ["startTime","endTime"]

for c in cols:
    dr = dr.withColumn(f"{c}_UTC", from_utc_timestamp(c, "UTC"))

dr = dr.withColumn("startDate",date_format(col("startTime_UTC"),"yyyyMMdd"))\
    .withColumn("startTime", date_format(col("startTime_UTC"), "h:mm:ss a"))

dr.write.format("delta").mode("overwrite").option("overwriteSchema","true").save("Tables/datasetRefresh")
