In [0]:
from functions.utility import create_table_if_not_exists

def migrate_history(schema, spark):
    tables = spark.sql(f'SHOW TABLES IN {schema}').filter('isTemporary = false')
    table_names = [row.tableName for row in tables.collect()]
    file_map = {}
    trans_map = {}
    for name in table_names:
        if name.endswith('_file_version_history'):
            base = name[:-len('_file_version_history')]
            file_map[base] = name
        elif name.endswith('_transaction_history'):
            base = name[:-len('_transaction_history')]
            trans_map[base] = name
    for base in sorted(set(file_map) & set(trans_map)):
        file_tbl = f'{schema}.{file_map[base]}'
        trans_tbl = f'{schema}.{trans_map[base]}'
        new_tbl = f'{schema}.{base}_file_ingestion_history'
        print(f'Merging {file_tbl} and {trans_tbl} into {new_tbl}')
        try:
            file_df = spark.table(file_tbl).selectExpr('version', 'explode(file_path) as file_path')
            trans_df = spark.table(trans_tbl)
            join_col = 'version' if 'version' in trans_df.columns else 'primary_key'
            joined = file_df.join(trans_df, join_col).select('file_path', *trans_df.columns)
            create_table_if_not_exists(joined, new_tbl, spark)
            joined.createOrReplaceTempView('df')
            spark.sql(f'''
merge into {new_tbl} as t
using df as s
  on t.file_path = s.file_path and t.version = s.version
when matched then update set *
when not matched then insert *''')
            spark.sql(f'drop table if exists {file_tbl}')
            spark.sql(f'drop table if exists {trans_tbl}')
            print(f'Successfully migrated {base}')
        except Exception as e:
            print(f'Failed to migrate {base}: {e}')

migrate_history('edsm.history', spark)
