This repository has been archived by the owner on Dec 15, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add script for exporting parts of the changes db
Summary: Exports rows from a table together with all related ancestor and child rows. You can import the exported data using `psql changes -f <export_file>`. The following (incomplete) tree illustrates what gets exported when you export rows from the build table: repository rows | project rows | author | \ | change rows \ | \ <build rows> \ / \ / job rows / \ log source rows job phase rows | | log chunk rows job step rows | | ... ... Test Plan: Tested locally. Reviewers: kylec Reviewed By: kylec Subscribers: vishal, reviews-dfbgrowth, christoffer, ar, cf Projects: #changes Differential Revision: https://tails.corp.dropbox.com/D81421
- Loading branch information
Showing
1 changed file
with
354 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,354 @@ | ||
#!/usr/bin/env python | ||
from __future__ import absolute_import | ||
|
||
import argparse | ||
from collections import defaultdict | ||
from itertools import chain | ||
import warnings | ||
|
||
from psycopg2.extensions import adapt as sqlescape | ||
|
||
from sqlalchemy import exc as sa_exc, tuple_ | ||
from sqlalchemy import MetaData | ||
from sqlalchemy.engine import reflection | ||
from sqlalchemy.sql.compiler import SQLCompiler | ||
|
||
from changes.config import create_app, db | ||
|
||
# In cases where a table has an implicit foreign key constraint, | ||
# i.e. defined in its model using sqlalchemy.orm.relationship, but not | ||
# explicitly as a foreign key constraint in the database, we'll add that | ||
# constraint to FOREIGN_KEY_HINTS to make the exporter traverse the | ||
# relationship. | ||
# Ideally, we would add the foreign key constraint to the database, and | ||
# remove the corresponding hint. | ||
FOREIGN_KEY_HINTS = { | ||
# This hints the exporter about an implicit foreign key constraint from | ||
# (source.repository_id, source.revision_sha) to | ||
# (revision.repository_id, sha). | ||
'source': [ | ||
{ | ||
'referred_table': 'revision', | ||
'referred_columns': ['repository_id', 'sha'], | ||
'constrained_columns': ['repository_id', 'revision_sha'], | ||
} | ||
] | ||
} | ||
|
||
class SqlExporter(object): | ||
""" | ||
SqlExporter creates sql insert statements for a set of rows from some | ||
table together with all related rows. | ||
Usage: | ||
exporter = SqlExporter(db.engine) | ||
builds_table = exporter.get_table('build') | ||
builds_pks = [ | ||
('402438bd-03d8-4a09-a494-1a2ba7d299b7', ), | ||
('f710d41d-d62a-44ac-b3c0-8b87423233ca', ), | ||
] | ||
statements = exporter.related_rows(build_table, build_pks) | ||
""" | ||
|
||
def __init__(self, engine): | ||
self.engine = engine | ||
self.inspector = reflection.Inspector.from_engine(engine) | ||
self.meta = MetaData() | ||
with warnings.catch_warnings(): | ||
warnings.simplefilter("ignore", category=sa_exc.SAWarning) | ||
self.meta.reflect(bind=engine) | ||
|
||
self.index = self._build_relationship_index() | ||
self.already_exported = defaultdict(set) | ||
|
||
def _build_relationship_index(self): | ||
""" | ||
Returns a dictionary of tables, with a list of foreign keys, dependent | ||
keys, and the primary key for each one. I.e.: | ||
{ | ||
<build table>: { | ||
"primary_key": [list of primary key columns], | ||
"foreign_keys": [list of foreign keys], | ||
"dependent_keys": [list of dependent keys], | ||
}, | ||
<jobs table>: {...}, | ||
... | ||
} | ||
""" | ||
def resolve_columns(table, columns): | ||
return [getattr(table.c, column) for column in columns] | ||
|
||
index = defaultdict(lambda: defaultdict(list)) | ||
for table in self.meta.sorted_tables: | ||
# Save the primary key, | ||
index[table]['primary_key'] = resolve_columns( | ||
table, self.inspector.get_pk_constraint( | ||
table.name)['constrained_columns']) | ||
|
||
# and the foreign keys for this table, | ||
foreign_keys = [] | ||
for fk in chain( | ||
self.inspector.get_foreign_keys(table.name), | ||
FOREIGN_KEY_HINTS.get(table.name, []), | ||
): | ||
foreign_table = self.meta.tables[fk['referred_table']] | ||
foreign_columns = resolve_columns( | ||
foreign_table, fk['referred_columns']) | ||
dependent_columns = resolve_columns( | ||
table, fk['constrained_columns']) | ||
foreign_keys.append({ | ||
'foreign_table': foreign_table, | ||
'foreign_columns': foreign_columns, | ||
'dependent_columns': dependent_columns, | ||
}) | ||
index[table]['foreign_keys'] = foreign_keys | ||
|
||
# and add this table as a dependent to each table on which it | ||
# depends. | ||
for fk in foreign_keys: | ||
foreign_table = fk['foreign_table'] | ||
index[foreign_table]['dependent_keys'].append({ | ||
'dependent_table': table, | ||
'dependent_columns': fk['dependent_columns'], | ||
'foreign_columns': fk['foreign_columns'], | ||
}) | ||
return index | ||
|
||
def _compile_statement(self, statement): | ||
""" | ||
Returns raw sql for the given sqlalchemy statement. | ||
""" | ||
dialect = self.engine.dialect | ||
compiler = SQLCompiler(dialect, statement) | ||
compiler.compile() | ||
encoding = dialect.encoding | ||
params = {} | ||
for key, value in compiler.params.iteritems(): | ||
if value is None: | ||
params[key] = 'null' | ||
else: | ||
if isinstance(value, unicode): | ||
value = value.encode(encoding) | ||
params[key] = sqlescape(value) | ||
sql = compiler.string.encode(encoding) | ||
interpolated_sql = sql % params | ||
return interpolated_sql.decode(encoding) | ||
|
||
def _record_pks(self, table, pks): | ||
""" | ||
Given a table and pks, returns those pks that still haven't been | ||
exported from that table, and records them as exported. In other words, | ||
calling _record_pks a second time with the same arguments returns an | ||
empty set. | ||
""" | ||
new_pks = set(pks) - self.already_exported[table] | ||
self.already_exported[table] |= new_pks | ||
return new_pks | ||
|
||
def dependent_keys(self, table): | ||
""" | ||
Returns a list of (dependent_table, dependent_columns, | ||
foreign_columns) tuples, where each tuple represents a foreign key | ||
from dependent_table to table. | ||
dependent_table: The table that defines the foreign key. | ||
dependent_columns: The columns constrained by the foreign key. | ||
foreign_columns: The columns referenced by the foreign key. | ||
""" | ||
return [ | ||
( | ||
c['dependent_table'], | ||
c['dependent_columns'], | ||
c['foreign_columns'] | ||
) for c in self.index[table]['dependent_keys'] | ||
] | ||
|
||
def foreign_keys(self, table): | ||
""" | ||
Returns a list of (foreign_table, foreign_columns, dependent_columns) | ||
tuples, where each tuple is a foreign key from table to foreign_table. | ||
foreign_table: The table referenced by the foreign key. | ||
foreign_columns: The columns referenced by the foreign key. | ||
dependent_columns: The columns constrained by the foreign key. | ||
""" | ||
return [ | ||
( | ||
c['foreign_table'], | ||
c['foreign_columns'], | ||
c['dependent_columns'] | ||
) for c in self.index[table]['foreign_keys'] | ||
] | ||
|
||
def primary_key(self, table): | ||
""" | ||
Returns the primary key for the given table. | ||
""" | ||
return self.index[table]['primary_key'] | ||
|
||
def get_table(self, table_name): | ||
""" | ||
Returns a sqlalchemy table object for the given table name. | ||
""" | ||
return self.meta.tables.get(table_name) | ||
|
||
def rows(self, table, pks): | ||
""" | ||
Returns sql insert statements for the rows in table identified by pks. | ||
""" | ||
rows = db.session.query(table).filter( | ||
tuple_(*self.primary_key(table)).in_(pks), | ||
).all() | ||
|
||
statements = [] | ||
for row in rows: | ||
statement = table.insert(row) | ||
insert_sql = self._compile_statement(statement) | ||
statements.append(u'{};'.format(insert_sql)) | ||
|
||
return statements | ||
|
||
def foreign_rows(self, table, pks): | ||
""" | ||
Recursively exports all foreign rows for the rows identified by table | ||
and pks. Foreign rows are rows referenced by the table's foreign keys. | ||
Returns a list of all exported rows ordered by their foreign | ||
key dependency so that each row is preceded by all rows on which it | ||
depends. | ||
""" | ||
statements = [] | ||
for foreign_table, _, dependent_columns in self.foreign_keys(table): | ||
# Get pks for all rows in the foreign table that are referenced | ||
# by the rows identified by table and pks. | ||
foreign_pks = db.session.query( | ||
*dependent_columns | ||
).filter( | ||
tuple_(*self.primary_key(table)).in_(pks) | ||
).all() | ||
|
||
# Mark all new foreign rows as exported. | ||
new_pks = self._record_pks(foreign_table, foreign_pks) | ||
|
||
if new_pks: | ||
# Export all new foreign rows, | ||
statements[:0] = self.rows(foreign_table, new_pks) | ||
|
||
# and their foreign rows recursively. | ||
statements[:0] = self.foreign_rows(foreign_table, new_pks) | ||
|
||
return statements | ||
|
||
def dependent_rows(self, table, pks): | ||
""" | ||
Recursively exports all dependent rows for the rows identified by | ||
table and pks. Dependent rows refer to table and pks with foreign keys. | ||
Returns a list of all exported rows ordered by their foreign key | ||
dependency so that each row is preceded by all rows on which it | ||
depends. | ||
""" | ||
statements = [] | ||
for dependent_table, dependent_columns, _ in self.dependent_keys(table): | ||
# Get pks for all rows in dependent_table that reference the rows | ||
# identified by table and pks. | ||
dependent_pks = db.session.query( | ||
*self.primary_key(dependent_table) | ||
).filter( | ||
tuple_(*dependent_columns).in_(pks) | ||
).all() | ||
|
||
# Export those rows, and all their related rows. | ||
statements.extend(self.related_rows(dependent_table, dependent_pks)) | ||
|
||
return statements | ||
|
||
def related_rows(self, table, pks): | ||
""" | ||
Exports all rows identified by table and pks, and recursively their | ||
foreign and dependent rows. Foreign rows are rows referenced by the | ||
table's foreign keys. Dependent rows refer to table and pks with | ||
foreign keys. | ||
Returns a list of all exported rows ordered by their foreign key | ||
dependency so that each row is preceded by all rows on which it | ||
depends. | ||
""" | ||
new_pks = self._record_pks(table, pks) | ||
statements = [] | ||
|
||
if new_pks: | ||
statements.extend(self.foreign_rows(table, new_pks)) | ||
statements.extend(self.rows(table, new_pks)) | ||
statements.extend(self.dependent_rows(table, new_pks)) | ||
|
||
return statements | ||
|
||
|
||
def validate_table(table_name): | ||
global exporter | ||
|
||
table = exporter.get_table(table_name) | ||
if table is None: | ||
raise argparse.ArgumentTypeError("The table '{}' does not exist.".format(table_name)) | ||
return table | ||
|
||
parser = argparse.ArgumentParser( | ||
formatter_class=argparse.RawTextHelpFormatter, description=""" | ||
Export rows from the given table together with all related rows. | ||
You can import the exported data using `psql changes -f <export_file>`. | ||
The following (incomplete) tree illustrates what gets exported when you export | ||
rows from the build table: | ||
repository rows | ||
| | ||
project rows | ||
| | ||
author | | ||
\ | | ||
change rows \ | | ||
\ <build rows> | ||
\ / | ||
\ / | ||
job rows | ||
/ \\ | ||
log source rows job phase rows | ||
| | | ||
log chunk rows job step rows | ||
| | | ||
... ... | ||
Example: | ||
# Export two builds and dump the result to export.sql: | ||
bin/export -t build -pks 402438bd-03d8-4a09-a494-1a2ba7d299b7 f710d41d-d62a-44ac-b3c0-8b87423233ca > export.sql | ||
# Import exported file: | ||
psql changes -f export.sql | ||
""") | ||
|
||
parser.add_argument( | ||
'-t', '--table', | ||
dest='table', | ||
metavar='TABLE', | ||
help='Table name.', | ||
required=True, | ||
type=validate_table, | ||
) | ||
parser.add_argument( | ||
'-pks', '--pks', | ||
dest='pks', | ||
metavar='ID', | ||
help='List of pks in TABLE.', | ||
nargs='+', | ||
required=True | ||
) | ||
|
||
app = create_app() | ||
app_context = app.app_context() | ||
app_context.push() | ||
|
||
exporter = SqlExporter(db.engine) | ||
args = parser.parse_args() | ||
pks = [(pk, ) for pk in args.pks] | ||
statements = exporter.related_rows(args.table, pks) | ||
for statement in statements: | ||
print statement.encode('utf8') |