In [None]:
dbutils.widgets.text('catalog', 'main', '01 Catalog')
dbutils.widgets.text('schema', 'chris_koester', '02 Schema')
dbutils.widgets.text('src_table', 'partitioned_queries_src', '03 Source Table')
dbutils.widgets.text('tgt_table', 'partitioned_queries_tgt', '04 Target Table')

In [None]:
from jdbc_bulk_ingest import main

In [None]:
catalog = dbutils.widgets.get('catalog')
schema = dbutils.widgets.get('schema')
src_table = dbutils.widgets.get('src_table')
tgt_table = dbutils.widgets.get('tgt_table')

spark.sql(f'use catalog {catalog}')
spark.sql(f'create schema if not exists {schema}')

In [None]:
# Delta is used for this demo notebook.
# Ingestion from a federated table / JDBC is the intended use case.

# Get lower and upper bound values
lower_bound = spark.sql(f'select min(customer_id) from {catalog}.{schema}.{src_table}').collect()[0][0]
print(f'lower_bound: {lower_bound}')
upper_bound = spark.sql(f'select max(customer_id) from {catalog}.{schema}.{src_table}').collect()[0][0]
print(f'upper_bound: {upper_bound}')

# Calculate number of 100 MiB partitions
# Use function and calculation that is appropriate for the source database system
table_size_in_bytes = spark.sql(f'desc detail {catalog}.{schema}.{src_table}').collect()[0]['sizeInBytes']
numPartitions = table_size_in_bytes / 1048576 / 100

# Force minimum parallelism for tables smaller than specified partition size
numPartitions = 2 if numPartitions < 1 else numPartitions 
print(f'Number of partitions: {int(numPartitions)}')

In [None]:
partition_queries = main.generate_partition_queries(catalog, schema, src_table, 'customer_id', lower_bound, upper_bound, numPartitions)

for i in partition_queries:
    i['full_query'] = f"insert into {catalog}.{schema}.partitioned_queries_tgt {i['src_query']}"

for i in partition_queries:
    print(i['full_query'])

# Assign query list to job task value. The list can be iterated in a subsequent task.
dbutils.jobs.taskValues.set(key = "partition_queries", value = partition_queries)

In [None]:
create_tgt_qry = f"""create or replace table {catalog}.{schema}.{tgt_table} (
  customer_id BIGINT,
  name STRING,
  alias STRING,
  payment_instrument_type STRING,
  payment_instrument STRING,
  email STRING,
  email2 STRING,
  ip_address STRING,
  md5_payment_instrument STRING,
  customer_notes STRING,
  created_ts TIMESTAMP,
  modified_ts TIMESTAMP,
  memo STRING)"""

spark.sql(create_tgt_qry)