Skip to content

Commit

Permalink
Merge branch 'main' into fix/ignore-cache-upload-failures
Browse files Browse the repository at this point in the history
  • Loading branch information
urimandujano committed Feb 5, 2024
2 parents e1d76fb + 25369e0 commit 1ddecf5
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 112 deletions.
10 changes: 4 additions & 6 deletions .github/workflows/python-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ concurrency:

jobs:
run-tests:
runs-on:
group: oss-larger-runners
name: python:${{ matrix.python-version }}, ${{ matrix.database }}, ${{ matrix.pytest-options }}
strategy:
matrix:
database:
- "postgres:13"
- "postgres:14"
- "sqlite"
os:
- "oss-test-runner"
python-version:
- "3.8"
- "3.9"
Expand Down Expand Up @@ -81,7 +81,6 @@ jobs:

fail-fast: false

runs-on: ${{ matrix.os }}
timeout-minutes: 45

steps:
Expand Down Expand Up @@ -238,14 +237,14 @@ jobs:
# Run a smaller subset of tests with pydantic v1 installed, the
# Python versions we support, and only on sqlite + postgres 14
run-tests-pydantic-v1:
runs-on:
group: oss-larger-runners
name: pydantic v1, python:${{ matrix.python-version }}, ${{ matrix.database }}, ${{ matrix.pytest-options }}
strategy:
matrix:
database:
- "postgres:14"
- "sqlite"
os:
- "oss-test-runner"
python-version:
- "3.8"
- "3.9"
Expand All @@ -268,7 +267,6 @@ jobs:

fail-fast: false

runs-on: ${{ matrix.os }}
timeout-minutes: 45

steps:
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from contextlib import contextmanager
from typing import Generator

import docker.errors as docker_errors
import pytest
from typer.testing import CliRunner

Expand Down Expand Up @@ -68,11 +69,10 @@ def cleanup_all_new_docker_objects(docker: DockerClient, worker_id: str):
for image in docker.images.list(filters=filters):
for tag in image.tags:
docker.images.remove(tag, force=True)
except docker.errors.NotFound:
except docker_errors.NotFound:
logger.warning("Failed to clean up Docker objects")


@pytest.mark.timeout(120)
@pytest.fixture(scope="session")
def prefect_base_image(pytestconfig: "pytest.Config", docker: DockerClient):
"""Ensure that the prefect dev image is available and up-to-date"""
Expand Down
42 changes: 11 additions & 31 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,53 +215,34 @@ async def pausing_flow():
assert min(sleep_intervals) <= 20 # Okay if this is zero
assert max(sleep_intervals) == 100

@pytest.mark.flaky
async def test_first_polling_is_smaller_than_the_timeout(self, monkeypatch):
sleeper = AsyncMock(side_effect=[None, None, None, None, None])
sleeper = AsyncMock(side_effect=[None])
monkeypatch.setattr("prefect.engine.anyio.sleep", sleeper)

@task
async def doesnt_pause():
return 42

@flow(task_runner=SequentialTaskRunner())
async def pausing_flow():
x = await doesnt_pause.submit()
await pause_flow_run(timeout=4, poll_interval=5)
y = await doesnt_pause.submit()
await doesnt_pause(wait_for=[x])
await doesnt_pause(wait_for=[y])
await doesnt_pause(wait_for=[x, y])

with pytest.raises(StopAsyncIteration):
# the sleeper mock will exhaust its side effects after 6 calls
await pausing_flow()

# When pausing a flow run and the poll_interval is greater than the
# timeout, the first sleep interval should be half of the timeout.
sleep_intervals = [c.args[0] for c in sleeper.await_args_list]
assert sleep_intervals[0] == 2
assert sleep_intervals[1:] == [5, 5, 5, 5, 5]
assert sleep_intervals[0] == 4 / 2

@pytest.mark.flaky(max_runs=4)
async def test_paused_flows_block_execution_in_sync_flows(self, prefect_client):
@task
def foo():
return 42
completed = False

@flow(task_runner=SequentialTaskRunner())
def pausing_flow():
x = foo.submit()
y = foo.submit()
nonlocal completed
pause_flow_run(timeout=0.1)
foo(wait_for=[x])
foo(wait_for=[y])
foo(wait_for=[x, y])
completed = True

flow_run_state = pausing_flow(return_state=True)
flow_run_id = flow_run_state.state_details.flow_run_id
task_runs = await prefect_client.read_task_runs(
flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]})
)
assert len(task_runs) == 2, "only two tasks should have completed"
pausing_flow(return_state=True)
assert not completed

async def test_paused_flows_block_execution_in_async_flows(self, prefect_client):
@task
Expand Down Expand Up @@ -1506,7 +1487,6 @@ def flaky_function():
assert await state.result() == 1

@pytest.mark.parametrize("jitter_factor", [0.1, 1, 10, 100])
@pytest.mark.flaky(max_runs=3)
async def test_waits_jittery_sleeps(
self,
mock_anyio_sleep,
Expand Down Expand Up @@ -1554,10 +1534,10 @@ async def flaky_function():
log_prints=False,
)

assert mock_anyio_sleep.await_count == 10
assert mock.call_count == 10 + 1 # 1 run + 10 retries
sleeps = [c.args[0] for c in mock_anyio_sleep.await_args_list]
assert statistics.variance(sleeps) > 0
assert max(sleeps) < 100 * (1 + jitter_factor)
assert max(sleeps) <= 100 * (1 + jitter_factor)

# Check for a proper final result
assert await state.result() == 1
Expand Down
Loading

0 comments on commit 1ddecf5

Please sign in to comment.