Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raise helpful error when task_worker runs against ephemeral server #13848

Merged
merged 5 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/prefect/client/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from prefect._internal.schemas.bases import IDBaseModel
from prefect.logging import get_logger
from prefect.settings import PREFECT_API_KEY, PREFECT_API_URL
from prefect.settings import PREFECT_API_KEY

logger = get_logger(__name__)

Expand All @@ -23,10 +23,11 @@ def __init__(
path: str,
keys: List[str],
client_id: Optional[str] = None,
base_url: Optional[str] = None,
):
self.model = model
self.client_id = client_id
base_url = PREFECT_API_URL.value().replace("http", "ws", 1)
base_url = base_url.replace("http", "ws", 1)
self.subscription_url = f"{base_url}{path}"

self.keys = keys
Expand Down
7 changes: 7 additions & 0 deletions src/prefect/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ async def stop(self):
raise StopTaskWorker

async def _subscribe_to_task_scheduling(self):
base_url = PREFECT_API_URL.value()
if base_url is None:
raise ValueError(
"`PREFECT_API_URL` must be set to use the task worker. "
"Task workers are not compatible with the ephemeral API."
)
logger.info(
f"Subscribing to tasks: {' | '.join(t.task_key.split('.')[-1] for t in self.tasks)}"
)
Expand All @@ -147,6 +153,7 @@ async def _subscribe_to_task_scheduling(self):
path="/task_runs/subscriptions/scheduled",
keys=[task.task_key for task in self.tasks],
client_id=self._client_id,
base_url=base_url,
):
if self._limiter:
await self._limiter.acquire_on_behalf_of(task_run.id)
Expand Down
9 changes: 9 additions & 0 deletions tests/test_task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
from prefect.exceptions import MissingResult
from prefect.filesystems import LocalFileSystem
from prefect.futures import PrefectDistributedFuture
from prefect.settings import PREFECT_API_URL, temporary_settings
from prefect.states import Running
from prefect.task_worker import TaskWorker, serve
from prefect.tasks import task_input_hash

pytestmark = pytest.mark.usefixtures("use_hosted_api_server")
Copy link
Contributor Author

@jakekaplan jakekaplan Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should always use a hosted server for these tests since task_workers should not run against the ephemeral api?



@pytest.fixture(autouse=True)
async def clear_cached_filesystems():
Expand Down Expand Up @@ -87,6 +90,12 @@ def mock_subscription(monkeypatch):
return mock_subscription


async def test_task_worker_does_not_run_against_ephemeral_api():
with pytest.raises(ValueError):
with temporary_settings({PREFECT_API_URL: None}):
await TaskWorker(...)._subscribe_to_task_scheduling()


async def test_task_worker_basic_context_management():
async with TaskWorker(...) as task_worker:
assert task_worker.started is True
Expand Down
Loading