Skip to content

Commit

Permalink
Merge f75c92f into 675a0e7
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemansteve committed Apr 19, 2018
2 parents 675a0e7 + f75c92f commit 8aa8a30
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 21 deletions.
28 changes: 26 additions & 2 deletions adsmp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import absolute_import, unicode_literals
from . import exceptions
from .models import Records, ChangeLog, IdentifierMapping
from adsmsg import OrcidClaims, DenormalizedRecord, FulltextUpdate, MetricsRecord, NonBibRecord, NonBibRecordList, MetricsRecordList
from adsmsg import OrcidClaims, DenormalizedRecord, FulltextUpdate, MetricsRecord, NonBibRecord, NonBibRecordList, MetricsRecordList, AugmentAffiliationResponseRecord, AugmentAffiliationResponseRecordList
from adsmsg.msg import Msg
from adsputils import ADSCelery, create_engine, sessionmaker, scoped_session, contextmanager
from sqlalchemy.orm import load_only as _load_only
Expand Down Expand Up @@ -108,6 +108,27 @@ def update_storage(self, bibcode, type, payload):
oldval = 'not-stored'
r.metrics = payload
r.metrics_updated = now
elif type == 'augment':
# r.augments holds a dict, only key currently supported is affiliations
# use sequence entry to determine location in affiliation array
db_augments = r.augments
if db_augments is None or len(db_augments) is 0:
db_augments = '{}'
db_augments = json.loads(db_augments)
affil_augments = db_augments.get('affiliations', [])
j = json.loads(payload)
# sequence count starts at 1, not 0
index = int(j['sequence'].split('/')[0]) - 1
if len(affil_augments) < index + 1:
# here if db array is not long enough to hold new value
# so we extend it by appending placeholder '-'
affil_augments = affil_augments + [u'-'] * (index - len(affil_augments) + 1)
affil_augments[index] = j['affiliation']
db_augments['affiliations'] = affil_augments
oldval = 'not-stored'
r.augments = json.dumps(db_augments)
r.augments_updated = now

else:
raise Exception('Unknown type: %s' % type)
session.add(ChangeLog(key=bibcode, type=type, oldvalue=oldval))
Expand Down Expand Up @@ -222,7 +243,10 @@ def get_msg_type(self, msg):
return 'metrics'
elif isinstance(msg, MetricsRecordList):
return 'metrics_records'

elif isinstance(msg, AugmentAffiliationResponseRecord):
return 'augment'
elif isinstance(msg, AugmentAffiliationResponseRecordList):
return 'augment_records'

else:
raise exceptions.IgnorableException('Unkwnown type {0} submitted for update'.format(repr(msg)))
Expand Down
14 changes: 10 additions & 4 deletions adsmp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ class Records(Base):
nonbib_data = Column(Text)
fulltext = Column(Text)
metrics = Column(Text)
# holds a dict of augments to be merged
# currently only supported key is 'affiliations'
# with the value an array holding affiliation strings and '-' placeholders
augments = Column(Text)

# when data is received we set the updated timestamp
bib_data_updated = Column(UTCDateTime, default=None)
orcid_claims_updated = Column(UTCDateTime, default=None)
nonbib_data_updated = Column(UTCDateTime, default=None)
fulltext_updated = Column(UTCDateTime, default=None)
metrics_updated = Column(UTCDateTime, default=None)
augments_updated = Column(UTCDateTime, default=None)

created = Column(UTCDateTime, default=get_date)
updated = Column(UTCDateTime, default=get_date)
Expand All @@ -74,11 +80,11 @@ class Records(Base):
status = Column(Enum('solr-failed', 'metrics-failed', 'links-failed', 'success', name='status'))

_date_fields = ['created', 'updated', 'processed', # dates
'bib_data_updated', 'orcid_claims_updated', 'nonbib_data_updated',
'fulltext_updated', 'metrics_updated', 'datalinks_processed',
'solr_processed', 'metrics_processed']
'bib_data_updated', 'orcid_claims_updated', 'nonbib_data_updated',
'fulltext_updated', 'metrics_updated', 'augments_updated',
'datalinks_processed', 'solr_processed', 'metrics_processed']
_text_fields = ['id', 'bibcode', 'status', 'solr_checksum', 'metrics_checksum', 'datalinks_checksum']
_json_fields = ['bib_data', 'orcid_claims', 'nonbib_data', 'metrics', 'fulltext']
_json_fields = ['bib_data', 'orcid_claims', 'nonbib_data', 'metrics', 'fulltext', 'augments']

def toJSON(self, for_solr=False, load_only=None):
if for_solr:
Expand Down
44 changes: 32 additions & 12 deletions adsmp/solr_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from adsputils import setup_logging, date2solrstamp
import time

from collections import OrderedDict

logger = setup_logging('solr_updater')

Expand Down Expand Up @@ -68,6 +68,26 @@ def extract_data_pipeline(data, solrdoc):
citation_count_norm=data.get('citation_count_norm', 1)
)


def extract_augments_pipeline(db_augments, solrdoc):
"""merge any agumented values into aff field and return"""
solr_aff = list(solrdoc.get('aff'))
if db_augments is None or len(db_augments) is 0:
db_augments = '{}'
if isinstance(db_augments, basestring):
db_augments = json.loads(db_augments)
affil_augments = db_augments.get('affiliations', [])
# loop over augmented affils and overwrite current values
for index, a in enumerate(affil_augments):
if len(a) > 1:
# here if have something larger than default '-' and need to overwrite
if len(solr_aff) < index + 1:
# extend array to make room for this entry
solr_aff = solr_aff + [u'-'] * (index - len(solr_aff) + 1)
solr_aff[index] = a # overwrite
return {'aff': solr_aff}


def extract_fulltext(data, solrdoc):
out = {}
for x,f in (('body', 'body'), ('acknowledgements', 'ack')):
Expand Down Expand Up @@ -176,16 +196,16 @@ def get_timestamps(db_record, out):
out['update_timestamp'] = date2solrstamp(last_update)
return out

DB_COLUMN_DESTINATIONS = {
'bib_data': '',
'orcid_claims': get_orcid_claims,
'nonbib_data': extract_data_pipeline,
'metrics': extract_metrics_pipeline,
'id': 'id',
'fulltext': extract_fulltext,
'#timestamps': get_timestamps, # use 'id' to be always called

}
DB_COLUMN_DESTINATIONS = OrderedDict([
('bib_data', ''),
('orcid_claims', get_orcid_claims),
('nonbib_data', extract_data_pipeline),
('metrics', extract_metrics_pipeline),
('id', 'id'),
('fulltext', extract_fulltext),
('#timestamps', get_timestamps), # use 'id' to be always called
('augments', extract_augments_pipeline) # over-writes existing field values
])


def delete_by_bibcodes(bibcodes, urls):
Expand Down Expand Up @@ -272,7 +292,7 @@ def transform_json_record(db_record):
if callable(target):
x = target(db_record, out) # in the interest of speed, don't create copy of out
if x:
out.update(x)
out.update(x)

return out

Expand Down
6 changes: 6 additions & 0 deletions adsmp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ def task_update_record(msg):
bibcodes.append(m.bibcode)
record = app.update_storage(m.bibcode, 'metrics', m.toJSON(including_default_value_fields=True))
logger.debug('Saved record from list: %s', record)
elif type == 'augment_records':
for m in msg.affiliation_responses:
m = Msg(m, None, None)
bibcodes.append(m.bibcode)
record = app.update_storage(m.bibcode, 'augment', m.toJSON(including_default_value_fields=True))
logger.debug('Saved record from list: %s', record)
else:
# here when record has a single bibcode
bibcodes.append(msg.bibcode)
Expand Down
46 changes: 46 additions & 0 deletions adsmp/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,52 @@ def test_update_records(self):
with self.app.session_scope() as session:
r = session.query(models.ChangeLog).filter_by(key='bibcode:abc').first()
self.assertTrue(r.key, 'abc')

# verify affilation augments can arrive in any order
self.app.update_storage('bib_augment_test', 'augment',
{'affiliation': 'CfA', 'sequence': '1/2'})
with self.app.session_scope() as session:
r = session.query(models.Records).filter_by(bibcode='bib_augment_test').first()
j = r.toJSON()
self.assertEquals(j['augments'], {'affiliations' : ['CfA']})
t = j['augments_updated']
self.assertTrue(now < t)

self.app.update_storage('bib_augment_test', 'augment',
{'affiliation': 'Tufts', 'sequence': '2/2'})
with self.app.session_scope() as session:
r = session.query(models.Records).filter_by(bibcode='bib_augment_test').first()
j = r.toJSON()
self.assertEquals(j['augments'], {'affiliations': ['CfA', 'Tufts']})
t = j['augments_updated']
self.assertTrue(now < t)

self.app.update_storage('bib_augment_test2', 'augment',
{'affiliation': 'Tufts', 'sequence': '2/2'})
with self.app.session_scope() as session:
r = session.query(models.Records).filter_by(bibcode='bib_augment_test2').first()
j = r.toJSON()
self.assertEquals(j['augments'], {'affiliations': ['-', 'Tufts']})
t = j['augments_updated']
self.assertTrue(now < t)

self.app.update_storage('bib_augment_test2', 'augment',
{'affiliation': 'CfA', 'sequence': '1/2'})
with self.app.session_scope() as session:
r = session.query(models.Records).filter_by(bibcode='bib_augment_test2').first()
j = r.toJSON()
self.assertEquals(j['augments'], {'affiliations': ['CfA', 'Tufts']})
t = j['augments_updated']
self.assertTrue(now < t)

self.app.update_storage('bib_augment_test3', 'augment',
{'affiliation': 'CfA', 'sequence': '4/4'})
with self.app.session_scope() as session:
r = session.query(models.Records).filter_by(bibcode='bib_augment_test3').first()
j = r.toJSON()
self.assertEquals(j['augments'], {'affiliations': ['-', '-', '-', 'CfA']})
t = j['augments_updated']
self.assertTrue(now < t)


def test_rename_bibcode(self):
Expand Down
7 changes: 5 additions & 2 deletions adsmp/tests/test_solr_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_solr_transformer(self):
u'links_data': u'', ### TODO(rca): superconfusing string, but fortunately we are getting ridd of it
u'orcid_pub': [u'-', u'-', u'-', u'-'],
u'page': [u'283'],
u'property': [u'OPENACCESS', u'ADS_OPENACCESS', u'ARTICLE', u'NOT REFEREED'],
#u'property': [u'OPENACCESS', u'ADS_OPENACCESS', u'ARTICLE', u'NOT REFEREED'],
u'pub': u'Astronomical Data Analysis Software and Systems XII',
u'pub_raw': u'Astronomical Data Analysis Software and Systems XII ASP Conference Series, Vol. 295, 2003 H. E. Payne, R. I. Jedrzejewski, and R. N. Hook, eds., p.283',
u'pubdate': u'2003-00-00',
Expand Down Expand Up @@ -121,6 +121,7 @@ def test_solr_transformer(self):
u'bibcode': u'2007JAP...101d4501Z',
u'boost': 0.1899999976158142,
u'data': [u'MAST:3', u'SIMBAD:1'],
u'property': [u'OPENACCESS', u'ADS_OPENACCESS', u'ARTICLE', u'NOT REFEREED'],
u'downloads': [0,
0,
0,
Expand Down Expand Up @@ -169,11 +170,13 @@ def test_solr_transformer(self):
u'grants': [u'2419335 g', u'3111723 g*'],
u'citation_count_norm': .2,
})
self.app.update_storage('bibcode', 'augment',
{'sequence': '2/4', 'affiliation': 'CfA'})

rec = self.app.get_record('bibcode')
self.assertDictContainsSubset({u'abstract': u'abstract text',
u'ack': u'aaa',
u'aff': [u'-', u'-', u'-', u'-'],
u'aff': [u'-', u'CfA', u'-', u'-'],
u'alternate_bibcode': [u'2003adass..12..283B'],
u'author': [u'Blecksmith, E.', u'Paltani, S.', u'Rots, A.', u'Winkelman, S.'],
u'author_count': 4,
Expand Down
21 changes: 20 additions & 1 deletion adsmp/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from adsmp import app, tasks
from adsmp.models import Base, Records
from adsputils import get_date
from adsmsg import DenormalizedRecord, FulltextUpdate, NonBibRecord, NonBibRecordList, MetricsRecord, MetricsRecordList
from adsmsg import DenormalizedRecord, FulltextUpdate, NonBibRecord, NonBibRecordList, MetricsRecord, MetricsRecordList,AugmentAffiliationResponseRecord
from adsmsg.orcid_claims import OrcidClaims

import mock
Expand Down Expand Up @@ -104,6 +104,25 @@ def test_task_update_record_nonbib_list(self):
self.assertFalse(next_task.called)


def test_task_update_record_augments(self):
with patch('adsmp.tasks.task_index_records.delay') as next_task:
tasks.task_update_record(AugmentAffiliationResponseRecord
(bibcode='2015ApJ...815..133S', author='me', affiliation='CfA', sequence='1/1'))
self.assertEquals(self.app.get_record(bibcode='2015ApJ...815..133S')['augments'],
{'affiliations': ['CfA']})
self.assertFalse(next_task.called)

def test_task_update_record_augments_list(self):
with patch('adsmp.tasks.task_index_records.delay') as next_task:
recs = NonBibRecordList()
nonbib_data = {'bibcode': '2003ASPC..295..361M', 'boost': 3.1}
nonbib_data2 = {'bibcode': '3003ASPC..295..361Z', 'boost': 3.2}
rec = NonBibRecord(**nonbib_data)
rec2 = NonBibRecord(**nonbib_data2)
recs.nonbib_records.extend([rec._data, rec2._data])
tasks.task_update_record(recs)
self.assertFalse(next_task.called)

def test_task_update_record_metrics(self):
with patch('adsmp.tasks.task_index_records.delay') as next_task:
self.assertFalse(next_task.called)
Expand Down
39 changes: 39 additions & 0 deletions alembic/versions/0155d2dff74e_add_augments_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""add augments field
Revision ID: 0155d2dff74e
Revises: dec29f80e7d1
Create Date: 2018-04-16 13:45:25.646731
"""

# revision identifiers, used by Alembic.
revision = '0155d2dff74e'
down_revision = 'dec29f80e7d1'

from alembic import op
import sqlalchemy as sa


def upgrade():
# sqlite doesn't have ALTER command
cx = op.get_context()
if 'sqlite' in cx.connection.engine.name:
with op.batch_alter_table("records") as batch_op:
batch_op.add_column(sa.Column('augments', sa.Text))
batch_op.add_column(sa.Column('augments_updated', sa.TIMESTAMP))
else:
op.add_column('records', sa.Column('augments', sa.Text))
op.add_column('records', sa.Column('augments_updated', sa.TIMESTAMP))


def downgrade():
cx = op.get_context()
if 'sqlite' in cx.connection.engine.name:
with op.batch_alter_table("records") as batch_op:
batch_op.drop_column('augments')
batch_op.drop_column('augments_updated')
else:
op.drop_column('records', 'augments')
op.drop_column('records', 'augments_updated')


0 comments on commit 8aa8a30

Please sign in to comment.