diff --git a/dataflow/pipeline/Pipeline.py b/dataflow/pipeline/Pipeline.py index 7c24f029..cf73cc68 100644 --- a/dataflow/pipeline/Pipeline.py +++ b/dataflow/pipeline/Pipeline.py @@ -99,26 +99,42 @@ def _build_operator_nodes_graph(self): # all keys in the first storage will be the initial keys for validation self.accumulated_keys.append(copy.deepcopy(iter_storage_keys)) + error_msg = [] # build graph of all operators and keys from all states for op_node in self.op_nodes_list: # check if accumulated_keys have the input keys of this operator # print(op_node, op_node.input_keys, op_node.output_keys) for input_key in op_node.input_keys: if input_key not in self.accumulated_keys[-1]: - error_msg = ( - f"Input key '{input_key}' in `{op_node.op_name}` does not match any output keys from previous operators " - f"or any dataset keys. Please check parameter " - f"'{op_node.input_key_nodes[input_key].key_para_name}' in the `run()` of the operator " - f"'{op_node.op_name}' (class '{op_node.op_obj.__class__.__name__}')." + error_msg.append( + { + "input_key": input_key, + "op_name": op_node.op_name, + "class_name": op_node.op_obj.__class__.__name__, + "key_para_name": op_node.input_key_nodes[input_key].key_para_name + } ) - self.logger.warning(error_msg) - raise KeyError(error_msg) + # add output keys to accumulated keys for output_key in op_node.output_keys: if output_key not in iter_storage_keys: iter_storage_keys.append(output_key) self.accumulated_keys.append(copy.deepcopy(iter_storage_keys)) + if len(error_msg) != 0: + # final_error_str = "KeyError in following Operators during pipeline.compile():" + details = "\n".join( + f"- Input key '{e['input_key']}' in `{e['op_name']}` " + f"(class <{e['class_name']}>) does not match any output keys " + f"from previous operators or dataset keys. " + f"Check parameter '{e['key_para_name']}' in the `{e['op_name']}.run()`." + for e in error_msg + ) + msg = f"Key Matching Error in following Operators during pipeline.compile():\n{details}" + self.logger.warning(msg) + raise KeyError(msg) + self.final_keys = copy.deepcopy(iter_storage_keys) + for i, keys in enumerate(self.accumulated_keys): # print(i, keys)