Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions dataflow/pipeline/Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,42 @@ def _build_operator_nodes_graph(self):
# all keys in the first storage will be the initial keys for validation
self.accumulated_keys.append(copy.deepcopy(iter_storage_keys))

error_msg = []
# build graph of all operators and keys from all states
for op_node in self.op_nodes_list:
# check if accumulated_keys have the input keys of this operator
# print(op_node, op_node.input_keys, op_node.output_keys)
for input_key in op_node.input_keys:
if input_key not in self.accumulated_keys[-1]:
error_msg = (
f"Input key '{input_key}' in `{op_node.op_name}` does not match any output keys from previous operators "
f"or any dataset keys. Please check parameter "
f"'{op_node.input_key_nodes[input_key].key_para_name}' in the `run()` of the operator "
f"'{op_node.op_name}' (class '{op_node.op_obj.__class__.__name__}')."
error_msg.append(
{
"input_key": input_key,
"op_name": op_node.op_name,
"class_name": op_node.op_obj.__class__.__name__,
"key_para_name": op_node.input_key_nodes[input_key].key_para_name
}
)
self.logger.warning(error_msg)
raise KeyError(error_msg)

# add output keys to accumulated keys
for output_key in op_node.output_keys:
if output_key not in iter_storage_keys:
iter_storage_keys.append(output_key)
self.accumulated_keys.append(copy.deepcopy(iter_storage_keys))
if len(error_msg) != 0:
# final_error_str = "KeyError in following Operators during pipeline.compile():"
details = "\n".join(
f"- Input key '{e['input_key']}' in `{e['op_name']}` "
f"(class <{e['class_name']}>) does not match any output keys "
f"from previous operators or dataset keys. "
f"Check parameter '{e['key_para_name']}' in the `{e['op_name']}.run()`."
for e in error_msg
)
msg = f"Key Matching Error in following Operators during pipeline.compile():\n{details}"
self.logger.warning(msg)
raise KeyError(msg)

self.final_keys = copy.deepcopy(iter_storage_keys)


for i, keys in enumerate(self.accumulated_keys):
# print(i, keys)
Expand Down
Loading