In [1]:
%pip install ms-fabric-cli --quiet

Note: you may need to restart the kernel to use updated packages.


# Variables

## First Time Setup
If it´s the first time running the script, set the parameter 'FIRST_RUN' as 'True', else set it up as 'False'

In [2]:
FIRST_RUN = True

## Capacity

In [3]:
# Specify if you want to install the Capacity Module
INSTALL_CAPACITY_MODULE = True

# Capacity ID of the capacity you want to setup. You can change it manually later in the Eventstream, 
# or add more capacities
capacity_id = f"31998247-b1ac-4a64-b855-b58555596473"

## Audit and Inventory

In [4]:
# Specify if you want to install the Platform Monitoring Module
INSTALL_PLATFORM_MONITORING_MODULE = True

### For the Platform Monitoring Module, 
### you need to setup an Azure Key Vault to store the SPN with the Admin API Access,
### and Admin access to the gateways to list. The SPN needs to be in a Servurity Group with
### the Tenant settings access to Admin and Regular Fabric API.
###
### IMPORTANT: Add the Security group also to this workspace

# Key Vault URI
key_vault_uri = f"https://mrtacatkeyvault.vault.azure.net/"

# Key Vault secret name with the tenant id
key_vault_tenant_id = f"tenant-id"

# Key vault secret name with the App Id of the Service Principal
key_vault_client_id = f"fabric-admin-api-sp-id"

# Key vault secret name with the secret of the Service Principal
key_vault_client_secret = f"fabric-admin-api-sp-secret"

## Gateway

In [5]:
# Specify if you want to install the Gateway Module
INSTALL_GATEWAY_MODULE = True

## Source Git Repo

In [6]:
##### DO NET CHANGE UNLESS SPECIFIED OTHERWISE ####
repo_owner = "ecotte"
repo_name = "Fabric-Monitoring-RTI"
branch = "Capacity"
folder_prefix = ""
###################################################

# Process

## Load Libraries

In [7]:
import subprocess
import os
import json
from zipfile import ZipFile 
import shutil
import re
import requests
import zipfile
from io import BytesIO
import yaml
import sempy.fabric as fabric

## Download of source & config files
This part downloads all source and config files needed for the deployment into the ressources of the notebook

In [8]:
def download_folder_as_zip(repo_owner, repo_name, output_zip, branch="main", folder_to_extract="src",  remove_folder_prefix = ""):
    # Construct the URL for the GitHub API to download the repository as a zip file
    url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/zipball/{branch}"
    
    # Make a request to the GitHub API
    response = requests.get(url)
    response.raise_for_status()

    folder_to_extract = f"/{folder_to_extract}" if folder_to_extract[0] != "/" else folder_to_extract
    
    # Ensure the directory for the output zip file exists
    os.makedirs(os.path.dirname(output_zip), exist_ok=True)
    
    # Create a zip file in memory
    with zipfile.ZipFile(BytesIO(response.content)) as zipf:
        with zipfile.ZipFile(output_zip, 'w') as output_zipf:
            for file_info in zipf.infolist():
                parts = file_info.filename.split('/')
                if  re.sub(r'^.*?/', '/', file_info.filename).startswith(folder_to_extract): 
                    # Extract only the specified folder
                    file_data = zipf.read(file_info.filename)  
                    if folder_prefix != "":
                        parts.remove(remove_folder_prefix)
                    output_zipf.writestr(('/'.join(parts[1:])), file_data)

def uncompress_zip_to_folder(zip_path, extract_to):
    # Ensure the directory for extraction exists
    os.makedirs(extract_to, exist_ok=True)
    
    # Uncompress all files from the zip into the specified folder
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    
    # Delete the original zip file
    os.remove(zip_path)

download_folder_as_zip(repo_owner, repo_name, output_zip = "./builtin/src/src.zip", branch = branch, folder_to_extract= f"{folder_prefix}/src", remove_folder_prefix = f"{folder_prefix}")
download_folder_as_zip(repo_owner, repo_name, output_zip = "./builtin/config/config.zip", branch = branch, folder_to_extract= f"{folder_prefix}/config" , remove_folder_prefix = f"{folder_prefix}")
uncompress_zip_to_folder(zip_path = "./builtin/config/config.zip", extract_to= "./builtin")

## Definition of deployment functions

In [9]:
def run_fab_command( command, capture_output: bool = False, silently_continue: bool = False, raw_output: bool = False):
    result = subprocess.run(["fab", "-c", command], capture_output=capture_output, text=True)
    if (not(silently_continue) and (result.returncode > 0 or result.stderr)):
       raise Exception(f"Error running fab command. exit_code: '{result.returncode}'; stderr: '{result}'")    
    if (capture_output and not raw_output): 
        output = result.stdout.strip()
        return output
    elif (capture_output and raw_output):
        return result

def fab_get_id(name):
    id = run_fab_command(f"get /{trg_workspace_name}.Workspace/{name} -q id" , capture_output = True, silently_continue= True)
    return(id)

def fab_get_item(name):
    item = run_fab_command(f"get /{trg_workspace_name}.Workspace/{name}" , capture_output = True, silently_continue= True)
    return(item)

def fab_get_display_name(name):
    display_name = run_fab_command(f"get /{trg_workspace_name}.Workspace/{name} -q displayName" , capture_output = True, silently_continue= True)
    return(display_name)

def fab_get_kusto_query_uri(name):
    connection = run_fab_command(f"get /{trg_workspace_name}.Workspace/{name} -q properties.queryServiceUri -f", capture_output = True, silently_continue= True)
    connection = connection.split("\n")[1]
    return(connection)

def fab_get_kusto_ingest_uri(name):
    connection = run_fab_command(f"get /{trg_workspace_name}.Workspace/{name} -q properties.ingestionServiceUri -f", capture_output = True, silently_continue= True)
    connection = connection.split("\n")[1]
    return(connection)

def fab_get_folders():
    response = run_fab_command(f"api workspaces/{trg_workspace_id}/folders", capture_output = True, silently_continue= True)
    return(json.loads(response).get('text',{}).get('value',[]))

def fab_get_folder(folder_name):
    for f in fab_get_folders():
        if f.get('displayName') == folder_name:
            return f
    return None

def fab_assign_item_folder(name,folder):
    folder_details = fab_get_folder(folder)
    item_id = fab_get_id(name)

    if folder_details is None:
        payload = json.dumps({"displayName": folder})
        folder_details = run_fab_command(f"api -X post workspaces/{trg_workspace_id}/folders -i {payload}", capture_output = True, silently_continue= True)
        folder_details = json.loads(folder_details).get('text',{})

    payload = json.dumps({"folder": folder_details.get('id')})

    return run_fab_command(f"api -X patch workspaces/{trg_workspace_id}/items/{item_id} -i {payload}", capture_output = True, silently_continue= True)

def fab_add_schedule(name):
    item = run_fab_command(f"get /{trg_workspace_name}.Workspace/{name} -q schedules" , capture_output = True, silently_continue= True)

    if len(json.loads(item)) == 0:
        schedule = get_schedule_by_name(name)

        return run_fab_command(f"job run-sch /{trg_workspace_name}.Workspace/{name} -i {json.dumps(schedule)}" , capture_output = True, silently_continue=True)

    return f"""Job schedule for '{name}' already exists...
* Job schedule {item}""" 

def get_id_by_name(name):
    for it in deployment_order:
        if it.get("name") == name:
                return it.get("id")
    return None

def get_schedule_by_name(name):
    for it in deployment_order:
        if it.get("name") == name:
                return it.get("schedule")
    return None

def copy_to_tmp(name,child=None):
    child_path = "" if child is None else f".children/{child}/"
    shutil.rmtree("./builtin/tmp",  ignore_errors=True)
    path2zip = "./builtin/src/src.zip"
    with  ZipFile(path2zip) as archive:
        for file in archive.namelist():
            if file.startswith(f'src/{name}/{child_path}'):
                archive.extract(file, './builtin/tmp')
    return(f"./builtin/tmp/src/{name}/{child_path}" )

def replace_ids_in_folder(folder_path, mapping_table):
    for root, _, files in os.walk(folder_path):
        for file_name in files:
            if file_name.endswith(('.py', '.json', '.pbir', '.platform', '.ipynb', '.tmdl')) and not file_name.endswith('report.json'):
                file_path = os.path.join(root, file_name)
                with open(file_path, 'r', encoding='utf-8') as file:
                    content = file.read()
                    for mapping in mapping_table:  
                        content = content.replace(mapping["old_id"], mapping["new_id"])
                with open(file_path, 'w', encoding='utf-8') as file:
                    file.write(content)

def deploy_item(name,child=None,it=None):
    print("")
    print("#############################################")
    print(f"Deploying {name}")

    # Copy and replace IDs in the item
    tmp_path = copy_to_tmp(name,child)

    name = name if child is None else child

    cli_parameter = ''
    if ".Notebook" in name:
        cli_parameter = cli_parameter + " --format .py"

    replace_ids_in_folder(tmp_path, mapping_table)    
    
    run_fab_command(f"import  /{trg_workspace_name}.Workspace/{name} -i {tmp_path} -f {cli_parameter} ", silently_continue= True)
    new_id= fab_get_id(name)

    if ".KQLDatabase" in name:
        display_name = fab_get_display_name(name)
        mapping_table.append({"Description": "KQL DB Name" , "old_id": "{kusto_db_name}", "new_id": display_name })
    elif ".Eventhouse" in name:
        query_uri = fab_get_kusto_query_uri(name)
        mapping_table.append({"Description": "Kusto Query Uri" , "old_id": "{kusto_query_uri}", "new_id": query_uri })
        mapping_table.append({"Description": "Kusto Query Uri" , "old_id": '"clusterUri": ""', "new_id": f'"clusterUri": "{query_uri}"' })
        ingest_uri = fab_get_kusto_ingest_uri(name)
        mapping_table.append({"Description": "Kusto Ingest Uri" , "old_id": "{kusto_ingest_uri}", "new_id": ingest_uri })
    mapping_table.append({ "old_id": it["id"], "new_id": new_id })

## CLI Login

In [10]:
# Set environment parameters for Fabric CLI
token = notebookutils.credentials.getToken('pbi')
os.environ['FAB_TOKEN'] = token
os.environ['FAB_TOKEN_ONELAKE'] = token

## Get current Workspace
This cell gets the current workspace to deploy automatically inside it

In [11]:
base_path = './builtin/'

deploy_order_path = os.path.join(base_path, 'config/deployment_order.json')
with open(deploy_order_path, 'r') as file:
        deployment_order = json.load(file)

src_workspace_name = "Workspace.src"
src_workspace_id = get_id_by_name(src_workspace_name)

src_capacity_name = "FabricCapacity"

mapping_table=[]

trg_workspace_id = fabric.get_notebook_workspace_id()
res = run_fab_command(f"api -X get workspaces/{trg_workspace_id}" , capture_output = True, silently_continue=True)
trg_workspace_name = json.loads(res)["text"]["displayName"]



print(f"Current workspace: {trg_workspace_name}")
print(f"Current workspace ID: {trg_workspace_id}")


# mapping_table.append({ "old_id": get_id_by_name(src_capacity_name), "new_id": capacity_id })
mapping_table.append({"Description": "Workspace Id" , "old_id": get_id_by_name(src_workspace_name), "new_id": trg_workspace_id })
mapping_table.append({"Description": "Workspace Id" , "old_id": "00000000-0000-0000-0000-000000000000", "new_id": trg_workspace_id })
mapping_table.append({"Description": "KeyVault" , "old_id": "{key_vault_uri}", "new_id": key_vault_uri })
mapping_table.append({"Description": "KeyVault" , "old_id": "{key_vault_tenant_id}", "new_id": key_vault_tenant_id })
mapping_table.append({"Description": "KeyVault" , "old_id": "{key_vault_client_id}", "new_id": key_vault_client_id })
mapping_table.append({"Description": "KeyVault" , "old_id": "{key_vault_client_secret}", "new_id": key_vault_client_secret })

display(mapping_table)

Current workspace: [Dev] Microsoft Fabric Platform Monitoring
Current workspace ID: 9abe4198-c12a-4ea7-8442-81bf06eb9097


## Deployment Logic
This part iterates through all the items, gets the respective source code, replaces all IDs dynamically and deploys the new item

In [12]:
exclude = [src_workspace_name,src_capacity_name]

for it in deployment_order:

    new_id = None
    
    name = it["name"]
    type = it["type"]

    if not INSTALL_CAPACITY_MODULE and type == "Capacity":
        continue
    elif not INSTALL_GATEWAY_MODULE and type == "Gateway":
        continue
    elif not INSTALL_PLATFORM_MONITORING_MODULE and type == "PlatformMonitoring":
        continue

    if name in exclude:
        continue

    if not FIRST_RUN and ".Eventstream" in name:
        continue

    deploy_item(name,None,it)

    for child in it.get("children",[]):
        child_name = child["name"]
        deploy_item(name,child_name,child)   



#############################################
Deploying Platform and Audit DB.Eventhouse
! An item with the same name exists
Importing (update) './builtin/tmp/src/Platform and Audit DB.Eventhouse/' → '/[Dev] Microsoft Fabric Platform Monitoring.Workspace/Platform and Audit DB.Eventhouse'...
* 'Platform and Audit DB.Eventhouse' imported

#############################################
Deploying Platform and Audit DB.KQLDatabase
! An item with the same name exists
Importing (update) './builtin/tmp/src/Platform and Audit DB.KQLDatabase/' → '/[Dev] Microsoft Fabric Platform Monitoring.Workspace/Platform and Audit DB.KQLDatabase'...
* 'Platform and Audit DB.KQLDatabase' imported

#############################################
Deploying CapacityUtilizationEvents.Eventstream
! An item with the same name exists
Importing (update) './builtin/tmp/src/CapacityUtilizationEvents.Eventstream/' → '/[Dev] Microsoft Fabric Platform Monitoring.Workspace/CapacityUtilizationEvents.Eventstream'...
* 'Capacit