In [1]:
# Reference https://fabric.guru/programmatically-creating-managing-lakehouses-in-fabric

import requests
import pandas as pd
from requests.exceptions import HTTPError

def get_lakehouse_list_api():

    '''
    Sandeep Pawar  |   fabric.guru

    This function uses the Fabric REST API to get all the lakehouses in the tenant that the user has access to.

    '''
    base_url = "https://api.fabric.microsoft.com/v1/admin/items?type=Lakehouse"
    token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com/")
    headers = {"Authorization": f"Bearer {token}"}

    try:
        response = requests.get(base_url, headers=headers)
        response.raise_for_status()

        data = response.json()

        # Check if 'itemEntities' exists, may not exist for all items types
        if 'itemEntities' not in data:
            raise KeyError("'itemEntities' key not found in the response data")

        lakehouses = pd.json_normalize(data['itemEntities'], sep='_')
        return lakehouses

    except HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
    except Exception as err:
        print(f"An error occurred: {err}")

df_Lakehouses = get_lakehouse_list_api()
#display(df_Lakehouses)


StatementMeta(, 8d7fd7aa-c188-42f3-9836-b01257d95e1b, 3, Finished, Available, Finished)

In [2]:
# Code generated by Data Wrangler for pandas DataFrame
# Get Lakehouse ID, Lakehouse Name and Workspace ID
# Exclude Dataflows Staging

def clean_data(df_Lakehouses):
    # Filter rows based on column: 'name'
    df_Lakehouses = df_Lakehouses[~df_Lakehouses['name'].str.contains("DataflowsStaging", regex=False, na=False)]
    return df_Lakehouses

df_Lakehouses_clean = clean_data(df_Lakehouses.copy())
#df_Lakehouses_clean.head()

StatementMeta(, 8d7fd7aa-c188-42f3-9836-b01257d95e1b, 4, Finished, Available, Finished)

In [4]:
# Warehouses
# Reference https://fabric.guru/programmatically-creating-managing-lakehouses-in-fabric

import requests
import pandas as pd
from requests.exceptions import HTTPError

def get_warehouse_list_api():

    '''
    Sandeep Pawar  |   fabric.guru

    This function uses the Fabric REST API to get all the lakehouses in the tenant that the user has access to.

    '''
    base_url = "https://api.fabric.microsoft.com/v1/admin/items?type=Warehouse"
    token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com/")
    headers = {"Authorization": f"Bearer {token}"}

    try:
        response = requests.get(base_url, headers=headers)
        response.raise_for_status()

        data = response.json()

        # Check if 'itemEntities' exists, may not exist for all items types
        if 'itemEntities' not in data:
            raise KeyError("'itemEntities' key not found in the response data")

        lakehouses = pd.json_normalize(data['itemEntities'], sep='_')
        return lakehouses

    except HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
    except Exception as err:
        print(f"An error occurred: {err}")

df_Warehouses = get_warehouse_list_api()
#display(df_Warehouses)

StatementMeta(, 2a870d78-2948-4a5b-bc59-32c45802ca9e, 6, Finished, Available, Finished)

In [5]:
# Code generated by Data Wrangler for pandas DataFrame for Warehouses
# Get Lakehouse ID, Lakehouse Name and Workspace ID
# Exclude Dataflows Staging

def clean_data(df_Warehouses):
    # Filter rows based on column: 'name'
    df_Warehouses = df_Warehouses[~df_Warehouses['name'].str.contains("DataflowsStaging", regex=False, na=False)]
    return df_Warehouses

df_Warehouses_clean = clean_data(df_Warehouses.copy())
#df_Warehouses_clean.head()

StatementMeta(, 2a870d78-2948-4a5b-bc59-32c45802ca9e, 7, Finished, Available, Finished)

In [6]:
import requests
from azure.identity import ClientSecretCredential, AuthenticationRequiredError

def get_access_token(app_id, client_secret, directory_id):
    try:
        # Create the ClientSecretCredential using the provided credentials
        # How and where did I find the blog post to enable the App Registration with the valid permissions        

        ######################################################################################### 
        # Read secretes from Azure Key Vault
        #########################################################################################
        ## This is the name of my Azure Key Vault 
        key_vault = "https://domain-keyvault.vault.azure.net/"
        ## I have stored my tenant id as one of the secrets to make it easier to use when needed 
        tenant = mssparkutils.credentials.getSecret(key_vault , "tenantid") 
        ## This is my application Id for my service principal account 
        client = mssparkutils.credentials.getSecret(key_vault , "pbi-applicationid") 
        ## This is my Client Secret for my service principal account 
        clientsecret = mssparkutils.credentials.getSecret(key_vault , "powerbi-clientsecret") 

        # I need to show in the permissions where I selected the Azure Storage.
        credential = ClientSecretCredential(
            client_id= client,
            client_secret= clientsecret,
            tenant_id= tenant
        ) 

        # Use the credential to get the access token
        token = credential.get_token("https://storage.azure.com/.default").token

        return token, credential

    except AuthenticationRequiredError as e:
        print("Authentication failed. Please check your credentials.")
        raise e

    except Exception as e:
        print("An error occurred while getting the access token:")
        print(str(e))
        raise e
    
access_token, credential = get_access_token("appId", "secret", "tenantId")


def check_connection_with_onelake(access_token):
    base_url = "https://onelake.dfs.fabric.microsoft.com/WORKSPACE_GUID/LAKEHOUSE_GUID/Files/" 
    token_headers = {
        "Authorization": "Bearer " + access_token
    }

    try:
        response = requests.get(base_url, headers=token_headers)

        if response.status_code == 200:
            print("Connection with OneLake is successful.")
        else:
            print("Failed to connect with OneLake. Status code:", response.status_code)
            print(response.content)

    except requests.exceptions.RequestException as e:
        print("An error occurred while checking the connection:", str(e))

# Assuming 'access_token' is already defined and contains a valid access token
check_connection_with_onelake(access_token)

StatementMeta(, 2a870d78-2948-4a5b-bc59-32c45802ca9e, 8, Finished, Available, Finished)

Connection with OneLake is successful.


In [10]:
# Loop Through Data for Files
import datetime
from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import col, year, month, quarter
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import lit
import requests
import json
from pandas import json_normalize
from pyspark.sql import types as T 
from pyspark.sql import SparkSession 

# Create a Spark session
sparkdf = spark.createDataFrame(df_Lakehouses_clean)
spark = SparkSession.builder.appName("AddColumnExample").getOrCreate()
data_collect = sparkdf.collect()

# looping thorough each row of the dataframe

for row in data_collect:
    # Variables Below
    var_lakehouseId = row["id"]
    var_lakehouseName = row["name"]
    var_workspaceId = row["workspaceId"]

    try:
        
        # Get Files Details
        mssparkutils.credentials.getToken('storage')
        get_url = 'https://onelake.dfs.fabric.microsoft.com/'+ var_workspaceId +'/'+ var_lakehouseId +'/Files?recursive=True&resource=filesystem'
        # display(get_url)
        resp = requests.get(get_url, headers={"authorization":f"bearer {mssparkutils.credentials.getToken('storage')}"})
        mydata = json.loads(resp.text)

        # Convert to Data Frame
        pandas_df = json_normalize(mydata, 'paths')

        # Reference for DateTime Formats https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
        pandas_df['lastModified'] = pd.to_datetime(pandas_df.lastModified, format = "%a, %d %b %Y %H:%M:%S %Z" )

        #Convert to Spark Data Frame 
        df=spark.createDataFrame(pandas_df) 

        #Additional Columns are added 
        df = df.withColumn("LakehouseId" , lit(var_lakehouseId)) 
        df = df.withColumn("LakehouseName" , lit(var_lakehouseName)) 
        df = df.withColumn("WorkspaceId" , lit(var_workspaceId)) 
        df = df.withColumn('contentLength', df['contentLength'].cast(T.LongType())) 
        df = df.withColumn('lastModified', df['lastModified'].cast(T.TimestampType()))
        df = df.withColumn("creationTime", from_unixtime(df.creationTime / 1e9).cast(T.TimestampType()))
        df = df.withColumn("CurrentDateTime", lit(current_timestamp()))
        df = df.withColumn('Year', year(current_timestamp()))
        df = df.withColumn('Month', month(current_timestamp()))
        df = df.withColumn('Day', dayofmonth(current_timestamp()))
                       
        #Writing to Lake House
        df.write.mode("append").format("delta").partitionBy("Year","Month","Day").save("Tables/Onelake_Storage_Files") 

    except Exception as e:
        Error: {e}

StatementMeta(, 2a870d78-2948-4a5b-bc59-32c45802ca9e, 12, Finished, Available, Finished)

In [11]:
# Loop Through Data for Tables
import datetime
from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import col, year, month, quarter
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import lit
import requests
import json
from pandas import json_normalize
from pyspark.sql import types as T 
from pyspark.sql import SparkSession 

# Create a Spark session
sparkdf = spark.createDataFrame(df_Lakehouses_clean)
spark = SparkSession.builder.appName("AddColumnExample").getOrCreate()
data_collect = sparkdf.collect()

# looping thorough each row of the dataframe

for row in data_collect:
#

    var_lakehouseId = row["id"]
    var_lakehouseName = row["name"]
    var_workspaceId = row["workspaceId"]

    try:
        
        # Reference https://www.reddit.com/r/MicrosoftFabric/comments/1bnaz0a/api_to_fetch_list_of_files_in_a_onelake_folder/
        mssparkutils.credentials.getToken('storage')
        get_url = 'https://onelake.dfs.fabric.microsoft.com/'+ var_workspaceId +'/'+ var_lakehouseId +'/Tables?recursive=True&resource=filesystem'
        # display(get_url)
        resp = requests.get(get_url, headers={"authorization":f"bearer {mssparkutils.credentials.getToken('storage')}"})
        mydata = json.loads(resp.text)

        # Convert to Data Frame
        pandas_df = json_normalize(mydata, 'paths')

        # Reference for DateTime Formats https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
        pandas_df['lastModified'] = pd.to_datetime(pandas_df.lastModified, format = "%a, %d %b %Y %H:%M:%S %Z" )

        #Convert to Spark Data Frame 

        df=spark.createDataFrame(pandas_df) 

        #Additional Columns are added 
        df = df.withColumn("LakehouseId" , lit(var_lakehouseId)) 
        df = df.withColumn("LakehouseName" , lit(var_lakehouseName)) 
        df = df.withColumn("WorkspaceId" , lit(var_workspaceId)) 
        

        df = df.withColumn('contentLength', df['contentLength'].cast(T.LongType())) 
        df = df.withColumn('lastModified', df['lastModified'].cast(T.TimestampType()))
        df = df.withColumn("creationTime", from_unixtime(df.creationTime / 1e9).cast(T.TimestampType()))
        df = df.withColumn("CurrentDateTime", lit(current_timestamp()))
        df = df.withColumn('Year', year(current_timestamp()))
        df = df.withColumn('Month', month(current_timestamp()))
        df = df.withColumn('Day', dayofmonth(current_timestamp()))
        # Bulk Rename column Names replacing "]" with ""
               
        #Writing to Lake House
        df.write.mode("append").format("delta").partitionBy("Year","Month","Day").save("Tables/Onelake_Storage_Tables") 

    except Exception as e:
        Error: {e}

StatementMeta(, 2a870d78-2948-4a5b-bc59-32c45802ca9e, 13, Finished, Available, Finished)

In [12]:
# Loop Through Data for Warehouses
import datetime
from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import col, year, month, quarter
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import lit
import requests
import json
from pandas import json_normalize
from pyspark.sql import types as T 
from pyspark.sql import SparkSession 

# Create a Spark session
sparkdf = spark.createDataFrame(df_Warehouses_clean)
spark = SparkSession.builder.appName("AddColumnExample").getOrCreate()
data_collect = sparkdf.collect()

# looping thorough each row of the dataframe

for row in data_collect:
#

    var_lakehouseId = row["id"]
    var_lakehouseName = row["name"]
    var_workspaceId = row["workspaceId"]

    try:
        
        # Reference https://www.reddit.com/r/MicrosoftFabric/comments/1bnaz0a/api_to_fetch_list_of_files_in_a_onelake_folder/
        mssparkutils.credentials.getToken('storage')
        get_url = 'https://onelake.dfs.fabric.microsoft.com/'+ var_workspaceId +'/'+ var_lakehouseId +'/Tables?recursive=True&resource=filesystem'
        # display(get_url)
        resp = requests.get(get_url, headers={"authorization":f"bearer {mssparkutils.credentials.getToken('storage')}"})
        mydata = json.loads(resp.text)

        # Convert to Data Frame
        pandas_df = json_normalize(mydata, 'paths')

        # Reference for DateTime Formats https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
        pandas_df['lastModified'] = pd.to_datetime(pandas_df.lastModified, format = "%a, %d %b %Y %H:%M:%S %Z" )

        #Convert to Spark Data Frame 

        df=spark.createDataFrame(pandas_df) 

        #Additional Columns are added 
        df = df.withColumn("LakehouseId" , lit(var_lakehouseId)) 
        df = df.withColumn("LakehouseName" , lit(var_lakehouseName)) 
        df = df.withColumn("WorkspaceId" , lit(var_workspaceId)) 
        

        df = df.withColumn('contentLength', df['contentLength'].cast(T.LongType())) 
        df = df.withColumn('lastModified', df['lastModified'].cast(T.TimestampType()))
        df = df.withColumn("creationTime", from_unixtime(df.creationTime / 1e9).cast(T.TimestampType()))
        df = df.withColumn("CurrentDateTime", lit(current_timestamp()))
        df = df.withColumn('Year', year(current_timestamp()))
        df = df.withColumn('Month', month(current_timestamp()))
        df = df.withColumn('Day', dayofmonth(current_timestamp()))
        # Bulk Rename column Names replacing "]" with ""
               
        #Writing to Lake House
        df.write.mode("append").format("delta").partitionBy("Year","Month","Day").save("Tables/Onelake_Storage_Warehouses") 

    except Exception as e:
        Error: {e}

StatementMeta(, 2a870d78-2948-4a5b-bc59-32c45802ca9e, 14, Finished, Available, Finished)

In [None]:
# Downloading Workspace Details
import json, requests, pandas as pd 
import datetime 
from datetime import datetime,date,timedelta 

######################################################################################### 
# Read secretes from Azure Key Vault
#########################################################################################
## This is the name of my Azure Key Vault 
key_vault = "https://domain-keyvault.vault.azure.net/"
## I have stored my tenant id as one of the secrets to make it easier to use when needed 
tenant = mssparkutils.credentials.getSecret(key_vault , "tenantid") 
## This is my application Id for my service principal account 
client = mssparkutils.credentials.getSecret(key_vault , "pbi-applicationid") 
## This is my Client Secret for my service principal account 
client_secret = mssparkutils.credentials.getSecret(key_vault , "powerbi-clientsecret") 

######################################################################################### 
# Authentication - Replace string variables with your relevant values 
#########################################################################################  

import json, requests, pandas as pd 
import datetime  

try: 
    from azure.identity import ClientSecretCredential 
except Exception:
     !pip install azure.identity 
     from azure.identity import ClientSecretCredential 

# Generates the access token for the Service Principal 
api = 'https://analysis.windows.net/powerbi/api/.default' 
auth = ClientSecretCredential(authority = 'https://login.microsoftonline.com/', 
               tenant_id = tenant, 
               client_id = client, 
               client_secret = client_secret) 
access_token = auth.get_token(api)
access_token = access_token.token 

## This is where I store my header with the Access Token, because this is required when authenticating 
## to the Power BI Admin APIs 
header = {'Authorization': f'Bearer {access_token}'}  

print('\nSuccessfully authenticated.')

header = {'Authorization': f'Bearer {access_token}'} 

## Below is an example where I am querying the datasets Admin API and I need the access token in the
## headers 
base_url = 'https://api.powerbi.com/v1.0/myorg/admin/' 
refreshables_url = "groups?%24top=5000"  
workspace_response = requests.get(base_url + refreshables_url, headers=header)

# Get the response into a DataFrame
data_workspaces = workspace_response.json()['value']

df_workspaces = pd.DataFrame(data_workspaces)
# print(df_app_users)

# #Create SparkDataFrame
spark_workspaces = spark.createDataFrame(df_workspaces)

# Select specific columns
selected_spark_workspaces = spark_workspaces.select('id', 'isReadOnly','isOnDedicatedCapacity','description','type','state','name','capacityId','defaultDatasetStorageFormat','dataflowStorageId','pipelineId')

# Rename columns
selected_spark_workspaces = selected_spark_workspaces.withColumnRenamed("id", "WorkspaceId")
selected_spark_workspaces = selected_spark_workspaces.withColumnRenamed("name", "WorkspaceName")

#display(spark_workspaces)

# Write to Lakehouse Table
selected_spark_workspaces.write.mode("overwrite").option("mergeSchema", "true").format("delta").save(f"Tables/WorkspaceDetails")