Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Fix Pipeline.run() running components with only defaults in the wrong order #7426

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,26 @@ def run(self, word: str):
if not there_are_only_lazy_variadics:
continue

# Components that have defaults for all their inputs must be treated the same identical way as we treat
# lazy variadic components. If there are only components with defaults we can run them.
# If we don't do this the order of execution of the Pipeline's Components will be affected cause we
# enqueue the Components in `to_run` at the start using the order they are added in the Pipeline.
# If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline
# logic A must be executed after B it could run instead before if we don't do this check.
has_only_defaults = all(
not socket.is_mandatory for socket in comp.__haystack_input__._sockets_dict.values() # type: ignore
)
if has_only_defaults:
there_are_only_components_with_defaults = True
for other_name, other_comp in waiting_for_input:
if name == other_name:
continue
there_are_only_components_with_defaults &= all(
not s.is_mandatory for s in other_comp.__haystack_input__._sockets_dict.values() # type: ignore
)
if not there_are_only_components_with_defaults:
continue

# Find the first component that has all the inputs it needs to run
has_enough_inputs = True
for input_socket in comp.__haystack_input__._sockets_dict.values(): # type: ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
Fixes `Pipeline.run()` logic so Components that have all their inputs with a default are run in the correct order.
This happened we gather a list of Components to run internally when running the Pipeline in the order they are
added during creation of the Pipeline.
This caused some Components to run before they received all their inputs.
51 changes: 51 additions & 0 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,54 @@ def run(self, word: str, intermediate: Optional[str] = None):
pipeline.add_component("hello", hello)
pipeline.connect("hello.intermediate", "hello.intermediate")
assert [("hello", hello)] == list(pipeline.walk())


def test_correct_execution_order_of_components_with_only_defaults(spying_tracer):
"""
We enqueue the Components in internal `to_run` data structure at the start of `Pipeline.run()` using the order
they are added in the Pipeline with `Pipeline.add_component()`.
If a Component A with defaults is added before a Component B that has no defaults, but in the Pipeline
logic A must be executed after B it could run instead before.

This test verifies that the order of execution is correct.
"""
docs = [Document(content="Rome is the capital of Italy"), Document(content="Paris is the capital of France")]
doc_store = InMemoryDocumentStore()
doc_store.write_documents(docs)
template = (
"Given the following information, answer the question.\n"
"Context:\n"
"{% for document in documents %}"
" {{ document.content }}\n"
"{% endfor %}"
"Question: {{ query }}"
)

pipe = Pipeline()

# The order of this addition is important for the test
# Do not edit them.
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store))
pipe.connect("retriever", "prompt_builder.documents")

query = "What is the capital of France?"
res = pipe.run({"prompt_builder": {"query": query}, "retriever": {"query": query}})

assert len(spying_tracer.spans) == 3
assert spying_tracer.spans[0].operation_name == "haystack.pipeline.run"
assert spying_tracer.spans[1].operation_name == "haystack.component.run"
assert spying_tracer.spans[1].tags["haystack.component.name"] == "retriever"
assert spying_tracer.spans[2].operation_name == "haystack.component.run"
assert spying_tracer.spans[2].tags["haystack.component.name"] == "prompt_builder"

print(res["prompt_builder"]["prompt"])
assert res == {
"prompt_builder": {
"prompt": "Given the following information, answer the question.\n"
"Context:\n"
" Paris is the capital of France\n"
" Rome is the capital of Italy\n"
"Question: What is the capital of France?"
}
}