diff --git a/.gitignore b/.gitignore index 4e9594481f..a5eb4b798f 100644 --- a/.gitignore +++ b/.gitignore @@ -64,3 +64,6 @@ target/ # Jetbrains editor project folder .idea/ + +# SIPStore +archive/ diff --git a/cernopendata/config.py b/cernopendata/config.py index dc9c4700f6..63e9a66e08 100644 --- a/cernopendata/config.py +++ b/cernopendata/config.py @@ -435,3 +435,5 @@ **params ) ] + +SIPSTORE_CHECKSUM_ALGORITHM = 'adler32' diff --git a/cernopendata/modules/fixtures/cli.py b/cernopendata/modules/fixtures/cli.py index 2e41a7670a..cf0914dc0a 100644 --- a/cernopendata/modules/fixtures/cli.py +++ b/cernopendata/modules/fixtures/cli.py @@ -37,428 +37,526 @@ from invenio_pidstore.models import PersistentIdentifier from invenio_records_files.api import Record from invenio_records_files.models import RecordsBuckets +from invenio_sipstore.models import SIPMetadataType from sqlalchemy.orm.attributes import flag_modified from cernopendata.modules.records.minters.docid import \ - cernopendata_docid_minter + cernopendata_docid_minter from cernopendata.modules.records.minters.recid import \ - cernopendata_recid_minter + cernopendata_recid_minter + +from .sip_utils import ( + handle_sipstore_record_file_index, + handle_sipstore_record_file, + sip_record, +) def get_jsons_from_dir(dir): - """Get JSON files inside a dir.""" - res = [] - for root, dirs, files in os.walk(dir): - for file in files: - if file.endswith(".json"): - res.append(os.path.join(root, file)) - return res - - -def handle_record_files(data, bucket, files, skip_files): - """Handles record files.""" - for file in files: - if skip_files: - break - assert 'uri' in file - assert 'size' in file - assert 'checksum' in file - - try: - f = FileInstance.create() - filename = file.get("uri").split('/')[-1:][0] - f.set_uri(file.get("uri"), file.get( - "size"), file.get("checksum")) - obj = ObjectVersion.create( - bucket, - filename, - _file_id=f.id - ) - - file.update({ - 'bucket': str(obj.bucket_id), - 'checksum': obj.file.checksum, - 'key': obj.key, - 'version_id': str(obj.version_id), - }) - - except Exception as e: - click.echo( - 'Recid {0} file {1} could not be loaded due ' - 'to {2}.'.format(data.get('recid'), filename, - str(e))) - continue - - -def create_record(schema, data, files, skip_files): - """Creates a new record.""" - id = uuid.uuid4() - cernopendata_recid_minter(id, data) - data['$schema'] = schema - record = Record.create(data, id_=id) - if not skip_files: - bucket = Bucket.create() - handle_record_files(data, bucket, files, skip_files) - RecordsBuckets.create( - record=record.model, bucket=bucket) - - return record - - -def update_record(pid, schema, data, files, skip_files): - """Updates the given record.""" - record = Record.get_record(pid.object_uuid) - with db.session.begin_nested(): - if record.files and not skip_files: - bucket_id = record.files.bucket - bucket = Bucket.get(bucket_id.id) - for o in ObjectVersion.get_by_bucket(bucket).all(): - o.remove() - o.file.delete() - RecordsBuckets.query.filter_by( - record=record.model, - bucket=bucket - ).delete() - bucket_id.remove() - db.session.commit() - record.update(data) - if not skip_files: - bucket = Bucket.create() - handle_record_files(data, bucket, files, skip_files) - RecordsBuckets.create( - record=record.model, bucket=bucket) - return record + """Get JSON files inside a dir.""" + res = [] + for root, dirs, files in os.walk(dir): + for file in files: + if file.endswith(".json"): + res.append(os.path.join(root, file)) + return res + + +def handle_record_files(data, bucket, files, skip_files, skip_sips): + """Handles record files.""" + sip_files = [] + for file in files: + if skip_files: + break + assert 'uri' in file + assert 'size' in file + assert 'checksum' in file + + try: + f = FileInstance.create() + filename = file.get("uri").split('/')[-1:][0] + f.set_uri(file.get("uri"), file.get( + "size"), file.get("checksum")) + obj = ObjectVersion.create( + bucket, + filename, + _file_id=f.id + ) + + file.update({ + 'bucket': str(obj.bucket_id), + 'checksum': obj.file.checksum, + 'key': obj.key, + 'version_id': str(obj.version_id), + }) + + except Exception as e: + click.echo( + 'Recid {0} file {1} could not be loaded due ' + 'to {2}.'.format(data.get('recid'), filename, + str(e))) + continue + + if not skip_sips: + if file.get("type", None) == "index.json": + sip_files += handle_sipstore_record_file_index(f) + + return sip_files + + +def handle_sip_files(files, skip_files, skip_sips): + """Handles record files.""" + sip_files = [] + for file in files: + if skip_files: + break + assert 'uri' in file + assert 'size' in file + assert 'checksum' in file + f = FileInstance.get_by_uri(file.get("uri")) + + if f and not skip_sips: + if file.get("type", None) == "index.json": + sip_files += handle_sipstore_record_file_index(f) + + return sip_files + + +def create_record(schema, data, files, skip_files, skip_sips): + """Creates a new record.""" + id = uuid.uuid4() + pid = cernopendata_recid_minter(id, data) + + data['$schema'] = schema + record = Record.create(data, id_=id) + if not skip_files: + bucket = Bucket.create() + sip_files_content = handle_record_files( + data, bucket, files, skip_files, skip_sips) + + RecordsBuckets.create( + record=record.model, bucket=bucket) + + return pid, record, sip_files_content + + +def update_record(pid, schema, data, files, skip_files, skip_sips): + """Updates the given record.""" + record = Record.get_record(pid.object_uuid) + # with db.session.begin_nested(): + # if record.files and not skip_files: + # bucket_id = record.files.bucket + # bucket = Bucket.get(bucket_id.id) + # for o in ObjectVersion.get_by_bucket(bucket).all(): + # o.remove() + # o.file.delete() + # RecordsBuckets.query.filter_by( + # record=record.model, + # bucket=bucket + # ).delete() + # bucket_id.remove() + # db.session.commit() + + record.update(data) + sip_files_content = [] + if not skip_files: + sip_files_content = handle_sip_files( + files, + skip_files, + skip_sips + ) + # bucket = Bucket.create() + # sip_files_content = handle_record_files( + # data, bucket, files, skip_files, skip_sips) + # RecordsBuckets.create( + # record=record.model, bucket=bucket) + return record, sip_files_content def create_doc(data, schema): - """Creates a new doc record.""" - from invenio_records import Record - id = uuid.uuid4() - cernopendata_docid_minter(id, data) - data['$schema'] = schema - record = Record.create(data, id_=id) - return record + """Creates a new doc record.""" + from invenio_records import Record + id = uuid.uuid4() + cernopendata_docid_minter(id, data) + data['$schema'] = schema + record = Record.create(data, id_=id) + return record def update_doc(pid, data): - """Updates the given doc record.""" - from invenio_records import Record - record = Record.get_record(pid.object_uuid) - record.update(data) - return record + """Updates the given doc record.""" + from invenio_records import Record + record = Record.get_record(pid.object_uuid) + record.update(data) + return record @click.group(chain=True) def fixtures(): - """Automate site bootstrap process and testing.""" + """Automate site bootstrap process and testing.""" @fixtures.command() @click.option('--skip-files', is_flag=True, default=False, - help='Skip loading of files') + help='Skip loading of files') +@click.option('--skip-sips', is_flag=True, default=False, + help='Skip create/update of SIPs') @click.option('files', '--file', '-f', multiple=True, - type=click.Path(exists=True), - help='Path to the file(s) to be loaded. If not provided, all' - 'files will be loaded') + type=click.Path(exists=True), + help='Path to the file(s) to be loaded. If not provided, all' + 'files will be loaded') @click.option('--profile', is_flag=True, - help='Output profiling information.') + help='Output profiling information.') @click.option('--mode', required=True, type=click.Choice( - ['insert', 'replace', 'insert-or-replace'])) + ['insert', 'replace', 'insert-or-replace'])) @with_appcontext -def records(skip_files, files, profile, mode): - """Load all records.""" - if profile: - import cProfile - import pstats - import StringIO - pr = cProfile.Profile() - pr.enable() - - indexer = RecordIndexer() - schema = current_app.extensions['invenio-jsonschemas'].path_to_url( - 'records/record-v1.0.0.json' - ) - data = pkg_resources.resource_filename('cernopendata', - 'modules/fixtures/data/records') - action = None - - if files: - record_json = files - else: - record_json = glob.glob(os.path.join(data, '*.json')) - - for filename in record_json: - # name = filename.split('/')[-1] - # if name.startswith('opera'): - # click.echo('Skipping opera records ...') - # continue - click.echo('Loading records from {0} ...'.format(filename)) - with open(filename, 'rb') as source: - for data in json.load(source): - - if not data: - click.echo('IGNORING a possibly broken or corrupted ' - 'record entry in file {0} ...'.format(filename)) - continue - - files = data.get('files', []) - - if mode == 'insert-or-replace': - try: - pid = PersistentIdentifier.get('recid', data['recid']) - if pid: - record = update_record( - pid, schema, data, files, skip_files) - action = 'updated' - except PIDDoesNotExistError: - record = create_record(schema, data, files, skip_files) - action = 'inserted' - elif mode == 'insert': - try: - pid = PersistentIdentifier.get('recid', data['recid']) - if pid: - click.echo( - 'Record recid {} exists already;' - ' cannot insert it. '.format( - data.get('recid')), err=True) - return - except PIDDoesNotExistError: - record = create_record(schema, data, files, skip_files) - action = 'inserted' - else: - try: - pid = PersistentIdentifier.get('recid', data['recid']) - except PIDDoesNotExistError: - click.echo( - 'Record recid {} does not exist; ' - 'cannot replace it.'.format( - data.get('recid')), err=True) - return - record = update_record( - pid, schema, data, files, skip_files) - action = 'updated' - - if not skip_files: - record.files.flush() - record.commit() - db.session.commit() - click.echo( - 'Record recid {0} {1}.'.format( - data.get('recid'), action)) - indexer.index(record) - db.session.expunge_all() - - if profile: - pr.disable() - s = StringIO.StringIO() - sortby = 'cumulative' - ps = pstats.Stats(pr, stream=s).sort_stats(sortby) - ps.print_stats() - print(s.getvalue()) +def records(skip_files, skip_sips, files, profile, mode): + """Load all records.""" + + if profile: + import cProfile + import pstats + import StringIO + pr = cProfile.Profile() + pr.enable() + + indexer = RecordIndexer() + schema = current_app.extensions['invenio-jsonschemas'].path_to_url( + 'records/record-v1.0.0.json' + ) + data = pkg_resources.resource_filename('cernopendata', + 'modules/fixtures/data/records') + action = None + + if files: + record_json = files + else: + record_json = glob.glob(os.path.join(data, '*.json')) + + for filename in record_json: + # name = filename.split('/')[-1] + # if name.startswith('opera'): + # click.echo('Skipping opera records ...') + # continue + + click.echo('Loading records from {0} ...'.format(filename)) + with open(filename, 'rb') as source: + for data in json.load(source): + if not data: + click.echo('IGNORING a possibly broken or corrupted ' + 'record entry in file {0} ...'.format(filename)) + continue + + files = data.get('files', []) + + pid = None + if mode == 'insert-or-replace': + try: + pid = PersistentIdentifier.get('recid', data['recid']) + if pid: + record, sip_files_content = update_record( + pid, schema, data, files, + skip_files, skip_sips) + action = 'updated' + except PIDDoesNotExistError: + pid, record, sip_files_content = create_record( + schema, data, files, skip_files, skip_sips) + action = 'inserted' + elif mode == 'insert': + try: + pid = PersistentIdentifier.get('recid', data['recid']) + if pid: + click.echo( + 'Record recid {} exists already;' + ' cannot insert it. '.format( + data.get('recid')), err=True) + return + except PIDDoesNotExistError: + pid, record, sip_files_content = create_record( + schema, data, files, skip_files, skip_sips) + action = 'inserted' + else: + try: + pid = PersistentIdentifier.get('recid', data['recid']) + except PIDDoesNotExistError: + click.echo( + 'Record recid {} does not exist; ' + 'cannot replace it.'.format( + data.get('recid')), err=True) + return + record, sip_files_content = update_record( + pid, schema, data, files, skip_files, skip_sips) + action = 'updated' + + if not skip_files: + record.files.flush() + record.commit() + + if not skip_sips: + sip_record(pid, record, sip_files_content, action) + + db.session.commit() + click.echo( + 'Record recid {0} {1}.'.format( + data.get('recid'), action)) + indexer.index(record) + db.session.expunge_all() + + if profile: + pr.disable() + s = StringIO.StringIO() + sortby = 'cumulative' + ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + ps.print_stats() + print(s.getvalue()) @fixtures.command() @with_appcontext def glossary(): - """Load glossary term records.""" - from invenio_db import db - from invenio_records import Record - from invenio_indexer.api import RecordIndexer - from cernopendata.modules.records.minters.termid import \ - cernopendata_termid_minter - - indexer = RecordIndexer() - schema = current_app.extensions['invenio-jsonschemas'].path_to_url( - 'records/glossary-term-v1.0.0.json' - ) - data = pkg_resources.resource_filename('cernopendata', - 'modules/fixtures/data') - glossary_terms_json = glob.glob(os.path.join(data, 'terms', '*.json')) - - for filename in glossary_terms_json: - - click.echo('Loading glossary terms from {0} ...'.format(filename)) - - with open(filename, 'rb') as source: - for data in json.load(source): - if "collections" not in data and \ - not isinstance( - data.get("collections", None), basestring): - data["collections"] = [] - data["collections"].append({"primary": "Terms"}) - id = uuid.uuid4() - cernopendata_termid_minter(id, data) - data['$schema'] = schema - record = Record.create(data, id_=id) - db.session.commit() - indexer.index(record) - db.session.expunge_all() + """Load glossary term records.""" + from invenio_db import db + from invenio_records import Record + from invenio_indexer.api import RecordIndexer + from cernopendata.modules.records.minters.termid import \ + cernopendata_termid_minter + + indexer = RecordIndexer() + schema = current_app.extensions['invenio-jsonschemas'].path_to_url( + 'records/glossary-term-v1.0.0.json' + ) + data = pkg_resources.resource_filename('cernopendata', + 'modules/fixtures/data') + glossary_terms_json = glob.glob(os.path.join(data, 'terms', '*.json')) + + for filename in glossary_terms_json: + + click.echo('Loading glossary terms from {0} ...'.format(filename)) + + with open(filename, 'rb') as source: + for data in json.load(source): + if "collections" not in data and \ + not isinstance( + data.get("collections", None), basestring): + data["collections"] = [] + data["collections"].append({"primary": "Terms"}) + id = uuid.uuid4() + cernopendata_termid_minter(id, data) + data['$schema'] = schema + record = Record.create(data, id_=id) + db.session.commit() + indexer.index(record) + db.session.expunge_all() @fixtures.command() @click.option('files', '--file', '-f', multiple=True, - type=click.Path(exists=True), - help='Path to the file(s) to be loaded. If not provided, all' - 'files will be loaded') + type=click.Path(exists=True), + help='Path to the file(s) to be loaded. If not provided, all' + 'files will be loaded') @click.option('--mode', required=True, type=click.Choice( - ['insert', 'replace', 'insert-or-replace'])) + ['insert', 'replace', 'insert-or-replace'])) @with_appcontext def docs(files, mode): - """Load demo article records.""" - from slugify import slugify - - indexer = RecordIndexer() - schema = current_app.extensions['invenio-jsonschemas'].path_to_url( - 'records/docs-v1.0.0.json' - ) - data = pkg_resources.resource_filename('cernopendata', - 'modules/fixtures/data/docs') - - if files: - articles_json = files - else: - articles_json = get_jsons_from_dir(data) - - for filename in articles_json: - # name = filename.split('/')[-1] - # if name.startswith('opera'): - # click.echo('Skipping opera records ...') - # continue - - click.echo('Loading docs from {0} ...'.format(filename)) - with open(filename, 'rb') as source: - for data in json.load(source): - - # Replace body with responding content - assert data["body"]["content"] - content_filename = os.path.join( - *( - ["/", ] + - filename.split('/')[:-1] + - [data["body"]["content"], ] - ) - ) - - with open(content_filename) as body_field: - data["body"]["content"] = body_field.read() - if "collections" not in data and \ - not isinstance( - data.get("collections", None), basestring): - data["collections"] = [] - if mode == 'insert-or-replace': - try: - pid = PersistentIdentifier.get( - 'docid', str(slugify( - data.get('slug', data['title'])))) - if pid: - record = update_doc(pid, data) - action = 'updated' - except PIDDoesNotExistError: - record = create_doc(data, schema) - action = 'inserted' - elif mode == 'insert': - try: - pid = PersistentIdentifier.get( - 'docid', str(slugify( - data.get('slug', data['title'])))) - if pid: - click.echo( - 'Record docid {} exists already;' - ' cannot insert it. '.format( - str(slugify( - data.get('slug', data['title'])))), - err=True) - return - except PIDDoesNotExistError: - record = create_doc(data, schema) - action = 'inserted' - else: - try: - pid = PersistentIdentifier.get( - 'docid', str(slugify( - data.get('slug', data['title'])))) - except PIDDoesNotExistError: - click.echo( - 'Record docid {} does not exist; ' - 'cannot replace it.'.format( - str(slugify( - data.get('slug', data['title'])))), - err=True) - return - record = update_doc(pid, data) - action = 'updated' - record.commit() - db.session.commit() - click.echo( - ' Record docid {0} {1}.'.format( - str(slugify(data.get( - 'slug', data['title']))), action)) - indexer.index(record) - db.session.expunge_all() + """Load demo article records.""" + from slugify import slugify + + indexer = RecordIndexer() + schema = current_app.extensions['invenio-jsonschemas'].path_to_url( + 'records/docs-v1.0.0.json' + ) + data = pkg_resources.resource_filename('cernopendata', + 'modules/fixtures/data/docs') + + if files: + articles_json = files + else: + articles_json = get_jsons_from_dir(data) + + for filename in articles_json: + # name = filename.split('/')[-1] + # if name.startswith('opera'): + # click.echo('Skipping opera records ...') + # continue + + click.echo('Loading docs from {0} ...'.format(filename)) + with open(filename, 'rb') as source: + for data in json.load(source): + + # Replace body with responding content + assert data["body"]["content"] + content_filename = os.path.join( + *( + ["/", ] + + filename.split('/')[:-1] + + [data["body"]["content"], ] + ) + ) + + with open(content_filename) as body_field: + data["body"]["content"] = body_field.read() + if "collections" not in data and \ + not isinstance( + data.get("collections", None), basestring): + data["collections"] = [] + if mode == 'insert-or-replace': + try: + pid = PersistentIdentifier.get( + 'docid', str(slugify( + data.get('slug', data['title'])))) + if pid: + record = update_doc(pid, data) + action = 'updated' + except PIDDoesNotExistError: + record = create_doc(data, schema) + action = 'inserted' + elif mode == 'insert': + try: + pid = PersistentIdentifier.get( + 'docid', str(slugify( + data.get('slug', data['title'])))) + if pid: + click.echo( + 'Record docid {} exists already;' + ' cannot insert it. '.format( + str(slugify( + data.get('slug', data['title'])))), + err=True) + return + except PIDDoesNotExistError: + record = create_doc(data, schema) + action = 'inserted' + else: + try: + pid = PersistentIdentifier.get( + 'docid', str(slugify( + data.get('slug', data['title'])))) + except PIDDoesNotExistError: + click.echo( + 'Record docid {} does not exist; ' + 'cannot replace it.'.format( + str(slugify( + data.get('slug', data['title'])))), + err=True) + return + record = update_doc(pid, data) + action = 'updated' + record.commit() + db.session.commit() + click.echo( + ' Record docid {0} {1}.'.format( + str(slugify(data.get( + 'slug', data['title']))), action)) + indexer.index(record) + db.session.expunge_all() @fixtures.command() @with_appcontext def pids(): - """Fetch and register PIDs.""" - from invenio_db import db - from invenio_oaiserver.fetchers import onaiid_fetcher - from invenio_oaiserver.minters import oaiid_minter - from invenio_pidstore.errors import PIDDoesNotExistError - from invenio_pidstore.models import PIDStatus, PersistentIdentifier - from invenio_pidstore.fetchers import recid_fetcher - from invenio_records.models import RecordMetadata - - recids = [r.id for r in RecordMetadata.query.all()] - db.session.expunge_all() - - with click.progressbar(recids) as bar: - for record_id in bar: - record = RecordMetadata.query.get(record_id) - try: - pid = recid_fetcher(record.id, record.json) - found = PersistentIdentifier.get( - pid_type=pid.pid_type, - pid_value=pid.pid_value, - pid_provider=pid.provider.pid_provider - ) - click.echo('Found {0}.'.format(found)) - except PIDDoesNotExistError: - db.session.add( - PersistentIdentifier.create( - pid.pid_type, pid.pid_value, - object_type='rec', object_uuid=record.id, - status=PIDStatus.REGISTERED - ) - ) - except KeyError: - click.echo('Skiped: {0}'.format(record.id)) - continue - - pid_value = record.json.get('_oai', {}).get('id') - if pid_value is None: - assert 'control_number' in record.json - pid_value = current_app.config.get( - 'OAISERVER_ID_PREFIX' - ) + str(record.json['control_number']) - - record.json.setdefault('_oai', {}) - record.json['_oai']['id'] = pid.pid_value - - pid = oaiid_fetcher(record.id, record.json) - try: - found = PersistentIdentifier.get( - pid_type=pid.pid_type, - pid_value=pid.pid_value, - pid_provider=pid.provider.pid_provider - ) - click.echo('Found {0}.'.format(found)) - except PIDDoesNotExistError: - pid = oaiid_minter(record.id, record.json) - db.session.add(pid) - - flag_modified(record, 'json') - assert record.json['_oai']['id'] - db.session.add(record) - db.session.commit() - db.session.expunge_all() + """Fetch and register PIDs.""" + from invenio_db import db + from invenio_oaiserver.fetchers import onaiid_fetcher + from invenio_oaiserver.minters import oaiid_minter + from invenio_pidstore.errors import PIDDoesNotExistError + from invenio_pidstore.models import PIDStatus, PersistentIdentifier + from invenio_pidstore.fetchers import recid_fetcher + from invenio_records.models import RecordMetadata + + recids = [r.id for r in RecordMetadata.query.all()] + db.session.expunge_all() + + with click.progressbar(recids) as bar: + for record_id in bar: + record = RecordMetadata.query.get(record_id) + try: + pid = recid_fetcher(record.id, record.json) + found = PersistentIdentifier.get( + pid_type=pid.pid_type, + pid_value=pid.pid_value, + pid_provider=pid.provider.pid_provider + ) + click.echo('Found {0}.'.format(found)) + except PIDDoesNotExistError: + db.session.add( + PersistentIdentifier.create( + pid.pid_type, pid.pid_value, + object_type='rec', object_uuid=record.id, + status=PIDStatus.REGISTERED + ) + ) + except KeyError: + click.echo('Skiped: {0}'.format(record.id)) + continue + + pid_value = record.json.get('_oai', {}).get('id') + if pid_value is None: + assert 'control_number' in record.json + pid_value = current_app.config.get( + 'OAISERVER_ID_PREFIX' + ) + str(record.json['control_number']) + + record.json.setdefault('_oai', {}) + record.json['_oai']['id'] = pid.pid_value + + pid = oaiid_fetcher(record.id, record.json) + try: + found = PersistentIdentifier.get( + pid_type=pid.pid_type, + pid_value=pid.pid_value, + pid_provider=pid.provider.pid_provider + ) + click.echo('Found {0}.'.format(found)) + except PIDDoesNotExistError: + pid = oaiid_minter(record.id, record.json) + db.session.add(pid) + + flag_modified(record, 'json') + assert record.json['_oai']['id'] + db.session.add(record) + db.session.commit() + db.session.expunge_all() + + +@fixtures.command() +@with_appcontext +def sipmetadata(): + """Load sipmetadata types.""" + data = [ + { + "title": "CERN Open Data Record JSON", + "name": "record-json", + "format": "json", + "schema": current_app.extensions['invenio-jsonschemas'] + .path_to_url('records/record-v1.0.0.json') + }, + { + "title": "CERN Open Data Docs JSON", + "name": "docs-json", + "format": "json", + "schema": current_app.extensions['invenio-jsonschemas'] + .path_to_url('records/docs-v1.0.0.json') + }, + { + "title": "CERN Open Data Glossary JSON", + "name": "glossary-json", + "format": "json", + "schema": current_app.extensions['invenio-jsonschemas'] + .path_to_url('records/glossary-term-v1.0.0.json') + }, + { + "title": "BagIt Archiver metadata", + "name": "bagit", + "format": "json", + "schema": current_app.extensions['invenio-jsonschemas'] + .path_to_url('sipstore/bagit-v1.0.0.json') + } + ] + + click.secho('Loading SIP metadata types...', fg='blue') + with click.progressbar(data) as types: + with db.session.begin_nested(): + for type in types: + db.session.add(SIPMetadataType(**type)) + db.session.commit() + click.secho('SIP metadata types loaded!', fg='green') diff --git a/cernopendata/modules/fixtures/data/example_records/test_sip_record_1.json b/cernopendata/modules/fixtures/data/example_records/test_sip_record_1.json new file mode 100644 index 0000000000..55be59c401 --- /dev/null +++ b/cernopendata/modules/fixtures/data/example_records/test_sip_record_1.json @@ -0,0 +1,83 @@ +[ + { + "abstract": { + "description": "Sample event set from /BTag/Run2011A-12Oct2013-v1/AOD primary dataset in json format readable from the browser-based 3d event display" + }, + "accelerator": "CERN-LHC", + "authors": [ + { + "name": "McCauley, Thomas" + } + ], + "collections": [ + "CMS-Derived-Datasets" + ], + "collision_information": { + "energy": "7TeV", + "type": "pp" + }, + "date_created": [ + "2011" + ], + "date_published": "2016", + "distribution": { + "formats": [ + "ig" + ], + "number_events": 25 + }, + "doi": "10.7483/OPENDATA.CMS.BK3T.NKC6", + "experiment": "CMS", + "files": [ + { + "checksum": "sha1:a100d93d3b27def7954fa5827e9717ab4ec407cc", + "size": 2411180, + "uri": "root://eospublic.cern.ch//eos/opendata/cms/Run2011A/BTag/IG/12Oct2013-v1/BTag.ig" + } + ], + "methodology": { + "description": "These files contain the objects to be displayed with the online event display. No event selection, apart accepting only the validated runs, is applied. The software to produce these files is available in:", + "links": [ + { + "recid": "550" + } + ] + }, + "note": { + "description": "No selection or quality criteria have been applied on the individual physics objects, apart from accepting only the validated runs" + }, + "publisher": "CERN Open Data Portal", + "recid": "614", + "relations": [ + { + "doi": "10.7483/OPENDATA.CMS.N372.QF6S", + "recid": "15", + "title": "/BTag/Run2011A-12Oct2013-v1/AOD", + "type": "isChildOf" + } + ], + "run_period": [ + "Run2011A" + ], + "system_details": { + "global_tag": "FT_53_LV5_AN1::All", + "release": "CMSSW_5_3_30" + }, + "title": "Event display file derived from /BTag/Run2011A-12Oct2013-v1/AOD", + "type": { + "primary": "Dataset", + "secondary": [ + "Derived" + ] + }, + "usage": { + "description": "The data can be accessed from the file menu of the online event display", + "links": [ + { + "description": "Explore and visualise events", + "url": "/visualise/events/CMS" + } + ] + } + } +] \ No newline at end of file diff --git a/cernopendata/modules/fixtures/data/example_records/test_sip_record_1_with_meta_changes.json b/cernopendata/modules/fixtures/data/example_records/test_sip_record_1_with_meta_changes.json new file mode 100644 index 0000000000..6a9aa3cda3 --- /dev/null +++ b/cernopendata/modules/fixtures/data/example_records/test_sip_record_1_with_meta_changes.json @@ -0,0 +1,83 @@ +[ + { + "abstract": { + "description": "Sample event set from /BTag/Run2011A-12Oct2013-v1/AOD primary dataset in json format readable from the browser-based 3d event display" + }, + "accelerator": "CERN-LHC", + "authors": [ + { + "name": "McCauley, Thomas" + } + ], + "collections": [ + "CMS-Derived-Datasets" + ], + "collision_information": { + "energy": "7TeV", + "type": "pp" + }, + "date_created": [ + "2011" + ], + "date_published": "2016", + "distribution": { + "formats": [ + "ig" + ], + "number_events": 25 + }, + "doi": "10.7483/OPENDATA.CMS.BK3T.NKC6", + "experiment": "CMS", + "files": [ + { + "checksum": "sha1:a100d93d3b27def7954fa5827e9717ab4ec407cc", + "size": 2411180, + "uri": "root://eospublic.cern.ch//eos/opendata/cms/Run2011A/BTag/IG/12Oct2013-v1/BTag.ig" + } + ], + "methodology": { + "description": "These files contain the objects to be displayed with the online event display. No event selection, apart accepting only the validated runs, is applied. The software to produce these files is available in:", + "links": [ + { + "recid": "550" + } + ] + }, + "note": { + "description": "No selection or quality criteria have been applied on the individual physics objects, apart from accepting only the validated runs" + }, + "publisher": "CERN Open Data Portal", + "recid": "614", + "relations": [ + { + "doi": "10.7483/OPENDATA.CMS.N372.QF6S", + "recid": "15", + "title": "/BTag/Run2011A-12Oct2013-v1/AOD", + "type": "isChildOf" + } + ], + "run_period": [ + "Run2011A" + ], + "system_details": { + "global_tag": "FT_53_LV5_AN1::All", + "release": "CMSSW_5_3_30" + }, + "title": "Test Event display file derived from /BTag/Run2011A-12Oct2013-v1/AOD", + "type": { + "primary": "Dataset", + "secondary": [ + "Derived" + ] + }, + "usage": { + "description": "The data can be accessed from the file menu of the online event display", + "links": [ + { + "description": "Explore and visualise events", + "url": "/visualise/events/CMS" + } + ] + } + } +] \ No newline at end of file diff --git a/cernopendata/modules/fixtures/data/example_records/test_sip_record_no_files.json b/cernopendata/modules/fixtures/data/example_records/test_sip_record_no_files.json new file mode 100644 index 0000000000..e54b531f79 --- /dev/null +++ b/cernopendata/modules/fixtures/data/example_records/test_sip_record_no_files.json @@ -0,0 +1,76 @@ +[ + { + "abstract": { + "description": "Sample event set from /BTag/Run2011A-12Oct2013-v1/AOD primary dataset in json format readable from the browser-based 3d event display" + }, + "accelerator": "CERN-LHC", + "authors": [ + { + "name": "McCauley, Thomas" + } + ], + "collections": [ + "CMS-Derived-Datasets" + ], + "collision_information": { + "energy": "7TeV", + "type": "pp" + }, + "date_created": [ + "2011" + ], + "date_published": "2016", + "distribution": { + "formats": [ + "ig" + ], + "number_events": 25 + }, + "doi": "10.7483/OPENDATA.CMS.BK3T.NKC6", + "experiment": "CMS", + "methodology": { + "description": "These files contain the objects to be displayed with the online event display. No event selection, apart accepting only the validated runs, is applied. The software to produce these files is available in:", + "links": [ + { + "recid": "550" + } + ] + }, + "note": { + "description": "No selection or quality criteria have been applied on the individual physics objects, apart from accepting only the validated runs" + }, + "publisher": "CERN Open Data Portal", + "recid": "614", + "relations": [ + { + "doi": "10.7483/OPENDATA.CMS.N372.QF6S", + "recid": "15", + "title": "/BTag/Run2011A-12Oct2013-v1/AOD", + "type": "isChildOf" + } + ], + "run_period": [ + "Run2011A" + ], + "system_details": { + "global_tag": "FT_53_LV5_AN1::All", + "release": "CMSSW_5_3_30" + }, + "title": "Event display file derived from /BTag/Run2011A-12Oct2013-v1/AOD", + "type": { + "primary": "Dataset", + "secondary": [ + "Derived" + ] + }, + "usage": { + "description": "The data can be accessed from the file menu of the online event display", + "links": [ + { + "description": "Explore and visualise events", + "url": "/visualise/events/CMS" + } + ] + } + } +] \ No newline at end of file diff --git a/cernopendata/modules/fixtures/data/example_records/test_sip_record_with_index_files.json b/cernopendata/modules/fixtures/data/example_records/test_sip_record_with_index_files.json new file mode 100644 index 0000000000..55f7aae433 --- /dev/null +++ b/cernopendata/modules/fixtures/data/example_records/test_sip_record_with_index_files.json @@ -0,0 +1,72 @@ +[ + { + "abstract": { + "description": "Proton-Proton ESD data sample at the collision energy of 7 TeV from RunB of 2010. Run period from run number 117222." + }, + "accelerator": "CERN-LHC", + "collaboration": { + "name": "ALICE Collaboration" + }, + "collections": [ + "ALICE-Reconstructed-Data" + ], + "collision_information": { + "energy": "7TeV", + "type": "pp" + }, + "date_created": [ + "2010" + ], + "date_published": "2016", + "distribution": { + "formats": [ + "root" + ], + "number_events": 8968, + "number_files": 2, + "size": 484934275 + }, + "doi": "10.7483/OPENDATA.ALICE.LP3F.G4KR", + "experiment": "ALICE", + "files": [ + { + "checksum": "adler32:f93d713e", + "size": 414, + "type": "index.json", + "uri": "root://eospublic.cern.ch//eos/opendata/alice/2010/LHC10b/000117222/ESD/file-indexes/ALICE_LHC10b_pp_ESD_117222_file_index.json" + }, + { + "checksum": "adler32:ed8736e4", + "size": 178, + "type": "index.txt", + "uri": "root://eospublic.cern.ch//eos/opendata/alice/2010/LHC10b/000117222/ESD/file-indexes/ALICE_LHC10b_pp_ESD_117222_file_index.txt" + } + ], + "license": { + "attribution": "CC0" + }, + "publisher": "CERN Open Data Portal", + "recid": "1100", + "title": "LHC2010b_pp_ESD_117222 (new)", + "title_additional": "Proton-Proton data sample at the collision energy of 7 TeV from run number 117222", + "type": { + "primary": "Dataset", + "secondary": [ + "Collision" + ] + }, + "usage": { + "description": "You can access and analyse these data through the ALICE Virtual Machine. Please follow the instructions for getting started and setting up the Virtual Machine:", + "links": [ + { + "description": "Getting started with ALICE data", + "url": "/getting-started/ALICE" + }, + { + "description": "How to install the ALICE Virtual Machine", + "url": "/VM/ALICE" + } + ] + } + } +] \ No newline at end of file diff --git a/cernopendata/modules/fixtures/sip_utils.py b/cernopendata/modules/fixtures/sip_utils.py new file mode 100644 index 0000000000..229963e2ff --- /dev/null +++ b/cernopendata/modules/fixtures/sip_utils.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# +# This file is part of CERN Open Data Portal. +# Copyright (C) 2017, 2018 CERN. +# +# CERN Open Data Portal is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# CERN Open Data Portal is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +"""SIP utils for CERN Open Data Portal.""" + +from __future__ import absolute_import, print_function + +import glob +import json + + +from invenio_db import db +from invenio_sipstore.models import SIPMetadataType, \ + SIP as SIPModel, RecordSIP as RecordSIPModel +from invenio_sipstore.api import RecordSIP, SIP as SIPApi + +from cernopendata.modules.sipstore.archivers.bagit_archiver import \ + CODBagItArchiver + + +def handle_sipstore_record_file_index(f): + # [TOFIX] Fetch real file from EOS need to rebase + # local fork of invenio-files-rest + # + json_str = f.storage(create_dir=False).open().read() + json_index = json.loads(json_str) + + content = [] + + for _file in json_index: + content += handle_sipstore_record_file(_file) + + return content + +def handle_sipstore_record_file(file, filename=None): + return [handle_sipstore_record_file_json(file, filename),] + # return handle_sipstore_record_file_txt(file, filename) + + +def handle_sipstore_record_file_txt(file, filename=None): + _filename = filename or file.get("filename", None) + content = "{} - {} - {} - {} - {}\n".format( + file.get("uri", None), + file.get("size", None), + "data/{}".format(_filename), + file.get("checksum", None), + _filename + ) + + return content + +def handle_sipstore_record_file_json(file, filename=None): + _filename = filename or file.get("filename", None) + return dict( + uri=file.get("uri", None), + size=file.get("size", None), + checksum=file.get("checksum", None), + filepath="data/{}".format(_filename), + filename=_filename, + # _fetch=True + ) + + +def sip_record(recid, record, files_content, inserted_or_updated): + sip_patch_of = None + + if inserted_or_updated == "updated": + sip_recid = recid + + sip_patch_of = ( + db.session.query(SIPModel) + .join(RecordSIPModel, RecordSIPModel.sip_id == SIPModel.id) + .filter(RecordSIPModel.pid_id == sip_recid.id) + .order_by(SIPModel.created.desc()) + .first() + ) + + recordsip = RecordSIP.create( + recid, + record, archivable=True, + create_sip_files=True, + # sip_metadata_type='record-json', + user_id=None, + agent={} + # agent=sip_agent + ) + + archiver = CODBagItArchiver( + recordsip.sip, include_all_previous=False, + patch_of=sip_patch_of, files_content=files_content + ) + + archiver.save_bagit_metadata() + + sip = ( + RecordSIPModel.query + .filter_by(pid_id=recid.id) + .order_by(RecordSIPModel.created.desc()) + .first().sip + ) + + # archive_sip.delay(str(sip.id)) + archive_sip(str(sip.id)) + + +# @shared_task(ignore_result=True, max_retries=6, +# default_retry_delay=4 * 60 * 60) +def archive_sip(sip_uuid): + """Send the SIP for archiving. + Retries every 4 hours, six times, which should work for up to 24 hours + archiving system downtime. + :param sip_uuid: UUID of the SIP for archiving. + :type sip_uuid: str + """ + try: + sip = SIPApi(SIPModel.query.get(sip_uuid)) + archiver = CODBagItArchiver(sip) + bagmeta = archiver.get_bagit_metadata(sip) + if bagmeta is None: + raise ArchivingError( + 'Bagit metadata does not exist for SIP: {0}.'.format(sip.id)) + if sip.archived: + raise ArchivingError( + 'SIP was already archived {0}.'.format(sip.id)) + archiver.write_all_files() + sip.archived = True + db.session.commit() + except Exception as exc: + # On ArchivingError (see above), do not retry, but re-raise + raise + if not isinstance(exc, ArchivingError): + archive_sip.retry(exc=exc) + raise diff --git a/cernopendata/modules/sipstore/__init__.py b/cernopendata/modules/sipstore/__init__.py new file mode 100644 index 0000000000..56c503b5c9 --- /dev/null +++ b/cernopendata/modules/sipstore/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- + +"""CERN Open Data SIPStore.""" + +from __future__ import absolute_import, print_function diff --git a/cernopendata/modules/sipstore/archivers/__init__.py b/cernopendata/modules/sipstore/archivers/__init__.py new file mode 100644 index 0000000000..56c503b5c9 --- /dev/null +++ b/cernopendata/modules/sipstore/archivers/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- + +"""CERN Open Data SIPStore.""" + +from __future__ import absolute_import, print_function diff --git a/cernopendata/modules/sipstore/archivers/bagit_archiver.py b/cernopendata/modules/sipstore/archivers/bagit_archiver.py new file mode 100644 index 0000000000..ae872a1bc8 --- /dev/null +++ b/cernopendata/modules/sipstore/archivers/bagit_archiver.py @@ -0,0 +1,115 @@ +import os +import zlib +import json +import re + +from hashlib import md5 + +from invenio_sipstore.archivers import BagItArchiver +from werkzeug.utils import secure_filename +from invenio_files_rest.models import FileInstance + +from flask import current_app +from invenio_db import db +from jsonschema import validate +from six import string_types +from werkzeug.utils import import_string + +from invenio_sipstore.api import SIP +from invenio_sipstore.archivers import BaseArchiver +from invenio_sipstore.models import SIPMetadata, SIPMetadataType, \ + current_jsonschemas + + +class CODBagItArchiver(BagItArchiver): + def __init__(self, sip, data_dir='data/files', + metadata_dir='data/metadata', extra_dir='', patch_of=None, + include_all_previous=False, tags=None, + filenames_mapping_file='data/filenames.txt', + files_content=None): + + self.files_content = files_content + + super(CODBagItArchiver, self).__init__( + sip, data_dir=data_dir, metadata_dir=metadata_dir, + extra_dir=extra_dir, patch_of=patch_of, + include_all_previous=include_all_previous, tags=tags, + filenames_mapping_file=filenames_mapping_file) + + def _calculate_checksum(self, content): + """Calculate checksum for given content""" + ad32 = hex(zlib.adler32(content, 1) & 0xffffffff)[2:] + + return "{}:{}".format(self.checksum_algorithm, str(ad32)) + + def _get_checksum(self, checksum, checksum_algorithm=None): + """Return the checksum if the type is the expected.""" + _checksum_algorithm = checksum_algorithm if checksum_algorithm \ + else self.checksum_algorithm + + checksum = checksum.split(':') + if checksum[0] != _checksum_algorithm or len(checksum) != 2: + raise AttributeError('Checksum format is not correct.') + else: + return checksum[1] + + def _get_data_files(self): + """Get the file information for all the data files. + + The structure is defined by the JSON Schema + ``sipstore/file-v1.0.0.json``. + + :return: list of dict containing file information. + """ + files = [] + for f in self.sip.files: + files.append(self._generate_sipfile_info(f)) + + for f in self.files_content: + filename = secure_filename(f.get("filename")) + + filepath = re.sub( + 'root://eospublic.cern.ch//eos/opendata/', '', + f.get("uri")) + filepath = os.path.join(self.data_dir, filepath) + + files.append( + dict( + checksum=f.get("checksum"), + size=f.get("size"), + filepath=filepath, + fullpath=f.get("uri"), + sipfilepath=filename, + filename=f.get("filename"), + fetched=True, + ) + ) + + return files + + def _write_sipfile(self, fileinfo=None, sipfile=None): + """Write a SIP file to disk. + + ***Requires** either `fileinfo` or `sipfile` to be passed. + + Parameter `fileinfo` with the file information + ('file_uuid' key required) or `sipfile` - the + :py:data:`~invenio_sipstore.models.SIPFile` instance, in which case the + relevant file information will be generated on the spot. + + :param fileinfo: File information on the SIPFile that is to be written. + :type fileinfo: dict + :param sipfile: SIP file to be written. + :type sipfile: ``invenio_sipstore.models.SIPFile`` + """ + assert fileinfo or sipfile + if not fileinfo: + fileinfo = self._generate_sipfile_info(sipfile) + if sipfile: + fi = sipfile.file + else: + fi = FileInstance.query.get(fileinfo['file_uuid']) + sf = self.storage_factory(fileurl=fileinfo['fullpath'], + size=fileinfo['size'], + modified=fi.updated) + return sf.copy(fi.storage(create_dir=False)) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 569ff77811..4215954ebf 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -64,6 +64,7 @@ services: volumes: - ./cernopendata:/code/cernopendata - ./scripts:/code/scripts + - ./archive:/code/var/archive volumes_from: - static links: diff --git a/requirements-dev.txt b/requirements-dev.txt index c5fe49407d..652f28f862 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,2 +1 @@ wdb -ipdb diff --git a/requirements-production-local-forks.txt b/requirements-production-local-forks.txt index 898febf177..78ff3e9639 100644 --- a/requirements-production-local-forks.txt +++ b/requirements-production-local-forks.txt @@ -1,3 +1,4 @@ git+https://github.com/lepture/mistune-contrib.git#egg=mistune-contrib -git+https://github.com/pamfilos/invenio-files-rest.git@eos-storage#egg=invenio-files-rest +git+https://github.com/pamfilos/invenio-files-rest.git@eos-storage-latest#egg=invenio-files-rest git+https://github.com/pamfilos/invenio-xrootd.git@eos-storage#egg=invenio-xrootd +git+https://github.com/pamfilos/invenio-sipstore.git@ad32#egg=invenio-sipstore diff --git a/requirements-production.txt b/requirements-production.txt index 0f213695cf..9104be21bc 100644 --- a/requirements-production.txt +++ b/requirements-production.txt @@ -33,7 +33,7 @@ Flask-Alembic==2.0.1 Flask-Assets==0.12 Flask-BabelEx==0.9.3 Flask-Breadcrumbs==0.4.0 -Flask-CeleryExt==0.3.0 +Flask-CeleryExt==0.3.1 Flask-Collect==1.2.2 Flask-Cors==3.0.3 Flask-Login==0.4.0 @@ -53,18 +53,18 @@ invenio-assets==1.0.0b7 invenio-base==1.0.0a16 invenio-celery==1.0.0b3 invenio-config==1.0.0b3 -invenio-db==1.0.0b8 -invenio-i18n==1.0.0b4 +invenio-db==1.0.0 +invenio-i18n==1.0.0 invenio-indexer==1.0.0a10 invenio-jsonschemas==1.0.0a5 -invenio-pidstore==1.0.0b2 +invenio-pidstore==1.0.0 invenio-previewer==1.0.0a11 invenio-query-parser==0.6.0 invenio-records==1.0.0b2 invenio-records-files==1.0.0a10 invenio-records-rest==1.0.0b1 invenio-records-ui==1.0.0b1 -invenio-rest==1.0.0b2 +invenio-rest==1.0.0 invenio-search==1.0.0a11 invenio-search-ui==1.0.0a9 invenio-theme==1.0.0b2 @@ -106,6 +106,7 @@ pluggy==0.5.2 poyo==0.4.1 prompt-toolkit==1.0.15 psycopg2==2.7.3.2 +psycopg2-binary==2.7.4 ptyprocess==0.5.2 py==1.4.34 pydocstyle==2.1.1 @@ -152,3 +153,4 @@ Werkzeug==0.14.1 whichcraft==0.4.1 WTForms==2.1 xrootdpyfs==0.1.5 + diff --git a/scripts/populate-instance.sh b/scripts/populate-instance.sh index 80fb137163..c60be5b2c6 100755 --- a/scripts/populate-instance.sh +++ b/scripts/populate-instance.sh @@ -31,6 +31,9 @@ cernopendata index init sleep 20 cernopendata files location local var/data --default +cernopendata files location archive var/archive + +cernopendata fixtures sipmetadata cernopendata fixtures glossary diff --git a/setup.py b/setup.py index 91f1b87c9a..9d65da7a82 100644 --- a/setup.py +++ b/setup.py @@ -83,7 +83,7 @@ 'invenio-db[versioning,postgresql]>=1.0.0b3', 'invenio-i18n>=1.0.0b1', 'invenio-indexer>=1.0.0a1', - 'invenio-jsonschemas==1.0.0a5', + # 'invenio-jsonschemas==1.0.0a5', 'invenio-logging==1.0.0b3', 'invenio-pidstore>=1.0.0b1', 'invenio-previewer>=1.0.0a11', @@ -93,6 +93,7 @@ 'invenio-records==1.0.0b2', 'invenio-search-ui>=1.0.0a2', 'invenio-search>=1.0.0a9', + 'invenio-sipstore>=1.0.0a7', 'invenio-theme==1.0.0b2', 'invenio-xrootd>=1.0.0a4', 'mistune>=0.7.4',