Skip to content

Commit

Permalink
set batch checkpoint date to correct date
Browse files Browse the repository at this point in the history
The current code naively sets the checkpoint to the
datetime that the batch was downloaded as opposed to the
'since date' in the last item of the batch.

This means that any interruptions in the export
would not correctly restart from where they left off
but would instead only export data that was updated
since the last run.
  • Loading branch information
snopoke committed Dec 8, 2017
1 parent 5e77928 commit ae01d1d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
11 changes: 6 additions & 5 deletions commcare_export/commcare_hq_client.py
Expand Up @@ -108,7 +108,6 @@ def iterate_resource(resource=resource, params=params):
last_batch_ids = set()

while more_to_fetch:
fetch_start = datetime.utcnow()
batch = self.get(resource, params)
total_count = int(batch['meta']['total_count']) if batch['meta']['total_count'] else 'unknown'
logger.debug('Received %s-%s of %s',
Expand All @@ -131,19 +130,21 @@ def iterate_resource(resource=resource, params=params):
else:
more_to_fetch = False

self.checkpoint(fetch_start)
self.checkpoint(paginator, batch)

return RepeatableIterator(iterate_resource)

def set_checkpoint_manager(self, manager, **checkpoint_kwargs):
self._checkpoint_manager = manager
self._checkpoint_kwargs = checkpoint_kwargs

def checkpoint(self, checkpoint_time):
if self._checkpoint_manager:
def checkpoint(self, paginator, batch):
from commcare_export.commcare_minilinq import DatePaginator
if self._checkpoint_manager and isinstance(paginator, DatePaginator):
since_date = paginator.get_since_date(batch)
kwargs = deepcopy(self._checkpoint_kwargs)
kwargs.update({
'checkpoint_time': checkpoint_time
'checkpoint_time': since_date
})
with self._checkpoint_manager:
self._checkpoint_manager.set_checkpoint(**kwargs)
Expand Down
10 changes: 6 additions & 4 deletions commcare_export/commcare_minilinq.py
Expand Up @@ -176,18 +176,20 @@ def next_page_params_since(self, since=None):
return params

def next_page_params_from_batch(self, batch):
since_date = self.get_since_date(batch)
if since_date:
return self.next_page_params_since(since_date)

def get_since_date(self, batch):
try:
last_obj = batch['objects'][-1]
except IndexError:
return

since = last_obj and last_obj.get(self.since_field)
if since:
since_date = None
for fmt in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ'):
try:
since_date = datetime.strptime(since, fmt)
return datetime.strptime(since, fmt)
except ValueError:
pass
if since_date:
return self.next_page_params_since(since_date)

0 comments on commit ae01d1d

Please sign in to comment.