Skip to content

Commit

Permalink
bug fixes and improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
acolombi committed Jul 24, 2019
1 parent 2877f40 commit b6e646f
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 28 deletions.
7 changes: 5 additions & 2 deletions config_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ def initialize(file_like = None):
if "desired_result" in _config:
raise ValueError("desired_result is a key in the old config spec. Check the README.md and example-config.json for the latest configuration parameters.")

DependencyBreak = collections.namedtuple('DependencyBreak', ['fk_table', 'target_table'])
def get_dependency_breaks():
DependencyBreak = collections.namedtuple('DependencyBreak', ['fk_table', 'target_table'])
return set([DependencyBreak(b['fk_table'], b['target_table']) for b in _config['dependency_breaks']])

def get_preserve_fk_opportunistically():
return set([DependencyBreak(b['fk_table'], b['target_table']) for b in _config['dependency_breaks'] if 'perserve_fk_opportunistically' in b and b['perserve_fk_opportunistically']])

def get_initial_targets():
return _config['initial_targets']

Expand Down Expand Up @@ -48,7 +51,7 @@ def get_fk_augmentation():
return list(map(__convert_tonic_format, _config['fk_augmentation']))

def get_upstream_filters():
return {f["table"] : f["condition"] for f in _config["upstream_filters"]}
return _config["upstream_filters"]

def __convert_tonic_format(obj):
if "fk_schema" in obj:
Expand Down
4 changes: 2 additions & 2 deletions db_connect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import config_reader
import psycopg2, mysql.connector
import os, pathlib, re, urllib, subprocess, os.path, json, getpass, time, sys
import os, pathlib, re, urllib, subprocess, os.path, json, getpass, time, sys, datetime

class DbConnect:

Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, cursor):
def execute(self, query):
start_time = time.time()
if config_reader.verbose_logging():
print('Beginning query:\n\t{}'.format(query))
print('Beginning query @ {}:\n\t{}'.format(str(datetime.datetime.now()), query))
sys.stdout.flush()
retval = self.inner_cursor.execute(query)
if config_reader.verbose_logging():
Expand Down
4 changes: 4 additions & 0 deletions example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
{
"table": "public.an_upstream_table",
"condition": "timestamp > '01-01-2001'"
},
{
"column": "condition_applied_to_any_table_with_this_column",
"condition": "condition_applied_to_any_table_with_this_column > 42"
}
],
"excluded_tables": [
Expand Down
12 changes: 4 additions & 8 deletions mysql_database_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os, uuid, csv
import config_reader
from pathlib import Path
from subset_utils import columns_joined, columns_tupled, quoter, schema_name, table_name, fully_qualified_table
from subset_utils import columns_joined, columns_tupled, quoter, schema_name, table_name, fully_qualified_table, redact_relationships

system_schemas_str = ','.join(['\'' + schema + '\'' for schema in ['information_schema', 'performance_schema', 'sys', 'mysql', 'innodb','tmp']])
temp_db = 'tonic_subset_temp_db_398dhjr23'
Expand Down Expand Up @@ -77,14 +77,10 @@ def copy_to_temp_table(conn, query, target_table, pk_columns = None):
def source_db_temp_table(target_table):
return temp_db + '.' + schema_name(target_table) + '_' + table_name(target_table)

def get_referencing_tables(table_name, tables, conn):
return [r for r in __get_redacted_fk_relationships(tables, conn) if r['target_table']==table_name]

def __get_redacted_fk_relationships(tables, conn):
def get_redacted_table_references(table_name, tables, conn):
relationships = get_unredacted_fk_relationships(tables, conn)
breaks = config_reader.get_dependency_breaks()
relationships = [r for r in relationships if (r['fk_table'], r['target_table']) not in breaks]
return relationships
redacted = redact_relationships(relationships)
return [r for r in redacted if r['target_table']==table_name]

def get_unredacted_fk_relationships(tables, conn):
cur = conn.cursor()
Expand Down
12 changes: 4 additions & 8 deletions psql_database_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import config_reader
from pathlib import Path
from psycopg2.extras import execute_values, register_default_json, register_default_jsonb
from subset_utils import columns_joined, columns_tupled, schema_name, table_name, fully_qualified_table
from subset_utils import columns_joined, columns_tupled, schema_name, table_name, fully_qualified_table, redact_relationships

register_default_json(loads=lambda x: str(x))
register_default_jsonb(loads=lambda x: str(x))
Expand Down Expand Up @@ -75,14 +75,10 @@ def copy_to_temp_table(conn, query, target_table, pk_columns = None):
cur.execute('INSERT INTO ' + temp_table + ' ' + query)
conn.commit()

def get_referencing_tables(table_name, tables, conn):
return [r for r in __get_redacted_fk_relationships(tables, conn) if r['target_table']==table_name]

def __get_redacted_fk_relationships(tables, conn):
def get_redacted_table_references(table_name, tables, conn):
relationships = get_unredacted_fk_relationships(tables, conn)
breaks = config_reader.get_dependency_breaks()
relationships = [r for r in relationships if (r['fk_table'], r['target_table']) not in breaks]
return relationships
redacted = redact_relationships(relationships)
return [r for r in redacted if r['target_table']==table_name]

def get_unredacted_fk_relationships(tables, conn):
cur = conn.cursor()
Expand Down
15 changes: 8 additions & 7 deletions subset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from topo_orderer import get_topological_order_by_tables
from subset_utils import UnionFind, table_name, find, compute_disconnected_tables, compute_downstream_tables, compute_upstream_tables, columns_joined, columns_tupled, columns_to_copy, quoter, fully_qualified_table, print_progress, print_progress_complete, mysql_db_name_hack
from subset_utils import UnionFind, schema_name, table_name, find, compute_disconnected_tables, compute_downstream_tables, compute_upstream_tables, columns_joined, columns_tupled, columns_to_copy, quoter, fully_qualified_table, print_progress, print_progress_complete, mysql_db_name_hack, upstream_filter_match, redact_relationships
import database_helper
import config_reader
import shutil, os, uuid, time, itertools
Expand Down Expand Up @@ -122,7 +122,8 @@ def __subset_direct(self, target, relationships):

def __subset_upstream(self, target, processed_tables, relationships):

relevant_key_constraints = list(filter(lambda r: r['target_table'] in processed_tables and r['fk_table'] == target, relationships))
redacted_relationships = redact_relationships(relationships)
relevant_key_constraints = list(filter(lambda r: r['target_table'] in processed_tables and r['fk_table'] == target, redacted_relationships))
# this table isn't referenced by anything we've already processed, so let's leave it empty
# OR
# table was already added, this only happens if the upstream table was also a direct target
Expand All @@ -139,10 +140,10 @@ def __subset_upstream(self, target, processed_tables, relationships):
self.__db_helper.copy_rows(self.__source_conn, self.__destination_conn, query, temp_target_name)

# filter it down in the target database
clauses = map(lambda kc: '{} IN (SELECT {} FROM {})'.format(columns_tupled(kc['fk_columns']), columns_joined(kc['target_columns']), fully_qualified_table(mysql_db_name_hack(kc['target_table'], self.__destination_conn))), relevant_key_constraints)
clauses = list(clauses)
if target in config_reader.get_upstream_filters():
clauses.append(config_reader.get_upstream_filters()[target])
table_columns = self.__db_helper.get_table_columns(table_name(target), schema_name(target), self.__source_conn)
clauses = ['{} IN (SELECT {} FROM {})'.format(columns_tupled(kc['fk_columns']), columns_joined(kc['target_columns']), fully_qualified_table(kc['target_table'])) for kc in relevant_key_constraints]
clauses.extend(upstream_filter_match(target, table_columns))

select_query = 'SELECT * FROM {} WHERE TRUE AND {}'.format(quoter(temp_target_name), ' AND '.join(clauses))
insert_query = 'INSERT INTO {} {}'.format(fully_qualified_table(mysql_db_name_hack(target, self.__destination_conn)), select_query)
self.__db_helper.run_query(insert_query, self.__destination_conn)
Expand All @@ -164,7 +165,7 @@ def __get_passthrough_tables(self):
# database. And we take those b_ids and run `select * from table b where id in (those list of ids)` then insert
# that result set into table b of the destination database
def subset_downstream(self, table, relationships):
referencing_tables = self.__db_helper.get_referencing_tables(table, self.__all_tables, self.__source_conn)
referencing_tables = self.__db_helper.get_redacted_table_references(table, self.__all_tables, self.__source_conn)

if len(referencing_tables) > 0:
pk_columns = referencing_tables[0]['target_columns']
Expand Down
17 changes: 16 additions & 1 deletion subset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
# to the downstream dependency that breaks the cycle
def columns_to_copy(table, relationships, conn):
target_breaks = set()
opportunists = config_reader.get_preserve_fk_opportunistically()
for dep_break in config_reader.get_dependency_breaks():
if dep_break.fk_table == table:
if dep_break.fk_table == table and dep_break not in opportunists:
target_breaks.add(dep_break.target_table)

columns_to_null = set()
Expand All @@ -19,6 +20,20 @@ def columns_to_copy(table, relationships, conn):
columns = database_helper.get_specific_helper().get_table_columns(table_name(table), schema_name(table), conn)
return ','.join(['{}.{}'.format(quoter(table_name(table)), quoter(c)) if c not in columns_to_null else 'NULL as {}'.format(quoter(c)) for c in columns])

def upstream_filter_match(target, table_columns):
retval = []
filters = config_reader.get_upstream_filters()
for filter in filters:
if "table" in filter and target == filter["table"]:
retval.append(filter["condition"])
if "column" in filter and filter["column"] in table_columns:
retval.append(filter["condition"])
return retval

def redact_relationships(relationships):
breaks = config_reader.get_dependency_breaks()
retval = [r for r in relationships if (r['fk_table'], r['target_table']) not in breaks]
return retval

def find(f, seq):
"""Return first item in sequence where f(item) == True."""
Expand Down

0 comments on commit b6e646f

Please sign in to comment.