Skip to content

Commit

Permalink
[test-api-update] execution_tests/dynamic_tests (#12427)
Browse files Browse the repository at this point in the history
### Summary & Motivation

Convert solid/pipeline uses in `execution_tests/dynamic_tests` to op/job

### How I Tested These Changes

BK
  • Loading branch information
smackesey committed Feb 17, 2023
1 parent 79f9ecf commit e1d3579
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def dynamic_echo(_, nums):


@job
def dynamic_pipeline():
def dynamic_job():
numbers = emit(num_range())
dynamic = numbers.map(lambda num: multiply_by_two(multiply_inputs(num, emit_ten())))
n = multiply_by_two.alias("double_total")(sum_numbers(dynamic.collect()))
Expand Down Expand Up @@ -132,7 +132,7 @@ def _run_configs():
def test_map(run_config):
with instance_for_test() as instance:
with execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=run_config,
) as result:
Expand Down Expand Up @@ -163,7 +163,7 @@ def test_map(run_config):
def test_map_empty(run_config):
with instance_for_test() as instance:
with execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=merge_dicts({"ops": {"num_range": {"config": {"range": 0}}}}, run_config),
) as result:
Expand All @@ -178,7 +178,7 @@ def test_map_empty(run_config):
def test_map_selection(run_config):
with instance_for_test() as instance:
with execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=merge_dicts({"ops": {"emit": {"inputs": {"num": 2}}}}, run_config),
op_selection=["emit*", "emit_ten"],
Expand Down Expand Up @@ -235,7 +235,7 @@ def test_tags():
emit.name: {"result": ["0", "1", "2"]},
},
)
plan = create_execution_plan(dynamic_pipeline, known_state=known_state)
plan = create_execution_plan(dynamic_job, known_state=known_state)

assert plan.get_step_by_key(emit.name).tags == {"first": "1"}

Expand All @@ -249,14 +249,14 @@ def test_tags():
def test_full_reexecute():
with instance_for_test() as instance:
result_1 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=_in_proc_cfg(),
)
assert result_1.success

result_2 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=_in_proc_cfg(),
reexecution_options=ReexecutionOptions(
Expand All @@ -273,14 +273,14 @@ def test_full_reexecute():
def test_partial_reexecute(run_config):
with instance_for_test() as instance:
result_1 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=run_config,
)
assert result_1.success

result_2 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=run_config,
reexecution_options=ReexecutionOptions(
Expand All @@ -291,7 +291,7 @@ def test_partial_reexecute(run_config):
assert result_2.success

result_3 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=run_config,
reexecution_options=ReexecutionOptions(
Expand Down Expand Up @@ -328,14 +328,14 @@ def test_fan_out_in_out_in(run_config):
def test_select_dynamic_step_and_downstream():
with instance_for_test() as instance:
result_1 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=_in_proc_cfg(),
)
assert result_1.success

result_2 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=_in_proc_cfg(),
reexecution_options=ReexecutionOptions(
Expand All @@ -346,7 +346,7 @@ def test_select_dynamic_step_and_downstream():
assert result_2.success

with execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
run_config=_in_proc_cfg(),
instance=instance,
reexecution_options=ReexecutionOptions(
Expand All @@ -366,7 +366,7 @@ def test_select_dynamic_step_and_downstream():
assert result_3.output_for_node("double_total") == 120

result_4 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
reexecution_options=ReexecutionOptions(
parent_run_id=result_1.run_id,
Expand All @@ -382,7 +382,7 @@ def test_select_dynamic_step_and_downstream():
assert "multiply_by_two[0]" not in keys_4

result_5 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
reexecution_options=ReexecutionOptions(
parent_run_id=result_1.run_id,
Expand All @@ -401,7 +401,7 @@ def test_select_dynamic_step_and_downstream():
def test_bad_step_selection():
with instance_for_test() as instance:
result_1 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=_in_proc_cfg(),
)
Expand All @@ -411,7 +411,7 @@ def test_bad_step_selection():
# both the dynamic outputting step key and something resolved by it in the previous run
with pytest.raises(DagsterExecutionStepNotFoundError):
execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
reexecution_options=ReexecutionOptions(
parent_run_id=result_1.run_id,
Expand Down Expand Up @@ -492,7 +492,7 @@ def test_select_dynamic_step_with_non_static_mapping():
def test_map_fail(run_config):
with instance_for_test() as instance:
result = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=merge_dicts({"ops": {"emit": {"config": {"fail": True}}}}, run_config),
raise_on_error=False,
Expand All @@ -507,7 +507,7 @@ def test_map_fail(run_config):
def test_map_reexecute_after_fail(run_config):
with instance_for_test() as instance:
result_1 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=merge_dicts(
run_config,
Expand All @@ -518,7 +518,7 @@ def test_map_reexecute_after_fail(run_config):
assert not result_1.success

result_2 = execute_job(
reconstructable(dynamic_pipeline),
reconstructable(dynamic_job),
instance=instance,
run_config=run_config,
reexecution_options=ReexecutionOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def destructure():
assert result.output_for_node("echo_a") == [1]
assert result.output_for_node("echo_b") == [2, 3]

# all fanned in inputs skipped -> solid skips
# all fanned in inputs skipped -> op skips
assert DagsterEventType.STEP_SKIPPED in [
event.event_type for event in result.all_events if event.step_key == "echo_c"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


@op(out=DynamicOut())
def dynamic_solid():
def dynamic_op():
yield DynamicOutput(1, mapping_key="1")
yield DynamicOutput(2, mapping_key="2")

Expand All @@ -26,13 +26,13 @@ def add(x, y):
def test_fan_in():
with pytest.raises(
DagsterInvalidDefinitionError,
match='Problematic dependency on dynamic output "dynamic_solid:result"',
match='Problematic dependency on dynamic output "dynamic_op:result"',
):

@job
def _should_fail():
numbers = []
dynamic_solid().map(numbers.append)
dynamic_op().map(numbers.append)
echo(numbers)


Expand All @@ -45,9 +45,9 @@ def test_multi_direct():
@job
def _should_fail():
def _add(x):
dynamic_solid().map(lambda y: add(x, y))
dynamic_op().map(lambda y: add(x, y))

dynamic_solid().map(_add)
dynamic_op().map(_add)


def test_multi_indirect():
Expand All @@ -59,9 +59,9 @@ def test_multi_indirect():
@job
def _should_fail():
def _add(x):
dynamic_solid().map(lambda y: add(x, y))
dynamic_op().map(lambda y: add(x, y))

dynamic_solid().map(lambda z: _add(echo(z)))
dynamic_op().map(lambda z: _add(echo(z)))


def test_multi_composite_out():
Expand All @@ -72,61 +72,61 @@ def test_multi_composite_out():

@graph
def composed_echo():
return dynamic_solid().map(echo)
return dynamic_op().map(echo)

@job
def _should_fail():
def _complex(item):
composed_echo().map(lambda y: add(y, item))

dynamic_solid().map(_complex)
dynamic_op().map(_complex)


def test_multi_composite_in():
with pytest.raises(
DagsterInvalidDefinitionError,
match=(
'cannot be downstream of dynamic output "dynamic_solid:result" since input "a" maps to'
'cannot be downstream of dynamic output "dynamic_op:result" since input "a" maps to'
" a node that is already downstream of another dynamic output"
),
):

@graph
def composed_add(a):
dynamic_solid().map(lambda b: add(a, b))
dynamic_op().map(lambda b: add(a, b))

@job
def _should_fail():
dynamic_solid().map(lambda x: composed_add(echo(x)))
dynamic_op().map(lambda x: composed_add(echo(x)))


def test_multi_composite_in_2():
with pytest.raises(
DagsterInvalidDefinitionError,
match=(
'cannot be downstream of dynamic output "dynamic_solid:result" since input "a" maps to'
'cannot be downstream of dynamic output "dynamic_op:result" since input "a" maps to'
" a node that is already downstream of another dynamic output"
),
):

@graph
def composed_add(a):
dynamic_solid().map(lambda b: add(a, b))
dynamic_op().map(lambda b: add(a, b))

@graph
def indirect(a):
composed_add(a)

@job
def _should_fail():
dynamic_solid().map(lambda x: indirect(echo(x)))
dynamic_op().map(lambda x: indirect(echo(x)))


def test_multi_composite_in_3():
with pytest.raises(
DagsterInvalidDefinitionError,
match=(
'cannot be downstream of dynamic output "dynamic_solid:result" since input "a" maps to'
'cannot be downstream of dynamic output "dynamic_op:result" since input "a" maps to'
" a node that is already downstream of another dynamic output"
),
):
Expand All @@ -137,14 +137,14 @@ def composed(a):

@job
def _should_fail():
dynamic_solid().map(composed)
dynamic_op().map(composed)


def test_multi_composite_in_4():
with pytest.raises(
DagsterInvalidDefinitionError,
match=(
'cannot be downstream of dynamic output "dynamic_solid:result" since input "a" maps to'
'cannot be downstream of dynamic output "dynamic_op:result" since input "a" maps to'
" a node that is already downstream of another dynamic output"
),
):
Expand All @@ -159,7 +159,7 @@ def indirect(a):

@job
def _should_fail():
dynamic_solid().map(indirect)
dynamic_op().map(indirect)


def test_direct_dep():
Expand All @@ -173,7 +173,7 @@ def _is_fine_1():
def _add(item):
dynamic_add(item)

dynamic_solid().map(_add)
dynamic_op().map(_add)

with pytest.raises(
DagsterInvalidDefinitionError,
Expand All @@ -185,11 +185,11 @@ def _should_fail():
def _add_echo(item):
dynamic_add(item).map(echo)

dynamic_solid().map(_add_echo)
dynamic_op().map(_add_echo)

@job
def _is_fine_2():
dynamic_solid().map(dynamic_add)
dynamic_op().map(dynamic_add)

with pytest.raises(
DagsterInvalidDefinitionError,
Expand All @@ -198,7 +198,7 @@ def _is_fine_2():

@job
def _should_fail():
echo(dynamic_solid().map(dynamic_add).collect())
echo(dynamic_op().map(dynamic_add).collect())


def test_collect_and_dep():
Expand All @@ -209,7 +209,7 @@ def test_collect_and_dep():

@job
def _bad():
x = dynamic_solid()
x = dynamic_op()
x.map(lambda y: add(y, x.collect()))

with pytest.raises(
Expand All @@ -219,5 +219,5 @@ def _bad():

@job
def _bad_other():
x = dynamic_solid()
x = dynamic_op()
x.map(lambda y: add(x.collect(), y))

0 comments on commit e1d3579

Please sign in to comment.