From a61b1d49b4113672e3e373250ca0a007940b010d Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Mon, 5 Feb 2024 16:13:06 -0500 Subject: [PATCH 1/4] Allowing tests to use all runners in our group (#11870) --- .github/workflows/python-tests.yaml | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/.github/workflows/python-tests.yaml b/.github/workflows/python-tests.yaml index 37458910db10..c0dab0849a38 100644 --- a/.github/workflows/python-tests.yaml +++ b/.github/workflows/python-tests.yaml @@ -45,6 +45,8 @@ concurrency: jobs: run-tests: + runs-on: + group: oss-larger-runners name: python:${{ matrix.python-version }}, ${{ matrix.database }}, ${{ matrix.pytest-options }} strategy: matrix: @@ -52,8 +54,6 @@ jobs: - "postgres:13" - "postgres:14" - "sqlite" - os: - - "oss-test-runner" python-version: - "3.8" - "3.9" @@ -81,7 +81,6 @@ jobs: fail-fast: false - runs-on: ${{ matrix.os }} timeout-minutes: 45 steps: @@ -238,14 +237,14 @@ jobs: # Run a smaller subset of tests with pydantic v1 installed, the # Python versions we support, and only on sqlite + postgres 14 run-tests-pydantic-v1: + runs-on: + group: oss-larger-runners name: pydantic v1, python:${{ matrix.python-version }}, ${{ matrix.database }}, ${{ matrix.pytest-options }} strategy: matrix: database: - "postgres:14" - "sqlite" - os: - - "oss-test-runner" python-version: - "3.8" - "3.9" @@ -268,7 +267,6 @@ jobs: fail-fast: false - runs-on: ${{ matrix.os }} timeout-minutes: 45 steps: From 4e3e3942c99a62f3cf4690bebe233195b4f1f2cb Mon Sep 17 00:00:00 2001 From: urimandujano Date: Mon, 5 Feb 2024 15:14:10 -0600 Subject: [PATCH 2/4] Use errors from docker.errors (#11857) Co-authored-by: Chris Guidry --- tests/fixtures/docker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/fixtures/docker.py b/tests/fixtures/docker.py index d805c11d6753..53d0f622348c 100644 --- a/tests/fixtures/docker.py +++ b/tests/fixtures/docker.py @@ -2,6 +2,7 @@ from contextlib import contextmanager from typing import Generator +import docker.errors as docker_errors import pytest from typer.testing import CliRunner @@ -68,7 +69,7 @@ def cleanup_all_new_docker_objects(docker: DockerClient, worker_id: str): for image in docker.images.list(filters=filters): for tag in image.tags: docker.images.remove(tag, force=True) - except docker.errors.NotFound: + except docker_errors.NotFound: logger.warning("Failed to clean up Docker objects") From 9e8b79fa7cf2a9d73563d4f82e8989684785b722 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Mon, 5 Feb 2024 16:14:17 -0500 Subject: [PATCH 3/4] Making test_flows.py deterministic (#11858) --- tests/fixtures/docker.py | 1 - tests/test_flows.py | 128 +++++++++++++++++---------------------- 2 files changed, 55 insertions(+), 74 deletions(-) diff --git a/tests/fixtures/docker.py b/tests/fixtures/docker.py index 53d0f622348c..2da8a81e717c 100644 --- a/tests/fixtures/docker.py +++ b/tests/fixtures/docker.py @@ -73,7 +73,6 @@ def cleanup_all_new_docker_objects(docker: DockerClient, worker_id: str): logger.warning("Failed to clean up Docker objects") -@pytest.mark.timeout(120) @pytest.fixture(scope="session") def prefect_base_image(pytestconfig: "pytest.Config", docker: DockerClient): """Ensure that the prefect dev image is available and up-to-date""" diff --git a/tests/test_flows.py b/tests/test_flows.py index 407b594bc929..948f17045171 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -74,6 +74,9 @@ from prefect.utilities.collections import flatdict_to_dict from prefect.utilities.hashing import file_hash +# Give an ample amount of sleep time in order to test flow timeouts +SLEEP_TIME = 10 + @flow def test_flow(): @@ -1096,7 +1099,7 @@ class TestFlowTimeouts: def test_flows_fail_with_timeout(self): @flow(timeout_seconds=0.1) def my_flow(): - time.sleep(1) + time.sleep(SLEEP_TIME) state = my_flow._run() assert state.is_failed() @@ -1108,7 +1111,7 @@ def my_flow(): async def test_async_flows_fail_with_timeout(self): @flow(timeout_seconds=0.1) async def my_flow(): - await anyio.sleep(1) + await anyio.sleep(SLEEP_TIME) state = await my_flow._run() assert state.is_failed() @@ -1118,7 +1121,7 @@ async def my_flow(): assert "exceeded timeout of 0.1 seconds" in state.message def test_timeout_only_applies_if_exceeded(self): - @flow(timeout_seconds=1) + @flow(timeout_seconds=10) def my_flow(): time.sleep(0.1) @@ -1139,113 +1142,97 @@ def my_flow(): @pytest.mark.timeout(method="thread") # alarm-based pytest-timeout will interfere def test_timeout_does_not_wait_for_completion_for_sync_flows(self, tmp_path): - canary_file = tmp_path / "canary" + completed = False @flow(timeout_seconds=0.1) def my_flow(): - time.sleep(3) - canary_file.touch() + time.sleep(SLEEP_TIME) + nonlocal completed + completed = True state = my_flow(return_state=True) assert state.is_failed() assert "exceeded timeout of 0.1 seconds" in state.message - - assert not canary_file.exists() + assert not completed def test_timeout_stops_execution_at_next_task_for_sync_flows(self, tmp_path): """ Sync flow runs tasks will fail after a timeout which will cause the flow to exit """ - canary_file = tmp_path / "canary" - task_canary_file = tmp_path / "task_canary" + completed = False + task_completed = False @task def my_task(): - task_canary_file.touch() + nonlocal task_completed + task_completed = True @flow(timeout_seconds=0.1) def my_flow(): - time.sleep(0.25) + time.sleep(SLEEP_TIME) my_task() - canary_file.touch() # Should not run + nonlocal completed + completed = True state = my_flow._run() assert state.is_failed() assert "exceeded timeout of 0.1 seconds" in state.message - # Wait in case the flow is just sleeping - time.sleep(0.5) - - assert not canary_file.exists() - assert not task_canary_file.exists() + assert not completed + assert not task_completed async def test_timeout_stops_execution_after_await_for_async_flows(self, tmp_path): """ Async flow runs can be cancelled after a timeout """ - canary_file = tmp_path / "canary" - sleep_time = 5 + completed = False @flow(timeout_seconds=0.1) async def my_flow(): # Sleep in intervals to give more chances for interrupt - for _ in range(sleep_time * 10): + for _ in range(100): await anyio.sleep(0.1) - canary_file.touch() # Should not run + nonlocal completed + completed = True - t0 = anyio.current_time() state = await my_flow._run() - t1 = anyio.current_time() assert state.is_failed() assert "exceeded timeout of 0.1 seconds" in state.message - - # Wait in case the flow is just sleeping - await anyio.sleep(sleep_time) - - assert not canary_file.exists() - assert ( - t1 - t0 < sleep_time - ), f"The engine returns without waiting; took {t1-t0}s" + assert not completed async def test_timeout_stops_execution_in_async_subflows(self, tmp_path): """ Async flow runs can be cancelled after a timeout """ - canary_file = tmp_path / "canary" - sleep_time = 5 + completed = False @flow(timeout_seconds=0.1) async def my_subflow(): # Sleep in intervals to give more chances for interrupt - for _ in range(sleep_time * 10): + for _ in range(SLEEP_TIME * 10): await anyio.sleep(0.1) - canary_file.touch() # Should not run + nonlocal completed + completed = True @flow async def my_flow(): - t0 = anyio.current_time() subflow_state = await my_subflow._run() - t1 = anyio.current_time() - return t1 - t0, subflow_state + return None, subflow_state state = await my_flow._run() - runtime, subflow_state = await state.result() + (_, subflow_state) = await state.result() assert "exceeded timeout of 0.1 seconds" in subflow_state.message - - assert not canary_file.exists() - assert ( - runtime < sleep_time - ), f"The engine returns without waiting; took {runtime}s" + assert not completed async def test_timeout_stops_execution_in_sync_subflows(self, tmp_path): """ Sync flow runs can be cancelled after a timeout once a task is called """ - canary_file = tmp_path / "canary" + completed = False @task def timeout_noticing_task(): @@ -1256,25 +1243,20 @@ def my_subflow(): time.sleep(0.5) timeout_noticing_task() time.sleep(10) - canary_file.touch() # Should not run + nonlocal completed + completed = True @flow def my_flow(): - t0 = time.perf_counter() subflow_state = my_subflow._run() - t1 = time.perf_counter() - return t1 - t0, subflow_state + return None, subflow_state state = my_flow._run() - runtime, subflow_state = await state.result() + (_, subflow_state) = await state.result() assert "exceeded timeout of 0.1 seconds" in subflow_state.message - # Wait in case the flow is just sleeping and will still create the canary - time.sleep(1) - - assert not canary_file.exists() - assert runtime < 5, f"The engine returns without waiting; took {runtime}s" + assert not completed async def test_subflow_timeout_waits_until_execution_starts(self, tmp_path): """ @@ -1282,11 +1264,12 @@ async def test_subflow_timeout_waits_until_execution_starts(self, tmp_path): Fixes: https://github.com/PrefectHQ/prefect/issues/7903. """ - canary_file = tmp_path / "canary" + completed = False @flow(timeout_seconds=1) async def downstream_flow(): - canary_file.touch() + nonlocal completed + completed = True @task async def sleep_task(n): @@ -1297,15 +1280,12 @@ async def my_flow(): upstream_sleepers = await sleep_task.map([0.5, 1.0]) await downstream_flow(wait_for=upstream_sleepers) - t0 = anyio.current_time() state = await my_flow._run() - t1 = anyio.current_time() assert state.is_completed() # Validate the sleep tasks have ran - assert t1 - t0 >= 1 - assert canary_file.exists() # Validate subflow has ran + assert completed class ParameterTestModel(pydantic.BaseModel): @@ -1650,10 +1630,10 @@ def my_flow(): my_flow() logs = await prefect_client.read_logs() - error_log = [log.message for log in logs if log.level == 40].pop() - assert "Traceback" in error_log - assert "NameError" in error_log, "Should reference the exception type" - assert "x + y" in error_log, "Should reference the line of code" + error_logs = "\n".join([log.message for log in logs if log.level == 40]) + assert "Traceback" in error_logs + assert "NameError" in error_logs, "Should reference the exception type" + assert "x + y" in error_logs, "Should reference the line of code" async def test_raised_exceptions_include_tracebacks(self, prefect_client): @flow @@ -1664,13 +1644,15 @@ def my_flow(): my_flow() logs = await prefect_client.read_logs() - error_log = [ - log.message - for log in logs - if log.level == 40 and "Encountered exception" in log.message - ].pop() - assert "Traceback" in error_log - assert "ValueError: Hello!" in error_log, "References the exception" + error_logs = "\n".join( + [ + log.message + for log in logs + if log.level == 40 and "Encountered exception" in log.message + ] + ) + assert "Traceback" in error_logs + assert "ValueError: Hello!" in error_logs, "References the exception" async def test_opt_out_logs_are_not_sent_to_api(self, prefect_client): @flow From 25369e07aaa3840e73a11499f82d39f46c70e784 Mon Sep 17 00:00:00 2001 From: Chris Pickett Date: Mon, 5 Feb 2024 16:14:38 -0500 Subject: [PATCH 4/4] De-flaky `tests/test_engine.py` (#11862) Co-authored-by: Chris Guidry --- tests/test_engine.py | 42 +++++++++++------------------------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/tests/test_engine.py b/tests/test_engine.py index ac1a42a8f87e..79bbba8aaae9 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -215,53 +215,34 @@ async def pausing_flow(): assert min(sleep_intervals) <= 20 # Okay if this is zero assert max(sleep_intervals) == 100 - @pytest.mark.flaky async def test_first_polling_is_smaller_than_the_timeout(self, monkeypatch): - sleeper = AsyncMock(side_effect=[None, None, None, None, None]) + sleeper = AsyncMock(side_effect=[None]) monkeypatch.setattr("prefect.engine.anyio.sleep", sleeper) - @task - async def doesnt_pause(): - return 42 - @flow(task_runner=SequentialTaskRunner()) async def pausing_flow(): - x = await doesnt_pause.submit() await pause_flow_run(timeout=4, poll_interval=5) - y = await doesnt_pause.submit() - await doesnt_pause(wait_for=[x]) - await doesnt_pause(wait_for=[y]) - await doesnt_pause(wait_for=[x, y]) with pytest.raises(StopAsyncIteration): # the sleeper mock will exhaust its side effects after 6 calls await pausing_flow() + # When pausing a flow run and the poll_interval is greater than the + # timeout, the first sleep interval should be half of the timeout. sleep_intervals = [c.args[0] for c in sleeper.await_args_list] - assert sleep_intervals[0] == 2 - assert sleep_intervals[1:] == [5, 5, 5, 5, 5] + assert sleep_intervals[0] == 4 / 2 - @pytest.mark.flaky(max_runs=4) async def test_paused_flows_block_execution_in_sync_flows(self, prefect_client): - @task - def foo(): - return 42 + completed = False @flow(task_runner=SequentialTaskRunner()) def pausing_flow(): - x = foo.submit() - y = foo.submit() + nonlocal completed pause_flow_run(timeout=0.1) - foo(wait_for=[x]) - foo(wait_for=[y]) - foo(wait_for=[x, y]) + completed = True - flow_run_state = pausing_flow(return_state=True) - flow_run_id = flow_run_state.state_details.flow_run_id - task_runs = await prefect_client.read_task_runs( - flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}) - ) - assert len(task_runs) == 2, "only two tasks should have completed" + pausing_flow(return_state=True) + assert not completed async def test_paused_flows_block_execution_in_async_flows(self, prefect_client): @task @@ -1506,7 +1487,6 @@ def flaky_function(): assert await state.result() == 1 @pytest.mark.parametrize("jitter_factor", [0.1, 1, 10, 100]) - @pytest.mark.flaky(max_runs=3) async def test_waits_jittery_sleeps( self, mock_anyio_sleep, @@ -1554,10 +1534,10 @@ async def flaky_function(): log_prints=False, ) - assert mock_anyio_sleep.await_count == 10 + assert mock.call_count == 10 + 1 # 1 run + 10 retries sleeps = [c.args[0] for c in mock_anyio_sleep.await_args_list] assert statistics.variance(sleeps) > 0 - assert max(sleeps) < 100 * (1 + jitter_factor) + assert max(sleeps) <= 100 * (1 + jitter_factor) # Check for a proper final result assert await state.result() == 1