Skip to content

Commit

Permalink
Fix the validation of links in archive import (#3354)
Browse files Browse the repository at this point in the history
The import logic imports the links after all the nodes have been
imported. The link validation, however, was incorrect. It included link
types only implicitly and `CALL` links were not even considered. Instead
of the convoluted nested try-except blocks, the `validate_link` utility
from `aiida.orm.utils.links` is used instead. This might not be the most
efficient way, since it goes through the front-end ORM but at least we
know it is correct.

Unfortunately, for Django, the validation cannot be done through the
front-end infrastructure because the query builder will go through the
SqlAlchemy, but the nodes that have been added during the import will
not be present in it, as they are managed by the Django transaction.
The only solution then is to perform the link validation manually.
  • Loading branch information
CasperWA authored and sphuber committed Oct 3, 2019
1 parent cd91628 commit b2492e2
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 124 deletions.
85 changes: 85 additions & 0 deletions aiida/backends/tests/tools/importexport/orm/test_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,88 @@ def test_dangling_link_to_existing_db_node(self, temp_dir):
msg='There should be a single StructureData, instead {} has been found'.format(builder.count())
)
self.assertEqual(builder.all()[0][0], struct_uuid)

@with_temp_dir
def test_multiple_post_return_links(self, temp_dir): # pylint: disable=too-many-locals
"""Check extra RETURN links can be added to existing Nodes, when label is not unique"""
data = orm.Int(1).store()
calc = orm.CalculationNode().store()
work = orm.WorkflowNode().store()
link_label = 'output_data'

data.add_incoming(calc, LinkType.CREATE, link_label)
data.add_incoming(work, LinkType.RETURN, link_label)

calc.seal()
work.seal()

data_uuid = data.uuid
calc_uuid = calc.uuid
work_uuid = work.uuid
before_links = get_all_node_links()

data_provenance = os.path.join(temp_dir, 'data.aiida')
all_provenance = os.path.join(temp_dir, 'all.aiida')

export([data], outfile=data_provenance, silent=True, return_backward=False)
export([data], outfile=all_provenance, silent=True, return_backward=True)

self.reset_database()

# import data provenance
import_data(data_provenance, silent=True)

no_of_work = orm.QueryBuilder().append(orm.WorkflowNode).count()
self.assertEqual(
no_of_work, 0, msg='{} WorkflowNode(s) was/were found, however, none should be present'.format(no_of_work)
)

nodes = orm.QueryBuilder().append(orm.Node, project='uuid')
self.assertEqual(
nodes.count(),
2,
msg='{} Node(s) was/were found, however, exactly two should be present'.format(no_of_work)
)
for node in nodes.iterall():
self.assertIn(node[0], [data_uuid, calc_uuid])

links = get_all_node_links()
self.assertEqual(
len(links),
1,
msg='Only a single Link (from Calc. to Data) is expected, '
'instead {} were found (in, out, label, type): {}'.format(len(links), links)
)
for from_uuid, to_uuid, found_label, found_type in links:
self.assertEqual(from_uuid, calc_uuid)
self.assertEqual(to_uuid, data_uuid)
self.assertEqual(found_label, link_label)
self.assertEqual(found_type, LinkType.CREATE.value)

# import data+logic provenance
import_data(all_provenance, silent=True)

no_of_work = orm.QueryBuilder().append(orm.WorkflowNode).count()
self.assertEqual(
no_of_work,
1,
msg='{} WorkflowNode(s) was/were found, however, exactly one should be present'.format(no_of_work)
)

nodes = orm.QueryBuilder().append(orm.Node, project='uuid')
self.assertEqual(
nodes.count(),
3,
msg='{} Node(s) was/were found, however, exactly three should be present'.format(no_of_work)
)
for node in nodes.iterall():
self.assertIn(node[0], [data_uuid, calc_uuid, work_uuid])

links = get_all_node_links()
self.assertEqual(
len(links),
2,
msg='Exactly two Links are expected, instead {} were found '
'(in, out, label, type): {}'.format(len(links), links)
)
self.assertListEqual(sorted(links), sorted(before_links))
12 changes: 6 additions & 6 deletions aiida/orm/utils/links.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,17 @@ def validate_link(source, target, link_type, link_label):
# or stored, in which case, the new proposed link is a duplicate and thus illegal
duplicate_link_triple = link_triple_exists(source, target, link_type, link_label)

# If the outdegree is `unique` there cannot already be any other incoming links of that type
# If the outdegree is `unique` there cannot already be any other outgoing link of that type
if outdegree == 'unique' and source.get_outgoing(link_type=link_type, only_uuid=True).all():
raise ValueError('node<{}> already has an outgoing {} link'.format(source.uuid, link_type))

# If the outdegree is `unique_pair` than the link labels for outgoing links of this type should be unique
# If the outdegree is `unique_pair`, then the link labels for outgoing links of this type should be unique
elif outdegree == 'unique_pair' and source.get_outgoing(
link_type=link_type, only_uuid=True, link_label_filter=link_label).all():
raise ValueError('node<{}> already has an outgoing {} link with label "{}"'.format(
target.uuid, link_type, link_label))
source.uuid, link_type, link_label))

# If the outdegree is `unique_triple` than the link triples of link type, link label and target should be unique
# If the outdegree is `unique_triple`, then the link triples of link type, link label and target should be unique
elif outdegree == 'unique_triple' and duplicate_link_triple:
raise ValueError('node<{}> already has an outgoing {} link with label "{}" from node<{}>'.format(
source.uuid, link_type, link_label, target.uuid))
Expand All @@ -176,13 +176,13 @@ def validate_link(source, target, link_type, link_label):
if indegree == 'unique' and target.get_incoming(link_type=link_type, only_uuid=True).all():
raise ValueError('node<{}> already has an incoming {} link'.format(target.uuid, link_type))

# If the indegree is `unique_pair` than the link labels for incoming links of this type should be unique
# If the indegree is `unique_pair`, then the link labels for incoming links of this type should be unique
elif indegree == 'unique_pair' and target.get_incoming(
link_type=link_type, link_label_filter=link_label, only_uuid=True).all():
raise ValueError('node<{}> already has an incoming {} link with label "{}"'.format(
target.uuid, link_type, link_label))

# If the indegree is `unique_triple` than the link triples of link type, link label and source should be unique
# If the indegree is `unique_triple`, then the link triples of link type, link label and source should be unique
elif indegree == 'unique_triple' and duplicate_link_triple:
raise ValueError('node<{}> already has an incoming {} link with label "{}" from node<{}>'.format(
target.uuid, link_type, link_label, source.uuid))
Expand Down
161 changes: 109 additions & 52 deletions aiida/tools/importexport/dbimport/backends/django/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import six

from aiida.common import timezone, json
from aiida.common.links import LinkType
from aiida.common.folders import SandboxFolder, RepositoryFolder
from aiida.common.links import LinkType, validate_link_label
from aiida.common.utils import grouper, get_object_from_string
from aiida.orm.utils.repository import Repository
from aiida.orm import QueryBuilder, Node, Group
Expand Down Expand Up @@ -490,7 +490,7 @@ def import_data_dj(
just_saved = {str(key): value for key, value in just_saved_queryset}

# Now I have the PKs, print the info
# Moreover, set the foreign_ids_reverse_mappings
# Moreover, add newly created Nodes to foreign_ids_reverse_mappings
for unique_id, new_pk in just_saved.items():
import_entry_pk = import_new_entry_pks[unique_id]
foreign_ids_reverse_mappings[model_name][unique_id] = new_pk
Expand All @@ -503,73 +503,130 @@ def import_data_dj(

if not silent:
print('STORING NODE LINKS...')
## TODO: check that we are not creating input links of an already
## existing node...
import_links = data['links_uuid']
links_to_store = []

# Needed for fast checks of existing links
# Needed, since QueryBuilder does not yet work for recently saved Nodes
existing_links_raw = models.DbLink.objects.all().values_list('input', 'output', 'label', 'type')
existing_links_labels = {(l[0], l[1]): l[2] for l in existing_links_raw}
existing_input_links = {(l[1], l[2]): l[0] for l in existing_links_raw}
existing_links = {(l[0], l[1], l[2], l[3]) for l in existing_links_raw}
existing_outgoing_unique = {(l[0], l[3]) for l in existing_links_raw}
existing_outgoing_unique_pair = {(l[0], l[2], l[3]) for l in existing_links_raw}
existing_incoming_unique = {(l[1], l[3]) for l in existing_links_raw}
existing_incoming_unique_pair = {(l[1], l[2], l[3]) for l in existing_links_raw}

calculation_node_types = 'process.calculation.'
workflow_node_types = 'process.workflow.'
data_node_types = 'data.'

link_mapping = {
LinkType.CALL_CALC: (workflow_node_types, calculation_node_types, 'unique_triple', 'unique'),
LinkType.CALL_WORK: (workflow_node_types, workflow_node_types, 'unique_triple', 'unique'),
LinkType.CREATE: (calculation_node_types, data_node_types, 'unique_pair', 'unique'),
LinkType.INPUT_CALC: (data_node_types, calculation_node_types, 'unique_triple', 'unique_pair'),
LinkType.INPUT_WORK: (data_node_types, workflow_node_types, 'unique_triple', 'unique_pair'),
LinkType.RETURN: (workflow_node_types, data_node_types, 'unique_pair', 'unique_triple'),
}

# ~ print(foreign_ids_reverse_mappings)
dbnode_reverse_mappings = foreign_ids_reverse_mappings[NODE_ENTITY_NAME]
for link in import_links:
# Check for dangling Links within the, supposed, self-consistent archive
try:
in_id = dbnode_reverse_mappings[link['input']]
out_id = dbnode_reverse_mappings[link['output']]
in_id = foreign_ids_reverse_mappings[NODE_ENTITY_NAME][link['input']]
out_id = foreign_ids_reverse_mappings[NODE_ENTITY_NAME][link['output']]
except KeyError:
if ignore_unknown_nodes:
continue
else:
raise exceptions.ImportValidationError(
'Trying to create a link with one or both unknown nodes, stopping (in_uuid={}, '
'out_uuid={}, label={})'.format(link['input'], link['output'], link['label'])
'out_uuid={}, label={}, type={})'.format(
link['input'], link['output'], link['label'], link['type']
)
)

# Check if link already exists, skip if it does
# This is equivalent to an existing triple link (i.e. unique_triple from below)
if (in_id, out_id, link['label'], link['type']) in existing_links:
continue

# Since backend specific Links (DbLink) are not validated upon creation, we will now validate them.
try:
existing_label = existing_links_labels[in_id, out_id]
if existing_label != link['label']:
raise exceptions.ImportValidationError(
'Trying to rename an existing link name, stopping (in={}, out={}, old_label={}, '
'new_label={})'.format(in_id, out_id, existing_label, link['label'])
validate_link_label(link['label'])
except ValueError as why:
raise exceptions.ImportValidationError('Error during Link label validation: {}'.format(why))

source = models.DbNode.objects.get(id=in_id)
target = models.DbNode.objects.get(id=out_id)

if source.uuid == target.uuid:
raise exceptions.ImportValidationError('Cannot add a link to oneself')

link_type = LinkType(link['type'])
type_source, type_target, outdegree, indegree = link_mapping[link_type]

# Check if source Node is a valid type
if not source.node_type.startswith(type_source):
raise exceptions.ImportValidationError(
'Cannot add a {} link from {} to {}'.format(link_type, source.node_type, target.node_type)
)

# Check if target Node is a valid type
if not target.node_type.startswith(type_target):
raise exceptions.ImportValidationError(
'Cannot add a {} link from {} to {}'.format(link_type, source.node_type, target.node_type)
)

# If the outdegree is `unique` there cannot already be any other outgoing link of that type,
# i.e., the source Node may not have a LinkType of current LinkType, going out, existing already.
if outdegree == 'unique' and (in_id, link['type']) in existing_outgoing_unique:
raise exceptions.ImportValidationError(
'Node<{}> already has an outgoing {} link'.format(source.uuid, link_type)
)

# If the outdegree is `unique_pair`,
# then the link labels for outgoing links of this type should be unique,
# i.e., the source Node may not have a LinkType of current LinkType, going out,
# that also has the current Link label, existing already.
elif outdegree == 'unique_pair' and \
(in_id, link['label'], link['type']) in existing_outgoing_unique_pair:
raise exceptions.ImportValidationError(
'Node<{}> already has an incoming {} link with label "{}"'.format(
target.uuid, link_type, link['label']
)
# Do nothing, the link is already in place and has
# the correct name
except KeyError:
try:
# We try to get the existing input of the link that
# points to "out" and has label link['label'].
# If there is no existing_input, it means that the
# link doesn't exist and it has to be created. If
# it exists, then the only case that we can have more
# than one links with the same name entering a node
# is the case of the RETURN links of workflows/
# workchains. If it is not this case, then it is
# an error.
existing_input = existing_input_links[out_id, link['label']]

if link['type'] != LinkType.RETURN:
raise exceptions.ImportValidationError(
'There exists already an input link to node with UUID {} with label {} but it does not'
' come from the expected input with UUID {} but from a node with UUID {}.'.format(
link['output'], link['label'], link['input'], existing_input
)
)
except KeyError:
# New link
links_to_store.append(
models.DbLink(
input_id=in_id,
output_id=out_id,
label=link['label'],
type=LinkType(link['type']).value
)
)

# If the indegree is `unique` there cannot already be any other incoming links of that type,
# i.e., the target Node may not have a LinkType of current LinkType, coming in, existing already.
if indegree == 'unique' and (out_id, link['type']) in existing_incoming_unique:
raise exceptions.ImportValidationError(
'Node<{}> already has an incoming {} link'.format(target.uuid, link_type)
)

# If the indegree is `unique_pair`,
# then the link labels for incoming links of this type should be unique,
# i.e., the target Node may not have a LinkType of current LinkType, coming in
# that also has the current Link label, existing already.
elif indegree == 'unique_pair' and \
(out_id, link['label'], link['type']) in existing_incoming_unique_pair:
raise exceptions.ImportValidationError(
'Node<{}> already has an incoming {} link with label "{}"'.format(
target.uuid, link_type, link['label']
)
if 'Link' not in ret_dict:
ret_dict['Link'] = {'new': []}
ret_dict['Link']['new'].append((in_id, out_id))
)

# New link
links_to_store.append(
models.DbLink(input_id=in_id, output_id=out_id, label=link['label'], type=link['type'])
)
if 'Link' not in ret_dict:
ret_dict['Link'] = {'new': []}
ret_dict['Link']['new'].append((in_id, out_id))

# Add new Link to sets of existing Links 'input PK', 'output PK', 'label', 'type'
existing_links.add((in_id, out_id, link['label'], link['type']))
existing_outgoing_unique.add((in_id, link['type']))
existing_outgoing_unique_pair.add((in_id, link['label'], link['type']))
existing_incoming_unique.add((out_id, link['type']))
existing_incoming_unique_pair.add((out_id, link['label'], link['type']))

# Store new links
if links_to_store:
Expand All @@ -587,7 +644,7 @@ def import_data_dj(
for groupuuid, groupnodes in import_groups.items():
# TODO: cache these to avoid too many queries
group_ = models.DbGroup.objects.get(uuid=groupuuid)
nodes_to_store = [dbnode_reverse_mappings[node_uuid] for node_uuid in groupnodes]
nodes_to_store = [foreign_ids_reverse_mappings[NODE_ENTITY_NAME][node_uuid] for node_uuid in groupnodes]
if nodes_to_store:
group_.dbnodes.add(*nodes_to_store)

Expand Down
Loading

0 comments on commit b2492e2

Please sign in to comment.