From 9ee9b4511eab98e9687006358a021b99688be9c6 Mon Sep 17 00:00:00 2001 From: Andrew Colombi Date: Tue, 6 Aug 2019 17:32:20 -0700 Subject: [PATCH] add post subset sql --- README.md | 4 +++- config_reader.py | 3 +++ direct_subset.py | 19 ++++++++++++++++--- example-config.json | 3 ++- subset.py | 7 +------ subset_utils.py | 7 +------ 6 files changed, 26 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 7b16d29..9bf02cf 100644 --- a/README.md +++ b/README.md @@ -64,10 +64,12 @@ Below we describe the use of all configuration parameters, but the best place to `fk_augmentation`: Additional foreign keys that, while not represented as constraints in the database, are logically present in the data. Foreign keys listed in `fk_augmentation` are unioned with the foreign keys provided by constraints in the database. `fk_augmentation` is useful when there are foreign keys existing in the data, but not represented in the database. The value is a JSON array of JSON objects. See `example-config.json` for details. -`dependency_breaks`: An array containg a JSON object with *"fk_table"* and *"target_table"* fields of table relationships to be ignored in order to break cycles +`dependency_breaks`: An array containing JSON objects with *"fk_table"* and *"target_table"* fields of table relationships to be ignored in order to break cycles `keep_disconnected_tables`: If `true` tables that the subset target(s) don't reach, when following foreign keys, will be copied 100% over. If it's `false` then their schema will be copied but the table contents will be empty. Put more mathematically, the tables and foreign keys create a graph (where tables are nodes and foreign keys are directed edges) disconnected tables are the tables in components that don't contain any targets. This setting decides how to import those tables. +`post_subset_sql`: An array of SQL commands that will be issued on the destination database after subsetting is complete. Useful to perform additional adhoc tasks after subsetting. + # Running Almost all the configuration is in the `config.json` file, so running is as simple as diff --git a/config_reader.py b/config_reader.py index 286e6e2..2f07bdc 100644 --- a/config_reader.py +++ b/config_reader.py @@ -53,6 +53,9 @@ def get_fk_augmentation(): def get_upstream_filters(): return _config["upstream_filters"] +def get_post_subset_sql(): + return _config["post_subset_sql"] if "post_subset_sql" in _config else [] + def __convert_tonic_format(obj): if "fk_schema" in obj: return { diff --git a/direct_subset.py b/direct_subset.py index 63feb2b..89f180a 100644 --- a/direct_subset.py +++ b/direct_subset.py @@ -1,9 +1,12 @@ import uuid, sys import config_reader, result_tabulator +import time from subset import Subset +from fast_subset import FastSubset from psql_database_creator import PsqlDatabaseCreator from mysql_database_creator import MySqlDatabaseCreator from db_connect import DbConnect +from subset_utils import print_progress import database_helper def db_creator(db_type, source, dest): @@ -29,12 +32,15 @@ def db_creator(db_type, source, dest): database.teardown() database.create() - # Get list of tables to operate on - all_tables = database_helper.get_specific_helper().list_all_tables(source_dbc) + db_helper = database_helper.get_specific_helper() + all_tables = db_helper.list_all_tables(source_dbc) all_tables = [x for x in all_tables if x not in config_reader.get_excluded_tables()] - subsetter = Subset(source_dbc, destination_dbc, all_tables) + if "--fast" in sys.argv: + subsetter = FastSubset(source_dbc, destination_dbc, all_tables) + else: + subsetter = Subset(source_dbc, destination_dbc, all_tables) try: subsetter.prep_temp_dbs() @@ -43,6 +49,13 @@ def db_creator(db_type, source, dest): if "--no-constraints" not in sys.argv: database.add_constraints() + print("Beginning post subset SQL calls") + start_time = time.time() + for idx, sql in enumerate(config_reader.get_post_subset_sql()): + print_progress(sql, idx+1, len(config_reader.get_post_subset_sql())) + db_helper.run_query(sql, destination_dbc.get_db_connection()) + print("Completed post subset SQL calls in {}s".format(time.time()-start_time)) + result_tabulator.tabulate(source_dbc, destination_dbc, all_tables) finally: subsetter.unprep_temp_dbs() diff --git a/example-config.json b/example-config.json index bc1a3b7..15241d2 100644 --- a/example-config.json +++ b/example-config.json @@ -50,5 +50,6 @@ "target_table": "public.user", "target_columns": ["id"] } - ] + ], + "post_subset_sql": ["UPDATE a_table SET a_column = 'value'"] } diff --git a/subset.py b/subset.py index 057c914..61712d7 100644 --- a/subset.py +++ b/subset.py @@ -1,5 +1,5 @@ from topo_orderer import get_topological_order_by_tables -from subset_utils import UnionFind, schema_name, table_name, find, compute_disconnected_tables, compute_downstream_tables, compute_upstream_tables, columns_joined, columns_tupled, columns_to_copy, quoter, fully_qualified_table, print_progress, print_progress_complete, mysql_db_name_hack, upstream_filter_match, redact_relationships +from subset_utils import UnionFind, schema_name, table_name, find, compute_disconnected_tables, compute_downstream_tables, compute_upstream_tables, columns_joined, columns_tupled, columns_to_copy, quoter, fully_qualified_table, print_progress, mysql_db_name_hack, upstream_filter_match, redact_relationships import database_helper import config_reader import shutil, os, uuid, time, itertools @@ -53,7 +53,6 @@ def run_middle_out(self): print_progress(target, idx+1, len(config_reader.get_initial_targets())) self.__subset_direct(target, relationships) processed_tables.add(target['table']) - print_progress_complete(len(config_reader.get_initial_targets())) print('Direct target tables completed in {}s'.format(time.time()-start_time)) # greedily grab rows with foreign keys to rows in the target strata @@ -65,7 +64,6 @@ def run_middle_out(self): data_added = self.__subset_upstream(t, processed_tables, relationships) if data_added: processed_tables.add(t) - print_progress_complete(len(upstream_tables)) print('Greedy subsettings completed in {}s'.format(time.time()-start_time)) # process pass-through tables, you need this before subset_downstream, so you can get all required downstream rows @@ -75,7 +73,6 @@ def run_middle_out(self): print_progress(t, idx+1, len(passthrough_tables)) q = 'SELECT * FROM {}'.format(fully_qualified_table(t)) self.__db_helper.copy_rows(self.__source_conn, self.__destination_conn, q, mysql_db_name_hack(t, self.__destination_conn)) - print_progress_complete(len(passthrough_tables)) print('Pass-through completed in {}s'.format(time.time()-start_time)) # use subset_downstream to get all supporting rows according to existing needs @@ -85,7 +82,6 @@ def run_middle_out(self): for idx, t in enumerate(downstream_tables): print_progress(t, idx+1, len(downstream_tables)) self.subset_downstream(t, relationships) - print_progress_complete(len(downstream_tables)) print('Downstream subsetting completed in {}s'.format(time.time()-start_time)) if config_reader.keep_disconnected_tables(): @@ -96,7 +92,6 @@ def run_middle_out(self): print_progress(t, idx+1, len(disconnected_tables)) q = 'SELECT * FROM {}'.format(fully_qualified_table(t)) self.__db_helper.copy_rows(self.__source_conn, self.__destination_conn, q, mysql_db_name_hack(t, self.__destination_conn)) - print_progress_complete(len(disconnected_tables)) print('Disconnected tables completed in {}s'.format(time.time()-start_time)) def prep_temp_dbs(self): diff --git a/subset_utils.py b/subset_utils.py index 087fba3..abdabdf 100644 --- a/subset_utils.py +++ b/subset_utils.py @@ -93,12 +93,7 @@ def quoter(id): return q + id + q def print_progress(target, idx, count): - end = '\n' if config_reader.verbose_logging() else '' - print('\x1b[2K\rProcessing {} of {}: {}'.format(idx, count, target), end=end) - -def print_progress_complete(count): - if count > 0: - print('') + print('Processing {} of {}: {}'.format(idx, count, target)) class UnionFind: