Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion datashare-python/datashare_python/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
"path to a worker config YAML file,"
" if not provided will load worker configuration from env variables"
)
_START_WORKER_SKIP_CONFIG_CLS_DISCOVERY = (
"skip config class discovery (useful to run a workflow worker from different app)"
)

_WORKER_QUEUE_HELP = "worker task queue"
_TEMPORAL_NAMESPACE_HELP = "worker temporal namespace"

Expand Down Expand Up @@ -105,9 +109,19 @@ async def start(
str | None,
typer.Option("--temporal-namespace", "-ns", help=_TEMPORAL_NAMESPACE_HELP),
] = None,
*,
skip_config_discovery: Annotated[
bool,
typer.Option(
"--skip-config-discovery", help=_START_WORKER_SKIP_CONFIG_CLS_DISCOVERY
),
] = False,
) -> None:
registered_wfs, registered_acts, registered_deps, worker_config_cls = discover(
workflows, act_names=activities, deps_name=dependencies
workflows,
act_names=activities,
deps_name=dependencies,
skip_config=skip_config_discovery,
)
if config_path is not None:
with config_path.open() as f:
Expand Down
11 changes: 9 additions & 2 deletions datashare-python/datashare_python/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@


def discover(
wf_names: list[str] | None, *, act_names: list[str] | None, deps_name: str | None
wf_names: list[str] | None,
*,
act_names: list[str] | None,
deps_name: str | None,
skip_config: bool = False,
) -> _Discovery:
discovered = ""
wfs = None
Expand Down Expand Up @@ -80,7 +84,10 @@ def discover(
f"- {n_deps} dependenc{'ies' if n_deps > 1 else 'y'}:"
f" {', '.join(deps_names)}"
)
worker_config_cls = discover_worker_config_cls()
if not skip_config:
worker_config_cls = discover_worker_config_cls()
else:
worker_config_cls = WorkerConfig
discovered += f"- worker config class: {worker_config_cls}"
logger.info("discovered:\n%s", discovered)
return wfs, acts, deps, worker_config_cls
Expand Down
42 changes: 42 additions & 0 deletions datashare-python/tests/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,45 @@ async def test_start_workers(
- 1 activity: create-translation-batches
- 3 dependencies: set_worker_config, set_loggers, set_es_client"""
assert expected in result.stderr


async def test_start_workers_skipping_config_discovery(
typer_asyncio_patch, # noqa: ANN001, ARG001
monkeypatch: MonkeyPatch,
capsys: CaptureFixture[str],
) -> None:
# Given
runner = CliRunner()
monkeypatch.setattr(DatashareWorker, "__aenter__", _mock_worker__aenter__)
monkeypatch.setattr(DatashareWorker, "__aexit__", _mock_worker__aexit__)
monkeypatch.setattr(DatashareWorker, "is_done", _mock_worker_is_done)
with capsys.disabled():
# When
result = runner.invoke(
cli_app,
[
"worker",
"start",
"--queue",
"cpu",
"--skip-config-discovery",
"--activities",
"ping",
"--activities",
"create-translation-batches",
"--workflows",
"ping",
"--dependencies",
"base",
"--temporal-address",
"localhost:7233",
],
catch_exceptions=False,
)
# Then
assert result.exit_code == 0
expected = """discovered:
- 1 workflow: ping
- 1 activity: create-translation-batches
- 3 dependencies: set_worker_config, set_loggers, set_es_client"""
assert expected in result.stderr
11 changes: 11 additions & 0 deletions datashare-python/tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import datashare_python
import pytest
from _pytest.monkeypatch import MonkeyPatch
from datashare_python.config import WorkerConfig
from datashare_python.discovery import (
discover,
discover_activities,
discover_dependencies,
discover_worker_config_cls,
Expand Down Expand Up @@ -55,6 +57,15 @@ def test_discover_activities(names: list[str], expected_activities: set[str]) ->
assert activities == expected_activities


def test_discover_skip_config_discover() -> None:
# When
_, _, _, workflow_cls = discover(
["ping"], act_names=None, deps_name=None, skip_config=True
)
# Then
assert workflow_cls == WorkerConfig


def test_discover_dependencies() -> None:
# When
name = "base"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set -e

uv run --no-sync datashare-python worker start \
--skip-config-discovery \
--queue datashare.workflows \
--workflow asr.transcription \
--workflow translation
Loading