Skip to content

Commit

Permalink
Merge pull request #47 from romanchyla/checksum
Browse files Browse the repository at this point in the history
Checksum
  • Loading branch information
romanchyla committed Mar 14, 2018
2 parents 79820ca + 0382060 commit 08de5d1
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 68 deletions.
144 changes: 144 additions & 0 deletions adsmp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from adsputils import serializer
from sqlalchemy import exc
from multiprocessing.util import register_after_fork
import zlib
import requests
from copy import deepcopy

class ADSMasterPipelineCelery(ADSCelery):

Expand Down Expand Up @@ -483,3 +486,144 @@ def metrics_delete_by_bibcode(self, bibcode):
session.delete(r)
session.commit()
return True

def checksum(self, data, ignore_keys=('mtime', 'ctime', 'update_timestamp')):
"""
Compute checksum of the passed in data. Preferred situation is when you
give us a dictionary. We can clean it up, remove the 'ignore_keys' and
sort the keys. Then compute CRC on the string version. You can also pass
a string, in which case we simple return the checksum.
@param data: string or dict
@param ignore_keys: list of patterns, if they are found (anywhere) in
the key name, we'll ignore this key-value pair
@return: checksum
"""
assert isinstance(ignore_keys, tuple)

if isinstance(data, basestring):
return hex(zlib.crc32(unicode(data)) & 0xffffffff)
else:
data = deepcopy(data)
# remove all the modification timestamps
for k,v in data.items():
for x in ignore_keys:
if x in k:
del data[k]
break
return hex(zlib.crc32(json.dumps(data, sort_keys=True)) & 0xffffffff)


def update_remote_targets(self, solr=None, metrics=None, links=None,
commit_solr=False):
"""Updates remote databases/solr
@param batch: list solr documents (already formatted)
@param metrics: tuple with two lists, the first is a list
of metric dicts to be inserted; the second is a list
of metric dicts to be updated
@param links_data: list of dicts to send to the remote
resolver links service
@return:
(set of processed bibcodes, set of bibcodes that failed)
@warning: this call has a side-effect, we are emptying the passed
in lists of data (to free up memory)
"""


if metrics and not (isinstance(metrics, tuple) and len(metrics) == 2):
raise Exception('Wrong data type passed in for metrics')

batch = solr or []
batch_insert, batch_update = metrics[0], metrics[1]
links_data = links or []


links_url = self.conf.get('LINKS_RESOLVER_UPDATE_URL')
recs_to_process = set()
failed_bibcodes = []
crcs = {}

# take note of all the recs that should be updated
# NOTE: we are assuming that set(solr) == set(metrics)_== set(links)
for x in (batch, batch_insert, batch_update, links_data):
if x:
for y in x:
if 'bibcode' in y:
recs_to_process.add(y['bibcode'])
else:
raise Exception('Every record must contain bibcode! Offender: %s' % y)

def update_crc(type, data, faileds):
while len(data): # we are freeing memory as well
x = data.pop()
b = x['bibcode']
if b in faileds:
continue
crcs.setdefault(b, {})
crcs[b][type + '_checksum'] = self.checksum(x)

if len(batch):
failed_bibcodes = self.reindex(batch, self.conf.get('SOLR_URLS'), commit=commit_solr)
failed_bibcodes = set(failed_bibcodes)
update_crc('solr', batch, failed_bibcodes)

if failed_bibcodes and len(failed_bibcodes):
self.logger.warn('SOLR failed to update some bibcodes: %s', failed_bibcodes)

# when solr_urls > 1, some of the servers may have successfully indexed
# but here we are refusing to pass data to metrics db; this seems the
# right choice because there is only one metrics db (but if we had many,
# then we could differentiate)

batch_insert = filter(lambda x: x['bibcode'] not in failed_bibcodes, batch_insert)
batch_update = filter(lambda x: x['bibcode'] not in failed_bibcodes, batch_update)
links_data = filter(lambda x: x['bibcode'] not in failed_bibcodes, links_data)

recs_to_process = recs_to_process - failed_bibcodes
if len(failed_bibcodes):
self.mark_processed(failed_bibcodes, type=None, status='solr-failed')

metrics_exception = None
if len(batch_insert) or len(batch_update):
metrics_done, metrics_exception = self.update_metrics_db(batch_insert, batch_update)

metrics_failed = recs_to_process - set(metrics_done)
if len(metrics_failed):
self.mark_processed(metrics_failed, type=None, status='metrics-failed')
update_crc('metrics', batch_insert, metrics_failed)
update_crc('metrics', batch_update, metrics_failed)


if len(links_data):
r = requests.put(links_url, data = links_data)
if r.status_code == 200:
self.logger.info('sent %s datalinks to %s including %s', len(links_data), links_url, links_data[0])
update_crc('datalinks', links_data, set())
else:
self.logger.error('error sending links to %s, error = %s', links_url, r.text)
self.mark_processed([x['bibcode'] for x in links_data], type=None, status='links-failed')
recs_to_process = recs_to_process - set([x['bibcode'] for x in links_data])

self.mark_processed(recs_to_process, type=None, status='success')
if len(crcs):
self._update_checksums(crcs)

if metrics_exception:
raise metrics_exception # will trigger retry
if len(recs_to_process) == 0:
raise Exception("Miserable, me, complete failure")

def _update_checksums(self, crcs):
"""Somewhat brain-damaged way of updating checksums
one-by-one for each bibcode we have; but at least
it touches all checksums for a rec in one go.
"""
with self.session_scope() as session:
for bibcode, vals in crcs.items():
r = session.query(Records).filter_by(bibcode=bibcode).first()
if r is None:
raise Exception('whaay?! Cannot update crc, bibcode does not exist for: %s', bibcode)
for k,crc in vals.items():
setattr(r, k, crc)
session.commit()
7 changes: 6 additions & 1 deletion adsmp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ class Records(Base):
solr_processed = Column(UTCDateTime, default=None)
metrics_processed = Column(UTCDateTime, default=None)
datalinks_processed = Column(UTCDateTime, default=None)

solr_checksum = Column(String(10), default=None)
metrics_checksum = Column(String(10), default=None)
datalinks_checksum = Column(String(10), default=None)

status = Column(Enum('solr-failed', 'metrics-failed', 'links-failed', 'success', name='status'))

_date_fields = ['created', 'updated', 'processed', # dates
'bib_data_updated', 'orcid_claims_updated', 'nonbib_data_updated',
'fulltext_updated', 'metrics_updated', 'datalinks_processed',
'solr_processed', 'metrics_processed']
_text_fields = ['id', 'bibcode', 'status']
_text_fields = ['id', 'bibcode', 'status', 'solr_checksum', 'metrics_checksum', 'datalinks_checksum']
_json_fields = ['bib_data', 'orcid_claims', 'nonbib_data', 'metrics', 'fulltext']

def toJSON(self, for_solr=False, load_only=None):
Expand Down
4 changes: 2 additions & 2 deletions adsmp/solr_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ def update_solr(json_records, solr_urls, ignore_errors=False, commit=False):
else:
raise Exception('Error posting data to SOLR: %s (err code: %s, err message:)' % (url, r.status_code, r.text))
return out



def transform_json_record(db_record):
out = {'bibcode': db_record['bibcode']}
Expand Down
74 changes: 17 additions & 57 deletions adsmp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,18 @@ def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=T
"""

if isinstance(bibcodes, basestring):
bibcodes = [bibcodes]

if not (update_solr or update_metrics or update_links):
raise Exception('Hmmm, I dont think I let you do NOTHING, sorry!')

logger.debug('Running index-records for: %s', bibcodes)
batch = []
batch_insert = []
batch_update = []
recs_to_process = set()
links_data = []
links_bibcodes = []
links_url = app.conf.get('LINKS_RESOLVER_UPDATE_URL')


#check if we have complete record
for bibcode in bibcodes:
Expand Down Expand Up @@ -161,29 +162,31 @@ def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=T
if update_solr:
d = solr_updater.transform_json_record(r)
logger.debug('Built SOLR: %s', d)
batch.append(d)
recs_to_process.add(bibcode)

if r.get('solr_checksum', None) != app.checksum(d):
batch.append(d)
else:
logger.debug('Checksum identical, skipping solr update for: %s', bibcode)

# get data for metrics
if update_metrics:
m = r.get('metrics', None)
if m:
if m and r.get('metrics_checksum', None) != app.checksum(m):
m['bibcode'] = bibcode
logger.debug('Got metrics: %s', m)
recs_to_process.add(bibcode)
if r.get('processed'):
batch_update.append(m)
else:
batch_insert.append(m)
else:
logger.debug('Checksum identical, skipping metrics update for: %s', bibcode)

if update_links and 'nonbib' in r and links_url:
if update_links and 'nonbib' in r:
nb = json.loads(r['nonbib'])
if 'data_links_rows' in nb:
if 'data_links_rows' in nb and r.get('links_checksum', None) != app.checksum(nb['data_links_rows']):
# send json version of DataLinksRow to update endpoint on links resolver
# need to optimize and not send one record at a time
tmp = {'bibcode': bibcode, 'data_links_rows': nb['data_links_rows']}
links_data.append(tmp)
links_bibcodes.append(bibcode)
else:
# if forced and we have at least the bib data, index it
if force is True:
Expand All @@ -192,53 +195,10 @@ def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=T
logger.debug('%s not ready for indexing yet (metadata=%s, orcid=%s, nonbib=%s, fulltext=%s, metrics=%s)' % \
(bibcode, bib_data_updated, orcid_claims_updated, nonbib_data_updated, fulltext_updated, \
metrics_updated))


failed_bibcodes = []
if len(batch):
failed_bibcodes = app.reindex(batch, app.conf.get('SOLR_URLS'), commit=commit)

if failed_bibcodes and len(failed_bibcodes):
logger.warn('Some bibcodes failed: %s', failed_bibcodes)
failed_bibcodes = set(failed_bibcodes)

# when solr_urls > 1, some of the servers may have successfully indexed
# but here we are refusing to pass data to metrics db; this seems the
# right choice because there is only one metrics db (but if we had many,
# then we could differentiate)

batch_insert = filter(lambda x: x['bibcode'] not in failed_bibcodes, batch_insert)
batch_update = filter(lambda x: x['bibcode'] not in failed_bibcodes, batch_update)

recs_to_process = recs_to_process - failed_bibcodes
if len(failed_bibcodes):
app.mark_processed(failed_bibcodes, type=None, status='solr-failed')

if batch or batch_insert or batch_update or links_data:
app.update_remote_targets(solr=batch, metrics=(batch_insert, batch_update), links=links_data, commit_solr=commit)


if len(batch_insert) or len(batch_update):
metrics_done, exception = app.update_metrics_db(batch_insert, batch_update)

metrics_failed = recs_to_process - set(metrics_done)
if len(metrics_failed):
app.mark_processed(metrics_failed, type=None, status='metrics-failed')

# mark all successful documents as done
app.mark_processed(metrics_done, type=None, status='success')

if exception:
raise exception # will trigger retry
else:
app.mark_processed(recs_to_process, type=None, status='success')

if len(links_data):
r = requests.put(links_url, data = links_data)
if r.status_code == 200:
logger.info('send %s datalinks to %s including %s', len(links_data), links_url, links_data[0])
app.mark_processed(links_bibcodes, type='links', status='success')
else:
logger.error('error sending links to %s, error = %s, sent data = %s ', links_url, r.text, tmp)
app.mark_processed(links_bibcodes, type=None, status='links-failed')


@app.task(queue='delete-records')
def task_delete_documents(bibcode):
Expand Down
Loading

0 comments on commit 08de5d1

Please sign in to comment.