# Notebook to synch the tables from source to destination UC
---
This notebook is going to create a share in the secondary workspace and then deep cloning the source data, which is defined in the dict in the variables section.

Source Workspace : uri to source workspace

Destination Workspace : uri to destination workspace



In [0]:
dbutils.widgets.text("source_workspace", "", "Source Workspace")
dbutils.widgets.text("destination_workspace", "", "Destination Workspace")

In [0]:
source_workspace = dbutils.widgets.get("source_workspace")
destination_workspace = dbutils.widgets.get("destination_workspace")

In [0]:
# get the credentials for sp from a keyvault backed secret scope
clientid = dbutils.secrets.get('kvbacked', 'clientid')
clientsecret = dbutils.secrets.get('kvbacked', 'clientsecret')
tenantid = dbutils.secrets.get('kvbacked', 'tenantid')


In [0]:
#variables
# catalogs to copy is the catalog name with the target storage root
cats_to_copy = [{
    "catadb360dev" : {
                        "schemas" : [
                            {"schema" : "schemaadb360dev"},
                            {"schema" : "silverdb"},
                            {"schema" : "golddb"}
                        ]
       }
    }              
]

warehouse = None


In [0]:
# get the catalogs to work on
catalog_names = [list(cat.keys())[0] for cat in cats_to_copy]

In [0]:
# for a specific catalog, get the schemas
catalog_name = "catadb360dev"
schema_names = [schema["schema"] for cat in cats_to_copy if catalog_name in cat for schema in cat[catalog_name]["schemas"]]

In [0]:
# imports
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors.platform import ResourceAlreadyExists, BadRequest
from databricks.sdk.service import catalog
from databricks.sdk.service.catalog import Privilege, PermissionsChange
from databricks.sdk.service.sharing import (AuthenticationType, SharedDataObjectUpdate,
                                            SharedDataObjectUpdateAction, SharedDataObject,
                                            SharedDataObjectDataObjectType, SharedDataObjectStatus)
from databricks.sdk.service.sql import (Disposition, StatementState,
                                        CreateWarehouseRequestWarehouseType, ExecuteStatementRequestOnWaitTimeout)
import time


In [0]:
# creating the sdk connection to source and destination workspace
sourceWs = WorkspaceClient(azure_client_id=clientid, azure_client_secret=clientsecret, azure_tenant_id=tenantid, host=source_workspace)
destWs = WorkspaceClient(azure_client_id=clientid, azure_client_secret=clientsecret, azure_tenant_id=tenantid, host=destination_workspace)

In [0]:
# create a recipient in source workspace
# get global metastore id for destination
dest_metastore_id = destWs.metastores.list()[0].global_metastore_id

try:
    recipient = sourceWs.recipients.create(
        name="dr_recipient",
        authentication_type=AuthenticationType.DATABRICKS,
        data_recipient_global_metastore_id=dest_metastore_id
    )
    print(f"recipient created for metastore: {dest_metastore_id}")
except (ResourceAlreadyExists, BadRequest):
    recipient = sourceWs.recipients.get(name="dr_recipient")
    print(f"recipient already exists for metastore: {dest_metastore_id}")


In [0]:
# get all the tables in the primary metastore
system_info_df = spark.sql("Select * from system.information_schema.tables  ")

In [0]:
# get remote provider name
try:
    # get local metastore id
    local_metastore_id = [r["current_metastore()"] for r in spark.sql("SELECT current_metastore()").collect()][0]
    remote_provider_name = [p.name for p in destWs.providers.list() if
                            p.data_provider_global_metastore_id == local_metastore_id][0]
    print(f'found remote provider: {remote_provider_name}')
except IndexError:
    print("Provider could not be found in target workspace; please check that it was created.")

In [0]:
# create the destination warehouse
try:
    warehouse = destWs.warehouses.create(
         name='dr_warehouse',
         cluster_size="2X-Small",
         max_num_clusters=1,
         auto_stop_mins=10,
         warehouse_type=CreateWarehouseRequestWarehouseType("PRO"),
         enable_serverless_compute=True,
    )
    print('warehouse dr_warehouse successfully created in destination workspace')
except BadRequest:
    warehouse = [wh for wh in destWs.warehouses.list() if wh.name == 'dr_warehouse'][0]
    print("Warehouse already exists; please delete it before running this script again.")

In [0]:
# define clone_table function
def clone_a_table(wsc: WorkspaceClient, source_catalog: str, dest_catalog: str, schema: str, table_name: str, whid: str ):
    print(f'cloning table {source_catalog}.{schema}.{table_name}..to {dest_catalog}.')

    try:
        sqlstmt = f'CREATE OR REPLACE TABLE {dest_catalog}.{schema}.{table_name} DEEP CLONE {source_catalog}.{schema}.{table_name}'

        resp = wsc.statement_execution.execute_statement(
            warehouse_id=whid,
            on_wait_timeout=ExecuteStatementRequestOnWaitTimeout("CONTINUE"),
            disposition=Disposition("EXTERNAL_LINKS"),
            statement=sqlstmt
        )

        while resp.status.state in {StatementState.PENDING, StatementState.RUNNING}:
            resp = wsc.statement_execution.get_statement(resp.statement_id)
            time.sleep(5)

        if resp.status.state != StatementState.SUCCEEDED:
            return {"catalog": dest_catalog,
                    "schema": schema,
                    "table_name": table_name,
                    "status": f"FAIL: {resp.status.error.message}"}
        else:
            return {"catalog": dest_catalog,
                    "schema": schema,
                    "table_name": table_name,
                    "status": "SUCCESS"}

    except Exception as e:
        return {"catalog": dest_catalog,
                "schema": schema,
                "table_name": table_name,
                "status": f"FAIL: {e}"}


In [0]:
# main loop for catalogs
for catalog_name in catalog_names:
    print(f'working on catalog {catalog_name}')
    # filter tables dataframe
    filtered_tables = system_info_df.filter((system_info_df.table_catalog == catalog_name) &
                            (system_info_df.table_schema != "information_schema") &
                            (system_info_df.table_type != "VIEW")).distinct().collect()

    unique_schemas = {row['table_schema'] for row in filtered_tables}
    all_tables = [row["table_name"] for row in filtered_tables]
    all_schemas = [row["table_schema"] for row in filtered_tables]             

    print(f'creating share for catalog {catalog_name}')
    try:
        share = sourceWs.shares.create(
            name=f"{catalog_name}_share"
        )
        share_name = share.name
        print(f'share created: {share_name}')
    except BadRequest:
        print(f'share already exists: {catalog_name}_share')
        share_name = f"{catalog_name}_share"

    try:
        _ = sourceWs.shares.update_permissions(
            share_name,
            changes=[PermissionsChange(add=[Privilege.SELECT], principal=recipient.name)]
        ) 
        print(f'updated share permissions for {share_name}')
    except BadRequest:
        print(f'could not update permissions for share {share_name}')
 
    # build update object with all schemas in the current catalog
    updates = [
        SharedDataObjectUpdate(action=SharedDataObjectUpdateAction.ADD,
                               data_object=SharedDataObject(name=f"{catalog_name}.{schema}",
                                                            data_object_type=SharedDataObjectDataObjectType.SCHEMA,
                                                            status=SharedDataObjectStatus.ACTIVE))
        for schema in unique_schemas]
    
    # update the share
    try:
        _ = sourceWs.shares.update(share_name, updates=updates)
        print(f'Updated share {share_name} successfully') 
    except Exception as e:
        print(f"Error updating share {share_name}: {e}")

    # create the shared catalog in the target workspace
    try:
        _ = destWs.catalogs.create(name=f"{catalog_name}_share", provider_name=remote_provider_name, share_name=share_name)
        print(f"Created shared catalog {catalog_name}_share successfully.")
    except BadRequest:
        print(f"Shared catalog {catalog_name}_share already exists. Skipping creation.")

    for i in range(len(all_tables)):
        #print(f'clone_a_table(destWs, {catalog_name}, {catalog_name+"_share"}, {all_schemas[i]}, {all_tables[i]}')
        r = clone_a_table(destWs, catalog_name+"_share", catalog_name, all_schemas[i], all_tables[i], warehouse.id)
        print(r)
    
print('finished clone')
