Skip to content

Commit

Permalink
Added a flag to deactivate checking of the updated<processed timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
romanchyla committed Dec 6, 2017
1 parent 1eaef70 commit d884cf6
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def print_kvs():
print kv.key, kv.value


def reindex(since=None, batch_size=None, force=False, update_solr=True, update_metrics=True):
def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True, update_metrics=True,
force_processing=False):
"""
Initiates routing of the records (everything that was updated)
since point in time T.
"""
if force:
if force_indexing:
key = 'last.reindex.forced'
else:
key = 'last.reindex.normal'
Expand Down Expand Up @@ -123,7 +124,7 @@ def reindex(since=None, batch_size=None, force=False, update_solr=True, update_m
processed = get_date(rec.processed)
updated = get_date(rec.updated)

if processed > updated:
if not force_processing and processed > updated:
continue # skip records that were already processed

sent += 1
Expand All @@ -136,17 +137,17 @@ def reindex(since=None, batch_size=None, force=False, update_solr=True, update_m
batch.append(rec.bibcode)
else:
batch.append(rec.bibcode)
tasks.task_index_records.delay(batch, force=force, update_solr=update_solr, update_metrics=update_metrics)
tasks.task_index_records.delay(batch, force=force_indexing, update_solr=update_solr, update_metrics=update_metrics)
batch = []
last_bibcode = rec.bibcode

if len(batch) > 0:
tasks.task_index_records.delay(batch, force=force, update_solr=update_solr, update_metrics=update_metrics,
commit=force)
elif force and last_bibcode:
tasks.task_index_records.delay(batch, force=force_indexing, update_solr=update_solr, update_metrics=update_metrics,
commit=force_indexing)
elif force_indexing and last_bibcode:
# issue one extra call with the commit
tasks.task_index_records.delay([last_bibcode], force=force, update_solr=update_solr, update_metrics=update_metrics,
commit=force)
tasks.task_index_records.delay([last_bibcode], force=force_indexing, update_solr=update_solr, update_metrics=update_metrics,
commit=force_indexing)

logger.info('Done processing %s records', sent)
except Exception, e:
Expand Down Expand Up @@ -178,12 +179,19 @@ def reindex(since=None, batch_size=None, force=False, update_solr=True, update_m
help='List of bibcodes separated by spaces')

parser.add_argument('-f',
'--force',
dest='force',
'--force_indexing',
dest='force_indexing',
action='store_true',
default=False,
help='Forces indexing of documents as soon as we receive them.')

parser.add_argument('-o',
'--force_processing',
dest='force_processing',
action='store_true',
default=False,
help='Submits records for processing even if they dont have any new updates (use this to rebuild index).')

parser.add_argument('-s',
'--since',
dest='since',
Expand Down Expand Up @@ -271,5 +279,5 @@ def reindex(since=None, batch_size=None, force=False, update_solr=True, update_m
elif args.reindex:
update_solr = 's' in args.reindex.lower()
update_metrics = 'm' in args.reindex.lower()
reindex(since=args.since, batch_size=args.batch_size, force=args.force,
reindex(since=args.since, batch_size=args.batch_size, force=args.force_indexing,
update_solr=update_solr, update_metrics=update_metrics)

0 comments on commit d884cf6

Please sign in to comment.