##### Branch out to new workspace notebook

This notebook mimics the [branch out to new workspace functionality](https://blog.fabric.microsoft.com/en-us/blog/introducing-new-branching-capabilities-in-fabric-git-integration) in the Fabric UI.

In addition to this:
<ul>
<li>Default lakehouses are updated to the corresponding "local" lakehouse</li>
<li>Creates shortcuts to tables or shortcuts in the source lakehouse </li>
<li>Sets lakehouse connections for semantic models to "local" lakehouse</li> 
<li>Rebinds reports to "local" semantic models</li></ul>

Requirements:
<ul>
<li>Source workspace ("source") needs to be connected to Azure Devops Git repos</li>
<li>Target workspace ("local") will be recreated using the same capacity</li>
<li>Azure Devops PAT token required for creating new branch</li>
<li>Requires Semantic Link Labs installed by pip install below.</li></ul>

Limitations of current script:

<ul>
<li>Recreates items using git. Please see <a href=https://learn.microsoft.com/en-us/fabric/cicd/git-integration/intro-to-git-integration?tabs=azure-devops#supported-items> git supported items </a></li>
<li>Does not recreate workspace roles, direct shares or lakehouse data access roles</li>
<li>Only updates connections of direct lake semantic models which use a lakehouse. Warehouse could be easily supported also using Semantic Link Labs see <a href=https://semantic-link-labs.readthedocs.io/en/stable/sempy_labs.directlake.html#sempy_labs.directlake.update_direct_lake_model_connection/>this link</a>
</li></ul>


##### Set parameters
Before running this notebook ensure these parameters are set correctly. If necessary these can be passed in via a data factory pipeline

In [None]:
branch_name = '' #eg 'DEV_Custom_Feature_06_branch'
project_name='' #eg Fabric_CICD_Option3'
repo_name='' #eg 'Workspace_CICD_Option3'
main_branch = '' #eg 'main'
branch_to_new_ws = '' #eg 'Workspace_' + branch_name
FABRIC_API_URL = '' #eg "https://api.fabric.microsoft.com/v1"
ADO_API_URL = '' #eg "https://dev.azure.com/MCAPS553100"

# Azure Devops Key Vault Name and Secret Name
nameOfKeyVault = '' #eg 'azdosc'
secret_name = '' #eg 'azdopat'

# enter pattern match for creating shortcuts - see https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py 
PATTERN_MATCH = ["*"]


##### Install semantic link labs to support advanced functionality

https://semantic-link-labs.readthedocs.io/en/latest/index.html
https://github.com/microsoft/semantic-link-labs/blob/main/README.md



In [None]:
%pip -q install semantic-link-labs

##### Library imports and fabric rest client setup

https://learn.microsoft.com/en-us/python/api/semantic-link-sempy/sempy.fabric.fabricrestclient

In [None]:
import pandas as pd
import datetime
import re,json, fnmatch,os
import requests, base64
import sempy
import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException
from pyspark.sql import DataFrame
from pyspark.sql.functions import col,current_timestamp,lit
import sempy_labs as labs
from sempy_labs import migration, directlake
from sempy_labs import lakehouse as lake
from sempy_labs import report as rep
from sempy_labs.tom import connect_semantic_model

# instantiate the Fabric rest client
client = fabric.FabricRestClient()

# get the current workspace ID based on the context of where this notebook is run from
thisWsId = notebookutils.runtime.context['currentWorkspaceId']
thisWsName = notebookutils.runtime.context['currentWorkspaceName']

##### Get Workspace Git Connection Details

In [None]:
url = "/v1/workspaces/" + thisWsId + "/git/connection"
try:
    print("Retreiving git details for current workspace...")
    response = client.get(url)
    gitProviderDetailsJSON = response.json()['gitProviderDetails']
except Exception as error:
    errmsg =  "Couldn't get git connection status for current workspace."
    if (verbose):
            errmsg = errmsg + "Error: "+str(error)
    print(str(errmsg))



##### Create new AzDO branch based on main

In [None]:
def get_branch_object_id(project_name, repo_name, branch_name, token):
    try:
        print(f"Retriving ID of main branch {branch_name} to be cloned ")
        headers = {'Authorization': f'Basic {token}',
                    'Content-Type': 'application/json'
                    }
        #print(f"{ADO_API_URL}/{project_name}/_apis/git/repositories/{repo_name}/refs/heads/{branch_name}?api-version=7.1")
        response = requests.get(f"{ADO_API_URL}/{project_name}/_apis/git/repositories/{repo_name}/refs/heads/{branch_name}?api-version=7.1", headers=headers)
        return response.json()["value"][0]["objectId"]
    except requests.exceptions.RequestException as e:
        print(f"Error getting branch object ID: {e}")
        return None

def encode_pat(pat):
    # Encode the PAT in base64
    encoded_pat = base64.b64encode(pat.encode('utf-8')).decode('utf-8')
    return encoded_pat

access_token =notebookutils.credentials.getToken('keyvault')
url = f'https://{nameOfKeyVault}.vault.azure.net/secrets/{secret_name}?api-version=7.3'
headers = {
    'Authorization': f'Bearer {access_token}',
    'Content-Type': 'application/json'
}

response = requests.get(url, headers=headers)
if response.status_code == 200:
    #print(response.json()['value'])
    pat_token =  encode_pat(':'+response.json()['value'])
else:
    print(f"Failed to get secret: {response.status_code} - {response.text}")

try:
    print(f"Creating feature branch {branch_name} based on {main_branch} in progress")
    headers = {"Authorization": f"Basic {pat_token}", "Content-Type": "application/json"}
    data =  [
                {
            "name":f"refs/heads/{branch_name}",
            "oldObjectId": "0000000000000000000000000000000000000000",
            "newObjectId": get_branch_object_id(project_name, repo_name, main_branch, pat_token)
            }
        ]
    response = requests.post(f"{ADO_API_URL}/{project_name}/_apis/git/repositories/{repo_name}/refs?api-version=7.1", headers=headers, json=data)
    response.raise_for_status()
    print(f"Feature branch {branch_name} created")
except requests.exceptions.RequestException as e:
    print(f"Error creating Azure DevOps branch: {e}")


##### Create new feature branch workspace

In [None]:

try:
    # get current capacity ID
    response = client.get(f"v1/workspaces/{thisWsId}")
    current_capacity_id = response.json()['capacityId']
    # create new workspace
    print("Creating workspace: " + branch_to_new_ws + " in capacity "+ current_capacity_id +"...")
    response = fabric.create_workspace(branch_to_new_ws,current_capacity_id) 
    new_workspace_id = response
    print("Created workspace with ID: " + new_workspace_id)
except Exception as error:
    errmsg =  "Failed to recreate workspace " +branch_to_new_ws + " with capacity ID ("+ current_capacity_id + ") due to: "+str(error)
    print(errmsg)


##### Change the Git connection details to use new branch

In [None]:
gitpayloadstr= '{"gitProviderDetails": ' + json.dumps(gitProviderDetailsJSON) + '}'
print("Before: " + gitpayloadstr)
gitpayload = json.loads(gitpayloadstr)
gitpayload["gitProviderDetails"]["branchName"] = branch_name

print("After: " +json.dumps(gitpayload))

##### Connect new feature branch workspace to git and sync

In [None]:

url = "v1/workspaces/" +  new_workspace_id + "/git/connect"

try:
    print('Attempting to connect workspace '+ branch_to_new_ws)
    response = client.post(url,json= gitpayload)
    print(str(response.status_code) + response.text) 
    success = True
    
except Exception as error:
    errmsg =  "Couldn't connect git to workspace " + branch_to_new_ws + ". Error: "+str(error)
    print(str(errmsg))
    success = False
# If connection successful then try to initialise    
if (success):
    url = "/v1/workspaces/" + new_workspace_id + "/git//initializeConnection"
    payload = {"initializationStrategy":"PreferRemote"}
    try:
        print('Attempting to initialize git connection for workspace '+ branch_to_new_ws)
        response = client.post(url,json= payload)
        #print(str(response.status_code) + response.text) 
        commithash = response.json()['remoteCommitHash']
        print('Successfully initialized. Updating with commithash '+commithash)
        if commithash!='':
            url = "/v1/workspaces/" + new_workspace_id + "/git/updateFromGit"
            payload = '{"remoteCommitHash": "' + commithash + '","conflictResolution": {"conflictResolutionType": "Workspace","conflictResolutionPolicy": "PreferWorkspace"},"options": {"allowOverrideItems": true}}'
            response = client.post(url,json= json.loads(payload))
            print(str(response.status_code))
    except Exception as error:
        errmsg =  "Couldn't initialize git for workspace " +branch_to_new_ws + ". Error: "+str(error)
        print(str(errmsg))

In [None]:
import time
# wait for git items to sync
time.sleep(30)

##### Update default lakehouses for notebooks and create shortcuts

Update notebook dependencies:
https://github.com/PowerBiDevCamp/FabConWorkshopSweden/blob/main/DemoFiles/GitUpdateWorkspace/updateWorkspaceDependencies_v1.ipynb

Shortcut creator:
https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py 

In [None]:

# Extract workspace_id, item_id and path from a onelake URI
def extract_onelake_https_uri_components(uri):
    # Define a regular expression to match any string between slashes and capture the final path element(s) without the leading slash
    pattern = re.compile(r"abfss://([^@]+)@[^/]+/([^/]+)/(.*)")
    match = pattern.search(uri)
    if match:
        workspace_id, item_id, path = match.groups()
        return workspace_id, item_id, path
    else:
        return None, None, None


def is_valid_onelake_uri(uri: str) -> bool:
    workspace_id, item_id, path = extract_onelake_https_uri_components(uri)
    if "abfss://" not in uri or workspace_id is None or item_id is None or path is None:
        return False

    return True


def get_last_path_segment(uri: str):
    path = uri.split("/")  # Split the entire URI by '/'
    return path[-1] if path else None


def is_delta_table(uri: str):
    delta_log_path = os.path.join(uri, "_delta_log")
    return mssparkutils.fs.exists(delta_log_path)


def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):
    shortcut_uri = (
        f"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}"
    )
    result = client.get(shortcut_uri).json()
    return result


def is_folder_matching_pattern(path: str, folder_name: str, patterns: []):
    if folder_name in patterns:
        return True
    else:
        for pattern in patterns:
            if fnmatch.fnmatch(folder_name, pattern):
                return is_delta_table(path)

    return False


def get_matching_delta_tables_uris(uri: str, patterns: []) -> []:
    # Use a set to avoid duplicates
    matched_uris = set()
    files = mssparkutils.fs.ls(uri)
    folders = [item for item in files if item.isDir]

    # Filter folders to only those that matches the pattern and is a delta table
    matched_uris.update(
        folder.path
        for folder in folders
        if is_folder_matching_pattern(folder.path, folder.name, patterns)
    )

    return matched_uris


def create_onelake_shorcut(source_uri: str, dest_uri: str):
    src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(
        source_uri
    )

    dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(
        dest_uri
    )

    name = get_last_path_segment(source_uri)
    dest_uri_joined = os.path.join(dest_uri, name)

    # If the destination path already exists, return without creating shortcut
    if mssparkutils.fs.exists(dest_uri_joined):
        print(f"Destination already exists: {dest_uri_joined}")
        return None

    request_body = {
        "name": name,
        "path": dest_path,
        "target": {
            "oneLake": {
                "itemId": src_item_id,
                "path": src_path,
                "workspaceId": src_workspace_id,
            }
        },
    }

    shortcut_uri = f"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts"
    print(f"Creating shortcut: {shortcut_uri}/{name}..")
    try:
        client.post(shortcut_uri, json=request_body)
    except FabricHTTPException as e:
        print(e)
        return None

    return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)
   

for notebook in notebookutils.notebook.list(workspaceId=new_workspace_id):
    if notebook.displayName != 'Create Feature Branch':

        # Get the current notebook definition
        notebook_def = notebookutils.notebook.getDefinition(notebook.displayName,workspaceId=new_workspace_id)
        json_payload = json.loads(notebook_def)
        
        # Check and remove any attached lakehouses
        if 'dependencies' in json_payload['metadata'] and 'lakehouse' in json_payload['metadata']['dependencies']:
        # Remove all lakehouses
            current_lakehouse = json_payload['metadata']['dependencies']['lakehouse']
            json_payload['metadata']['dependencies']['lakehouse'] = {}

            #Update new notebook definition after removing existing lakehouses and with new default lakehouseId
            (notebookutils.notebook.updateDefinition(
                        name = notebook.displayName,
                        content  = json.dumps(json_payload),  
                        defaultLakehouse = current_lakehouse['default_lakehouse_name'],
                        defaultLakehouseWorkspace = new_workspace_id,
                        workspaceId = new_workspace_id
                        )
                )
            print(f"Updated notebook {notebook.displayName} with new default lakehouse: {current_lakehouse['default_lakehouse_name']} in {new_workspace_id}")
            # fetch ID of target lakehouse in new workspace
            source_lh_id = notebookutils.lakehouse.getWithProperties(name=current_lakehouse['default_lakehouse_name'], workspaceId=thisWsId)['id']
            target_lh_id = notebookutils.lakehouse.getWithProperties(name=current_lakehouse['default_lakehouse_name'], workspaceId=new_workspace_id)['id']

            SOURCE_URI = f"abfss://{thisWsId}@onelake.dfs.fabric.microsoft.com/{source_lh_id}/Tables"
            DEST_URI = f"abfss://{new_workspace_id}@onelake.dfs.fabric.microsoft.com/{target_lh_id}/Tables"

            if PATTERN_MATCH is None or len(PATTERN_MATCH) == 0:
                raise TypeError("Argument 'PATTERN_MATCH' should be a valid list of patterns or ["*"] to match everything")

            # Collect created shortcuts
            result = []

            # If either URI's are invalid, just return
            if not is_valid_onelake_uri(SOURCE_URI) or not is_valid_onelake_uri(DEST_URI):
                print(
                    "invalid URI's provided. URI's should be in the form: abfss://<workspace-id>@onelake.dfs.fabric.microsoft.com/<item-id>/<path>"
                )
            else:
                # Remove any trailing '/' from uri's
                source_uri_addr = SOURCE_URI.rstrip("/")
                dest_uri_addr = DEST_URI.rstrip("/")

                dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(
                    dest_uri_addr
                )

                # If we are not shortcutting to a managed table folder or
                # the source uri is a delta table, just shortcut it 1-1.
                if not dest_path.startswith("Tables") or is_delta_table(source_uri_addr):
                    shortcut = create_onelake_shorcut(source_uri_addr, dest_uri_addr)
                    if shortcut is not None:
                        result.append(shortcut)
                else:
                    # If source is not a delta table, and destination is managed table folder:
                    # Iterate over source folders and create table shortcuts @ destination
                    for delta_table_uri in get_matching_delta_tables_uris(
                        source_uri_addr, PATTERN_MATCH
                    ):
                        shortcut = create_onelake_shorcut(delta_table_uri, dest_uri_addr)
                        if shortcut is not None:
                            result.append(shortcut)
            print(result)


###### Update direct lake model lakehouse connection

https://semantic-link-labs.readthedocs.io/en/stable/sempy_labs.directlake.html#sempy_labs.directlake.update_direct_lake_model_lakehouse_connection
    

In [None]:

df_datasets = fabric.list_datasets(branch_to_new_ws)

# Iterate over each dataset in the dataframe
for index, row in df_datasets.iterrows():
    # Check if the dataset is not the default semantic model
    if not labs.is_default_semantic_model(row['Dataset Name'], fabric.resolve_workspace_id(branch_to_new_ws)):
        print('Updating semantic model connection ' + row['Dataset Name'])
        labs.directlake.update_direct_lake_model_lakehouse_connection(dataset=row['Dataset Name'], workspace= branch_to_new_ws,lakehouse =labs.directlake.get_direct_lake_source(row['Dataset Name'], workspace= branch_to_new_ws)[1], lakehouse_workspace=branch_to_new_ws)
        labs.refresh_semantic_model(dataset=row['Dataset Name'], workspace= branch_to_new_ws)



##### Rebind reports in new branch workspace

https://semantic-link-labs.readthedocs.io/en/latest/sempy_labs.report.html#sempy_labs.report.report_rebind

In [None]:
df_reports = fabric.list_reports(workspace=branch_to_new_ws)
for index, row in df_reports.iterrows():
    #print(row['Name'] + '-' + row['Dataset Id'])
    df_datasets = fabric.list_datasets(workspace=branch_to_new_ws)
    dataset_name = df_datasets[df_datasets['Dataset ID'] == row['Dataset Id']]['Dataset Name'].values[0]
    print(dataset_name)
    labs.report.report_rebind(report=row['Name'],dataset=dataset_name, report_workspace=branch_to_new_ws, dataset_workspace=branch_to_new_ws)


###### Commit changes made above to Git

In [None]:
json_body = {
	"mode": "All",
    "comment": "Update datasets connections. Rebind reports."
}

resp = client.post(f"v1/workspaces/{new_workspace_id}/git/commitToGit",json=json_body)
print(resp)