Skip to content

Commit

Permalink
Fix: Fix Pipeline.run() running components with only defaults in the …
Browse files Browse the repository at this point in the history
…wrong order (#7426)

* Fix Pipeline.run() running components with only defaults in the wrong order

* Add release notes
  • Loading branch information
silvanocerza committed Apr 8, 2024
1 parent d038080 commit 6533483
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
20 changes: 20 additions & 0 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,26 @@ def run(self, word: str):
if not there_are_only_lazy_variadics:
continue

# Components that have defaults for all their inputs must be treated the same identical way as we treat
# lazy variadic components. If there are only components with defaults we can run them.
# If we don't do this the order of execution of the Pipeline's Components will be affected cause we
# enqueue the Components in `to_run` at the start using the order they are added in the Pipeline.
# If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline
# logic A must be executed after B it could run instead before if we don't do this check.
has_only_defaults = all(
not socket.is_mandatory for socket in comp.__haystack_input__._sockets_dict.values() # type: ignore
)
if has_only_defaults:
there_are_only_components_with_defaults = True
for other_name, other_comp in waiting_for_input:
if name == other_name:
continue
there_are_only_components_with_defaults &= all(
not s.is_mandatory for s in other_comp.__haystack_input__._sockets_dict.values() # type: ignore
)
if not there_are_only_components_with_defaults:
continue

# Find the first component that has all the inputs it needs to run
has_enough_inputs = True
for input_socket in comp.__haystack_input__._sockets_dict.values(): # type: ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
Fixes `Pipeline.run()` logic so Components that have all their inputs with a default are run in the correct order.
This happened we gather a list of Components to run internally when running the Pipeline in the order they are
added during creation of the Pipeline.
This caused some Components to run before they received all their inputs.
51 changes: 51 additions & 0 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,3 +754,54 @@ def run(self, word: str, intermediate: Optional[str] = None):
pipeline.add_component("hello", hello)
pipeline.connect("hello.intermediate", "hello.intermediate")
assert [("hello", hello)] == list(pipeline.walk())


def test_correct_execution_order_of_components_with_only_defaults(spying_tracer):
"""
We enqueue the Components in internal `to_run` data structure at the start of `Pipeline.run()` using the order
they are added in the Pipeline with `Pipeline.add_component()`.
If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline
logic A must be executed after B it could run instead before.
This test verifies that the order of execution is correct.
"""
docs = [Document(content="Rome is the capital of Italy"), Document(content="Paris is the capital of France")]
doc_store = InMemoryDocumentStore()
doc_store.write_documents(docs)
template = (
"Given the following information, answer the question.\n"
"Context:\n"
"{% for document in documents %}"
" {{ document.content }}\n"
"{% endfor %}"
"Question: {{ query }}"
)

pipe = Pipeline()

# The order of this addition is important for the test
# Do not edit them.
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store))
pipe.connect("retriever", "prompt_builder.documents")

query = "What is the capital of France?"
res = pipe.run({"prompt_builder": {"query": query}, "retriever": {"query": query}})

assert len(spying_tracer.spans) == 3
assert spying_tracer.spans[0].operation_name == "haystack.pipeline.run"
assert spying_tracer.spans[1].operation_name == "haystack.component.run"
assert spying_tracer.spans[1].tags["haystack.component.name"] == "retriever"
assert spying_tracer.spans[2].operation_name == "haystack.component.run"
assert spying_tracer.spans[2].tags["haystack.component.name"] == "prompt_builder"

print(res["prompt_builder"]["prompt"])
assert res == {
"prompt_builder": {
"prompt": "Given the following information, answer the question.\n"
"Context:\n"
" Paris is the capital of France\n"
" Rome is the capital of Italy\n"
"Question: What is the capital of France?"
}
}

0 comments on commit 6533483

Please sign in to comment.