Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Migrate Pipeline.run() tests with run arguments #7777

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions test/core/pipeline/features/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Tuple, List, Dict, Any
from typing import Tuple, List, Dict, Any, Set

from pytest_bdd import when, then, parsers

from haystack import Pipeline


PipelineData = Tuple[Pipeline, List[Dict[str, Any]], List[Dict[str, Any]], List[List[str]]]
PipelineData = Tuple[Pipeline, List[Tuple[Dict[str, Any], Set[str]]], List[Dict[str, Any]], List[List[str]]]
PipelineResult = Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[List[str]], List[List[str]]]
shadeMe marked this conversation as resolved.
Show resolved Hide resolved


Expand All @@ -15,7 +15,7 @@ def run_pipeline(pipeline_data: PipelineData, spying_tracer):
Attempts to run a pipeline with the given inputs.
`pipeline_data` is a tuple that must contain:
* A Pipeline instance
* The Pipeline inputs
* The Pipeline inputs, and optionally <include_outputs_from> components
* The expected outputs

Optionally it can contain:
Expand All @@ -39,7 +39,10 @@ def run_pipeline(pipeline_data: PipelineData, spying_tracer):

for i in inputs:
try:
res = pipeline.run(i)
if isinstance(i, tuple):
res = pipeline.run(data=i[0], include_outputs_from=i[1])
else:
res = pipeline.run(i)
run_order = [
span.tags["haystack.component.name"]
for span in spying_tracer.spans
Expand Down
3 changes: 3 additions & 0 deletions test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ Feature: Pipeline running
| that has a component that sends one of its outputs to itself |
| that has multiple branches that merge into a component with a single variadic input |
| that has multiple branches of different lengths that merge into a component with a single variadic input |
| that is linear and returns intermediate outputs |
| that has a loop and returns intermediate outputs from it |
| that is linear and returns intermediate outputs from multiple sockets |

Scenario Outline: Running a bad Pipeline
Given a pipeline <kind>
Expand Down
111 changes: 111 additions & 0 deletions test/core/pipeline/features/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,3 +878,114 @@ def pipeline_that_has_multiple_branches_of_different_lengths_that_merge_into_a_c
{"fourth_addition": {"result": 12}},
["first_addition", "second_addition", "third_addition", "sum", "fourth_addition"],
)


@given("a pipeline that is linear and returns intermediate outputs", target_fixture="pipeline_data")
def pipeline_that_is_linear_and_returns_intermediate_outputs():
pipeline = Pipeline()
pipeline.add_component("first_addition", AddFixedValue(add=2))
pipeline.add_component("second_addition", AddFixedValue())
pipeline.add_component("double", Double())
pipeline.connect("first_addition", "double")
pipeline.connect("double", "second_addition")

return (
pipeline,
[
({"first_addition": {"value": 1}}, {"first_addition", "second_addition", "double"}),
({"first_addition": {"value": 1}}, {"double"}),
],
[
{"second_addition": {"result": 7}, "first_addition": {"result": 3}, "double": {"value": 6}},
{"second_addition": {"result": 7}, "double": {"value": 6}},
],
[["first_addition", "double", "second_addition"], ["first_addition", "double", "second_addition"]],
)


@given("a pipeline that has a loop and returns intermediate outputs from it", target_fixture="pipeline_data")
def pipeline_that_has_a_loop_and_returns_intermediate_outputs_from_it():
pipeline = Pipeline(max_loops_allowed=10)
pipeline.add_component("add_one", AddFixedValue(add=1))
pipeline.add_component("multiplexer", Multiplexer(type_=int))
pipeline.add_component("below_10", Threshold(threshold=10))
pipeline.add_component("below_5", Threshold(threshold=5))
pipeline.add_component("add_three", AddFixedValue(add=3))
pipeline.add_component("accumulator", Accumulate())
pipeline.add_component("add_two", AddFixedValue(add=2))

pipeline.connect("add_one.result", "multiplexer")
pipeline.connect("multiplexer.value", "below_10.value")
pipeline.connect("below_10.below", "accumulator.value")
pipeline.connect("accumulator.value", "below_5.value")
pipeline.connect("below_5.above", "add_three.value")
pipeline.connect("below_5.below", "multiplexer")
pipeline.connect("add_three.result", "multiplexer")
pipeline.connect("below_10.above", "add_two.value")

return (
pipeline,
(
{"add_one": {"value": 3}},
{"add_two", "add_one", "multiplexer", "below_10", "accumulator", "below_5", "add_three"},
),
{
"add_two": {"result": 13},
"add_one": {"result": 4},
"multiplexer": {"value": 11},
"below_10": {"above": 11},
"accumulator": {"value": 8},
"below_5": {"above": 8},
"add_three": {"result": 11},
},
[
"add_one",
"multiplexer",
"below_10",
"accumulator",
"below_5",
"multiplexer",
"below_10",
"accumulator",
"below_5",
"add_three",
"multiplexer",
"below_10",
"add_two",
],
)


@given(
"a pipeline that is linear and returns intermediate outputs from multiple sockets", target_fixture="pipeline_data"
)
def pipeline_that_is_linear_and_returns_intermediate_outputs_from_multiple_sockets():
@component
class DoubleWithOriginal:
"""
Doubles the input value and returns the original value as well.
"""

@component.output_types(value=int, original=int)
def run(self, value: int):
return {"value": value * 2, "original": value}

pipeline = Pipeline()
pipeline.add_component("first_addition", AddFixedValue(add=2))
pipeline.add_component("second_addition", AddFixedValue())
pipeline.add_component("double", DoubleWithOriginal())
pipeline.connect("first_addition", "double")
pipeline.connect("double.value", "second_addition")

return (
pipeline,
[
({"first_addition": {"value": 1}}, {"first_addition", "second_addition", "double"}),
({"first_addition": {"value": 1}}, {"double"}),
],
[
{"second_addition": {"result": 7}, "first_addition": {"result": 3}, "double": {"value": 6, "original": 3}},
{"second_addition": {"result": 7}, "double": {"value": 6, "original": 3}},
],
[["first_addition", "double", "second_addition"], ["first_addition", "double", "second_addition"]],
)
97 changes: 0 additions & 97 deletions test/core/pipeline/test_intermediate_outputs.py

This file was deleted.