## Catalog migration script

### Description
This script is an utility to migrate a catalog from hive_metastore to Unity Catalog. It loops through all/specified schemas in the hive_metastore and migrates the following:
- external tables
- views

This script can run in DRY RUN mode in which case it will only print the feasibility of the migration table per table basis.

### How to use
#### Parameters:
- Schemas to migrate (`schemas-to-migrate`): default is 'all'; specify schemas separated by comma if 'all' is not desired
- Migration scope (`migration-scope`): default is 'all'; possible values 'external-tables', 'views'
- UC Catalog Destination (`uc-catalog-destination`): catalog destination in Unity catalog
- UC Schema Owner (`uc-schema-owner`): owner for the newly created UC schemas, tables & views; this needs to be an account level group / user
- DRY RUN (`dry_run mode`): True/False; if True it won't perform the migration, it will print to the INFO log the result of the attempted migration
- Table list for migration (`selective-table-upgrade-yaml`): YAML file which contains the table list to migrate
- Storage accounts:
    - Storage accounts for non PII external locations(`external-locations-non-pii`): storage account list (separated by comma) of all external location for non PII data
    - Storage accounts for PII external locations(`external-locations-pii`): storage account list (separated by comma) of all external location for PII data
- Azure Service Principal authentication details:
    - DB Secret Scope for SP credentials (`auth-secret-scope`): Databricks secrets scope where the Service Principals secrets are saved
    - SP Non PII Secret Value DB Secrets (`auth-client-secret-non-pii`): Databricks secrets key where the Service Principal secret for PII data is saved
    - SP PII Secret Value DB Secrets (`auth-client-secret-pii`): Databricks secrets key where the Service Principal secret for PII data is saved
    - SP Non PII ID (`auth-client-id-non-pii`): Service Principal ID used to access non PII data 
    - SP PII ID (`auth-client-id-pii`): Service Principal ID used to access PII data
    - SP Endpoint(`auth-client-endpoint`): Service Principal Endpoint

#### How to run:
- setup the parameters in the defined widgets and run all the cells
- note this needs to run from a UC compatible single user cluster

### Configuration

In [0]:
pip install PyYAML

In [0]:
pip install pandas

In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.text("selective-table-upgrade-yaml", "", "Selective Table Upgrade Config Path")
dbutils.widgets.text("uc-catalog-destination", "", "")
dbutils.widgets.text("uc-schema-owner", "", "")
dbutils.widgets.dropdown("dry-run",choices=["True","False"],defaultValue="True")
dbutils.widgets.text("schemas-to-migrate", "", "")
dbutils.widgets.dropdown("migration-scope",choices=["all","external-tables","views"],defaultValue="external-tables")
dbutils.widgets.text("exclude-schemas", "", "")
dbutils.widgets.text("external-locations-non-pii", "", "")
dbutils.widgets.text("external-locations-pii", "", "")
dbutils.widgets.text("auth-secret-scope", "", "")
dbutils.widgets.text("auth-client-id-non-pii", "", "")
dbutils.widgets.text("auth-client-id-pii", "", "")
dbutils.widgets.text("auth-client-secret-non-pii", "", "")
dbutils.widgets.text("auth-client-secret-pii", "", "")
dbutils.widgets.text("auth-client-endpoint", "", "")
dbutils.widgets.text("table-exclusion-yaml", "", "")
dbutils.widgets.text("selective-view-upgrade-yaml", "", "")

In [0]:
catalog_destination = dbutils.widgets.get("uc-catalog-destination")
owner_destination = dbutils.widgets.get("uc-schema-owner")
dry_run = True if dbutils.widgets.get("dry-run") == "True" else False
schemas_to_migrate = dbutils.widgets.get("schemas-to-migrate")
migration_scope = dbutils.widgets.get("migration-scope")
exclude_schemas = dbutils.widgets.get("exclude-schemas")
external_locations_non_pii = dbutils.widgets.get("external-locations-non-pii")
external_locations_pii = dbutils.widgets.get("external-locations-pii")
auth_secret_scope = dbutils.widgets.get("auth-secret-scope")
auth_client_id_non_pii = dbutils.widgets.get("auth-client-id-non-pii")
auth_client_id_pii = dbutils.widgets.get("auth-client-id-pii")
auth_client_secret_non_pii = dbutils.widgets.get("auth-client-secret-non-pii")
auth_client_secret_pii = dbutils.widgets.get("auth-client-secret-pii")
auth_client_endpoint = dbutils.widgets.get("auth-client-endpoint")
table_exclusion_yaml = dbutils.widgets.get("table-exclusion-yaml")
selected_table_list_yaml =  dbutils.widgets.get("selective-table-upgrade-yaml")
if table_exclusion_yaml and selected_table_list_yaml:
    raise ValueError("Input either selective upgrade YAML file or exclusion table YAML file")
yaml_configuration = ""
if table_exclusion_yaml:
    yaml_configuration = table_exclusion_yaml
else:
    yaml_configuration = selected_table_list_yaml

change_view_path_yaml = dbutils.widgets.get("selective-view-upgrade-yaml")

# validate input parameters
if not catalog_destination:
    raise ValueError("UC Catalog Destination is required for the migration")
if not owner_destination:
    raise ValueError("UC Owner Destination is required for the migration")
if not schemas_to_migrate:
    raise ValueError("Specify which schemas to migrate, separated by comma. Leave 'all' and it will migrate all schemas.")
if not external_locations_non_pii:
    raise ValueError("External Locations for Non PII data not specified.")
else:
    external_locations_non_pii_list = external_locations_non_pii.replace(' ','').split(',')
if not external_locations_pii:
    raise ValueError("External Locations for PII data not specified.")
else:
    external_locations_pii_list = external_locations_pii.replace(' ','').split(',')
if not (auth_secret_scope and auth_client_id_non_pii and auth_client_id_pii and auth_client_secret_non_pii and auth_client_secret_pii and auth_client_endpoint):
    raise ValueError("Azure SP credentials are required for the migration.")

## Authenticate with Service Principals for Non PII External Location
for external_locations_non_pii_item in external_locations_non_pii_list:
    spark.conf.set(f"fs.azure.account.auth.type.{external_locations_non_pii_item}.dfs.core.windows.net", "OAuth")
    spark.conf.set(f"fs.azure.account.oauth.provider.type.{external_locations_non_pii_item}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set(f"fs.azure.account.oauth2.client.id.{external_locations_non_pii_item}.dfs.core.windows.net", auth_client_id_non_pii) # f19cea40-ebb7-4350-8be4-72056fa25585
    spark.conf.set(f"fs.azure.account.oauth2.client.secret.{external_locations_non_pii_item}.dfs.core.windows.net", dbutils.secrets.get(scope=auth_secret_scope, key=auth_client_secret_non_pii)) # {{secrets/Databricks-BIS-PP-EUW/Databricks-Spn-BusinessIntelligenceSystems-PreProd}}
    spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{external_locations_non_pii_item}.dfs.core.windows.net", auth_client_endpoint) #  https://login.microsoftonline.com/f009f285-5242-433a-9365-daa1edf145c3/oauth2/token

## Authenticate with Service Principals for PII External Location
for external_locations_pii_item in external_locations_pii_list:
    spark.conf.set(f"fs.azure.account.auth.type.{external_locations_pii_item}.dfs.core.windows.net", "OAuth")
    spark.conf.set(f"fs.azure.account.oauth.provider.type.{external_locations_pii_item}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set(f"fs.azure.account.oauth2.client.id.{external_locations_pii_item}.dfs.core.windows.net", auth_client_id_pii)
    spark.conf.set(f"fs.azure.account.oauth2.client.secret.{external_locations_pii_item}.dfs.core.windows.net", dbutils.secrets.get(scope=auth_secret_scope, key=auth_client_secret_pii))
    spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{external_locations_pii_item}.dfs.core.windows.net", auth_client_endpoint)


### Logging

In [0]:
## Configure logging
import sys
import logging

# level = logging.DEBUG if dbutils.widgets.get("debug") == "True" else logging.INFO
level = logging.INFO
logging.basicConfig(stream=sys.stderr,level=level,format="%(asctime)s [%(name)s][%(levelname)s] %(message)s")
logging.getLogger("databricks.sdk").setLevel(level)
logging.getLogger("py4j.clientserver").setLevel(logging.WARNING)
logger = logging.getLogger()

### Migration code

In [0]:
import re
import yaml
import pyspark.sql.utils
from pyspark.sql.functions import concat_ws, col, lit
import pandas as pd
from pyspark.sql.types import StructType

## Configure logging
import sys
import logging

level = logging.INFO
logging.basicConfig(stream=sys.stderr,level=level,format="%(asctime)s [%(name)s][%(levelname)s] %(message)s")
logging.getLogger("databricks.sdk").setLevel(level) 
logging.getLogger("py4j.clientserver").setLevel(logging.WARNING)
logger = logging.getLogger()

class CatalogMigration:
    '''
        Contains a list of utility methods to migrate external tables and views form hive_metastore to Unity Catalog.
    '''

    def __init__(self, spark):
        self.spark = spark
        self.statements = []
        self.migrated_tables = []
        self.not_eligible_for_migration = []
        self.failed_migration_tables = []
        self.migrated_views = []
        self.failed_migration_views = []
        columns_empty = StructType([])
        self.exclusion_table_list = spark.createDataFrame([],columns_empty)
        self.migration_view_list = spark.createDataFrame([],columns_empty)

    def _load_migration_conf(self, yaml_configuration,root,leaf):
        '''
        Load the YAML file to databricks dataframe
        Parameters:
            yaml_configuration: YAML file path
            root: root of the YAML file
            leaf: leaf of the yaml file

        Returns:
            return exclution table list databricks dataframe , viee list databricks dataframe , yaml raw output   
        '''
        logger.info(f"yaml file {yaml_configuration}")
        # Load all migration tables yaml
        with open(yaml_configuration, 'r') as file:
            config = yaml.safe_load(file)
            if(len(yaml_configuration) > 0):
                if bool(config):
                    try:
                        pdf = pd.DataFrame.from_dict(config[root][leaf],orient="index")
                        if leaf == "tables":
                            df = spark.createDataFrame(pdf).dropDuplicates()
                            self.exclusion_table_list = df
                        if leaf == "views":
                            df_v = spark.createDataFrame(pdf).dropDuplicates()
                            self.migration_view_list = df_v
                            
                    except Exception as exception:
                        logger.info("Cant find any excluded tables in the list given or not in correct format")
                        raise
            
        return self.exclusion_table_list,self.migration_view_list,config

    # -------------------------------
    #    Migrate VIEWS
    # -------------------------------
    def _view_definition_replace_table(self, view_definition:str):
        '''
            replace the view definition with migrated tables and return the new view definition
            Parameters:
                view_definition:  The existing view definition
            
            Returns:
               String new view definition
        '''
        if yaml_configuration:
            tablelist,_,_=self._load_migration_conf(yaml_configuration,"resources","tables")
            full_table_name_source = list(tablelist.withColumn("full_table_name_source", concat_ws(".",col("database"), col("name")))
                                .select("full_table_name_source")
                                .toPandas()['full_table_name_source'])
            full_table_name_destination = list(tablelist.withColumn("full_table_name_destination", concat_ws(".",col("target_schema"), col("target_table")))
                                .select("full_table_name_destination")
                                .toPandas()['full_table_name_destination'])
            for old,new in zip(full_table_name_source,full_table_name_destination):
                view_definition=view_definition.replace(old,new)      
        else:
            logger.error("Cannot find external table list")
            raise
        return view_definition

    def _migrate_view(self, full_view_name_source, catalog_destination, view_owner_to, dry_run,is_selective=False,destination_view=""):
        '''
            Migrates a view from hive metastore to the catalog destination and sets the specified owner

            Parameters:
                full_view_name_source:  The full view name to be migrated
                catalog_destination:    The catalog destination where the view will be recreated.
                view_owner_to:          The owner of the newly created view.
                dry_run:                When running in dry run mode, the view statement is only printed to the log and the view will not be created. 
                                        Note this does not validate if there are any errors in the view DDL statement.
            
            Returns:
                True:                   If the view was successfully migrated or it was run in dry run mode
                False:                  If the view migration failed
        '''

        use_hive_catalog_statement = "USE CATALOG `hive_metastore`"
        self.spark.sql(use_hive_catalog_statement)
        try:
            # Get the DDL of the view
            view_definition_statement = f"DESCRIBE EXTENDED {full_view_name_source}"
            self.statements.append(view_definition_statement)
            view_definition_ddl = self.spark.sql(view_definition_statement).where("col_name = 'View Text'").collect()
            # Only one row expected here
            for row in view_definition_ddl:
                view_definition = row['data_type']
                # Replace hive_metastore if present with the catalog destination
                view_definition = re.sub(rf"(`?hive_metastore`?)", f"`{catalog_destination}`", view_definition)
                if not destination_view:
                    full_view_name_destination = full_view_name_source.replace("hive_metastore", catalog_destination)
                else:    
                    full_view_name_destination = destination_view
                    view_definition=self._view_definition_replace_table(view_definition)
                


                use_uc_catalog_statement = f"USE CATALOG `{catalog_destination}`" # Most of the views won't have hive_metastore in the DDL, so we need to switch to the catalog destination
                view_statement = f"CREATE OR REPLACE VIEW {full_view_name_destination} AS {view_definition}"
                alter_owner_statement = f"ALTER VIEW {full_view_name_destination} OWNER TO `{view_owner_to}`" # Set new owner
                
                
                if (dry_run):
                    logger.info(use_uc_catalog_statement)
                    logger.info(view_statement)
                    logger.info(alter_owner_statement)
                    logger.info(f"{use_hive_catalog_statement}\n")
                else:

                    self.statements.append(use_uc_catalog_statement)
                    self.spark.sql(use_uc_catalog_statement)
                        
                    self.statements.append(view_statement)
                    self.spark.sql(view_statement)
                        
                    self.statements.append(alter_owner_statement)
                    self.spark.sql(alter_owner_statement)
                        
                    self.migrated_views.append(full_view_name_source)
                    logger.info(f"Successfully migrated view: {full_view_name_source}")

                    # Revert back to using hive_metastore
                    self.statements.append(use_hive_catalog_statement)
                    spark.sql(use_hive_catalog_statement)

                return True
        except Exception as exception:
            logger.error(f"Failed migrating view: {full_view_name_source} due to: {str(exception)}")
            self.failed_migration_views.append(full_view_name_source)
            # Revert back to using hive_metastore
            if not dry_run:
                self.statements.append(use_hive_catalog_statement)
                self.spark.sql(use_hive_catalog_statement)
            return False

       
    
    def _migrate_views(self, views, catalog_destination, owner_destination, dry_run,destination_views):
        '''
            Migrates all views from the from hive_metatore to the specified UC catalog

             Parameters:
                views:                  The list of full view name to be migrated
                catalog_destination:    The catalog destination where the view will be recreated.
                view_owner_to:          The owner of the newly created view.
                dry_run:                When running in dry run mode, the view statement is only printed to the log and the view will not be created. 
                                        Note this does not validate if there are any errors in the view DDL statement.
        '''
        if change_view_path_yaml:
            for view,destination_view in zip(views,destination_views):
                self._migrate_view(view,catalog_destination, owner_destination, dry_run,is_selective=True,destination_view=destination_view)
        else:
            for view in views:
                self._migrate_view(view, catalog_destination, owner_destination, dry_run)

    def _get_views_per_schema(self, schema_name):
        '''
            Get all views from the schema

            Parameters:
                schema_name:            Schema to get the views for

            Returns:
                list of complete view names (including catalog, schema, view name) existent in the hive_metastore specified schema
        '''
        show_views_statement = f"SHOW VIEWS FROM hive_metastore.{schema_name}"
        self.statements.append(show_views_statement)
        try:
            return list(self.spark.sql(show_views_statement)
                        .withColumn("full_view_name", concat_ws(".", lit('hive_metastore'), col("namespace"), col("viewName")))
                        .select("full_view_name")
                        .toPandas()['full_view_name'])
        except Exception as exception:
            logger.error(f"Cannot retrieve views for schema: {schema_name} due to: {str(exception)}")
            return list()
    
    def _get_views(self, schemas,catalog_destination):
        '''
            Returns list of views from all schemas

            Parameters:
                schemas:                List of schemas

            Returns:
                list of complete view names for all schemas
        '''
        views = []
        destination_views = []
        try:
            if change_view_path_yaml:
                _,data,_ = self._load_migration_conf(change_view_path_yaml,"resources","views")
                views.extend(list(data.withColumn("full_view_name", concat_ws(".", lit('hive_metastore'), col("source_schema"), col("source_view")))
                            .select("full_view_name")
                            .toPandas()['full_view_name']))
                destination_views.extend(list(data.withColumn("full_detination_view_name", concat_ws(".", lit(f'{catalog_destination}'), col("target_schema"), col("target_view")))
                            .select("full_detination_view_name")
                            .toPandas()['full_detination_view_name']))
                logger.info(f"Following views will be migrated {views} to {destination_views}")
            else:
                for schema_name in schemas:
                    use_hive_catalog_statement = "USE CATALOG `hive_metastore`"
                    self.spark.sql(use_hive_catalog_statement)
                    views.extend(self._get_views_per_schema(schema_name))
        except Exception as exception:
            logger.error(f"Cannot retrieve views due to: {str(exception)}")
            return list()            
        return views,destination_views
    
    # -------------------------------
    #    Check exclude list
    # -------------------------------
    def _is_excluding(self, source_schema, source_table_name):
        '''
            Return status of a table is in exclusion list

            Parameters:
                source_schema:        Schema to exclude
                source_table_name:    Table to exclude
            Returns:
                True:                 If table is in the list
                False                 If table is not in the list or List is empty
            
        '''
        df = self.exclusion_table_list
        logger.info(f"Following tables will get excluded {df}")       
        return not (df.filter( (df['schema']  == source_schema) & (df['table']  == source_table_name) ).isEmpty())

    # -------------------------------
    #    Migrate TABLES
    # -------------------------------

    def _migrate_table(self, full_table_name_source, full_table_name_destination, schema_owner_to, dry_run,is_selective=False):
        '''
            Migrates an external table using SYNC and sets the specified owner

            Parameters:
                full_table_name_source: The full table name to be migrated
                catalog_destination:    The catalog destination where the table will be synced.
                schema_owner_to:        The owner of the newly created table.
                dry_run:                When running in dry run mode, this method is not executed. SYNC in Dry RUN mode is ran at schema level prior to this method.

            Returns:
                True:                   If the migration was successful or the method was running in dry run mode
                False:                  If the migration failed
        '''
        if (not dry_run):
            try:
                sync_table_statement = f"SYNC TABLE {full_table_name_destination} FROM {full_table_name_source} SET OWNER `{schema_owner_to}`"
                self.statements.append(sync_table_statement)
                result = self.spark.sql(sync_table_statement).collect()
                # only one row expected 
                for row in result:
                    match row['status_code']:
                        case 'SUCCESS':
                            logger.info(f"Successfully migrated table: {full_table_name_source} to {full_table_name_destination}")
                            self.migrated_tables.append(full_table_name_source)
                            return True
                        case _:
                            status = row['status_code']
                            description = row['description']
                            logger.error(f"Failed migrating table: {full_table_name_source} to {full_table_name_destination} due to: {status}: {description}")
                            self.failed_migration_tables.append(f"{full_table_name_source}: error {status}, {description}")
                            return False
            except Exception as exception:
                logger.error(f"Failed migrating table: {full_table_name_source} due to: {str(exception)}")
                self.failed_migration_tables.append(full_table_name_source)
                return False
            
        elif(dry_run and is_selective):
            try:
                logger.info(f"Migrating Table: {full_table_name_source}")
                sync_table_dry_run_statement = f"SYNC TABLE {full_table_name_destination} FROM {full_table_name_source} DRY RUN"
                self.statements.append(sync_table_dry_run_statement)
                upgrade_table_dry_run = self.spark.sql(sync_table_dry_run_statement).collect()

                if (dry_run and upgrade_table_dry_run):
                    display(upgrade_table_dry_run)
                
                # Get all tables in the schema
                for row in upgrade_table_dry_run:
                    full_table_name_source = f"`hive_metastore`.`{row['source_schema']}`.`{row['source_name']}`"
                    full_table_name_destination = f"`{row['target_catalog']}`.`{row['target_schema']}`.`{row['target_name']}`"
                    status_code = row['status_code']
                    match status_code:
                        case 'DRY_RUN_SUCCESS':
                            logger.info(f"Dry run Successful on table: {full_table_name_source} to {full_table_name_destination}")
                            self.migrated_tables.append(full_table_name_source)
                        case 'VIEWS_NOT_SUPPORTED':
                            logger.debug("Views are migrated using method `_migrate_view`.")
                        case _:
                            reason_cannot_migrate = row['description']
                            self.not_eligible_for_migration.append((full_table_name_source, status_code, reason_cannot_migrate))        
            except Exception as exception:
                logger.error(f"Failed migrating table in dry_run mode {dry_run}: {full_table_name_source} due to: {str(exception)}")
                self.failed_migration_tables.append(full_table_name_source)
                return False   
        return True

    def _migrate_schema(self, schema_to_upgrade, catalog_destination, schema_owner_to, dry_run):
        '''
            Migrates a schema; note only external tables are supported using this method.

            Parameters:
                catalog_destination:    The catalog destination where the schema will be synced.
                schema_owner_to:        The owner of the newly created schema.
                dry_run:                When running in dry run mode, this migration is not done and the feasibility status is printed.
        '''
        try:
            logger.info(f"Migrating schema: {schema_to_upgrade}")
            sync_schema_dry_run_statement = f"SYNC SCHEMA `{catalog_destination}`.`{schema_to_upgrade}` FROM `hive_metastore`.`{schema_to_upgrade}` DRY RUN"
            self.statements.append(sync_schema_dry_run_statement)
            upgrade_schema_dry_run = self.spark.sql(sync_schema_dry_run_statement).collect()

            if (dry_run and upgrade_schema_dry_run):
                display(upgrade_schema_dry_run)
            
            # Get all tables in the schema
            for row in upgrade_schema_dry_run:
                full_table_name_source = f"`hive_metastore`.`{row['source_schema']}`.`{row['source_name']}`"
                full_table_name_destination = f"`{row['target_catalog']}`.`{row['target_schema']}`.`{row['target_name']}`"
                status_code = row['status_code']
                if not(self._is_excluding(source_schema=row['source_schema'],source_table_name=row['source_name'])):
                    match status_code:
                        case 'DRY_RUN_SUCCESS':
                            self._migrate_table(full_table_name_source, full_table_name_destination, schema_owner_to, dry_run)
                        case 'VIEWS_NOT_SUPPORTED':
                            logger.debug("Views are migrated using method `_migrate_view`.")
                        case _:
                            reason_cannot_migrate = row['description']
                            self.not_eligible_for_migration.append((full_table_name_source, status_code, reason_cannot_migrate))
                else:
                    logger.info(f"Excluding table {full_table_name_source}")           
        except Exception as exception:
            logger.error(f"Failed migrating schema in dry_run mode {dry_run}: {catalog_destination}.{schema_to_upgrade} due to: {str(exception)}")
    
    def _migrate_schema_selective(self, catalog_destination, schema_owner_to, dry_run, yaml_configuration):
        '''
            Migrates a schema; note only external tables are supported using this method and it used the migration.yml for selective updates

            Parameters:
                catalog_destination:    The catalog destination where the schema will be synced.
                schema_owner_to:        The owner of the newly created schema.
                dry_run:                When running in dry run mode, this migration is not done and the feasibility status is printed.
                yaml_configuration:     The yaml configuration for the selective sync for the tables with customizations.
        '''
        try:
            logger.info(f"Migrating schemas & tables selectively")
            # Load the configuration file
        
            _,_,config = self._load_migration_conf(yaml_configuration,"resources","tables")

            # Loop through each table and run the SYNC upgrades
            for table in config['resources']['tables']:
                managed_table_obj = config['resources']['tables'][table]
                database_name = managed_table_obj['database']
                managed_table = managed_table_obj['name']
                target_schema = managed_table_obj['target_schema']
                target_table = managed_table_obj['target_table']
                is_pii = managed_table_obj['is_pii']
                full_table_name_source = f"`hive_metastore`.`{database_name}`.`{managed_table}`"
                full_table_name_destination = f"`{catalog_destination}`.`{target_schema}`.`{managed_table}`"
                self._migrate_table(full_table_name_source, full_table_name_destination, schema_owner_to, dry_run,is_selective=True)
                if managed_table != target_table and dry_run == False:
                    spark.sql(f"ALTER TABLE {full_table_name_destination} RENAME TO `{catalog_destination}`.`{target_schema}`.`{target_table}`")
                    logger.info(f"Sucessfully renamed table {full_table_name_destination} to `{catalog_destination}`.`{target_schema}`.`{target_table}`")


        except Exception as exception:
            logger.error(f"Failed migrating schemas selectively in dry_run mode {dry_run}: {catalog_destination} due to: {str(exception)}")

    def _migrate_schemas(self, schemas_to_upgrade, catalog_destination, schema_owner_to, dry_run, yaml_configuration):
        '''
            Migrates all schemas specifies from hive_metastore to UC specified schema

            Parameters:
                catalog_destination:    The catalog destination where the schema will be synced.
                schemas_to_upgrade:     List of schemas to upgrade
                schema_owner_to:        The owner of the newly created schema.
                dry_run:                When running in dry run mode, this migration is not done and the feasibility status is printed.
        '''
        if(len(yaml_configuration) > 0):
            self._migrate_schema_selective(catalog_destination, schema_owner_to, dry_run, yaml_configuration)
        else:
            for schema_name in schemas_to_upgrade:
                self._migrate_schema(schema_name, catalog_destination, schema_owner_to, dry_run)

    def _get_schemas(self, schemas_to_migrate, exclude_schemas):
        '''
            Gets all schemas

            Parameters:
                schemas_to_migrate:     Input parameters from the wigdets: 'all', list of schemas specified by comma, individual schema names
            
            Returns:
                schemas from hive_metastore based on the input parameters
        '''
        if (schemas_to_migrate == "all"):
            # Get all schemas in the hive_metastore
            schemas = list(self.spark.sql("SHOW schemas IN hive_metastore").toPandas()['databaseName'])
        else:
            # Get all schemas separated by comma, trim white spaces
            schemas = schemas_to_migrate.replace(' ','').split(',')
        if (exclude_schemas != ''):
            exclude_schemas_list = exclude_schemas.replace(' ','').split(',')
            result = list (set(schemas) - set(exclude_schemas_list))
        else:
            result = schemas
        return result
    
    def migrate_catalog(self, migration_scope, schemas_to_migrate, catalog_destination, schema_owner_to, exclude_schemas, dry_run=True, yaml_configuration = ""):
        '''
            Entry point for migrating the catalog. Migrates a catalog, only including external tables and views.

            Parameters:
                migration_scope:        What will be migrated: external-tables, views, all (all includes external tables & views)
                schemas_to_migrate:     Which schemas will be migrated: specify list of comma separated schemas or 'all' (includes all schemas from hive_metastore) 
                catalog_destination:    The catalog destination where the catalog will be synced. Note this needs to exist before running this script.
                schema_owner_to:        The owner of the newly created schema.
                dry_run:                When running in dry run mode, this migration is not done and the feasibility status of migrating tables is printed. Note views are not included in the dry run.
        '''
        EmptyStatus,_,_ = self._load_migration_conf(yaml_configuration,"resources","tables")
        logger.info(f"Following tables will be ignored {EmptyStatus.show(truncate=False)}")
            

        schemas = self._get_schemas(schemas_to_migrate, exclude_schemas) 

        match migration_scope:
            case 'external-tables':
                self._migrate_schemas(schemas, catalog_destination, schema_owner_to, dry_run, yaml_configuration)
            case 'views':
                views,destination_views = self._get_views(schemas,catalog_destination)
                self._migrate_views(views, catalog_destination, schema_owner_to, dry_run,destination_views)
            case 'all':
                views,destination_views = self._get_views(schemas,catalog_destination)
                self._migrate_schemas(schemas, catalog_destination, schema_owner_to, dry_run, yaml_configuration)
                self._migrate_views(views, catalog_destination, schema_owner_to, dry_run,destination_views)
            case _:
                raise ValueError(f"Value '{migration_scope}' not recognized for migration scope. Please choose from: external-tables, views, all.")

    def printMigrationStats(self):
        '''
            Prints migration statistics.
        '''
        logger.info("MIGRATION RESULTS")
        logger.info("--------------------")
        logger.info(f"NOT ELIGIBLE FOR MIGRATION: {len(self.not_eligible_for_migration)}")
        if (len(self.not_eligible_for_migration) > 0):
            all_not_eligigle = self.not_eligible_for_migration

            dbfs_root_non_default = [x for x in all_not_eligigle if x[1] == 'DBFS_ROOT_LOCATION' and "default" not in x[0]]
            logger.info(f"MANAGED TABLES not in 'default' schema: {len(dbfs_root_non_default)}")
            if (len(dbfs_root_non_default) > 0):
                display(dbfs_root_non_default)

            non_dbfs_root = [x for x in all_not_eligigle if x[1] != 'DBFS_ROOT_LOCATION']
            logger.info(f"NON MANAGED TABLES: {len(non_dbfs_root)}")
            if (len(non_dbfs_root) > 0):
                display(non_dbfs_root)
        logger.info(f"SUCCESSFULLY MIGRATED TABLES: {len(self.migrated_tables)}")
        logger.info(f"FAILED MIGRATION TABLES: {len(self.failed_migration_tables)}")
        [logger.info(item) for item in self.failed_migration_tables]
        logger.info(f"SUCCESSFULLY MIGRATED VIEWS: {len(self.migrated_views)}")
        logger.info(f"FAILED MIGRATION VIEWS: {len(self.failed_migration_views)}")
        [logger.info(item) for item in self.failed_migration_views]    

### Run the script

In [0]:
migration = CatalogMigration(spark)
migration.migrate_catalog(migration_scope, schemas_to_migrate, catalog_destination, owner_destination, exclude_schemas, dry_run, yaml_configuration)

In [0]:
migration.printMigrationStats()