In [None]:
import os
import zipfile
import xml.etree.ElementTree as ET

In [None]:
ACCOUNT_NAME = '<ACCOUNT_NAME>'
USER_NAME = '<USER_NAME>'
DB_NAME = '<DB_NAME>'
WAREHOUSE = '<WAREHOUSE>'
SCHEMA = '<SCHEMA>'
ROLE_NAME = '<ROLE_NAME>'

In [None]:
packaged_workbook_path = os.path.abspath('My Workbook.twbx')
file_ext = f'.{packaged_workbook_path.split(".")[-1]}'
extract_dir = os.path.abspath(f'snowflake_migration_{os.path.basename(packaged_workbook_path).strip(file_ext)}/')
packaged_workbook_path, extract_dir

First, unpack your packaged (*.tbwx) workbook by renaming and unzipping the file:

In [2]:
def unpack(packaged_workbook_path, extract_dir):
    
    packaged_workbook_path = os.path.abspath(packaged_workbook_path)
    file_ext = f'.{packaged_workbook_path.split(".")[-1]}'
    zip_file_path = packaged_workbook_path.replace(file_ext, '.zip')
    
    if not os.path.exists(packaged_workbook_path):
        raise Exception(f'File {packaged_workbook_path} does not exist')
    
    os.rename(packaged_workbook_path, zip_file_path)
    print(f'Renamed {packaged_workbook_path} to {zip_file_path}')
    
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
    
    print(f'Unzipped {zip_file_path} to {extract_dir}')
    
    file_ext_wb = file_ext.strip('x')
    
    file_path_wb = file_path.replace(file_ext, file_ext_wb)
    file_path_wb = os.path.join(extract_dir, os.path.basename(file_path_wb))
    
    return file_path_wb

In [None]:
file_path_wb = unpack(file_path, extract_dir)

Then we parse the xml tree of the unpacked workbook to find the named connections and relation section and substitute the relevant Redshift sections with our Snowflake equivalents:

In [None]:
def migrate_to_snowflake(workbook_book_path):
    
    tree = ET.parse(workbook_book_path)
    root = tree.getroot()
    
    for ds in root.findall('*datasource'):
    
        for nm in ds.find('*named-connections'):
    
            print(nm.tag, nm.get('name'), nm.get('caption'))
            nm.set('caption', f'{ACCOUNT_NAME}.snowflakecomputing.com')
            nm.set('name', nm.get('name').replace('redshift', 'snowflake'))
            print('>>', nm.tag, nm.get('name'), nm.get('caption'))

            for cn in nm.getchildren():
                print('\t', cn.tag, cn.get('class'), cn.get('server'))
                cn.set('class', 'snowflake')
                cn.set('schema', f'{SCHEMA}')
                cn.set('dbname', f'{DB_NAME}')
                cn.set('server', f'{ACCOUNT_NAME}.snowflakecomputing.com')
                cn.set('service', f'{ROLE_NAME}')
                cn.set('username', f'{USER_NAME}')
                cn.set('warehouse', f'{WAREHOUSE}')
                cn.set('port', '')
                print('\t>>', cn.tag, cn.get('class'), cn.get('server'))
                print('\n')

        for rel in ds.iter('relation'):
            if rel.get('connection') is not None:
                print(rel.tag, rel.get('connection'))
                rel.set('connection', rel.get('connection').replace('redshift', 'snowflake'))
                print('>>', rel.tag, rel.get('connection'))
                print('\n')
    
    return tree

In [None]:
tree = migrate_to_snowflake(file_path_wb)

Lastly, save the updated XML to a new file:

In [None]:
def save_migrated_workbook(tree, file_path_wb):
    file_ext_wb = f'.{file_path_wb.split(".")[-1]}'
    new_path = file_path_wb.replace(file_ext_wb, f'_converted{file_ext_wb}')
    if not os.path.exists(new_path):
        tree.write(new_path)
        print(f'Migrated {file_path_wb} to {new_path}')
    else:
        print(f'{new_path} already exists!')

In [None]:
save_migrated_workbook(tree, file_path_wb)