In [None]:
dbutils.widgets.dropdown(
    "src_type",
    "sqlserver",
    ["sqlserver", "postgresql", "redshift", "synapse", "delta"],
    "01 Source Type",
)
dbutils.widgets.text("src_catalog", "", "02 Source Catalog")
dbutils.widgets.text("src_schema", "", "03 Source Schema")
dbutils.widgets.text("src_table", "", "04 Source Table")
dbutils.widgets.text("root_dir", "", "05 Root directory for project files")
dbutils.widgets.text('tgt_ddl_file_path', '', '06 Create target DDL file path')
dbutils.widgets.text("jdbc_config_file", "", "06 Source Table")
dbutils.widgets.text("partition_col", "", "07 Source Partition Column")
dbutils.widgets.text("partition_size_mb", "", "08 Partition Size MB")
dbutils.widgets.text("tgt_catalog", "", "09 Target Catalog")
dbutils.widgets.text("tgt_schema", "", "10 Target Schema")
dbutils.widgets.text("tgt_table", "", "11 Target Table")

In [None]:
from lakefed_ingest.main import *

In [None]:
src_type = dbutils.widgets.get('src_type')
src_catalog = dbutils.widgets.get('src_catalog')
src_schema = dbutils.widgets.get('src_schema')
src_table = dbutils.widgets.get('src_table')
root_dir = dbutils.widgets.get('root_dir')
file_path = dbutils.widgets.get('tgt_ddl_file_path')
jdbc_config_file = dbutils.widgets.get('jdbc_config_file')
partition_col = dbutils.widgets.get('partition_col')
partition_size_mb = int(dbutils.widgets.get('partition_size_mb'))
tgt_catalog = dbutils.widgets.get('tgt_catalog')
tgt_schema = dbutils.widgets.get('tgt_schema')
tgt_table = dbutils.widgets.get('tgt_table')


jdbc_config_file = None if jdbc_config_file == '' else jdbc_config_file

In [None]:
table_size_mb = 0

if src_type == 'sqlserver':
    table_size_mb = get_table_size_sqlserver(src_catalog, src_schema, src_table)
elif src_type == 'postgresql':
    table_size_mb = get_table_size_postgresql(src_catalog, src_schema, src_table, root_dir, jdbc_config_file)
elif src_type == 'redshift':
    table_size_mb = get_table_size_redshift(src_catalog, src_schema, src_table)
elif src_type == 'synapse':
    table_size_mb = get_table_size_synapse(src_catalog, src_schema, src_table)
elif src_type == 'delta':
    table_size_mb = get_table_size_delta(src_catalog, src_schema, src_table)
else:
    raise ValueError(f'Unsupported src_type: {src_type}')

print(f'Table size MB: {table_size_mb}')

In [None]:
lower_bound, upper_bound = get_partition_boundaries(src_catalog, src_schema, src_table, partition_col)

print(f'Upper and lower bound: {lower_bound}, {upper_bound}')

In [None]:
# Calculate number of partitions. Minimum is 2.
num_partitions = int(table_size_mb / partition_size_mb)
num_partitions = max(num_partitions, 2)

print(f'Number of partitions: {num_partitions}')

In [None]:
# Generate partition list
partition_list = get_partition_list(
    partition_col,
    lower_bound,
    upper_bound,
    num_partitions
)

# TODO set partitions table suffix at job level
partitions_tbl = f'{tgt_catalog}.{tgt_schema}.{tgt_table}_partitions'

# Write partitions to table
partition_df = get_partition_df(partition_list, num_partitions)
partition_df.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable(partitions_tbl)

In [None]:
# Get distinct list of batch_ids
batch_list_qry = f"""\
    select array_agg(distinct batch_id) as batch_list
    from {partitions_tbl}
"""

batch_list = spark.sql(batch_list_qry).collect()[0][0]
batch_list.sort()
print(f'batch_list: {batch_list}')

# Assign batch list to job task value. The lvl1 job iterates over the batches.
dbutils.jobs.taskValues.set(key="batch_list", value=batch_list)

In [None]:
sql_ddl = get_sql_ddl(
    catalog=tgt_catalog,
    schema=tgt_schema,
    table=tgt_table,
    cluster_col=partition_col,
    root_dir=root_dir,
    file_path=file_path,
)

print(f'Create target table DDL statement:\n{sql_ddl}')

spark.sql(sql_ddl)