Skip to content
This repository has been archived by the owner on Feb 27, 2021. It is now read-only.

BASE dump import and optimizations for speed #760

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
125 changes: 94 additions & 31 deletions backend/oai.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@


import logging

from datetime import datetime
import os
import bz2

from backend.papersource import PaperSource

Expand All @@ -33,16 +33,19 @@
from oaipmh.metadata import MetadataRegistry
from oaipmh.metadata import oai_dc_reader
from papers.models import Paper
from papers.models import OaiRecord
from backend.translators import OAIDCTranslator
from backend.translators import BASEDCTranslator
from backend.oaireader import base_dc_reader
from backend.utils import with_speed_report
from backend.utils import group_by_batches

logger = logging.getLogger('dissemin.' + __name__)

class OaiPaperSource(PaperSource): # TODO: this should not inherit from PaperSource
"""
A paper source that fetches records from the OAI-PMH proxy
(typically: proaixy).
A paper source that fetches records from an OAI-PMH source
(typically: BASE).

It uses the ListRecord verb to fetch records from the OAI-PMH
source. Each record is then converted to a :class:`BarePaper`
Expand Down Expand Up @@ -111,6 +114,43 @@ def ingest(self, from_date=None, metadataPrefix='oai_dc',
records = self.client.listRecords(**args)
self.process_records(records, metadataPrefix)

def load_base_dump(self, directory_path):
Phyks marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a path to a directory, representing an un-tar'ed BASE dump,
read all the bz2'ed files in it and save them to the database.

:param directory_path: the path to the directory where the BASE dump was un-tar'ed
"""
metadata_prefix = 'base_dc'
generator = self.read_base_dump(directory_path, metadata_prefix)
self.process_records(generator, metadata_prefix, max_lookahead=10000)

def read_base_dump(self, directory_path, metadata_prefix):
"""
Given a path to a directory, representing an un-tar'ed BASE dump,
read all the bz2'ed files in it as a generator of OAI records

:param directory_path: the path to the directory where the BASE dump was un-tar'ed
:param metadata_prefix: the metadata prefix to read the records
"""
filenames = os.listdir(directory_path)
namespaces = self.client.getNamespaces()
metadata_registry = self.client.getMetadataRegistry()
for filename in filenames:
beckstefan marked this conversation as resolved.
Show resolved Hide resolved
if not filename.endswith('.bz2'):
continue
file_path = os.path.join(directory_path, filename)
with bz2.open(file_path, 'r') as f:
payload = f.read()
tree = self.client.parse(payload)
records, _ = self.client.buildRecords(
metadata_prefix, namespaces,
metadata_registry, tree)
for header, metadata, about in records:
header._element = None
metadata._element = None
yield (header, metadata, about)

def create_paper_by_identifier(self, identifier, metadataPrefix):
"""
Queries the OAI-PMH proxy for a single paper.
Expand Down Expand Up @@ -138,17 +178,24 @@ def listRecords_or_empty(self, source, *args, **kwargs):
except NoRecordsMatchError:
return []

def process_record(self, header, metadata, format):
def translate_record(self, header, metadata, metadata_format):
"""
Saves the record given by the header and metadata (as returned by
pyoai) into a Paper, or None if anything failed.
Translates a record to a BarePaper. Returns None
if an invalid format is given, or the record has incomplete metadata.
"""
translator = self.translators.get(format)
translator = self.translators.get(metadata_format)
if translator is None:
logger.warning("Unknown metadata format %s, skipping" % header.format())
return
return translator.translate(header, metadata)

def process_record(self, header, metadata, metadata_format):
"""
Saves the record given by the header and metadata (as returned by
pyoai) into a Paper, or None if anything failed.
"""
paper = self.translate_record(header, metadata, metadata_format)

paper = translator.translate(header, metadata)
if paper is not None:
try:
with transaction.atomic():
Expand All @@ -157,33 +204,49 @@ def process_record(self, header, metadata, format):
except ValueError:
logger.exception("Ignoring invalid paper with header %s" % header.identifier())

def process_records(self, listRecords, format):
def process_records(self, listRecords, metadata_format, max_lookahead=1000):
"""
Save as :class:`Paper` all the records contained in this list
Save as :class:`Paper` all the records contained in this list.
Records are represented as pairs of OaiHeader and OaiRecord, as returned
by pyoai's ListRecords
"""
# check that we have at least one translator, otherwise
# it's not really worth trying…
if not self.translators:
raise ValueError("No OAI translators have been set up: " +
"We cannot save any record.")

last_report = datetime.now()
processed_since_report = 0

with ParallelGenerator(listRecords, max_lookahead=1000) as g:
for record in g:
header = record[0]
metadata = record[1]._map

self.process_record(header, metadata, format)

# rate reporting
processed_since_report += 1
if processed_since_report >= 1000:
td = datetime.now() - last_report
rate = 'infty'
if td.seconds:
rate = str(processed_since_report / td.seconds)
logger.info("current rate: %s records/s" % rate)
processed_since_report = 0
last_report = datetime.now()
with ParallelGenerator(listRecords, max_lookahead=max_lookahead) as g:
for record_group in group_by_batches(with_speed_report(g, name='OAI papers')):
oai_ids_to_last_updated = {
record[0].identifier():record[0].datestamp() for record in record_group
}

# Fetch the last modified date of all these records in the DB
last_modified_in_db = {
pk : last_modified
for pk, last_modified in OaiRecord.objects.filter(
identifier__in=oai_ids_to_last_updated.keys()
).values_list('identifier', 'last_update')
}

# Deduce the list of papers to update
ids_to_update = {
pk
for pk, last_updated_in_base in oai_ids_to_last_updated.items()
if pk not in last_modified_in_db or last_modified_in_db[pk].date() <= last_updated_in_base.date()
}

bare_papers = [self.translate_record(record[0], record[1]._map, metadata_format)
for record in record_group
if record[0].identifier() in ids_to_update]

for paper in bare_papers:
if paper is not None:
try:
with transaction.atomic():
Paper.from_bare(paper)
except ValueError:
logger.exception("Ignoring invalid paper with identifier ", paper.oairecords[0].identifier)


41 changes: 27 additions & 14 deletions backend/pubtype_translations.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,33 @@
'info:eu-repo/semantics/patent': 'other',
'info:eu-repo/semantics/other': 'other',
# BASE
'typenorm:0000': 'other', # text
'typenorm:0001': 'journal-article',
'typenorm:0002': 'book',
'typenorm:0003': 'proceedings-article',
'typenorm:0004': 'thesis',
'typenorm:0005': 'other', # review
'typenorm:0101': 'other', # audio
'typenorm:0102': 'other', # video
'typenorm:0103': 'poster', # image
'typenorm:0104': 'poster', # map
'typenorm:0105': 'other', # software
'typenorm:0106': 'dataset',
'typenorm:0107': 'other', # sheet music
'typenorm:9999': 'other', # unknown
# https://www.base-search.net/about/download/base_interface.pdf
'base:1' : 'other', # Text
'base:11' : 'book', # Book
'base:111' : 'book-chapter', # Book part
'base:12' : 'journal-issue', # Journal/Newspaper
'base:121' : 'journal-article', # Article contribution to journal/newspaper
'base:122' : 'other', # Other non-article part of journal/newspaper
'base:13' : 'proceedings-article', # Conference object
'base:14' : 'report', # Report
'base:15' : 'other', # Review
'base:16' : 'other', # Course material
'base:17' : 'other', # Lecture
'base:18' : 'thesis', # Thesis
'base:181' : 'thesis', # Bachelor thesis
'base:182' : 'thesis', # Master thesis
'base:183' : 'thesis', # Doctoral and postdoctoral thesis
'base:19' : 'other', # Manuscript
'base:1A' : 'other', # Patent
'base:2' : 'other', # Musical notation
'base:3' : 'other', # Map
'base:4' : 'other', # Audio
'base:5' : 'other', # Image/Video
'base:51' : 'other', # Still image
'base:52' : 'other', # Mocing image/Video
'base:6' : 'other' , # Software
'base:7' : 'dataset', # Dataset
'base:F' : 'other', # Other/Unknown Material
# RG
'inProceedings': 'proceedings-article',
}
Expand Down
Binary file not shown.
Binary file not shown.
29 changes: 26 additions & 3 deletions backend/tests/test_oai.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,29 @@ def delete(self, identifier):
p.delete()
except OaiRecord.DoesNotExist:
pass

def test_read_dump(self):
"""
Reads a small example dump from BASE
"""
records = self.base_oai.read_base_dump(os.path.join(self.testdir, 'data/example_base_dump'), 'base_dc')
# Fetch the records
records = list(records)
titles = [record.getField('title') for _, record, _ in records]
self.assertEqual(len(records), 20)
self.assertTrue(['Modularizing the Elimination of r=0 in Kleene Algebra'] in titles)

def test_load_dump(self):
"""
Loads up a dump in the DB.
"""
self.base_oai.load_base_dump(os.path.join(self.testdir, 'data/example_base_dump'))
# Loading it a second time should be very quick
Phyks marked this conversation as resolved.
Show resolved Hide resolved
self.base_oai.load_base_dump(os.path.join(self.testdir, 'data/example_base_dump'))

paper = Paper.objects.get(title='Modularizing the Elimination of r=0 in Kleene Algebra')
self.assertEqual(paper.pdf_url, 'http://dx.doi.org/10.2168/lmcs-1(3:5)2005')


def test_create_no_match(self):
"""
Expand Down Expand Up @@ -280,9 +303,9 @@ def test_base_doctype(self):
mappings = {
'ftunivsavoie:oai:HAL:hal-01062241v1': 'proceedings-article',
'ftunivsavoie:oai:HAL:hal-01062339v1': 'book-chapter',
'ftunivmacedonia:oai:dspace.lib.uom.gr:2159/6227': 'other',
'ftartxiker:oai:HAL:hal-00845819v1': 'journal-article',
'ftdatacite:oai:oai.datacite.org:402223': 'dataset',
'ftunivmacedonia:oai:dspace.lib.uom.gr:2159/6227': 'preprint',
'ftartxiker:oai:HAL:hal-00845819v1': 'preprint',
'ftdatacite:oai:oai.datacite.org:402223': 'preprint',
}

for ident, typ in list(mappings.items()):
Expand Down
13 changes: 10 additions & 3 deletions backend/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
from datetime import timedelta
from time import sleep

from backend.utils import group_by_batches
from backend.utils import report_speed
from backend.utils import utf8_truncate
from backend.utils import with_speed_report


def test_group_by_batches():
assert list(group_by_batches(range(10), batch_size=4)) == [[0,1,2,3],[4,5,6,7],[8,9]]


def test_report_speed():
assert list(with_speed_report(range(10))) == list(range(10))
assert list(with_speed_report([])) == []

@report_speed(name='my_generator', report_delay=timedelta(seconds=0.1))
def second_generator(limit):
for elem in range(limit):
sleep(0.01)
yield elem

assert list(second_generator(20)) == list(range(20))


class TestUtf8Truncate:
"""
Tests truncation by utf-8 length
Expand All @@ -26,4 +33,4 @@ def test_utf8_truncate_ascii(self):
assert utf8_truncate('x' * 1026) == 'x' * 1024

def test_utf8_truncate_unicode(self):
assert utf8_truncate('–' * 1024) == '–' * 341
assert utf8_truncate('–' * 1024) == '–' * 341
8 changes: 4 additions & 4 deletions backend/translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class OaiTranslator(object):
A translator takes a metadata record from the OAI-PMH
proxy and converts it to a :class:`BarePaper`.
"""

def __init__(self, oaisource):
"""
Inits the translator to create OaiRecords linked to the
Expand Down Expand Up @@ -147,7 +147,7 @@ def translate(self, header, metadata):
self.add_oai_record(header, metadata, paper)
return paper
except ValueError as e:
logger.warning("OAI record "+header.identifier()+" skipped:\n", e, exc_info=True)
logger.warning("OAI record %s skipped:\n%s", header.identifier(), e, exc_info=True)
paper.update_availability()

def add_oai_record(self, header, metadata, paper):
Expand All @@ -170,7 +170,7 @@ def add_oai_record(self, header, metadata, paper):
keywords = ' | '.join(metadata['subject'])
contributors = ' '.join(metadata['contributor'])[:4096]

typenorms = ['typenorm:'+tn for tn in metadata.get('typenorm', [])]
typenorms = ['base:{}'.format(tn) for tn in metadata.get('typenorm', [])]
pubtype_list = metadata.get('type', []) + typenorms
pubtype = None
for raw_pubtype in pubtype_list:
Expand Down Expand Up @@ -207,4 +207,4 @@ class BASEDCTranslator(OAIDCTranslator):
"""

def format(self):
return 'base_dc'
return 'base_dc'
18 changes: 16 additions & 2 deletions backend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def inner(*args, **kwargs):
def request_retry(url, **kwargs):
"""
Retries a request, with throttling and exponential back-off.

:param url: the URL to fetch
:param data: the GET parameters
:param headers: the HTTP headers
Expand Down Expand Up @@ -135,7 +135,7 @@ def with_speed_report(generator, name=None, report_delay=timedelta(seconds=10)):
"""
Periodically reports the speed at which we are enumerating the items
of a generator.

:param name: a name to use in the reports (eg "papers from Crossref API")
:param report_delay: print a report every so often
"""
Expand Down Expand Up @@ -165,3 +165,17 @@ def wrapped_generator(*args, **kwargs):
return with_speed_report(func(*args, **kwargs), name=logging_name, report_delay=report_delay)
return wrapped_generator
return decorator


def group_by_batches(generator, batch_size=100):
"""
Given a generator, returns a generator of groups of at most batch_size elements.
"""
current_batch = []
for item in generator:
current_batch.append(item)
if len(current_batch) == batch_size:
yield current_batch
current_batch = []
if current_batch:
yield current_batch