Skip to content
This repository has been archived by the owner on Feb 27, 2021. It is now read-only.

BASE dump import and optimizations for speed #760

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
123 changes: 92 additions & 31 deletions backend/oai.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@


import logging

from datetime import datetime
import os
import bz2

from backend.papersource import PaperSource

Expand All @@ -33,16 +33,19 @@
from oaipmh.metadata import MetadataRegistry
from oaipmh.metadata import oai_dc_reader
from papers.models import Paper
from papers.models import OaiRecord
from backend.translators import OAIDCTranslator
from backend.translators import BASEDCTranslator
from backend.oaireader import base_dc_reader
from backend.utils import with_speed_report
from backend.utils import group_by_batches

logger = logging.getLogger('dissemin.' + __name__)

class OaiPaperSource(PaperSource): # TODO: this should not inherit from PaperSource
"""
A paper source that fetches records from the OAI-PMH proxy
(typically: proaixy).
A paper source that fetches records from an OAI-PMH source
(typically: BASE).

It uses the ListRecord verb to fetch records from the OAI-PMH
source. Each record is then converted to a :class:`BarePaper`
Expand Down Expand Up @@ -111,6 +114,43 @@ def ingest(self, from_date=None, metadataPrefix='oai_dc',
records = self.client.listRecords(**args)
self.process_records(records, metadataPrefix)

def load_base_dump(self, directory_path):
Phyks marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a path to a directory, representing an un-tar'ed BASE dump,
read all the bz2'ed files in it and save them to the database.

:param directory_path: the path to the directory where the BASE dump was un-tar'ed
"""
metadata_prefix = 'base_dc'
generator = self.read_base_dump(directory_path, metadata_prefix)
self.process_records(generator, metadata_prefix, max_lookahead=10000)

def read_base_dump(self, directory_path, metadata_prefix):
"""
Given a path to a directory, representing an un-tar'ed BASE dump,
read all the bz2'ed files in it as a generator of OAI records

:param directory_path: the path to the directory where the BASE dump was un-tar'ed
:param metadata_prefix: the metadata prefix to read the records
"""
filenames = os.listdir(directory_path)
namespaces = self.client.getNamespaces()
metadata_registry = self.client.getMetadataRegistry()
for filename in filenames:
beckstefan marked this conversation as resolved.
Show resolved Hide resolved
if not filename.endswith('.bz2'):
continue
file_path = os.path.join(directory_path, filename)
with bz2.open(file_path, 'r') as f:
payload = f.read()
tree = self.client.parse(payload)
records, _ = self.client.buildRecords(
metadata_prefix, namespaces,
metadata_registry, tree)
for header, metadata, about in records:
header._element = None
metadata._element = None
yield (header, metadata, about)

def create_paper_by_identifier(self, identifier, metadataPrefix):
"""
Queries the OAI-PMH proxy for a single paper.
Expand Down Expand Up @@ -138,17 +178,24 @@ def listRecords_or_empty(self, source, *args, **kwargs):
except NoRecordsMatchError:
return []

def process_record(self, header, metadata, format):
def translate_record(self, header, metadata, metadata_format):
"""
Saves the record given by the header and metadata (as returned by
pyoai) into a Paper, or None if anything failed.
Translates a record to a BarePaper. Returns None
if an invalid format is given, or the record has incomplete metadata.
"""
translator = self.translators.get(format)
translator = self.translators.get(metadata_format)
if translator is None:
logger.warning("Unknown metadata format %s, skipping" % header.format())
return
return translator.translate(header, metadata)

def process_record(self, header, metadata, metadata_format):
"""
Saves the record given by the header and metadata (as returned by
pyoai) into a Paper, or None if anything failed.
"""
paper = self.translate_record(header, metadata, metadata_format)

paper = translator.translate(header, metadata)
if paper is not None:
try:
with transaction.atomic():
Expand All @@ -157,33 +204,47 @@ def process_record(self, header, metadata, format):
except ValueError:
logger.exception("Ignoring invalid paper with header %s" % header.identifier())

def process_records(self, listRecords, format):
def process_records(self, listRecords, metadata_format, max_lookahead=1000):
"""
Save as :class:`Paper` all the records contained in this list
Save as :class:`Paper` all the records contained in this list.
Records are represented as pairs of OaiHeader and OaiRecord, as returned
by pyoai's ListRecords
"""
# check that we have at least one translator, otherwise
# it's not really worth trying…
if not self.translators:
raise ValueError("No OAI translators have been set up: " +
"We cannot save any record.")

last_report = datetime.now()
processed_since_report = 0

with ParallelGenerator(listRecords, max_lookahead=1000) as g:
for record in g:
header = record[0]
metadata = record[1]._map

self.process_record(header, metadata, format)

# rate reporting
processed_since_report += 1
if processed_since_report >= 1000:
td = datetime.now() - last_report
rate = 'infty'
if td.seconds:
rate = str(processed_since_report / td.seconds)
logger.info("current rate: %s records/s" % rate)
processed_since_report = 0
last_report = datetime.now()
with ParallelGenerator(listRecords, max_lookahead=max_lookahead) as g:
for record_group in group_by_batches(with_speed_report(g, name='OAI papers')):
oai_ids_to_last_updated = {
record[0].identifier():record[0].datestamp() for record in record_group
}

# Fetch the last modified date of all these records in the DB
last_modified_in_db = { id : last_modified
wetneb marked this conversation as resolved.
Show resolved Hide resolved
for id, last_modified in
OaiRecord.objects.filter(identifier__in=oai_ids_to_last_updated.keys()).values_list('identifier', 'last_update')
}

# Deduce the list of papers to update
ids_to_update = set()
for id, last_updated_in_base in oai_ids_to_last_updated.items():
wetneb marked this conversation as resolved.
Show resolved Hide resolved
if id not in last_modified_in_db or last_modified_in_db[id].date() <= last_updated_in_base.date():
ids_to_update.add(id)
wetneb marked this conversation as resolved.
Show resolved Hide resolved

bare_papers = [self.translate_record(record[0], record[1]._map, metadata_format)
for record in record_group
if record[0].identifier() in ids_to_update]

for paper in bare_papers:
if paper is not None:
try:
with transaction.atomic():
saved = Paper.from_bare(paper)
return saved
except ValueError:
logger.exception("Ignoring invalid paper with identifier " + paper.oairecords[0].identifier)
wetneb marked this conversation as resolved.
Show resolved Hide resolved


Binary file not shown.
Binary file not shown.
23 changes: 23 additions & 0 deletions backend/tests/test_oai.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,29 @@ def delete(self, identifier):
p.delete()
except OaiRecord.DoesNotExist:
pass

def test_read_dump(self):
"""
Reads a small example dump from BASE
"""
records = self.base_oai.read_base_dump(os.path.join(self.testdir, 'data/example_base_dump'), 'base_dc')
# Fetch the records
records = list(records)
titles = [record.getField('title') for _, record, _ in records]
self.assertEqual(len(records), 20)
self.assertTrue(['Modularizing the Elimination of r=0 in Kleene Algebra'] in titles)

def test_load_dump(self):
"""
Loads up a dump in the DB.
"""
self.base_oai.load_base_dump(os.path.join(self.testdir, 'data/example_base_dump'))
# Loading it a second time should be very quick
Phyks marked this conversation as resolved.
Show resolved Hide resolved
self.base_oai.load_base_dump(os.path.join(self.testdir, 'data/example_base_dump'))

paper = Paper.objects.get(title='Modularizing the Elimination of r=0 in Kleene Algebra')
self.assertEqual(paper.pdf_url, 'http://dx.doi.org/10.2168/lmcs-1(3:5)2005')


def test_create_no_match(self):
"""
Expand Down
6 changes: 5 additions & 1 deletion backend/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from backend.utils import report_speed
from backend.utils import with_speed_report
from backend.utils import group_by_batches
from time import sleep
from datetime import timedelta

Expand All @@ -16,4 +17,7 @@ def second_generator(limit):
yield elem

assert list(second_generator(20)) == list(range(20))



def test_group_by_batches():
assert list(group_by_batches(range(10), batch_size=4)) == [[0,1,2,3],[4,5,6,7],[8,9]]
2 changes: 1 addition & 1 deletion backend/translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def translate(self, header, metadata):
self.add_oai_record(header, metadata, paper)
return paper
except ValueError as e:
logger.warning("OAI record "+header.identifier()+" skipped:\n", e, exc_info=True)
logger.warning("OAI record "+header.identifier()+" skipped:\n%s", e, exc_info=True)
wetneb marked this conversation as resolved.
Show resolved Hide resolved
paper.update_availability()

def add_oai_record(self, header, metadata, paper):
Expand Down
14 changes: 14 additions & 0 deletions backend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,17 @@ def wrapped_generator(*args, **kwargs):
return with_speed_report(func(*args, **kwargs), name=logging_name, report_delay=report_delay)
return wrapped_generator
return decorator


def group_by_batches(generator, batch_size=100):
"""
Given a generator, returns a generator of groups of at most batch_size elements.
"""
current_batch = []
for item in generator:
current_batch.append(item)
if len(current_batch) == batch_size:
yield current_batch
current_batch = []
if len(current_batch) > 0:
wetneb marked this conversation as resolved.
Show resolved Hide resolved
yield current_batch