### Microsoft Fabric Healthcare Data Solutions Git Integration Helper


[Healthcare Data Solutions (HDS)](https://learn.microsoft.com/en-us/industry/healthcare/healthcare-data-solutions/overview) in [Microsoft Fabric](https://learn.microsoft.com/en-us/fabric/get-started/microsoft-fabric-overview) supports version control through [Application Lifecycle Management (ALM)](https://learn.microsoft.com/en-us/fabric/cicd/git-integration/intro-to-git-integration?tabs=azure-devops).

This notebook helps users that want to use ALM to pull in and HDS item that has been checked into source control. This assumes that you have already checked in your changes from one workspace and have synchronized those changes to another workspace. If you are unfamiliar with this process, follow the instructions [here](https://learn.microsoft.com/en-us/fabric/cicd/git-integration/git-get-started?tabs=azure-devops%2CAzure%2Ccommit-to-git).

To complete the ALM integration process, start by providing required parameters below and execute the the remaining cells in the notebook.

### Parameters

In the cell below, please provide the following parameters to start the migration process. These will be the only inputs required for the migration.

___source_workspace_id___: The id of the source workspace.<br>
___source_lakehouse_id___: The lakehouse id for the administration lakehouse in the source workspace.<br>
___dest_workspace_id___: The id of the destination workspace.<br>
___dest_lakehouse_id___: The lakehouse id for the administration lakehouse in the destination workspace.<br>
___source_solution_name___: The name of the HDS solution in the source workspace, the default is `healthcare1`

In [None]:
source_lakehouse_id = ""
source_workspace_id = ""
dest_workspace_id = ""
dest_lakehouse_id = ""
solution_name="healthcare1"

#### Helper Methods

Run the following cell to register helper methods used for migrating Healthcare Data Solution assets to a target workspace.

In [None]:
from sempy.fabric import FabricRestClient
import json

def get_deployment_parameters_config(source_workspace_id: str, source_lakehouse_id: str):
    deployment_parameters_configuration_source_path = "Files/system-configurations/deploymentParametersConfiguration.json"

    source_lakehouse_data = get_lakehouse(source_workspace_id, source_lakehouse_id)
    source_dfs_domain = get_lakehouse_dfs_domain(source_lakehouse_data)
    print(source_dfs_domain)
    configuration_path = f"abfss://{source_workspace_id}@{source_dfs_domain}/{source_lakehouse_id}/{deployment_parameters_configuration_source_path}"
    print(configuration_path)
    df = spark.read.option("multiline", "true").json(configuration_path)
    deployment_parameters_config_json = df.collect()[0].asDict(recursive=True)
    return deployment_parameters_config_json

def get_workspace_artifacts_by_id(workspace_id: str):
    fabric_client = FabricRestClient()
    artifacts = fabric_client.get(f"/v1/workspaces/{workspace_id}/items").json()['value']
    source_workspace_artifacts = { artifact["id"]: artifact for artifact in artifacts }
    return source_workspace_artifacts

def get_workspace_lakehouses(workspace_id: str):
    fabric_client = FabricRestClient()
    lakehouses = fabric_client.get(f"/v1/workspaces/{workspace_id}/items?type=Lakehouse").json()['value']
    lakehouses_dict = { lakehouse["displayName"]: lakehouse for lakehouse in lakehouses }
    return lakehouses_dict

def get_workspace_healthcare_data_solution_by_name(workspace_id: str, solution_name: str|None = "healthcare1"):
    fabric_client = FabricRestClient()
    artifacts = fabric_client.get(f"/v1/workspaces/{workspace_id}/items?type=Healthcaredatasolution").json()['value']
    for artifact in artifacts:
        if artifact["displayName"] == solution_name:
            return artifact
    return None

def get_workspace_details(workspace_id: str):
    fabric_client = FabricRestClient()
    workspace_details = fabric_client.get(f"/v1/workspaces/{workspace_id}").json()
    return workspace_details

def get_updated_global_activity_parameters(source_workspace_id, source_lakehouse_id, dest_workspace_id, dest_lakehouse_id, solution_name="healthcare1"):
    
    deployment_parameters_config = get_deployment_parameters_config(source_workspace_id, source_lakehouse_id)
    global_parameters = deployment_parameters_config["activitiesGlobalParameters"]

    source_workspace_artifacts = get_workspace_artifacts_by_id(source_workspace_id)
    dest_workspace_artifacts = get_workspace_artifacts_by_id(dest_workspace_id)

    source_workspace_lakehouses = { v["displayName"]: v for v in source_workspace_artifacts.values() if str(v['type']).lower() == "lakehouse" }
    dest_workspace_lakehouses = { v["displayName"]: v for v in dest_workspace_artifacts.values() if str(v['type']).lower() == "lakehouse" }
    dest_workspace_artifacts_by_name = { v["displayName"]: v for v in dest_workspace_artifacts.values() }

    updated_global_parameters = {}
    replacements = []
    for pk, pv in global_parameters.items():

        updated_global_parameters[pk] = pv
        if "_lakehouse_id" in pk:
            lakehouse_name = pk.split("_lakehouse_id")[0]
            target_notebook = f"{solution_name}_msft_{lakehouse_name.lower()}"
            if target_notebook in source_workspace_lakehouses and target_notebook in dest_workspace_lakehouses:
                updated_global_parameters[pk] = dest_workspace_lakehouses[target_notebook]["id"]

        for id in source_workspace_artifacts.keys():
            # Try to find ids in the parameters
            for id in source_workspace_artifacts.keys():
                if id in pv:
                    source_artifact_name = source_workspace_artifacts[id]["displayName"]

                    if source_artifact_name in dest_workspace_artifacts_by_name:
                        replacement_id = dest_workspace_artifacts_by_name[source_artifact_name]["id"]
                        replacements.append([source_artifact_name ,replacement_id])
                        updated_global_parameters[pk] = str(updated_global_parameters[pk]).replace(id, replacement_id)

    return updated_global_parameters, replacements

def get_updated_activties_configuration(source_workspace_id, source_lakehouse_id, dest_workspace_id, dest_lakehouse_id):
    
    deployment_parameters_config = get_deployment_parameters_config(source_workspace_id, source_lakehouse_id)
    activities_json = deployment_parameters_config['activities']

    source_workspace_artifacts = get_workspace_artifacts_by_id(source_workspace_id)
    dest_workspace_artifacts = get_workspace_artifacts_by_id(dest_workspace_id)

    dest_workspace_notebooks = {v["displayName"]: v["id"] for v in dest_workspace_artifacts.values() if str(v['type']).lower() == "notebook" }
    dest_workspace_artifacts_by_name = {v["displayName"]: v for v in dest_workspace_artifacts.values() }

    configuration_activities = {}
    for activity in activities_json.keys():
        configuration_activities[activities_json[activity]["name"]] = activities_json[activity]

    updated_activities = {}
    replacements = []
    for config_activity_name, config_activity in configuration_activities.items():

        # If the activity (notebook) name is found in the destination workspace
        if config_activity_name in dest_workspace_notebooks:

            new_activity_id = dest_workspace_notebooks[config_activity_name]
            config_parameters = config_activity["parameters"]
            updated_parameters = {}

            # For each parameter in the activty parameters section
            for pk, pv in config_parameters.items():
                updated_parameters[pk] = pv

                if pk not in ["checkpoint_path", "schema_dir_path"]:
                    
                    # Replace the source workspace id with the destination workspace id if found
                    if source_workspace_id in pv:
                        #replacements.append(f"{pk} src: {source_workspace_id} -> dest: {dest_workspace_id} (workspace)")
                        updated_parameters[pk] = str(updated_parameters[pk]).replace(source_workspace_id, dest_workspace_id)

                    # Try to find ids in the parameters
                    for id in source_workspace_artifacts.keys():
                        if id in pv:
                            source_artifact_name = source_workspace_artifacts[id]["displayName"]
                            if source_artifact_name in dest_workspace_artifacts_by_name:
                                replacement_id = dest_workspace_artifacts_by_name[source_artifact_name]["id"]
                                replacements.append([source_artifact_name ,replacement_id])
                                updated_parameters[pk] = str(updated_parameters[pk]).replace(id, replacement_id)
            
            updated_activities[new_activity_id] = {
                "name": config_activity_name,
                "parameters": updated_parameters
            }

    return updated_activities, replacements

def consolidate_replacements(global_param_replacements, activity_replacements):
    all_replacements = {}
    for r in activity_replacements:
        if r[0] not in all_replacements:
            all_replacements[r[0]] = r[1]
    
    for r in global_param_replacements:
        if r[0] not in all_replacements:
            all_replacements[r[0]] = r[1]
    
    return all_replacements

def get_updated_deployment_parameters_configuration(source_workspace_id, source_lakehouse_id, dest_workspace_id, dest_lakehouse_id, solution_name="healthcare1"):
    updated_gloabal_parameters, global_param_replacements = get_updated_global_activity_parameters(source_workspace_id, source_lakehouse_id, dest_workspace_id, dest_lakehouse_id, solution_name)
    updated_activities, activity_replacements = get_updated_activties_configuration(source_workspace_id, source_lakehouse_id, dest_workspace_id, dest_lakehouse_id)

    updated_deployment_parameters_configuration = {
        "activitiesGlobalParameters": updated_gloabal_parameters,
        "activities": updated_activities
    }
    
    return updated_deployment_parameters_configuration, consolidate_replacements(global_param_replacements, activity_replacements)

def get_deployment_parameters_destination_path(dest_workspace_id, dest_lakehouse_id):
    destination_lakehouse_data = get_lakehouse(dest_workspace_id, dest_lakehouse_id)
    destination_lakehouse_dfs_domain = get_lakehouse_dfs_domain(destination_lakehouse_data)

    return f"abfss://{dest_workspace_id}@{destination_lakehouse_dfs_domain}/{dest_lakehouse_id}/Files/system-configurations/deploymentParametersConfiguration.json"

def get_lakehouse_dfs_domain(lakehouse_data: any):
    onelake_files_path = lakehouse_data["properties"]["oneLakeFilesPath"]
    env = onelake_files_path.split("//")[1].split("/")[0]
    return env

def get_lakehouse(workspace_id: str, lakehouse_id: str):
    fabric_client = FabricRestClient()
    return fabric_client.get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()

def copy_system_data(source_workspace_id, source_admin_lakehouse_id, dest_workspace_id, dest_admin_lakehouse_id, solution_name):

    try:
        internal_relative_path = "/system-configurations/HDS/_internal"
        libraries_relative_path = "/deployment-assets/libraries"

        dest_admin_lakehouse_data = get_lakehouse(dest_workspace_id, dest_admin_lakehouse_id)

        lakehouse_dfs = get_lakehouse_dfs_domain(dest_admin_lakehouse_data)

        dest_admin_lakehouse_id = dest_admin_lakehouse_data["id"]

        source_file_path = f"abfss://{source_workspace_id}@{lakehouse_dfs}/{source_admin_lakehouse_id}/Files"
        dest_files_path = f"abfss://{dest_workspace_id}@{lakehouse_dfs}/{dest_admin_lakehouse_id}/Files"

        if mssparkutils.fs.exists(f"{source_file_path}{internal_relative_path}") and mssparkutils.fs.exists(dest_files_path):
            print("Copying internal resources...")
            mssparkutils.fs.cp(f"{source_file_path}{internal_relative_path}", f"{dest_files_path}{internal_relative_path}", recurse=True)
        else:
            print("Internal data not found in administrative lakehouse or destination does not exist")
        
        if mssparkutils.fs.exists(f"{source_file_path}{libraries_relative_path}") and mssparkutils.fs.exists(dest_files_path):
            print("Copying libraries...")
            mssparkutils.fs.cp(f"{source_file_path}{internal_relative_path}", f"{dest_files_path}{internal_relative_path}", recurse=True)
        else:
            print("HDS Libraries not found in administrative lakehouse or destination does not exist")
    except Exception as ex:
        print("Exception occurred when copying system data: {ex}")

def copy_data_assets(source_workspace_id, dest_workspace_id, solution_name):

    sample_data_relative_path = "/SampleData"
    reference_data_relative_path = "/ReferenceData"
    source_workspace_lakehouses = get_workspace_lakehouses(source_workspace_id)
    dest_workspace_lakehouses = get_workspace_lakehouses(dest_workspace_id)
    bronze_lakehouse_name = f"{solution_name}_msft_bronze"

    if bronze_lakehouse_name in source_workspace_lakehouses and bronze_lakehouse_name in dest_workspace_lakehouses:

        source_bronze_lakehouse_id = source_workspace_lakehouses[bronze_lakehouse_name]["id"]
        dest_bronze_lakehouse_id = dest_workspace_lakehouses[bronze_lakehouse_name]["id"]
        source_lakehouse = get_lakehouse(source_workspace_id, source_bronze_lakehouse_id)
        dest_lakehouse = get_lakehouse(dest_workspace_id, dest_bronze_lakehouse_id)

        source_lakehouse_dfs = get_lakehouse_dfs_domain(source_lakehouse)
        dest_lakehouse_dfs = get_lakehouse_dfs_domain(dest_lakehouse)

        source_file_path = f"abfss://{source_workspace_id}@{source_lakehouse_dfs}/{source_bronze_lakehouse_id}/Files"
        dest_files_path = f"abfss://{dest_workspace_id}@{dest_lakehouse_dfs}/{dest_bronze_lakehouse_id}/Files"

        try:
            if mssparkutils.fs.exists(f"{source_file_path}{sample_data_relative_path}") and mssparkutils.fs.exists(dest_files_path):
                print("Copying sample data...")
                mssparkutils.fs.cp(f"{source_file_path}{sample_data_relative_path}", f"{dest_files_path}{sample_data_relative_path}", recurse=True)
            else:
                print("Sample data not found in {bronze_lakehouse_name} or destination does not exist")
        except Exception as ex:
            print(f"Exception occurred while copying sample data, the source or destination folder might not exist: {ex}")

        try:
            if mssparkutils.fs.exists(f"{source_file_path}{reference_data_relative_path}") and mssparkutils.fs.exists(dest_files_path):
                print("Copying reference data...")
                mssparkutils.fs.cp(f"{source_file_path}{reference_data_relative_path}", f"{dest_files_path}{reference_data_relative_path}", recurse=True)
            else:
                print("Reference data not found in {bronze_lakehouse_name} or destination does not exist")
        except Exception as ex:
            print(f"Exception occurred while copying reference data, the source or destination folder might not exist: {ex}")

    else:
        print(f"{bronze_lakehouse_name} not found in either the source or target workspace. Sample data and reference data were not copied.")

def copy_workload_system_data(source_workspace_id, dest_workspace_id, solution_name, dest_admin_lakehouse_id):

    try:
        internal_relative_path = "/DMHConfiguration"
        libraries_relative_path = "/deployment-assets/libraries"

        dest_admin_lakehouse_data = get_lakehouse(dest_workspace_id, dest_admin_lakehouse_id)
        lakehouse_dfs = get_lakehouse_dfs_domain(dest_admin_lakehouse_data)

        source_solution_id = get_workspace_healthcare_data_solution_by_name(source_workspace_id, solution_name)["id"]
        dest_solution_id = get_workspace_healthcare_data_solution_by_name(dest_workspace_id, solution_name)["id"]

        source_files_path = f"abfss://{source_workspace_id}@{lakehouse_dfs}/{source_solution_id}"
        dest_files_path = f"abfss://{dest_workspace_id}@{lakehouse_dfs}/{dest_solution_id}"

        try:
            if mssparkutils.fs.exists(f"{source_files_path}{internal_relative_path}") and mssparkutils.fs.exists(f"{dest_files_path}{internal_relative_path}"):
                print("Copying internal resources...")
                mssparkutils.fs.cp(f"{source_files_path}{internal_relative_path}", f"{dest_files_path}{internal_relative_path}", recurse=True)
            else:
                print("Internal data not found in workload or does not exist")
        except Exception as ex:
            print(f"Exception occurred while copying internal sources, the source or destination folder might not exist: {ex}")
        
        try:
            if mssparkutils.fs.exists(f"{source_files_path}{libraries_relative_path}") and mssparkutils.fs.exists(f"{source_files_path}{libraries_relative_path}"):
                print("Copying libraries...")
                mssparkutils.fs.cp(f"{source_files_path}{libraries_relative_path}", f"{dest_files_path}{libraries_relative_path}", recurse=True)
            else:
                print("HDS Libraries not found in administrative lakehouse or destination does not exist")
        except Exception as ex:
            print(f"Exception occurred while copying libraries, the source or destination folder might not exist: {ex}")

    except Exception as ex:
        print(f"Exception occurred when copying workload system data: {ex}")

#### Copy Deployment Parameters Configuration

Run the following cell to show what the update deployment parameters configuration will look like before saving to the destination workspace.


In [None]:
updated_config, replacements = get_updated_deployment_parameters_configuration(source_workspace_id, source_lakehouse_id, dest_workspace_id, dest_lakehouse_id, solution_name)

print("Here are the artifacts that were udpated with their destination ids:\n")

for replaced_artifact, replaced_artifact_id in replacements.items():
    print(f"{replaced_artifact}: {replaced_artifact_id}")

print("\n")
print("This is the updated deployment parameters configuration for the destination workspace:\n")
print(json.dumps(updated_config, indent=2))

#### Save the deployment parameters to the destination workspace

After reviewing the updated deployment parameters configuration, run the following cell to persist the updated configuration to the destination workspace. Please not this will override the existing `deploymentParametersConfiguration.json` file if it already exists in that location.

In [None]:
deployment_parameters_destination_path = get_deployment_parameters_destination_path(dest_workspace_id, dest_lakehouse_id)
mssparkutils.fs.put(file = deployment_parameters_destination_path, content = json.dumps(updated_config), overwrite=True)

#### Copy System Data
Execute this cell to copy important system metadata and libraries to the destination workspace.

In [None]:
copy_workload_system_data(source_workspace_id, source_lakehouse_id, dest_workspace_id, dest_lakehouse_id, solution_name)

#### Copy Data Assets
Execute this cell to copy sample and reference data to the destination workspace.

In [None]:
copy_data_assets(source_workspace_id, dest_workspace_id, solution_name)