Skip to content

Commit

Permalink
Merge pull request #58 from dimagi/sk/checkpoint-table
Browse files Browse the repository at this point in the history
refactor checkpoint management
  • Loading branch information
millerdev committed Aug 14, 2017
2 parents 1d78cfd + a359e8f commit 9e43688
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 64 deletions.
85 changes: 85 additions & 0 deletions commcare_export/checkpoint.py
@@ -0,0 +1,85 @@
import datetime
import logging
import uuid

from commcare_export.writers import SqlMixin

logger = logging.getLogger(__name__)


class CheckpointManager(SqlMixin):
table_name = 'commcare_export_runs'

def set_checkpoint(self, query, query_md5, checkpoint_time=None, run_complete=False):
logger.info('Setting checkpoint')
checkpoint_time = checkpoint_time or datetime.datetime.utcnow()
self._insert_checkpoint(
id=uuid.uuid4().hex,
query_file_name=query,
query_file_md5=query_md5,
time_of_run=checkpoint_time.isoformat(),
final=run_complete
)
if run_complete:
self._cleanup(query_md5)

def create_checkpoint_table(self):
columns = self._get_checkpoint_table_columns()
self._migrate_checkpoint_table(columns)

def _insert_checkpoint(self, **row):
table = self.table(self.table_name)
insert = table.insert().values(**row)
self.connection.execute(insert)

def _cleanup(self, query_md5):
sql = """
DELETE FROM {}
WHERE final = :final
AND query_file_md5 = :md5
""".format(self.table_name)
self.connection.execute(
self.sqlalchemy.sql.text(sql),
final=False,
md5=query_md5,
)

def _migrate_checkpoint_table(self, columns):
ctx = self.alembic.migration.MigrationContext.configure(self.connection)
op = self.alembic.operations.Operations(ctx)
reflect = False
if not self.table_name in self.metadata.tables:
op.create_table(self.table_name, *columns)
reflect = True
else:
existing_columns = {c.name: c for c in self.table(self.table_name).columns}
for column in columns:
if column.name not in existing_columns:
op.add_column(self.table_name, column)
reflect = True
if reflect:
self.metadata.clear()
self.metadata.reflect()

def _get_checkpoint_table_columns(self):
return [
self.get_id_column(),
self.sqlalchemy.Column('query_file_name',
self.sqlalchemy.Unicode(self.MAX_VARCHAR_LEN, collation=self.collation)),
self.sqlalchemy.Column('query_file_md5', self.sqlalchemy.Unicode(32, collation=self.collation)),
self.sqlalchemy.Column('time_of_run', self.sqlalchemy.Unicode(32, collation=self.collation)),
self.sqlalchemy.Column('final', self.sqlalchemy.Boolean()),
]

def get_time_of_last_run(self, query_file_md5):
if 'commcare_export_runs' in self.metadata.tables:
sql = """
SELECT time_of_run FROM commcare_export_runs
WHERE query_file_md5 = :query_file_md5 ORDER BY time_of_run DESC
"""
cursor = self.connection.execute(
self.sqlalchemy.sql.text(sql),
query_file_md5=query_file_md5
)
for row in cursor:
return row[0]
33 changes: 15 additions & 18 deletions commcare_export/cli.py
Expand Up @@ -12,6 +12,8 @@
import sqlalchemy
import io
from datetime import datetime

from commcare_export.checkpoint import CheckpointManager
from six.moves import input

import dateutil.parser
Expand Down Expand Up @@ -132,7 +134,7 @@ def main_with_args(args):
version = args.api_version)

api_client = api_client.authenticated(username=args.username, password=args.password, mode=args.auth_mode)

checkpoint_manager = None
if args.output_format == 'xlsx':
writer = writers.Excel2007TableWriter(args.output)
elif args.output_format == 'xls':
Expand All @@ -155,22 +157,17 @@ def main_with_args(args):
is_mysql = 'mysql' in args.output
collation = 'utf8_bin' if is_mysql else None
writer = writers.SqlTableWriter(engine.connect(), args.strict_types, collation=collation)
checkpoint_manager = CheckpointManager(engine.connect(), collation=collation)
with checkpoint_manager:
checkpoint_manager.create_checkpoint_table()
api_client.set_checkpoint_manager(checkpoint_manager, query=args.query, query_md5=query_file_md5)

if not args.since and not args.start_over and os.path.exists(args.query):
connection = sqlalchemy.create_engine(args.output)

# Grab the current list of tables to see if we have already run & written to it
metadata = sqlalchemy.MetaData()
metadata.bind = connection
metadata.reflect()

if 'commcare_export_runs' in metadata.tables:
cursor = connection.execute(sqlalchemy.sql.text('SELECT time_of_run FROM commcare_export_runs WHERE query_file_md5 = :query_file_md5 ORDER BY time_of_run DESC'), query_file_md5=query_file_md5)
for row in cursor:
args.since = row[0]
logger.debug('Last successful run was %s', args.since)
break
cursor.close()
with checkpoint_manager:
args.since = checkpoint_manager.get_time_of_last_run(query_file_md5)

if args.since:
logger.debug('Last successful run was %s', args.since)
else:
logger.warn('No successful runs found, and --since not specified: will import ALL data')

Expand All @@ -184,7 +181,6 @@ def main_with_args(args):
# Assume that if any tables were emitted, that is the idea, otherwise print the output
if len(list(env.emitted_tables())) > 0:
with writer:
api_client.set_checkpointer(writer, query=args.query, query_md5=query_file_md5)
for table in env.emitted_tables():
logger.debug('Writing %s', table['name'])
if table['name'] != table['name'].lower():
Expand All @@ -194,8 +190,9 @@ def main_with_args(args):
)
writer.write_table(table)

if os.path.exists(args.query):
writer.set_checkpoint(args.query, query_file_md5, run_start, True)
if checkpoint_manager and os.path.exists(args.query):
with checkpoint_manager:
checkpoint_manager.set_checkpoint(args.query, query_file_md5, run_start, True)

if args.output_format == 'json':
print(json.dumps(writer.tables, indent=4, default=RepeatableIterator.to_jvalue))
Expand Down
11 changes: 6 additions & 5 deletions commcare_export/commcare_hq_client.py
Expand Up @@ -36,7 +36,7 @@ def __init__(self, url, project, version=LATEST_KNOWN_VERSION, session=None, aut
self.project = project
self.__session = session
self.__auth = auth
self._checkpointer = None
self._checkpoint_manager = None
self._checkpoint_kwargs = {}

@property
Expand Down Expand Up @@ -135,17 +135,18 @@ def iterate_resource(resource=resource, params=params):

return RepeatableIterator(iterate_resource)

def set_checkpointer(self, writer, **checkpoint_kwargs):
self._checkpointer = writer
def set_checkpoint_manager(self, manager, **checkpoint_kwargs):
self._checkpoint_manager = manager
self._checkpoint_kwargs = checkpoint_kwargs

def checkpoint(self, checkpoint_time):
if self._checkpointer:
if self._checkpoint_manager:
kwargs = deepcopy(self._checkpoint_kwargs)
kwargs.update({
'checkpoint_time': checkpoint_time
})
self._checkpointer.set_checkpoint(**kwargs)
with self._checkpoint_manager:
self._checkpoint_manager.set_checkpoint(**kwargs)

class MockCommCareHqClient(object):
"""
Expand Down
74 changes: 35 additions & 39 deletions commcare_export/writers.py
@@ -1,17 +1,13 @@
import re
import sys
import uuid
import zipfile
import csv
import json
import datetime
import logging
import sys
import zipfile
from itertools import chain

import six
from six import StringIO, u

from itertools import chain
import datetime

logger = logging.getLogger(__name__)

MAX_COLUMN_SIZE = 2000
Expand Down Expand Up @@ -54,9 +50,6 @@ def write_table(self, table):
"{'name': str, 'headings': [str], 'rows': [[str]]} -> ()"
raise NotImplementedError()

def set_checkpoint(self, query, query_md5, checkpoint_time=None, run_complete=False):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
pass

Expand Down Expand Up @@ -177,16 +170,17 @@ def write_table(self, table):
for row in table['rows']:
self.output_stream.write('|%s|\n' % '|'.join(ensure_text(val) for val in row))

class SqlTableWriter(TableWriter):

class SqlMixin(object):
"""
Write tables to a database specified by URL
(TODO) with "upsert" based on primary key.
"""

MIN_VARCHAR_LEN=32 # Since SQLite does not actually support ALTER COLUMN type, let's maximize the chance that we do not have to write workarounds by starting medium
MAX_VARCHAR_LEN=255 # Arbitrary point at which we switch to TEXT; for postgres VARCHAR == TEXT anyhow and for Sqlite it doesn't matter either
MIN_VARCHAR_LEN = 32 # Since SQLite does not actually support ALTER COLUMN type, let's maximize the chance that we do not have to write workarounds by starting medium
MAX_VARCHAR_LEN = 255 # Arbitrary point at which we switch to TEXT; for postgres VARCHAR == TEXT anyhow and for Sqlite it doesn't matter either

def __init__(self, connection, strict_types=False, collation=None):
def __init__(self, connection, collation=None):
try:
import sqlalchemy
import alembic
Expand All @@ -198,12 +192,11 @@ def __init__(self, connection, strict_types=False, collation=None):
"command: pip install sqlalchemy alembic")

self.base_connection = connection
self.strict_types = strict_types
self.collation = collation

def __enter__(self):
self.connection = self.base_connection.connect() # "forks" the SqlAlchemy connection
return self # TODO: fork the writer so this can be called many times
self.connection = self.base_connection.connect() # "forks" the SqlAlchemy connection
return self # TODO: fork the writer so this can be called many times

def __exit__(self, exc_type, exc_val, exc_tb):
self.connection.close()
Expand All @@ -220,13 +213,31 @@ def metadata(self):
self._metadata.reflect()
return self._metadata

def table(self, table_name):
return self.sqlalchemy.Table(table_name, self.metadata, autoload=True, autoload_with=self.connection)

def get_id_column(self):
return self.sqlalchemy.Column(
'id',
self.sqlalchemy.Unicode(self.MAX_VARCHAR_LEN),
primary_key=True
)


class SqlTableWriter(SqlMixin, TableWriter):
"""
Write tables to a database specified by URL
(TODO) with "upsert" based on primary key.
"""

def __init__(self, connection, strict_types=False, collation=None):
super(SqlTableWriter, self).__init__(connection, collation=collation)
self.strict_types = strict_types

@property
def is_sqllite(self):
return 'sqlite' in self.connection.engine.driver

def table(self, table_name):
return self.sqlalchemy.Table(table_name, self.metadata, autoload=True, autoload_with=self.connection)

def best_type_for(self, val):
if not self.is_sqllite:
if isinstance(val, bool):
Expand Down Expand Up @@ -292,13 +303,6 @@ def least_upper_bound(self, source_type, dest_type):
# FIXME: Don't be so silly
return self.sqlalchemy.UnicodeText(collation=self.collation)

def get_id_column(self):
return self.sqlalchemy.Column(
'id',
self.sqlalchemy.Unicode(self.MAX_VARCHAR_LEN),
primary_key=True
)

def make_table_compatible(self, table_name, row_dict):
ctx = self.alembic.migration.MigrationContext.configure(self.connection)
op = self.alembic.operations.Operations(ctx)
Expand Down Expand Up @@ -379,6 +383,9 @@ def upsert(self, table, row_dict):
self.connection.execute(update)

def write_table(self, table):
"""
:param table: dict of {'name': 'name', 'headings', [...], 'rows': [[...], [...]]
"""
table_name = table['name']
headings = table['headings']

Expand All @@ -391,14 +398,3 @@ def write_table(self, table):
self.upsert(self.table(table_name), row_dict)

if logger.getEffectiveLevel() == 'DEBUG': sys.stderr.write('\n')

def set_checkpoint(self, query, query_md5, checkpoint_time=None, run_complete=False):
logger.info('Setting checkpoint')
checkpoint_time = checkpoint_time or datetime.datetime.utcnow()
self.write_table({
'name': 'commcare_export_runs',
'headings': ['id', 'query_file_name', 'query_file_md5', 'time_of_run', 'final'],
'rows': [[uuid.uuid4().hex, query, query_md5, checkpoint_time.isoformat(), run_complete]]
})
if run_complete:
self.connection.execute(self.sqlalchemy.sql.text('DELETE FROM commcare_export_runs WHERE final = :final'), final=False)

0 comments on commit 9e43688

Please sign in to comment.