Skip to content

Commit

Permalink
Add utility function to move Components in queues
Browse files Browse the repository at this point in the history
  • Loading branch information
silvanocerza committed Jun 20, 2024
1 parent c1fe733 commit 648dd44
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
21 changes: 21 additions & 0 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,24 @@ def run(self, word: str):
inner[k] = v

return final_outputs


def _enqueue_component(
component_pair: Tuple[str, Component],
to_run: List[Tuple[str, Component]],
waiting_for_input: List[Tuple[str, Component]],
):
"""
Append a Component in the queue of Components to run if not already in it.
Remove it from the waiting list if it's there.
:param component_pair: Tuple of Component name and instance
:param to_run: Queue of Components to run
:param waiting_for_input: Queue of Components waiting for input
"""
if component_pair in waiting_for_input:
waiting_for_input.remove(component_pair)

if component_pair not in to_run:
to_run.append(component_pair)
37 changes: 37 additions & 0 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from haystack.core.component.types import InputSocket, OutputSocket, Variadic
from haystack.core.errors import PipelineConnectError, PipelineDrawingError, PipelineError
from haystack.core.pipeline import Pipeline, PredefinedPipeline
from haystack.core.pipeline.pipeline import _enqueue_component
from haystack.core.serialization import DeserializationCallbacks
from haystack.testing.factory import component_class
from haystack.testing.sample_components import AddFixedValue, Double, Greet
Expand Down Expand Up @@ -1249,3 +1250,39 @@ def test__is_stuck_in_a_loop(self):

waiting_for_input = [("document_builder", document_joiner), ("document_joiner", document_joiner)]
assert not pipe._is_stuck_in_a_loop(waiting_for_input)

def test__enqueue_component(self):
document_builder = component_class(
"DocumentBuilder", input_types={"text": str}, output_types={"doc": Document}
)()
document_joiner = component_class("DocumentJoiner", input_types={"docs": Variadic[Document]})()

to_run = []
waiting_for_input = []
_enqueue_component(("document_builder", document_builder), to_run, waiting_for_input)
assert to_run == [("document_builder", document_builder)]
assert waiting_for_input == []

to_run = [("document_builder", document_builder)]
waiting_for_input = []
_enqueue_component(("document_builder", document_builder), to_run, waiting_for_input)
assert to_run == [("document_builder", document_builder)]
assert waiting_for_input == []

to_run = []
waiting_for_input = [("document_builder", document_builder)]
_enqueue_component(("document_builder", document_builder), to_run, waiting_for_input)
assert to_run == [("document_builder", document_builder)]
assert waiting_for_input == []

to_run = []
waiting_for_input = [("document_joiner", document_joiner)]
_enqueue_component(("document_builder", document_builder), to_run, waiting_for_input)
assert to_run == [("document_builder", document_builder)]
assert waiting_for_input == [("document_joiner", document_joiner)]

to_run = [("document_joiner", document_joiner)]
waiting_for_input = []
_enqueue_component(("document_builder", document_builder), to_run, waiting_for_input)
assert to_run == [("document_joiner", document_joiner), ("document_builder", document_builder)]
assert waiting_for_input == []

0 comments on commit 648dd44

Please sign in to comment.