Skip to content

Commit

Permalink
add post subset sql
Browse files Browse the repository at this point in the history
  • Loading branch information
acolombi committed Aug 7, 2019
1 parent 24f8aa2 commit 9ee9b45
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 17 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ Below we describe the use of all configuration parameters, but the best place to

`fk_augmentation`: Additional foreign keys that, while not represented as constraints in the database, are logically present in the data. Foreign keys listed in `fk_augmentation` are unioned with the foreign keys provided by constraints in the database. `fk_augmentation` is useful when there are foreign keys existing in the data, but not represented in the database. The value is a JSON array of JSON objects. See `example-config.json` for details.

`dependency_breaks`: An array containg a JSON object with *"fk_table"* and *"target_table"* fields of table relationships to be ignored in order to break cycles
`dependency_breaks`: An array containing JSON objects with *"fk_table"* and *"target_table"* fields of table relationships to be ignored in order to break cycles

`keep_disconnected_tables`: If `true` tables that the subset target(s) don't reach, when following foreign keys, will be copied 100% over. If it's `false` then their schema will be copied but the table contents will be empty. Put more mathematically, the tables and foreign keys create a graph (where tables are nodes and foreign keys are directed edges) disconnected tables are the tables in components that don't contain any targets. This setting decides how to import those tables.

`post_subset_sql`: An array of SQL commands that will be issued on the destination database after subsetting is complete. Useful to perform additional adhoc tasks after subsetting.

# Running

Almost all the configuration is in the `config.json` file, so running is as simple as
Expand Down
3 changes: 3 additions & 0 deletions config_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def get_fk_augmentation():
def get_upstream_filters():
return _config["upstream_filters"]

def get_post_subset_sql():
return _config["post_subset_sql"] if "post_subset_sql" in _config else []

def __convert_tonic_format(obj):
if "fk_schema" in obj:
return {
Expand Down
19 changes: 16 additions & 3 deletions direct_subset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import uuid, sys
import config_reader, result_tabulator
import time
from subset import Subset
from fast_subset import FastSubset
from psql_database_creator import PsqlDatabaseCreator
from mysql_database_creator import MySqlDatabaseCreator
from db_connect import DbConnect
from subset_utils import print_progress
import database_helper

def db_creator(db_type, source, dest):
Expand All @@ -29,12 +32,15 @@ def db_creator(db_type, source, dest):
database.teardown()
database.create()


# Get list of tables to operate on
all_tables = database_helper.get_specific_helper().list_all_tables(source_dbc)
db_helper = database_helper.get_specific_helper()
all_tables = db_helper.list_all_tables(source_dbc)
all_tables = [x for x in all_tables if x not in config_reader.get_excluded_tables()]

subsetter = Subset(source_dbc, destination_dbc, all_tables)
if "--fast" in sys.argv:
subsetter = FastSubset(source_dbc, destination_dbc, all_tables)
else:
subsetter = Subset(source_dbc, destination_dbc, all_tables)

try:
subsetter.prep_temp_dbs()
Expand All @@ -43,6 +49,13 @@ def db_creator(db_type, source, dest):
if "--no-constraints" not in sys.argv:
database.add_constraints()

print("Beginning post subset SQL calls")
start_time = time.time()
for idx, sql in enumerate(config_reader.get_post_subset_sql()):
print_progress(sql, idx+1, len(config_reader.get_post_subset_sql()))
db_helper.run_query(sql, destination_dbc.get_db_connection())
print("Completed post subset SQL calls in {}s".format(time.time()-start_time))

result_tabulator.tabulate(source_dbc, destination_dbc, all_tables)
finally:
subsetter.unprep_temp_dbs()
Expand Down
3 changes: 2 additions & 1 deletion example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@
"target_table": "public.user",
"target_columns": ["id"]
}
]
],
"post_subset_sql": ["UPDATE a_table SET a_column = 'value'"]
}
7 changes: 1 addition & 6 deletions subset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from topo_orderer import get_topological_order_by_tables
from subset_utils import UnionFind, schema_name, table_name, find, compute_disconnected_tables, compute_downstream_tables, compute_upstream_tables, columns_joined, columns_tupled, columns_to_copy, quoter, fully_qualified_table, print_progress, print_progress_complete, mysql_db_name_hack, upstream_filter_match, redact_relationships
from subset_utils import UnionFind, schema_name, table_name, find, compute_disconnected_tables, compute_downstream_tables, compute_upstream_tables, columns_joined, columns_tupled, columns_to_copy, quoter, fully_qualified_table, print_progress, mysql_db_name_hack, upstream_filter_match, redact_relationships
import database_helper
import config_reader
import shutil, os, uuid, time, itertools
Expand Down Expand Up @@ -53,7 +53,6 @@ def run_middle_out(self):
print_progress(target, idx+1, len(config_reader.get_initial_targets()))
self.__subset_direct(target, relationships)
processed_tables.add(target['table'])
print_progress_complete(len(config_reader.get_initial_targets()))
print('Direct target tables completed in {}s'.format(time.time()-start_time))

# greedily grab rows with foreign keys to rows in the target strata
Expand All @@ -65,7 +64,6 @@ def run_middle_out(self):
data_added = self.__subset_upstream(t, processed_tables, relationships)
if data_added:
processed_tables.add(t)
print_progress_complete(len(upstream_tables))
print('Greedy subsettings completed in {}s'.format(time.time()-start_time))

# process pass-through tables, you need this before subset_downstream, so you can get all required downstream rows
Expand All @@ -75,7 +73,6 @@ def run_middle_out(self):
print_progress(t, idx+1, len(passthrough_tables))
q = 'SELECT * FROM {}'.format(fully_qualified_table(t))
self.__db_helper.copy_rows(self.__source_conn, self.__destination_conn, q, mysql_db_name_hack(t, self.__destination_conn))
print_progress_complete(len(passthrough_tables))
print('Pass-through completed in {}s'.format(time.time()-start_time))

# use subset_downstream to get all supporting rows according to existing needs
Expand All @@ -85,7 +82,6 @@ def run_middle_out(self):
for idx, t in enumerate(downstream_tables):
print_progress(t, idx+1, len(downstream_tables))
self.subset_downstream(t, relationships)
print_progress_complete(len(downstream_tables))
print('Downstream subsetting completed in {}s'.format(time.time()-start_time))

if config_reader.keep_disconnected_tables():
Expand All @@ -96,7 +92,6 @@ def run_middle_out(self):
print_progress(t, idx+1, len(disconnected_tables))
q = 'SELECT * FROM {}'.format(fully_qualified_table(t))
self.__db_helper.copy_rows(self.__source_conn, self.__destination_conn, q, mysql_db_name_hack(t, self.__destination_conn))
print_progress_complete(len(disconnected_tables))
print('Disconnected tables completed in {}s'.format(time.time()-start_time))

def prep_temp_dbs(self):
Expand Down
7 changes: 1 addition & 6 deletions subset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,7 @@ def quoter(id):
return q + id + q

def print_progress(target, idx, count):
end = '\n' if config_reader.verbose_logging() else ''
print('\x1b[2K\rProcessing {} of {}: {}'.format(idx, count, target), end=end)

def print_progress_complete(count):
if count > 0:
print('')
print('Processing {} of {}: {}'.format(idx, count, target))

class UnionFind:

Expand Down

0 comments on commit 9ee9b45

Please sign in to comment.