From 743db136c0c9c40226ff55653d79c6d57f359af9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cle=CC=81ment=20Doumouro?= Date: Wed, 29 Apr 2026 10:02:19 +0200 Subject: [PATCH] feature(asr-worker): implement activity heartbeat --- datashare-python/datashare_python/logging_.py | 56 ++++--- datashare-python/datashare_python/utils.py | 154 ++++++++++-------- datashare-python/datashare_python/worker.py | 12 +- datashare-python/pyproject.toml | 2 +- worker-template/uv.dist.lock | 54 +++--- worker-template/uv.lock | 54 +++--- worker-template/worker_template/activities.py | 18 +- worker-template/worker_template/workflows.py | 19 ++- workers/asr-worker/asr_worker/activities.py | 30 ++-- workers/asr-worker/asr_worker/workflows.py | 2 +- workers/asr-worker/tests/test_activities.py | 4 +- workers/asr-worker/tests/test_workflows.py | 3 + 12 files changed, 225 insertions(+), 183 deletions(-) diff --git a/datashare-python/datashare_python/logging_.py b/datashare-python/datashare_python/logging_.py index 18aef39..32fc219 100644 --- a/datashare-python/datashare_python/logging_.py +++ b/datashare-python/datashare_python/logging_.py @@ -2,11 +2,7 @@ import sys from copy import copy -from icij_common.logging_utils import ( - DATE_FMT, - STREAM_HANDLER_FMT, - STREAM_HANDLER_FMT_WITH_WORKER_ID, -) +from icij_common.logging_utils import DATE_FMT, STREAM_HANDLER_FMT from pythonjsonlogger.core import RESERVED_ATTRS, BaseJsonFormatter from pythonjsonlogger.orjson import OrjsonFormatter from temporalio import activity, workflow @@ -26,6 +22,11 @@ ) +_STREAM_HANDLER_FMT_WITH_WORKER_ID = ( + "[%(levelname)s][%(asctime)s.%(msecs)03d][%(worker_id)s][%(name)s]: %(message)s" +) + + def setup_worker_loggers( loggers: dict[str, LogLevel], *, worker_id: str | None, in_json: bool ) -> None: @@ -35,35 +36,18 @@ def setup_worker_loggers( logger = logging.getLogger(logger_name) logger.setLevel(level) logger.handlers = [] - for handler in _get_worker_handlers(level, worker_id, in_json=in_json): + for handler in _get_worker_handlers(level, worker_filter, in_json=in_json): logger.addHandler(handler) - logger.addFilter(worker_filter) - - -def _get_worker_handlers( - level: int, worker_id: str | None, *, in_json: bool -) -> list[logging.Handler]: - stream_handler = logging.StreamHandler(sys.stderr) - if in_json: - fmt = _json_formatter(datefmt=DATE_FMT) - else: - if worker_id is not None: - fmt = STREAM_HANDLER_FMT_WITH_WORKER_ID - else: - fmt = STREAM_HANDLER_FMT - fmt = logging.Formatter(fmt, DATE_FMT) - stream_handler.setFormatter(fmt) - stream_handler.setLevel(level) - return [stream_handler] class WorkerFilter(logging.Filter): - def __init__(self, worker_id: str) -> None: + def __init__(self, worker_id: str | None) -> None: super().__init__() - self._worker_id = worker_id + self.worker_id = worker_id def filter(self, record: logging.LogRecord) -> bool: - record.worker_id = self._worker_id + if self.worker_id is not None: + record.worker_id = self.worker_id if workflow.in_workflow(): wf_info = workflow.info() for attr in _WF_LOGGED_ATTRS: @@ -79,6 +63,24 @@ def filter(self, record: logging.LogRecord) -> bool: return True +def _get_worker_handlers( + level: int, worker_filter: WorkerFilter, *, in_json: bool +) -> list[logging.Handler]: + stream_handler = logging.StreamHandler(sys.stderr) + if in_json: + fmt = _json_formatter(datefmt=DATE_FMT) + else: + if worker_filter.worker_id is not None: + fmt = _STREAM_HANDLER_FMT_WITH_WORKER_ID + else: + fmt = STREAM_HANDLER_FMT + fmt = logging.Formatter(fmt, DATE_FMT) + stream_handler.setFormatter(fmt) + stream_handler.setLevel(level) + stream_handler.addFilter(worker_filter) + return [stream_handler] + + def _json_formatter(datefmt: str) -> BaseJsonFormatter: fmt = OrjsonFormatter( # let's keep logging as fast as possible _LOGGED_ATTRIBUTES, datefmt=datefmt diff --git a/datashare-python/datashare_python/utils.py b/datashare-python/datashare_python/utils.py index 751fb59..e827c74 100644 --- a/datashare-python/datashare_python/utils.py +++ b/datashare-python/datashare_python/utils.py @@ -1,12 +1,10 @@ import asyncio +import contextlib +import contextvars import inspect import json -import logging -import sys -from collections.abc import ( - Callable, - Coroutine, -) +import threading +from collections.abc import Awaitable, Callable, Coroutine from copy import deepcopy from dataclasses import dataclass from datetime import timedelta @@ -20,15 +18,6 @@ import nest_asyncio import temporalio -from icij_common.logging_utils import ( - DATE_FMT, - STREAM_HANDLER_FMT, - STREAM_HANDLER_FMT_WITH_WORKER_ID, - WorkerIdFilter, -) -from icij_common.pydantic_utils import get_field_default_value -from pydantic.fields import FieldInfo -from pythonjsonlogger.json import JsonFormatter from temporalio import activity, workflow from temporalio.client import Client, WorkflowHandle from temporalio.common import RetryPolicy, SearchAttributeKey @@ -123,6 +112,7 @@ async def execute_activity( *, args: list | None = None, start_to_close_timeout: timedelta | None = None, + heartbeat_timeout: timedelta = timedelta(minutes=1), retry_policy: temporalio.common.RetryPolicy | None = None, ) -> Any: if args is None: @@ -135,6 +125,7 @@ async def execute_activity( start_to_close_timeout=start_to_close_timeout, task_queue=task_queue, retry_policy=retry_policy, + heartbeat_timeout=heartbeat_timeout, ) @@ -150,6 +141,8 @@ async def progress_handler( activity_id=activity_id, run_id=run_id, progress=progress, weight=weight ) await handle.signal("update_progress", signal) + with contextlib.suppress(RuntimeError, asyncio.TimeoutError): + activity.heartbeat() def get_activity_progress_handler_async( @@ -229,6 +222,74 @@ def wrapper(self: ActivityWithProgress, *args: P.args) -> T: return decorator +def with_async_heartbeat( + activity_fn: Callable[P, Awaitable[T]], n_missed_before_timeout: int +) -> Callable[P, Awaitable[T]]: + # Copied from + # https://github.com/temporalio/samples-python/blob/main/custom_decorator/activity_utils.py + @wraps(activity_fn) + async def wrapper(*args, **kwargs) -> T: + heartbeat_timeout = activity.info().heartbeat_timeout + heartbeat_task = None + if heartbeat_timeout: + period = heartbeat_timeout.total_seconds() / n_missed_before_timeout + heartbeat_task = asyncio.create_task(_async_heartbeat_every(period)) + try: + activity.heartbeat() + return await activity_fn(*args, **kwargs) + finally: + if heartbeat_task: + heartbeat_task.cancel() + await asyncio.wait([heartbeat_task]) + + return wrapper + + +async def _async_heartbeat_every(period: float, *details: Any) -> None: + with contextlib.suppress(RuntimeError, asyncio.TimeoutError): + activity.heartbeat(*details) + while True: + await asyncio.sleep(period) + with contextlib.suppress(RuntimeError, asyncio.TimeoutError): + activity.heartbeat(*details) + + +def with_sync_heartbeat( + activity_fn: Callable[P, T], n_missed_before_timeout: int +) -> Callable[P, T]: + @wraps(activity_fn) + def wrapper(*args, **kwargs) -> T: + heartbeat_timeout = activity.info().heartbeat_timeout + heartbeat_thread, stop_event = None, None + if heartbeat_timeout: + period = heartbeat_timeout.total_seconds() / n_missed_before_timeout + ctx = contextvars.copy_context() + run_args = (_sync_heartbeat_every, period, threading.Event()) + heartbeat_thread, stop_event = ( + threading.Thread(target=ctx.run, args=run_args), + run_args[-1], + ) + heartbeat_thread.start() + try: + return activity_fn(*args, **kwargs) + finally: + if heartbeat_thread: + stop_event.set() + heartbeat_thread.join() + + return wrapper + + +def _sync_heartbeat_every( + period: float, stop_event: threading.Event, *details: Any +) -> None: + with contextlib.suppress(RuntimeError, asyncio.TimeoutError): + activity.heartbeat(*details) + while not stop_event.wait(period): + with contextlib.suppress(RuntimeError, asyncio.TimeoutError): + activity.heartbeat(*details) + + def positional_args_only(activity_fn: Callable[P, T]) -> Callable[P, T]: sig = inspect.signature(activity_fn) @@ -336,6 +397,7 @@ def activity_defn( name: str, progress_weight: float = 1.0, retriables: set[type[Exception]] = None, + n_missed_heartbeats_before_timeout: int = 5, ) -> Callable[[Callable[P, T]], Callable[P, T]]: def decorator(activity_fn: Callable[P, T]) -> Callable[P, T]: # TODO: some of these could probably be reimplemented more elegantly using @@ -344,6 +406,15 @@ def decorator(activity_fn: Callable[P, T]) -> Callable[P, T]: activity_fn = with_retriables(retriables)(activity_fn) if supports_progress(activity_fn): activity_fn = with_progress(progress_weight)(activity_fn) + is_async = asyncio.iscoroutinefunction(activity_fn) + if is_async: + activity_fn = with_async_heartbeat( + activity_fn, n_missed_heartbeats_before_timeout + ) + else: + activity_fn = with_sync_heartbeat( + activity_fn, n_missed_heartbeats_before_timeout + ) activity_fn = activity.defn(activity_fn, name=name) return activity_fn @@ -382,59 +453,6 @@ async def _scaled(p: float) -> None: return _scaled -class LogWithWorkerIDMixin: - def setup_loggers(self, worker_id: str | None = None) -> None: - # Ugly work around the Pydantic V1 limitations... - all_loggers = self.loggers - if isinstance(all_loggers, FieldInfo): - all_loggers = get_field_default_value(all_loggers) - all_loggers.append(__name__) - loggers = sorted(set(all_loggers)) - log_level = self.log_level - if isinstance(log_level, FieldInfo): - log_level = get_field_default_value(log_level) - force_warning = getattr(self, "force_warning_loggers", []) - if isinstance(force_warning, FieldInfo): - force_warning = get_field_default_value(force_warning) - force_warning = set(force_warning) - worker_id_filter = None - if worker_id is not None: - worker_id_filter = WorkerIdFilter(worker_id) - handlers = self._handlers(worker_id_filter, log_level) - for logger_ in loggers: - logger_ = logging.getLogger(logger_) # noqa: PLW2901 - level = getattr(logging, log_level) - if logger_.name in force_warning: - level = max(logging.WARNING, level) - logger_.setLevel(level) - logger_.handlers = [] - for handler in handlers: - logger_.addHandler(handler) - - def _handlers( - self, worker_id_filter: logging.Filter | None, log_level: int - ) -> list[logging.Handler]: - stream_handler = logging.StreamHandler(sys.stderr) - if worker_id_filter is not None: - fmt = STREAM_HANDLER_FMT_WITH_WORKER_ID - else: - fmt = STREAM_HANDLER_FMT - log_in_json = getattr(self, "log_in_json", False) - if isinstance(log_in_json, FieldInfo): - log_in_json = get_field_default_value(log_in_json) - if log_in_json: - fmt = JsonFormatter(fmt, DATE_FMT) - else: - fmt = logging.Formatter(fmt, DATE_FMT) - stream_handler.setFormatter(fmt) - handlers = [stream_handler] - for handler in handlers: - if worker_id_filter is not None: - handler.addFilter(worker_id_filter) - handler.setLevel(log_level) - return handlers - - def safe_dir(doc_id: str) -> Path: if len(doc_id) < 4: raise ValueError(f"expected doc_id to be at least 4, found {doc_id}") diff --git a/datashare-python/datashare_python/worker.py b/datashare-python/datashare_python/worker.py index 92fb17e..6931415 100644 --- a/datashare-python/datashare_python/worker.py +++ b/datashare-python/datashare_python/worker.py @@ -11,7 +11,12 @@ from copy import copy from typing import Any -from temporalio.worker import PollerBehaviorSimpleMaximum, Worker +from temporalio.worker import ( + PollerBehaviorSimpleMaximum, + UnsandboxedWorkflowRunner, + Worker, +) +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner from .config import WorkerConfig from .dependencies import with_dependencies @@ -62,6 +67,7 @@ def datashare_worker( # Scale horizontally be default for activities, each worker processes one activity # at a time max_concurrent_io_activities: int = 10, + sandboxed: bool = True, ) -> DatashareWorker: if workflows is None: workflows = [] @@ -86,6 +92,7 @@ def datashare_worker( if workflows: logger.warning(_SEPARATE_IO_AND_CPU_WORKERS) interceptors = [TraceContextInterceptor()] + wf_runner = SandboxedWorkflowRunner() if sandboxed else UnsandboxedWorkflowRunner() return DatashareWorker( client, interceptors=interceptors, @@ -101,6 +108,7 @@ def datashare_worker( # Workflow tasks are assumed to be very lightweight and fast we can reserve # several of them workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(5), + workflow_runner=wf_runner, ) @@ -144,6 +152,7 @@ async def worker_context( event_loop: AbstractEventLoop, task_queue: str, dependencies: list[ContextManagerFactory] | None = None, + sandboxed: bool = True, ) -> AsyncGenerator[DatashareWorker, None]: discovered = [] if activities is not None: @@ -185,6 +194,7 @@ async def worker_context( activities=acts, task_queue=task_queue, max_concurrent_io_activities=worker_config.max_concurrent_io_activities, + sandboxed=sandboxed, ) async with worker: yield worker diff --git a/datashare-python/pyproject.toml b/datashare-python/pyproject.toml index 1ffc380..9bd619e 100644 --- a/datashare-python/pyproject.toml +++ b/datashare-python/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "datashare-python" version = "0.7.2" -description = "Manage Pythoœn tasks and local resources in Datashare" +description = "Manage Python tasks and local resources in Datashare" authors = [ { name = "Clément Doumouro", email = "cdoumouro@icij.org" }, { name = "Clément Doumouro", email = "clement.doumouro@gmail.com" }, diff --git a/worker-template/uv.dist.lock b/worker-template/uv.dist.lock index 8006b6b..a872b91 100644 --- a/worker-template/uv.dist.lock +++ b/worker-template/uv.dist.lock @@ -2044,12 +2044,12 @@ dependencies = [ { name = "typing-extensions", marker = "sys_platform == 'darwin'" }, ] wheels = [ - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp311-none-macosx_11_0_arm64.whl", hash = "sha256:a52952a8c90a422c14627ea99b9826b7557203b46b4d0772d3ca5c7699692425" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp312-none-macosx_11_0_arm64.whl", hash = "sha256:287242dd1f830846098b5eca847f817aa5c6015ea57ab4c1287809efea7b77eb" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:8924d10d36eac8fe0652a060a03fc2ae52980841850b9a1a2ddb0f27a4f181cd" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-none-macosx_11_0_arm64.whl", hash = "sha256:bcee64ae7aa65876ceeae6dcaebe75109485b213528c74939602208a20706e3f" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:defadbeb055cfcf5def58f70937145aecbd7a4bc295238ded1d0e85ae2cf0e1d" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:886f84b181f766f53265ba0a1d503011e60f53fff9d569563ef94f24160e1072" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp311-none-macosx_11_0_arm64.whl", hash = "sha256:a52952a8c90a422c14627ea99b9826b7557203b46b4d0772d3ca5c7699692425", upload-time = "2026-01-23T15:12:43Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp312-none-macosx_11_0_arm64.whl", hash = "sha256:287242dd1f830846098b5eca847f817aa5c6015ea57ab4c1287809efea7b77eb", upload-time = "2026-01-23T15:12:43Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:8924d10d36eac8fe0652a060a03fc2ae52980841850b9a1a2ddb0f27a4f181cd", upload-time = "2026-01-23T15:12:45Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-none-macosx_11_0_arm64.whl", hash = "sha256:bcee64ae7aa65876ceeae6dcaebe75109485b213528c74939602208a20706e3f", upload-time = "2026-01-23T15:12:48Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:defadbeb055cfcf5def58f70937145aecbd7a4bc295238ded1d0e85ae2cf0e1d", upload-time = "2026-01-23T15:12:48Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:886f84b181f766f53265ba0a1d503011e60f53fff9d569563ef94f24160e1072", upload-time = "2026-01-23T15:12:50Z" }, ] [[package]] @@ -2070,27 +2070,27 @@ dependencies = [ { name = "typing-extensions", marker = "sys_platform != 'darwin'" }, ] wheels = [ - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:0e611cfb16724e62252b67d31073bc5c490cb83e92ecdc1192762535e0e44487" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:3de2adb9b4443dc9210ef1f1b16da3647ace53553166d6360bbbd7edd6f16e4d" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_amd64.whl", hash = "sha256:69b3785d28be5a9c56ab525788ec5000349ec59132a74b7d5e954b905015b992" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_arm64.whl", hash = "sha256:15b4ae6fe371d96bffb8e1e9af62164797db20a0dc1337345781659cfd0b8bb1" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:3bf9b442a51a2948e41216a76d7ab00f0694cfcaaa51b6f9bcab57b7f89843e6" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7417d8c565f219d3455654cb431c6d892a3eb40246055e14d645422de13b9ea1" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_amd64.whl", hash = "sha256:a4e06b4f441675d26b462123c8a83e77c55f1ec8ebc081203be2db1ea8054add" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_arm64.whl", hash = "sha256:1abe31f14b560c1f062699e966cb08ef5b67518a1cfac2d8547a3dbcd8387b06" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:3e532e553b37ee859205a9b2d1c7977fd6922f53bbb1b9bfdd5bdc00d1a60ed4" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:39b3dff6d8fba240ae0d1bede4ca11c2531ae3b47329206512d99e17907ff74b" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_amd64.whl", hash = "sha256:404a7ab2fffaf2ca069e662f331eb46313692b2f1630df2720094284f390ccef" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_arm64.whl", hash = "sha256:161decbff26a33f13cb5ba6d2c8f458bbf56193bcc32ecc70be6dd4c7a3ee79d" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:01b1884f724977a20c7da2f640f1c7b37f4a2c117a7f4a6c1c0424d14cb86322" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:031a597147fa81b1e6d79ccf1ad3ccc7fafa27941d6cf26ff5caaa384fb20e92" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-win_amd64.whl", hash = "sha256:e586ab1363e3f86aa4cc133b7fdcf98deb1d2c13d43a7a6e5a6a18e9c5364893" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:65010ab4aacce6c9a1ddfc935f986c003ca8638ded04348fd326c3e74346237c" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:88adf5157db5da1d54b1c9fe4a6c1d20ceef00e75d854e206a87dbf69e3037dc" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-win_amd64.whl", hash = "sha256:f60e2565f261542efac07e25208fb3fc55c6fe82314a5a9cbee971edb5f27713" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:3ac2b8df2c55430e836dcda31940d47f1f5f94b8731057b6f20300ebea394dd9" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:5b688445f928f13563b7418b17c57e97bf955ab559cf73cd8f2b961f8572dbb3" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-win_amd64.whl", hash = "sha256:cf9c3e50b595721ca6b488bdcc326e0f1af73ed28b9b66eff504a96649bb5c96" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:0e611cfb16724e62252b67d31073bc5c490cb83e92ecdc1192762535e0e44487", upload-time = "2026-01-23T15:12:54Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:3de2adb9b4443dc9210ef1f1b16da3647ace53553166d6360bbbd7edd6f16e4d", upload-time = "2026-01-23T15:12:58Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_amd64.whl", hash = "sha256:69b3785d28be5a9c56ab525788ec5000349ec59132a74b7d5e954b905015b992", upload-time = "2026-01-23T15:12:59Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_arm64.whl", hash = "sha256:15b4ae6fe371d96bffb8e1e9af62164797db20a0dc1337345781659cfd0b8bb1", upload-time = "2026-01-23T15:13:00Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:3bf9b442a51a2948e41216a76d7ab00f0694cfcaaa51b6f9bcab57b7f89843e6", upload-time = "2026-01-23T15:13:02Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7417d8c565f219d3455654cb431c6d892a3eb40246055e14d645422de13b9ea1", upload-time = "2026-01-23T15:13:04Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_amd64.whl", hash = "sha256:a4e06b4f441675d26b462123c8a83e77c55f1ec8ebc081203be2db1ea8054add", upload-time = "2026-01-23T15:13:06Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_arm64.whl", hash = "sha256:1abe31f14b560c1f062699e966cb08ef5b67518a1cfac2d8547a3dbcd8387b06", upload-time = "2026-01-23T15:13:08Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:3e532e553b37ee859205a9b2d1c7977fd6922f53bbb1b9bfdd5bdc00d1a60ed4", upload-time = "2026-01-23T15:13:09Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:39b3dff6d8fba240ae0d1bede4ca11c2531ae3b47329206512d99e17907ff74b", upload-time = "2026-01-23T15:13:12Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_amd64.whl", hash = "sha256:404a7ab2fffaf2ca069e662f331eb46313692b2f1630df2720094284f390ccef", upload-time = "2026-01-23T15:13:13Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_arm64.whl", hash = "sha256:161decbff26a33f13cb5ba6d2c8f458bbf56193bcc32ecc70be6dd4c7a3ee79d", upload-time = "2026-01-23T15:13:14Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:01b1884f724977a20c7da2f640f1c7b37f4a2c117a7f4a6c1c0424d14cb86322", upload-time = "2026-01-23T15:13:16Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:031a597147fa81b1e6d79ccf1ad3ccc7fafa27941d6cf26ff5caaa384fb20e92", upload-time = "2026-01-23T15:13:19Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-win_amd64.whl", hash = "sha256:e586ab1363e3f86aa4cc133b7fdcf98deb1d2c13d43a7a6e5a6a18e9c5364893", upload-time = "2026-01-23T15:13:20Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:65010ab4aacce6c9a1ddfc935f986c003ca8638ded04348fd326c3e74346237c", upload-time = "2026-01-23T15:13:22Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:88adf5157db5da1d54b1c9fe4a6c1d20ceef00e75d854e206a87dbf69e3037dc", upload-time = "2026-01-23T15:13:23Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-win_amd64.whl", hash = "sha256:f60e2565f261542efac07e25208fb3fc55c6fe82314a5a9cbee971edb5f27713", upload-time = "2026-01-23T15:13:26Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:3ac2b8df2c55430e836dcda31940d47f1f5f94b8731057b6f20300ebea394dd9", upload-time = "2026-01-23T15:13:28Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:5b688445f928f13563b7418b17c57e97bf955ab559cf73cd8f2b961f8572dbb3", upload-time = "2026-01-23T15:13:29Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-win_amd64.whl", hash = "sha256:cf9c3e50b595721ca6b488bdcc326e0f1af73ed28b9b66eff504a96649bb5c96", upload-time = "2026-01-23T15:13:32Z" }, ] [[package]] diff --git a/worker-template/uv.lock b/worker-template/uv.lock index 3d2f114..eff0187 100644 --- a/worker-template/uv.lock +++ b/worker-template/uv.lock @@ -2070,12 +2070,12 @@ dependencies = [ { name = "typing-extensions", marker = "sys_platform == 'darwin'" }, ] wheels = [ - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp311-none-macosx_11_0_arm64.whl", hash = "sha256:a52952a8c90a422c14627ea99b9826b7557203b46b4d0772d3ca5c7699692425" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp312-none-macosx_11_0_arm64.whl", hash = "sha256:287242dd1f830846098b5eca847f817aa5c6015ea57ab4c1287809efea7b77eb" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:8924d10d36eac8fe0652a060a03fc2ae52980841850b9a1a2ddb0f27a4f181cd" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-none-macosx_11_0_arm64.whl", hash = "sha256:bcee64ae7aa65876ceeae6dcaebe75109485b213528c74939602208a20706e3f" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:defadbeb055cfcf5def58f70937145aecbd7a4bc295238ded1d0e85ae2cf0e1d" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:886f84b181f766f53265ba0a1d503011e60f53fff9d569563ef94f24160e1072" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp311-none-macosx_11_0_arm64.whl", hash = "sha256:a52952a8c90a422c14627ea99b9826b7557203b46b4d0772d3ca5c7699692425", upload-time = "2026-01-23T15:12:43Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp312-none-macosx_11_0_arm64.whl", hash = "sha256:287242dd1f830846098b5eca847f817aa5c6015ea57ab4c1287809efea7b77eb", upload-time = "2026-01-23T15:12:43Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:8924d10d36eac8fe0652a060a03fc2ae52980841850b9a1a2ddb0f27a4f181cd", upload-time = "2026-01-23T15:12:45Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp313-none-macosx_11_0_arm64.whl", hash = "sha256:bcee64ae7aa65876ceeae6dcaebe75109485b213528c74939602208a20706e3f", upload-time = "2026-01-23T15:12:48Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:defadbeb055cfcf5def58f70937145aecbd7a4bc295238ded1d0e85ae2cf0e1d", upload-time = "2026-01-23T15:12:48Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:886f84b181f766f53265ba0a1d503011e60f53fff9d569563ef94f24160e1072", upload-time = "2026-01-23T15:12:50Z" }, ] [[package]] @@ -2096,27 +2096,27 @@ dependencies = [ { name = "typing-extensions", marker = "sys_platform != 'darwin'" }, ] wheels = [ - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:0e611cfb16724e62252b67d31073bc5c490cb83e92ecdc1192762535e0e44487" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:3de2adb9b4443dc9210ef1f1b16da3647ace53553166d6360bbbd7edd6f16e4d" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_amd64.whl", hash = "sha256:69b3785d28be5a9c56ab525788ec5000349ec59132a74b7d5e954b905015b992" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_arm64.whl", hash = "sha256:15b4ae6fe371d96bffb8e1e9af62164797db20a0dc1337345781659cfd0b8bb1" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:3bf9b442a51a2948e41216a76d7ab00f0694cfcaaa51b6f9bcab57b7f89843e6" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7417d8c565f219d3455654cb431c6d892a3eb40246055e14d645422de13b9ea1" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_amd64.whl", hash = "sha256:a4e06b4f441675d26b462123c8a83e77c55f1ec8ebc081203be2db1ea8054add" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_arm64.whl", hash = "sha256:1abe31f14b560c1f062699e966cb08ef5b67518a1cfac2d8547a3dbcd8387b06" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:3e532e553b37ee859205a9b2d1c7977fd6922f53bbb1b9bfdd5bdc00d1a60ed4" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:39b3dff6d8fba240ae0d1bede4ca11c2531ae3b47329206512d99e17907ff74b" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_amd64.whl", hash = "sha256:404a7ab2fffaf2ca069e662f331eb46313692b2f1630df2720094284f390ccef" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_arm64.whl", hash = "sha256:161decbff26a33f13cb5ba6d2c8f458bbf56193bcc32ecc70be6dd4c7a3ee79d" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:01b1884f724977a20c7da2f640f1c7b37f4a2c117a7f4a6c1c0424d14cb86322" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:031a597147fa81b1e6d79ccf1ad3ccc7fafa27941d6cf26ff5caaa384fb20e92" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-win_amd64.whl", hash = "sha256:e586ab1363e3f86aa4cc133b7fdcf98deb1d2c13d43a7a6e5a6a18e9c5364893" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:65010ab4aacce6c9a1ddfc935f986c003ca8638ded04348fd326c3e74346237c" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:88adf5157db5da1d54b1c9fe4a6c1d20ceef00e75d854e206a87dbf69e3037dc" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-win_amd64.whl", hash = "sha256:f60e2565f261542efac07e25208fb3fc55c6fe82314a5a9cbee971edb5f27713" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:3ac2b8df2c55430e836dcda31940d47f1f5f94b8731057b6f20300ebea394dd9" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:5b688445f928f13563b7418b17c57e97bf955ab559cf73cd8f2b961f8572dbb3" }, - { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-win_amd64.whl", hash = "sha256:cf9c3e50b595721ca6b488bdcc326e0f1af73ed28b9b66eff504a96649bb5c96" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:0e611cfb16724e62252b67d31073bc5c490cb83e92ecdc1192762535e0e44487", upload-time = "2026-01-23T15:12:54Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:3de2adb9b4443dc9210ef1f1b16da3647ace53553166d6360bbbd7edd6f16e4d", upload-time = "2026-01-23T15:12:58Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_amd64.whl", hash = "sha256:69b3785d28be5a9c56ab525788ec5000349ec59132a74b7d5e954b905015b992", upload-time = "2026-01-23T15:12:59Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp311-cp311-win_arm64.whl", hash = "sha256:15b4ae6fe371d96bffb8e1e9af62164797db20a0dc1337345781659cfd0b8bb1", upload-time = "2026-01-23T15:13:00Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:3bf9b442a51a2948e41216a76d7ab00f0694cfcaaa51b6f9bcab57b7f89843e6", upload-time = "2026-01-23T15:13:02Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7417d8c565f219d3455654cb431c6d892a3eb40246055e14d645422de13b9ea1", upload-time = "2026-01-23T15:13:04Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_amd64.whl", hash = "sha256:a4e06b4f441675d26b462123c8a83e77c55f1ec8ebc081203be2db1ea8054add", upload-time = "2026-01-23T15:13:06Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp312-cp312-win_arm64.whl", hash = "sha256:1abe31f14b560c1f062699e966cb08ef5b67518a1cfac2d8547a3dbcd8387b06", upload-time = "2026-01-23T15:13:08Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:3e532e553b37ee859205a9b2d1c7977fd6922f53bbb1b9bfdd5bdc00d1a60ed4", upload-time = "2026-01-23T15:13:09Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:39b3dff6d8fba240ae0d1bede4ca11c2531ae3b47329206512d99e17907ff74b", upload-time = "2026-01-23T15:13:12Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_amd64.whl", hash = "sha256:404a7ab2fffaf2ca069e662f331eb46313692b2f1630df2720094284f390ccef", upload-time = "2026-01-23T15:13:13Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313-win_arm64.whl", hash = "sha256:161decbff26a33f13cb5ba6d2c8f458bbf56193bcc32ecc70be6dd4c7a3ee79d", upload-time = "2026-01-23T15:13:14Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:01b1884f724977a20c7da2f640f1c7b37f4a2c117a7f4a6c1c0424d14cb86322", upload-time = "2026-01-23T15:13:16Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:031a597147fa81b1e6d79ccf1ad3ccc7fafa27941d6cf26ff5caaa384fb20e92", upload-time = "2026-01-23T15:13:19Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp313-cp313t-win_amd64.whl", hash = "sha256:e586ab1363e3f86aa4cc133b7fdcf98deb1d2c13d43a7a6e5a6a18e9c5364893", upload-time = "2026-01-23T15:13:20Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:65010ab4aacce6c9a1ddfc935f986c003ca8638ded04348fd326c3e74346237c", upload-time = "2026-01-23T15:13:22Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:88adf5157db5da1d54b1c9fe4a6c1d20ceef00e75d854e206a87dbf69e3037dc", upload-time = "2026-01-23T15:13:23Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314-win_amd64.whl", hash = "sha256:f60e2565f261542efac07e25208fb3fc55c6fe82314a5a9cbee971edb5f27713", upload-time = "2026-01-23T15:13:26Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:3ac2b8df2c55430e836dcda31940d47f1f5f94b8731057b6f20300ebea394dd9", upload-time = "2026-01-23T15:13:28Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:5b688445f928f13563b7418b17c57e97bf955ab559cf73cd8f2b961f8572dbb3", upload-time = "2026-01-23T15:13:29Z" }, + { url = "https://download-r2.pytorch.org/whl/cpu/torch-2.9.1%2Bcpu-cp314-cp314t-win_amd64.whl", hash = "sha256:cf9c3e50b595721ca6b488bdcc326e0f1af73ed28b9b66eff504a96649bb5c96", upload-time = "2026-01-23T15:13:32Z" }, ] [[package]] diff --git a/worker-template/worker_template/activities.py b/worker-template/worker_template/activities.py index d5a09b6..f49ea9a 100644 --- a/worker-template/worker_template/activities.py +++ b/worker-template/worker_template/activities.py @@ -83,7 +83,7 @@ async def create_classification_batches( class TranslateDocs(ActivityWithProgress): @activity_defn(name="translate-docs") - def translate_docs( + async def translate_docs( self, docs: list[str], target_language: str, @@ -93,15 +93,13 @@ def translate_docs( progress: ProgressRateHandler | None = None, ) -> int: es_client = lifespan_es_client() - return self._event_loop.run_until_complete( - translate_docs( - docs, - target_language=target_language, - project=project, - es_client=es_client, - config=config, - progress=progress, - ) + return await translate_docs( + docs, + target_language=target_language, + project=project, + es_client=es_client, + config=config, + progress=progress, ) diff --git a/worker-template/worker_template/workflows.py b/worker-template/worker_template/workflows.py index 49c446f..6abc60d 100644 --- a/worker-template/worker_template/workflows.py +++ b/worker-template/worker_template/workflows.py @@ -5,9 +5,7 @@ from temporalio import workflow with workflow.unsafe.imports_passed_through(): - from datashare_python.utils import ( - WorkflowWithProgress, - ) + from datashare_python.utils import WorkflowWithProgress, execute_activity from .activities import ( ClassifyDocs, @@ -35,11 +33,13 @@ async def run(self, args: TranslateAndClassifyArgs) -> TranslateAndClassifyRespo args.config.translation.batch_size, ] # Create translation batches - translation_batches = await workflow.execute_activity( + heartbeat_timeout = timedelta(seconds=10) + translation_batches = await execute_activity( CreateTranslationBatches.create_translation_batches, args=translation_batch_args, task_queue=TaskQueues.IO, start_to_close_timeout=timedelta(hours=1), + heartbeat_timeout=heartbeat_timeout, ) # Translate translation_args = [ @@ -47,11 +47,12 @@ async def run(self, args: TranslateAndClassifyArgs) -> TranslateAndClassifyRespo for b in translation_batches ] translations_activities = [ - workflow.execute_activity( + execute_activity( TranslateDocs.translate_docs, args=args, task_queue=TaskQueues.TRANSLATE_GPU, start_to_close_timeout=timedelta(hours=1), + heartbeat_timeout=heartbeat_timeout, ) for args in translation_args ] @@ -63,11 +64,12 @@ async def run(self, args: TranslateAndClassifyArgs) -> TranslateAndClassifyRespo args.language, args.config.classification, ] - clf_batches = await workflow.execute_activity( + clf_batches = await execute_activity( CreateClassificationBatches.create_classification_batches, args=clf_batch_args, task_queue=TaskQueues.IO, start_to_close_timeout=timedelta(days=1), + heartbeat_timeout=heartbeat_timeout, ) # Classify clf_args = [ @@ -75,11 +77,12 @@ async def run(self, args: TranslateAndClassifyArgs) -> TranslateAndClassifyRespo for b in clf_batches ] clf_activities = [ - workflow.execute_activity( + execute_activity( ClassifyDocs.classify_docs, args=args, task_queue=TaskQueues.CLASSIFY_GPU, start_to_close_timeout=timedelta(days=1), + heartbeat_timeout=heartbeat_timeout, ) for args in clf_args ] @@ -94,7 +97,7 @@ async def run(self, args: TranslateAndClassifyArgs) -> TranslateAndClassifyRespo class PingWorkflow(WorkflowWithProgress): @workflow.run async def run(self, arg: dict) -> str: # noqa: ARG002 - return await workflow.execute_activity( + return await execute_activity( Pong.pong, task_queue=TaskQueues.IO, start_to_close_timeout=timedelta(hours=1), diff --git a/workers/asr-worker/asr_worker/activities.py b/workers/asr-worker/asr_worker/activities.py index 1cd6d33..97c656e 100644 --- a/workers/asr-worker/asr_worker/activities.py +++ b/workers/asr-worker/asr_worker/activities.py @@ -1,3 +1,4 @@ +import asyncio import contextlib import logging import os @@ -123,7 +124,7 @@ def preprocess( return batches @activity_defn(name=RUN_INFERENCE_ACTIVITY, progress_weight=_INFERENCE_WEIGHT) - def infer( + async def infer( self, preprocessed_inputs: list[Path], project: str, @@ -149,14 +150,14 @@ def infer( "model loaded, starting inference on %s audio chunks !", len(preprocessed_inputs), ) - paths = infer_act( + inference_res = infer_act( inference_runner, preprocessed_inputs, output_dir=output_dir, - event_loop=self._event_loop, progress=progress, ) - return [p.relative_to(workdir) for p in paths] + inference_res = [p.relative_to(workdir) async for p in inference_res] + return inference_res @activity_defn(name=POSTPROCESS_ACTIVITY, progress_weight=_BASE_WEIGHT) def postprocess( @@ -245,13 +246,13 @@ def preprocess_act( return list(_preprocess(preprocessor, audios, output_dir)) -def infer_act( +async def infer_act( inference_runner: InferenceRunner, preprocessed_inputs: list[Path], output_dir: Path, event_loop: AbstractEventLoop | None = None, progress: RawProgressHandler | None = None, -) -> list[Path]: +) -> AsyncIterable[Path]: # Audios paths in the input are relative to the batch file directory inputs = ( [ @@ -263,9 +264,11 @@ def infer_act( audio_paths, inputs = tee(inputs) audio_paths = (i.metadata.preprocessed_file_path for b in audio_paths for i in b) # TODO: implement caching - paths = [] + inference_results = await asyncio.to_thread( + _transcribe_as_list, inference_runner, list(inputs) + ) for res_i, (path, asr_res) in enumerate( - zip(audio_paths, inference_runner.process(inputs), strict=True) + zip(audio_paths, inference_results, strict=True) ): filename = f"{debuggable_name(path.name)}-transcript.json" transcript_path = output_dir / safe_dir(filename) / filename @@ -274,10 +277,15 @@ def infer_act( "run inference for %s, writing result to %s", path, transcript_path ) transcript_path.write_text(asr_res.model_dump_json()) - paths.append(transcript_path) + yield transcript_path if progress is not None and event_loop is not None: - event_loop.run_until_complete(progress(res_i)) - return paths + await progress(res_i) + + +def _transcribe_as_list( + inference_runner: InferenceRunner, inputs: Iterable[list[PreprocessedInput]] +) -> list[ASRResult]: + return list(inference_runner.process(inputs)) def postprocess_act( diff --git a/workers/asr-worker/asr_worker/workflows.py b/workers/asr-worker/asr_worker/workflows.py index 43449ae..3de53bd 100644 --- a/workers/asr-worker/asr_worker/workflows.py +++ b/workers/asr-worker/asr_worker/workflows.py @@ -36,7 +36,7 @@ async def run(self, args: ASRArgs) -> ASRResponse: doc_query = has_id(args.docs) if isinstance(args.docs, list) else args.docs search_args = [args.project, doc_query, batch_size] logger.info("searching files to process...") - batch_paths = await workflow.execute_activity( + batch_paths = await execute_activity( ASRActivities.search_audio_paths, args=search_args, start_to_close_timeout=timedelta(seconds=TEN_MINUTES), diff --git a/workers/asr-worker/tests/test_activities.py b/workers/asr-worker/tests/test_activities.py index b826637..1683731 100644 --- a/workers/asr-worker/tests/test_activities.py +++ b/workers/asr-worker/tests/test_activities.py @@ -229,7 +229,7 @@ def test_preprocess_act(test_worker_config: ASRWorkerConfig, tmpdir: Path) -> No assert written_batches == expected_batches -def test_infer_act(tmpdir: Path) -> None: +async def test_infer_act(tmpdir: Path) -> None: # Given inference_runner = MockInferenceRunner() workdir = Path(tmpdir) / "workdir" @@ -252,7 +252,7 @@ def test_infer_act(tmpdir: Path) -> None: # Then asr_results = [ ASRResult.model_validate_json((output_dir / p).read_text()) - for p in asr_result_paths + async for p in asr_result_paths ] assert asr_results == INFERENCE_RESULTS diff --git a/workers/asr-worker/tests/test_workflows.py b/workers/asr-worker/tests/test_workflows.py index 0259018..f901905 100644 --- a/workers/asr-worker/tests/test_workflows.py +++ b/workers/asr-worker/tests/test_workflows.py @@ -59,6 +59,7 @@ async def io_bound_worker( workflows=[ASRWorkflow], activities=[activities.search_audio_paths], dependencies=dependencies, + sandboxed=False, ) async with worker_ctx: yield @@ -83,6 +84,7 @@ async def cpu_bound_worker( task_queue=task_queue, activities=[activities.preprocess, activities.postprocess], dependencies=dependencies, + sandboxed=False, ) async with worker_ctx: yield @@ -107,6 +109,7 @@ async def gpu_inference_worker( task_queue=task_queue, activities=[activities.infer], dependencies=dependencies, + sandboxed=False, ) async with worker_ctx: yield