# Set variables

Please set variables required to execute the script and to clone relevant tables to the backup storage account.

In [0]:
# Script mode - 'sync' or 'failover'
script_mode = 'sync'

# Name of the backup storage account where files will be saved
storage_account_name = '<adls_storage_account_name>'

# Name of the container where cloned delta table's files will be stored
container_name = '<container_name>'

# List of databases to clone tables from
databases_to_clone = [] # List of strings

# List of single tables to clone
tables_to_clone = [] # If provided, the function will clone only these tables

# Databricks secret scope's name
secret_scope = 'default'

# ADLS connection string secret
adls_conn_str = 'ADLS-CONN-STR'

# Run the script

### Init connection to ADLS

In [0]:
# Default secret scope for all AIF DBR workspaces is called `default`. If you are using different scope, please change the variable above. The same applies to connection string stored in secrets. Default name is `ADLS-CONN-STR` and it can be edited in variables section above.

spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
  dbutils.secrets.get(scope=secret_scope, key=adls_conn_str)
)

### Define functions

In [0]:
from pyspark.sql import functions as F


def get_tables_to_clone(databases_to_clone: list = None, tables_to_clone: list = None) -> list:
    """  
    Returns a list of existing tables from all or selected databases in a Spark environment
    or single tables provided in the variables section.
  
    Args:  
        databases_to_clone (list): List of database names from which to retrieve tables.
          If not provided, the function retrieves tables from all databases.
        tables_to_clone (list): List of tables to clone. 
            If provided, rest of tables from DB will be skipped.
  
    Returns:  
        list: List of tables with their databases.
          Each table is represented as a string in the format 'databaseName.tableName'.  
  
    Example:  
        >>> databases_to_clone = ['db1', 'db2']  
        >>> get_tables_to_clone(databases_to_clone)  
        ['db1.table1', 'db1.table2', 'db2.table1', 'db2.table2']  
    """
    if databases_to_clone:  
        databases = databases_to_clone  
    else:  
        databases = [  
            db.databaseName  
            for db in spark.sql('show databases').collect()  
            if not db.databaseName.endswith('_clone')  
        ]  
  
    tables = []  
    for db in databases:  
        df = spark.sql(f"SHOW TABLES IN {db}")  
        df_filtered = df.filter(~df.isTemporary & df.tableName.isin(tables_to_clone if tables_to_clone else [])).select(  
            F.concat(df.database, F.lit("."), df.tableName).alias('combined')  
        )  
  
        tables_in_db = df_filtered.toPandas()['combined'].tolist()  
        tables.extend(tables_in_db)  
  
    return tables  


def clone_tables(adls_path: str, tables_to_clone: list, script_mode: str) -> None:  
    """  
    Clones tables from one database to another in a spark environment.  
      
    The function iterates over a list of tables, creates a new database if it does not exist, and then clones each table to the new database.
  
    Args:  
        adls_path (str): The path where the cloned tables will be stored.  
        tables_to_clone (list): List of tables to clone. Each table is represented as a string in the format 'databaseName.tableName'.  
        script_mode (str): The mode of operation, 'sync' for cloning to a new database, 'failover' for cloning to the original database.  
  
    Returns:  
        None
    """  
    if not isinstance(adls_path, str) or not isinstance(tables_to_clone, list) or not isinstance(script_mode, str):  
        raise ValueError("Invalid input parameters")  
          
    for table in tables_to_clone:  
        try:  
            db_name, table_name = table.split(".")  
            db_name_to_create = f"{db_name}_clone" if script_mode == 'sync' else db_name.split("_clone")[0]  
            table_path = f"{adls_path}{db_name_to_create}.{table_name}"  

            print(f"Creating database `{db_name_to_create}`...")  
            sql(f"CREATE DATABASE IF NOT EXISTS {db_name_to_create}")  

            print(f"Creating table `{db_name_to_create}.{table_name}` based on `{db_name}.{table_name}`...")  
            sql(  
                f"""  
                CREATE OR REPLACE TABLE {db_name_to_create}.{table_name}  
                DEEP CLONE {db_name}.{table_name}  
                LOCATION '{table_path}';  
                """  
            )  
        except Exception as e:  
            print(f"Error while cloning table `{table}`: {str(e)}")  
            continue  
    print("All tables cloned successfully.")

### Clone tables

In [0]:
# Path to relevant container in storage account
adls_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

# List of tables to clone
tables_to_clone = get_tables_to_clone(databases_to_clone=databases_to_clone, tables_to_clone=tables_to_clone)

# Execute cloning process
clone_tables(adls_path=adls_path, tables_to_clone=tables_to_clone, script_mode=script_mode)


Creating database `delta_base_2_clone`...
Creating table `delta_base_2_clone.table_x` based on `delta_base_2.table_x`...
All tables cloned successfully.
