From b2492e2869127d820499de8e6f8d421fbf17166b Mon Sep 17 00:00:00 2001 From: Casper Welzel Andersen <43357585+CasperWA@users.noreply.github.com> Date: Thu, 3 Oct 2019 13:12:44 +0200 Subject: [PATCH] Fix the validation of links in archive import (#3354) The import logic imports the links after all the nodes have been imported. The link validation, however, was incorrect. It included link types only implicitly and `CALL` links were not even considered. Instead of the convoluted nested try-except blocks, the `validate_link` utility from `aiida.orm.utils.links` is used instead. This might not be the most efficient way, since it goes through the front-end ORM but at least we know it is correct. Unfortunately, for Django, the validation cannot be done through the front-end infrastructure because the query builder will go through the SqlAlchemy, but the nodes that have been added during the import will not be present in it, as they are managed by the Django transaction. The only solution then is to perform the link validation manually. --- .../tools/importexport/orm/test_links.py | 85 +++++++++ aiida/orm/utils/links.py | 12 +- .../dbimport/backends/django/__init__.py | 161 ++++++++++++------ .../dbimport/backends/sqla/__init__.py | 101 ++++------- 4 files changed, 235 insertions(+), 124 deletions(-) diff --git a/aiida/backends/tests/tools/importexport/orm/test_links.py b/aiida/backends/tests/tools/importexport/orm/test_links.py index 51cc859ce3..4e169bb3d2 100644 --- a/aiida/backends/tests/tools/importexport/orm/test_links.py +++ b/aiida/backends/tests/tools/importexport/orm/test_links.py @@ -675,3 +675,88 @@ def test_dangling_link_to_existing_db_node(self, temp_dir): msg='There should be a single StructureData, instead {} has been found'.format(builder.count()) ) self.assertEqual(builder.all()[0][0], struct_uuid) + + @with_temp_dir + def test_multiple_post_return_links(self, temp_dir): # pylint: disable=too-many-locals + """Check extra RETURN links can be added to existing Nodes, when label is not unique""" + data = orm.Int(1).store() + calc = orm.CalculationNode().store() + work = orm.WorkflowNode().store() + link_label = 'output_data' + + data.add_incoming(calc, LinkType.CREATE, link_label) + data.add_incoming(work, LinkType.RETURN, link_label) + + calc.seal() + work.seal() + + data_uuid = data.uuid + calc_uuid = calc.uuid + work_uuid = work.uuid + before_links = get_all_node_links() + + data_provenance = os.path.join(temp_dir, 'data.aiida') + all_provenance = os.path.join(temp_dir, 'all.aiida') + + export([data], outfile=data_provenance, silent=True, return_backward=False) + export([data], outfile=all_provenance, silent=True, return_backward=True) + + self.reset_database() + + # import data provenance + import_data(data_provenance, silent=True) + + no_of_work = orm.QueryBuilder().append(orm.WorkflowNode).count() + self.assertEqual( + no_of_work, 0, msg='{} WorkflowNode(s) was/were found, however, none should be present'.format(no_of_work) + ) + + nodes = orm.QueryBuilder().append(orm.Node, project='uuid') + self.assertEqual( + nodes.count(), + 2, + msg='{} Node(s) was/were found, however, exactly two should be present'.format(no_of_work) + ) + for node in nodes.iterall(): + self.assertIn(node[0], [data_uuid, calc_uuid]) + + links = get_all_node_links() + self.assertEqual( + len(links), + 1, + msg='Only a single Link (from Calc. to Data) is expected, ' + 'instead {} were found (in, out, label, type): {}'.format(len(links), links) + ) + for from_uuid, to_uuid, found_label, found_type in links: + self.assertEqual(from_uuid, calc_uuid) + self.assertEqual(to_uuid, data_uuid) + self.assertEqual(found_label, link_label) + self.assertEqual(found_type, LinkType.CREATE.value) + + # import data+logic provenance + import_data(all_provenance, silent=True) + + no_of_work = orm.QueryBuilder().append(orm.WorkflowNode).count() + self.assertEqual( + no_of_work, + 1, + msg='{} WorkflowNode(s) was/were found, however, exactly one should be present'.format(no_of_work) + ) + + nodes = orm.QueryBuilder().append(orm.Node, project='uuid') + self.assertEqual( + nodes.count(), + 3, + msg='{} Node(s) was/were found, however, exactly three should be present'.format(no_of_work) + ) + for node in nodes.iterall(): + self.assertIn(node[0], [data_uuid, calc_uuid, work_uuid]) + + links = get_all_node_links() + self.assertEqual( + len(links), + 2, + msg='Exactly two Links are expected, instead {} were found ' + '(in, out, label, type): {}'.format(len(links), links) + ) + self.assertListEqual(sorted(links), sorted(before_links)) diff --git a/aiida/orm/utils/links.py b/aiida/orm/utils/links.py index 52c3909626..1bc8143077 100644 --- a/aiida/orm/utils/links.py +++ b/aiida/orm/utils/links.py @@ -157,17 +157,17 @@ def validate_link(source, target, link_type, link_label): # or stored, in which case, the new proposed link is a duplicate and thus illegal duplicate_link_triple = link_triple_exists(source, target, link_type, link_label) - # If the outdegree is `unique` there cannot already be any other incoming links of that type + # If the outdegree is `unique` there cannot already be any other outgoing link of that type if outdegree == 'unique' and source.get_outgoing(link_type=link_type, only_uuid=True).all(): raise ValueError('node<{}> already has an outgoing {} link'.format(source.uuid, link_type)) - # If the outdegree is `unique_pair` than the link labels for outgoing links of this type should be unique + # If the outdegree is `unique_pair`, then the link labels for outgoing links of this type should be unique elif outdegree == 'unique_pair' and source.get_outgoing( link_type=link_type, only_uuid=True, link_label_filter=link_label).all(): raise ValueError('node<{}> already has an outgoing {} link with label "{}"'.format( - target.uuid, link_type, link_label)) + source.uuid, link_type, link_label)) - # If the outdegree is `unique_triple` than the link triples of link type, link label and target should be unique + # If the outdegree is `unique_triple`, then the link triples of link type, link label and target should be unique elif outdegree == 'unique_triple' and duplicate_link_triple: raise ValueError('node<{}> already has an outgoing {} link with label "{}" from node<{}>'.format( source.uuid, link_type, link_label, target.uuid)) @@ -176,13 +176,13 @@ def validate_link(source, target, link_type, link_label): if indegree == 'unique' and target.get_incoming(link_type=link_type, only_uuid=True).all(): raise ValueError('node<{}> already has an incoming {} link'.format(target.uuid, link_type)) - # If the indegree is `unique_pair` than the link labels for incoming links of this type should be unique + # If the indegree is `unique_pair`, then the link labels for incoming links of this type should be unique elif indegree == 'unique_pair' and target.get_incoming( link_type=link_type, link_label_filter=link_label, only_uuid=True).all(): raise ValueError('node<{}> already has an incoming {} link with label "{}"'.format( target.uuid, link_type, link_label)) - # If the indegree is `unique_triple` than the link triples of link type, link label and source should be unique + # If the indegree is `unique_triple`, then the link triples of link type, link label and source should be unique elif indegree == 'unique_triple' and duplicate_link_triple: raise ValueError('node<{}> already has an incoming {} link with label "{}" from node<{}>'.format( target.uuid, link_type, link_label, source.uuid)) diff --git a/aiida/tools/importexport/dbimport/backends/django/__init__.py b/aiida/tools/importexport/dbimport/backends/django/__init__.py index 26c3ec67b6..c35388bfa9 100644 --- a/aiida/tools/importexport/dbimport/backends/django/__init__.py +++ b/aiida/tools/importexport/dbimport/backends/django/__init__.py @@ -22,8 +22,8 @@ import six from aiida.common import timezone, json -from aiida.common.links import LinkType from aiida.common.folders import SandboxFolder, RepositoryFolder +from aiida.common.links import LinkType, validate_link_label from aiida.common.utils import grouper, get_object_from_string from aiida.orm.utils.repository import Repository from aiida.orm import QueryBuilder, Node, Group @@ -490,7 +490,7 @@ def import_data_dj( just_saved = {str(key): value for key, value in just_saved_queryset} # Now I have the PKs, print the info - # Moreover, set the foreign_ids_reverse_mappings + # Moreover, add newly created Nodes to foreign_ids_reverse_mappings for unique_id, new_pk in just_saved.items(): import_entry_pk = import_new_entry_pks[unique_id] foreign_ids_reverse_mappings[model_name][unique_id] = new_pk @@ -503,73 +503,130 @@ def import_data_dj( if not silent: print('STORING NODE LINKS...') - ## TODO: check that we are not creating input links of an already - ## existing node... import_links = data['links_uuid'] links_to_store = [] - # Needed for fast checks of existing links + # Needed, since QueryBuilder does not yet work for recently saved Nodes existing_links_raw = models.DbLink.objects.all().values_list('input', 'output', 'label', 'type') - existing_links_labels = {(l[0], l[1]): l[2] for l in existing_links_raw} - existing_input_links = {(l[1], l[2]): l[0] for l in existing_links_raw} + existing_links = {(l[0], l[1], l[2], l[3]) for l in existing_links_raw} + existing_outgoing_unique = {(l[0], l[3]) for l in existing_links_raw} + existing_outgoing_unique_pair = {(l[0], l[2], l[3]) for l in existing_links_raw} + existing_incoming_unique = {(l[1], l[3]) for l in existing_links_raw} + existing_incoming_unique_pair = {(l[1], l[2], l[3]) for l in existing_links_raw} + + calculation_node_types = 'process.calculation.' + workflow_node_types = 'process.workflow.' + data_node_types = 'data.' + + link_mapping = { + LinkType.CALL_CALC: (workflow_node_types, calculation_node_types, 'unique_triple', 'unique'), + LinkType.CALL_WORK: (workflow_node_types, workflow_node_types, 'unique_triple', 'unique'), + LinkType.CREATE: (calculation_node_types, data_node_types, 'unique_pair', 'unique'), + LinkType.INPUT_CALC: (data_node_types, calculation_node_types, 'unique_triple', 'unique_pair'), + LinkType.INPUT_WORK: (data_node_types, workflow_node_types, 'unique_triple', 'unique_pair'), + LinkType.RETURN: (workflow_node_types, data_node_types, 'unique_pair', 'unique_triple'), + } - # ~ print(foreign_ids_reverse_mappings) - dbnode_reverse_mappings = foreign_ids_reverse_mappings[NODE_ENTITY_NAME] for link in import_links: + # Check for dangling Links within the, supposed, self-consistent archive try: - in_id = dbnode_reverse_mappings[link['input']] - out_id = dbnode_reverse_mappings[link['output']] + in_id = foreign_ids_reverse_mappings[NODE_ENTITY_NAME][link['input']] + out_id = foreign_ids_reverse_mappings[NODE_ENTITY_NAME][link['output']] except KeyError: if ignore_unknown_nodes: continue else: raise exceptions.ImportValidationError( 'Trying to create a link with one or both unknown nodes, stopping (in_uuid={}, ' - 'out_uuid={}, label={})'.format(link['input'], link['output'], link['label']) + 'out_uuid={}, label={}, type={})'.format( + link['input'], link['output'], link['label'], link['type'] + ) ) + # Check if link already exists, skip if it does + # This is equivalent to an existing triple link (i.e. unique_triple from below) + if (in_id, out_id, link['label'], link['type']) in existing_links: + continue + + # Since backend specific Links (DbLink) are not validated upon creation, we will now validate them. try: - existing_label = existing_links_labels[in_id, out_id] - if existing_label != link['label']: - raise exceptions.ImportValidationError( - 'Trying to rename an existing link name, stopping (in={}, out={}, old_label={}, ' - 'new_label={})'.format(in_id, out_id, existing_label, link['label']) + validate_link_label(link['label']) + except ValueError as why: + raise exceptions.ImportValidationError('Error during Link label validation: {}'.format(why)) + + source = models.DbNode.objects.get(id=in_id) + target = models.DbNode.objects.get(id=out_id) + + if source.uuid == target.uuid: + raise exceptions.ImportValidationError('Cannot add a link to oneself') + + link_type = LinkType(link['type']) + type_source, type_target, outdegree, indegree = link_mapping[link_type] + + # Check if source Node is a valid type + if not source.node_type.startswith(type_source): + raise exceptions.ImportValidationError( + 'Cannot add a {} link from {} to {}'.format(link_type, source.node_type, target.node_type) + ) + + # Check if target Node is a valid type + if not target.node_type.startswith(type_target): + raise exceptions.ImportValidationError( + 'Cannot add a {} link from {} to {}'.format(link_type, source.node_type, target.node_type) + ) + + # If the outdegree is `unique` there cannot already be any other outgoing link of that type, + # i.e., the source Node may not have a LinkType of current LinkType, going out, existing already. + if outdegree == 'unique' and (in_id, link['type']) in existing_outgoing_unique: + raise exceptions.ImportValidationError( + 'Node<{}> already has an outgoing {} link'.format(source.uuid, link_type) + ) + + # If the outdegree is `unique_pair`, + # then the link labels for outgoing links of this type should be unique, + # i.e., the source Node may not have a LinkType of current LinkType, going out, + # that also has the current Link label, existing already. + elif outdegree == 'unique_pair' and \ + (in_id, link['label'], link['type']) in existing_outgoing_unique_pair: + raise exceptions.ImportValidationError( + 'Node<{}> already has an incoming {} link with label "{}"'.format( + target.uuid, link_type, link['label'] ) - # Do nothing, the link is already in place and has - # the correct name - except KeyError: - try: - # We try to get the existing input of the link that - # points to "out" and has label link['label']. - # If there is no existing_input, it means that the - # link doesn't exist and it has to be created. If - # it exists, then the only case that we can have more - # than one links with the same name entering a node - # is the case of the RETURN links of workflows/ - # workchains. If it is not this case, then it is - # an error. - existing_input = existing_input_links[out_id, link['label']] - - if link['type'] != LinkType.RETURN: - raise exceptions.ImportValidationError( - 'There exists already an input link to node with UUID {} with label {} but it does not' - ' come from the expected input with UUID {} but from a node with UUID {}.'.format( - link['output'], link['label'], link['input'], existing_input - ) - ) - except KeyError: - # New link - links_to_store.append( - models.DbLink( - input_id=in_id, - output_id=out_id, - label=link['label'], - type=LinkType(link['type']).value - ) + ) + + # If the indegree is `unique` there cannot already be any other incoming links of that type, + # i.e., the target Node may not have a LinkType of current LinkType, coming in, existing already. + if indegree == 'unique' and (out_id, link['type']) in existing_incoming_unique: + raise exceptions.ImportValidationError( + 'Node<{}> already has an incoming {} link'.format(target.uuid, link_type) + ) + + # If the indegree is `unique_pair`, + # then the link labels for incoming links of this type should be unique, + # i.e., the target Node may not have a LinkType of current LinkType, coming in + # that also has the current Link label, existing already. + elif indegree == 'unique_pair' and \ + (out_id, link['label'], link['type']) in existing_incoming_unique_pair: + raise exceptions.ImportValidationError( + 'Node<{}> already has an incoming {} link with label "{}"'.format( + target.uuid, link_type, link['label'] ) - if 'Link' not in ret_dict: - ret_dict['Link'] = {'new': []} - ret_dict['Link']['new'].append((in_id, out_id)) + ) + + # New link + links_to_store.append( + models.DbLink(input_id=in_id, output_id=out_id, label=link['label'], type=link['type']) + ) + if 'Link' not in ret_dict: + ret_dict['Link'] = {'new': []} + ret_dict['Link']['new'].append((in_id, out_id)) + + # Add new Link to sets of existing Links 'input PK', 'output PK', 'label', 'type' + existing_links.add((in_id, out_id, link['label'], link['type'])) + existing_outgoing_unique.add((in_id, link['type'])) + existing_outgoing_unique_pair.add((in_id, link['label'], link['type'])) + existing_incoming_unique.add((out_id, link['type'])) + existing_incoming_unique_pair.add((out_id, link['label'], link['type'])) # Store new links if links_to_store: @@ -587,7 +644,7 @@ def import_data_dj( for groupuuid, groupnodes in import_groups.items(): # TODO: cache these to avoid too many queries group_ = models.DbGroup.objects.get(uuid=groupuuid) - nodes_to_store = [dbnode_reverse_mappings[node_uuid] for node_uuid in groupnodes] + nodes_to_store = [foreign_ids_reverse_mappings[NODE_ENTITY_NAME][node_uuid] for node_uuid in groupnodes] if nodes_to_store: group_.dbnodes.add(*nodes_to_store) diff --git a/aiida/tools/importexport/dbimport/backends/sqla/__init__.py b/aiida/tools/importexport/dbimport/backends/sqla/__init__.py index 4246782318..a42fbffc6a 100644 --- a/aiida/tools/importexport/dbimport/backends/sqla/__init__.py +++ b/aiida/tools/importexport/dbimport/backends/sqla/__init__.py @@ -25,8 +25,9 @@ from aiida.common.folders import SandboxFolder, RepositoryFolder from aiida.common.links import LinkType from aiida.common.utils import get_object_from_string +from aiida.orm import QueryBuilder, Node, Group, WorkflowNode, CalculationNode, Data +from aiida.orm.utils.links import link_triple_exists, validate_link from aiida.orm.utils.repository import Repository -from aiida.orm import QueryBuilder, Node, Group from aiida.tools.importexport.common import exceptions from aiida.tools.importexport.common.archive import extract_tree, extract_tar, extract_zip @@ -107,7 +108,7 @@ def import_data_sqla( :raises `~aiida.tools.importexport.common.exceptions.ImportUniquenessError`: if a new unique entity can not be created. """ - from aiida.backends.sqlalchemy.models.node import DbNode + from aiida.backends.sqlalchemy.models.node import DbNode, DbLink from aiida.backends.sqlalchemy.utils import flag_modified # This is the export version expected by this function @@ -568,7 +569,7 @@ def import_data_sqla( just_saved = dict() # Now I have the PKs, print the info - # Moreover, set the foreign_ids_reverse_mappings + # Moreover, add newly created Nodes to foreign_ids_reverse_mappings for unique_id, new_pk in just_saved.items(): from uuid import UUID if isinstance(unique_id, UUID): @@ -584,82 +585,48 @@ def import_data_sqla( if not silent: print('STORING NODE LINKS...') - ## TODO: check that we are not creating input links of an already - ## existing node... - import_links = data['links_uuid'] - links_to_store = [] - # Needed for fast checks of existing links - from aiida.backends.sqlalchemy.models.node import DbLink - existing_links_raw = session.query(DbLink.input_id, DbLink.output_id, DbLink.label).all() - existing_links_labels = {(l[0], l[1]): l[2] for l in existing_links_raw} - existing_input_links = {(l[1], l[2]): l[0] for l in existing_links_raw} + import_links = data['links_uuid'] - dbnode_reverse_mappings = foreign_ids_reverse_mappings[NODE_ENTITY_NAME] for link in import_links: + # Check for dangling Links within the, supposed, self-consistent archive try: - in_id = dbnode_reverse_mappings[link['input']] - out_id = dbnode_reverse_mappings[link['output']] + in_id = foreign_ids_reverse_mappings[NODE_ENTITY_NAME][link['input']] + out_id = foreign_ids_reverse_mappings[NODE_ENTITY_NAME][link['output']] except KeyError: if ignore_unknown_nodes: continue else: raise exceptions.ImportValidationError( 'Trying to create a link with one or both unknown nodes, stopping (in_uuid={}, ' - 'out_uuid={}, label={})'.format(link['input'], link['output'], link['label']) + 'out_uuid={}, label={}, type={})'.format( + link['input'], link['output'], link['label'], link['type'] + ) ) + # Since backend specific Links (DbLink) are not validated upon creation, we will now validate them. + source = QueryBuilder().append(Node, filters={'id': in_id}, project='*').first()[0] + target = QueryBuilder().append(Node, filters={'id': out_id}, project='*').first()[0] + link_type = LinkType(link['type']) + + # Check for existence of a triple link, i.e. unique triple. + # If it exists, then the link already exists, continue to next link, otherwise, validate link. + if link_triple_exists(source, target, link_type, link['label']): + continue + try: - existing_label = existing_links_labels[in_id, out_id] - if existing_label != link['label']: - raise exceptions.ImportValidationError( - 'Trying to rename an existing link name, stopping (in={}, out={}, old_label={}, ' - 'new_label={})'.format(in_id, out_id, existing_label, link['label']) - ) - # Do nothing, the link is already in place and has - # the correct name - except KeyError: - try: - # We try to get the existing input of the link that - # points to "out" and has label link['label']. - # If there is no existing_input, it means that the - # link doesn't exist and it has to be created. If - # it exists, then the only case that we can have more - # than one links with the same name entering a node - # is the case of the RETURN links of workflows/ - # workchains. If it is not this case, then it is - # an error. - existing_input = existing_input_links[out_id, link['label']] - - if link['type'] != LinkType.RETURN: - raise exceptions.ImportValidationError( - 'There exists already an input link to node with UUID {} with label {} but it does not' - ' come from the expected input with UUID {} but from a node with UUID {}.'.format( - link['output'], link['label'], link['input'], existing_input - ) - ) - except KeyError: - # New link - links_to_store.append( - DbLink( - input_id=in_id, - output_id=out_id, - label=link['label'], - type=LinkType(link['type']).value - ) - ) - if 'Link' not in ret_dict: - ret_dict['Link'] = {'new': []} - ret_dict['Link']['new'].append((in_id, out_id)) + validate_link(source, target, link_type, link['label']) + except ValueError as why: + raise exceptions.ImportValidationError('Error occurred during Link validation: {}'.format(why)) - # Store new links - if links_to_store: - if not silent: - print(' ({} new links...)'.format(len(links_to_store))) - session.add_all(links_to_store) - else: - if not silent: - print(' (0 new links...)') + # New link + session.add(DbLink(input_id=in_id, output_id=out_id, label=link['label'], type=link['type'])) + if 'Link' not in ret_dict: + ret_dict['Link'] = {'new': []} + ret_dict['Link']['new'].append((in_id, out_id)) + + if not silent: + print(' ({} new links...)'.format(len(ret_dict.get('Link', {}).get('new', [])))) if not silent: print('STORING GROUP ELEMENTS...') @@ -668,7 +635,9 @@ def import_data_sqla( # # TODO: cache these to avoid too many queries qb_group = QueryBuilder().append(Group, filters={'uuid': {'==': groupuuid}}) group_ = qb_group.first()[0] - nodes_ids_to_add = [dbnode_reverse_mappings[node_uuid] for node_uuid in groupnodes] + nodes_ids_to_add = [ + foreign_ids_reverse_mappings[NODE_ENTITY_NAME][node_uuid] for node_uuid in groupnodes + ] qb_nodes = QueryBuilder().append(Node, filters={'id': {'in': nodes_ids_to_add}}) # Adding nodes to group avoiding the SQLA ORM to increase speed nodes_to_add = [n[0].backend_entity for n in qb_nodes.all()]