Skip to content

Commit

Permalink
Merge pull request #42 from spacemansteve/master
Browse files Browse the repository at this point in the history
#39 data links support
  • Loading branch information
spacemansteve committed Feb 22, 2018
2 parents 90ccf20 + b0b232f commit 7ef45c4
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 22 deletions.
4 changes: 3 additions & 1 deletion adsmp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, app_name, *args, **kwargs):
MetricsBase.metadata.bind = self._metrics_engine
self._metrics_table = Table('metrics', MetricsBase.metadata)
register_after_fork(self._metrics_engine, self._metrics_engine.dispose)

self._metrics_table_insert = self._metrics_table.insert() \
.values({
'an_refereed_citations': bindparam('an_refereed_citations', required=False),
Expand Down Expand Up @@ -307,6 +307,8 @@ def mark_processed(self, bibcodes, type=None, status = None):
column = 'solr_processed'
elif type == 'metrics':
column = 'metrics_processed'
elif type == 'links':
column = 'datalinks_processed'
else:
column = 'processed'

Expand Down
7 changes: 4 additions & 3 deletions adsmp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ class Records(Base):

solr_processed = Column(UTCDateTime, default=None)
metrics_processed = Column(UTCDateTime, default=None)
status = Column(Enum('solr-failed', 'metrics-failed', 'success', name='status'))
datalinks_processed = Column(UTCDateTime, default=None)
status = Column(Enum('solr-failed', 'metrics-failed', 'links-failed', 'success', name='status'))

_date_fields = ['created', 'updated', 'processed', # dates
'bib_data_updated', 'orcid_claims_updated', 'nonbib_data_updated',
'fulltext_updated', 'metrics_updated',
'fulltext_updated', 'metrics_updated', 'datalinks_processed',
'solr_processed', 'metrics_processed']
_text_fields = ['id', 'bibcode', 'status']
_json_fields = ['bib_data', 'orcid_claims', 'nonbib_data', 'metrics', 'fulltext']
Expand Down Expand Up @@ -172,4 +173,4 @@ def toJSON(self):
reference_num = self.reference_num,
rn_citations = self.rn_citations,
rn_citation_data = self.rn_citation_data,
modtime = self.modtime and get_date(self.modtime).isoformat() or None)
modtime = self.modtime and get_date(self.modtime).isoformat() or None)
29 changes: 25 additions & 4 deletions adsmp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from adsmp import solr_updater
from kombu import Queue
import math
import requests
import json
from adsmsg import MetricsRecord, NonBibRecord
from adsmsg.msg import Msg

Expand Down Expand Up @@ -78,7 +80,7 @@ def task_update_record(msg):


@app.task(queue='index-records')
def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=True, commit=False):
def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=True, update_links=True, commit=False):
"""
This task is (normally) called by the cronjob task
(that one, quite obviously, is in turn started by cron)
Expand Down Expand Up @@ -107,14 +109,17 @@ def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=T
"""

if not (update_solr or update_metrics):
if not (update_solr or update_metrics or update_links):
raise Exception('Hmmm, I dont think I let you do NOTHING, sorry!')

logger.debug('Running index-records for: %s', bibcodes)
batch = []
batch_insert = []
batch_update = []
recs_to_process = set()
links_data = []
links_bibcodes = []
links_url = app.conf.get('LINKS_RESOLVER_UPDATE_URL')

#check if we have complete record
for bibcode in bibcodes:
Expand Down Expand Up @@ -170,6 +175,15 @@ def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=T
batch_update.append(m)
else:
batch_insert.append(m)

if update_links and 'nonbib' in r and links_url:
nb = json.loads(r['nonbib'])
if 'data_links_rows' in nb:
# send json version of DataLinksRow to update endpoint on links resolver
# need to optimize and not send one record at a time
tmp = {'bibcode': bibcode, 'data_links_rows': nb['data_links_rows']}
links_data.append(tmp)
links_bibcodes.append(bibcode)
else:
# if forced and we have at least the bib data, index it
if force is True:
Expand Down Expand Up @@ -216,8 +230,15 @@ def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=T
raise exception # will trigger retry
else:
app.mark_processed(recs_to_process, type=None, status='success')



if len(links_data):
r = requests.put(links_url, data = links_data)
if r.status_code == 200:
logger.info('send %s datalinks to %s including %s', len(links_data), links_url, links_data[0])
app.mark_processed(links_bibcodes, type='links', status='success')
else:
logger.error('error sending links to %s, error = %s, sent data = %s ', links_url, r.text, tmp)
app.mark_processed(links_bibcodes, type=None, status='links-failed')

@app.task(queue='delete-records')
def task_delete_documents(bibcode):
Expand Down
23 changes: 19 additions & 4 deletions adsmp/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import json

from mock import patch
from mock import patch, Mock
import unittest
from adsmp import app, tasks
from adsmp.models import Base
Expand All @@ -21,7 +21,8 @@ def setUp(self):
'SQLALCHEMY_URL': 'sqlite:///',
'SQLALCHEMY_ECHO': False,
'SOLR_URLS': ['http://foo.bar.com/solr/v1'],
'METRICS_SQLALCHEMY_URL': None
'METRICS_SQLALCHEMY_URL': None,
'LINKS_RESOLVER_UPDATE_URL': 'http://localhost:8080/update'
})
tasks.app = self.app # monkey-patch the app object
Base.metadata.bind = self.app._session.get_bind()
Expand Down Expand Up @@ -212,13 +213,27 @@ def test_task_update_solr(self):


def test_task_index_records(self):
self.assertRaises(Exception, lambda : tasks.task_index_records(['foo', 'bar'], update_solr=False, update_metrics=False))
self.assertRaises(Exception, lambda : tasks.task_index_records(['foo', 'bar'], update_solr=False, update_metrics=False, update_links=False))

with patch.object(tasks.logger, 'error', return_value=None) as logger:
tasks.task_index_records(['non-existent'])
logger.assert_called_with(u"The bibcode %s doesn't exist!", 'non-existent')



def test_task_index_links(self):
"""verify data is sent to links microservice update endpoint"""
r = Mock()
r.status_code = 200
with patch.object(self.app, 'get_record', return_value={'bibcode': 'linkstest',

'nonbib': '{"data_links_rows": "baz"}',
'bib_data_updated': get_date(),
'nonbib_data_updated': get_date(),
'processed': get_date('2025')}), \
patch('requests.put', return_value = r) as p:
tasks.task_index_records(['linkstest'], update_solr=False, update_metrics=False, update_links=True, force=True)
p.assert_called_with('http://localhost:8080/update', data=[{'bibcode': 'linkstest', 'data_links_rows': 'baz'}])


if __name__ == '__main__':
unittest.main()
6 changes: 5 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@

SOLR_URLS = ['http://localhost:9983/solr/collection1/update']
SOLR_URL_NEW = 'http://localhost:9983/solr/collection1/query'
SOLR_URL_OLD = 'http://localhost:9984/solr/collection1/query'
SOLR_URL_OLD = 'http://localhost:9984/solr/collection1/query'

# url for the update endpoint of the links resolver microservice
# new links data is sent to this url, the mircoservice updates its datastore
LINKS_RESOLVER_UDPATE_URL = 'http://localhost:8080/update'
22 changes: 13 additions & 9 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def diagnostics(bibcodes):
print '# of %s' % x, session.query(Records).filter(getattr(Records, x) != None).count()

print 'sending test bibcodes to the queue for reindexing'
tasks.task_index_records.delay(bibcodes, force=True, update_solr=True, update_metrics=True)
tasks.task_index_records.delay(bibcodes, force=True, update_solr=True, update_metrics=True, update_links=True)



Expand All @@ -73,7 +73,7 @@ def print_kvs():


def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True, update_metrics=True,
force_processing=False):
update_links=True, force_processing=False):
"""
Initiates routing of the records (everything that was updated)
since point in time T.
Expand Down Expand Up @@ -142,7 +142,7 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
batch.append(rec.bibcode)
else:
batch.append(rec.bibcode)
tasks.task_index_records.delay(batch, force=force_indexing, update_solr=update_solr, update_metrics=update_metrics)
tasks.task_index_records.delay(batch, force=force_indexing, update_solr=update_solr, update_metrics=update_metrics, update_links=update_links)
batch = []
last_bibcode = rec.bibcode

Expand Down Expand Up @@ -216,11 +216,11 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
nargs='?',
dest='reindex',
action='store',
const = 'sm',
default='sm',
const = 'sml',
default='sml',
help='Sent all updated documents to SOLR/Postgres (you can combine with --since).' +
'Default is to update both solr and metrics. You can choose what to update.' +
'(s = update solr, m = update metrics)')
'(s = update solr, m = update metrics, l = update link resolver)')

parser.add_argument('-e',
'--batch_size',
Expand Down Expand Up @@ -284,6 +284,7 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
elif args.reindex:
update_solr = 's' in args.reindex.lower()
update_metrics = 'm' in args.reindex.lower()
update_links = 'l' in args.reindex.lower()
if args.filename:
print 'sending bibcodes from file to the queue for reindexing'
bibs = []
Expand All @@ -294,14 +295,17 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
bibs.append(bibcode)
if len(bibs) >= 100:
tasks.task_index_records.delay(bibs, force=True,
update_solr=update_solr, update_metrics=update_metrics)
update_solr=update_solr, update_metrics=update_metrics,
update_links = update_links)
bibs = []
if len(bibs) > 0:
tasks.task_index_records.delay(bibs, force=True,
update_solr=update_solr, update_metrics=update_metrics)
update_solr=update_solr, update_metrics=update_metrics,
update_links = update_links)
bibs = []
else:
print 'sending bibcode since date to the queue for reindexing'
reindex(since=args.since, batch_size=args.batch_size, force_indexing=args.force_indexing,
update_solr=update_solr, update_metrics=update_metrics, force_processing=args.force_processing)
update_solr=update_solr, update_metrics=update_metrics,
update_links = update_links, force_processing=args.force_processing)

0 comments on commit 7ef45c4

Please sign in to comment.