From adb578b812c3e7d23fb86f7f3a773fbadd8d9b3a Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 7 Oct 2025 14:40:03 -0700 Subject: [PATCH 1/5] Add WorkflowDefinition Signed-off-by: Tim Li --- cadence/worker/_decision_task_handler.py | 4 +- cadence/worker/_registry.py | 12 +- cadence/workflow.py | 139 +++++++++++++++++- .../worker/test_decision_task_handler.py | 34 +++-- .../test_decision_task_handler_integration.py | 6 +- tests/cadence/worker/test_registry.py | 13 +- .../worker/test_task_handler_integration.py | 91 +++++++----- 7 files changed, 239 insertions(+), 60 deletions(-) diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index d35ee66..be8d44c 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -76,7 +76,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - ) try: - workflow_func = self._registry.get_workflow(workflow_type_name) + workflow_definition = self._registry.get_workflow(workflow_type_name) except KeyError: logger.error( "Workflow type not found in registry", @@ -105,7 +105,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - workflow_engine = WorkflowEngine( info=workflow_info, client=self._client, - workflow_func=workflow_func + workflow_func=workflow_definition.fn ) self._workflow_engines[cache_key] = workflow_engine diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py index d60521d..c9b5326 100644 --- a/cadence/worker/_registry.py +++ b/cadence/worker/_registry.py @@ -9,6 +9,7 @@ import logging from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T +from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions logger = logging.getLogger(__name__) @@ -28,7 +29,7 @@ class Registry: def __init__(self) -> None: """Initialize the registry.""" - self._workflows: Dict[str, Callable] = {} + self._workflows: Dict[str, WorkflowDefinition] = {} self._activities: Dict[str, ActivityDefinition] = {} self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping @@ -60,7 +61,10 @@ def decorator(f: Callable) -> Callable: if workflow_name in self._workflows: raise KeyError(f"Workflow '{workflow_name}' is already registered") - self._workflows[workflow_name] = f + # Create WorkflowDefinition with type information + workflow_opts = WorkflowDefinitionOptions(name=workflow_name) + workflow_def = WorkflowDefinition.wrap(f, workflow_opts) + self._workflows[workflow_name] = workflow_def # Register alias if provided alias = options.get('alias') @@ -135,7 +139,7 @@ def _register_activity(self, defn: ActivityDefinition) -> None: self._activities[defn.name] = defn - def get_workflow(self, name: str) -> Callable: + def get_workflow(self, name: str) -> WorkflowDefinition: """ Get a registered workflow by name. @@ -143,7 +147,7 @@ def get_workflow(self, name: str) -> Callable: name: Name or alias of the workflow Returns: - The workflow function + The workflow definition with type information Raises: KeyError: If workflow is not found diff --git a/cadence/workflow.py b/cadence/workflow.py index 51b968f..50692d7 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -2,10 +2,147 @@ from contextlib import contextmanager from contextvars import ContextVar from dataclasses import dataclass -from typing import Iterator +from functools import update_wrapper +from inspect import signature, Parameter +from typing import Iterator, Callable, TypeVar, ParamSpec, Generic, TypedDict, Unpack, overload, get_type_hints, Type, Any from cadence.client import Client + +@dataclass(frozen=True) +class WorkflowParameter: + """Parameter information for a workflow function.""" + name: str + type_hint: Type | None + default_value: Any | None + + +class WorkflowDefinitionOptions(TypedDict, total=False): + """Options for defining a workflow.""" + name: str + + +P = ParamSpec('P') +T = TypeVar('T') + + +class WorkflowDefinition(Generic[P, T]): + """ + Definition of a workflow function with metadata. + + Similar to ActivityDefinition but for workflows. + Provides type safety and metadata for workflow functions. + """ + + def __init__(self, wrapped: Callable[P, T], name: str, params: list[WorkflowParameter]): + self._wrapped = wrapped + self._name = name + self._params = params + update_wrapper(self, wrapped) + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: + return self._wrapped(*args, **kwargs) + + @property + def name(self) -> str: + """Get the workflow name.""" + return self._name + + @property + def params(self) -> list[WorkflowParameter]: + """Get the workflow parameters.""" + return self._params + + @property + def fn(self) -> Callable[P, T]: + """Get the underlying workflow function.""" + return self._wrapped + + @classmethod + def wrap(cls, fn: Callable[P, T], opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition[P, T]': + """ + Wrap a function as a WorkflowDefinition. + + Args: + fn: The workflow function to wrap + opts: Options for the workflow definition + + Returns: + A WorkflowDefinition instance + """ + name = fn.__qualname__ + if "name" in opts and opts["name"]: + name = opts["name"] + + params = _get_workflow_params(fn) + return cls(fn, name, params) + + +WorkflowDecorator = Callable[[Callable[P, T]], WorkflowDefinition[P, T]] + + +@overload +def defn(fn: Callable[P, T]) -> WorkflowDefinition[P, T]: + ... + + +@overload +def defn(**kwargs: Unpack[WorkflowDefinitionOptions]) -> WorkflowDecorator: + ... + + +def defn(fn: Callable[P, T] | None = None, **kwargs: Unpack[WorkflowDefinitionOptions]) -> WorkflowDecorator | WorkflowDefinition[P, T]: + """ + Decorator to define a workflow function. + + Usage: + @defn + def my_workflow(input_data: str) -> str: + return f"processed: {input_data}" + + @defn(name="custom_workflow_name") + def my_other_workflow(input_data: str) -> str: + return f"custom: {input_data}" + + Args: + fn: The workflow function (when used without parentheses) + **kwargs: Workflow definition options + + Returns: + Either a WorkflowDefinition (direct decoration) or a decorator function + """ + opts = WorkflowDefinitionOptions(**kwargs) + + def decorator(inner_fn: Callable[P, T]) -> WorkflowDefinition[P, T]: + return WorkflowDefinition.wrap(inner_fn, opts) + + if fn is not None: + return decorator(fn) + + return decorator + + +def _get_workflow_params(fn: Callable) -> list[WorkflowParameter]: + """Extract parameter information from a workflow function.""" + args = signature(fn).parameters + hints = get_type_hints(fn) + result = [] + for name, param in args.items(): + # Filter out self parameter + if param.name == "self": + continue + default = None + if param.default != Parameter.empty: + default = param.default + if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD): + type_hint = hints.get(name, None) + result.append(WorkflowParameter(name, type_hint, default)) + else: + raise ValueError(f"Parameters must be positional. {name} is {param.kind}, and not valid") + + return result + + @dataclass class WorkflowInfo: workflow_type: str diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index cd2b210..928a6eb 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -82,9 +82,14 @@ def test_initialization(self, mock_client, mock_registry): @pytest.mark.asyncio async def test_handle_task_implementation_success(self, handler, sample_decision_task, mock_registry): """Test successful decision task handling.""" - # Mock workflow function - mock_workflow_func = Mock() - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) @@ -142,9 +147,14 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp @pytest.mark.asyncio async def test_handle_task_implementation_caches_engines(self, handler, sample_decision_task, mock_registry): """Test that decision task handler caches workflow engines for same workflow execution.""" - # Mock workflow function - mock_workflow_func = Mock() - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) @@ -324,18 +334,20 @@ async def test_respond_decision_task_completed_error(self, handler, sample_decis async def test_workflow_engine_creation_with_workflow_info(self, handler, sample_decision_task, mock_registry): """Test that WorkflowEngine is created with correct WorkflowInfo.""" mock_workflow_func = Mock() - mock_registry.get_workflow.return_value = mock_workflow_func - + mock_workflow_definition = Mock() + mock_workflow_definition.fn = mock_workflow_func + mock_registry.get_workflow.return_value = mock_workflow_definition + mock_engine = Mock(spec=WorkflowEngine) mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) - + with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_workflow_engine_class: with patch('cadence.worker._decision_task_handler.WorkflowInfo') as mock_workflow_info_class: await handler._handle_task_implementation(sample_decision_task) - + # Verify WorkflowInfo was created with correct parameters (called once for engine) assert mock_workflow_info_class.call_count == 1 for call in mock_workflow_info_class.call_args_list: @@ -345,7 +357,7 @@ async def test_workflow_engine_creation_with_workflow_info(self, handler, sample 'workflow_id': "test_workflow_id", 'workflow_run_id': "test_run_id" } - + # Verify WorkflowEngine was created with correct parameters mock_workflow_engine_class.assert_called_once() call_args = mock_workflow_engine_class.call_args diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py index b513a14..ef7e0c0 100644 --- a/tests/cadence/worker/test_decision_task_handler_integration.py +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -35,12 +35,12 @@ def mock_client(self): def registry(self): """Create a registry with a test workflow.""" reg = Registry() - - @reg.workflow + + @reg.workflow(name="test_workflow") def test_workflow(input_data): """Simple test workflow that returns the input.""" return f"processed: {input_data}" - + return reg @pytest.fixture diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 4a8973b..1b776c6 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -30,16 +30,21 @@ def test_basic_registration_and_retrieval(self, registration_type): @reg.workflow def test_func(): return "test" - - func = reg.get_workflow("test_func") + + # Registry stores WorkflowDefinition internally + func_def = reg.get_workflow(test_func.__name__) + # WorkflowDefinition can be called directly + assert func_def() == "test" + # Verify it's actually a WorkflowDefinition + from cadence.workflow import WorkflowDefinition + assert isinstance(func_def, WorkflowDefinition) else: @reg.activity def test_func(): return "test" func = reg.get_activity(test_func.name) - - assert func() == "test" + assert func() == "test" def test_direct_call_behavior(self): reg = Registry() diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index 8e6aef9..5c76f03 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -61,11 +61,14 @@ def sample_decision_task(self): @pytest.mark.asyncio async def test_full_task_handling_flow_success(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow from base handler through decision handler.""" - # Mock workflow function - def mock_workflow_func(input_data): - return f"processed: {input_data}" - - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) @@ -86,11 +89,14 @@ def mock_workflow_func(input_data): @pytest.mark.asyncio async def test_full_task_handling_flow_with_error(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow when an error occurs.""" - # Mock workflow function - def mock_workflow_func(input_data): - return f"processed: {input_data}" - - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine to raise an error mock_engine = Mock(spec=WorkflowEngine) @@ -110,11 +116,14 @@ def mock_workflow_func(input_data): @pytest.mark.asyncio async def test_context_activation_integration(self, handler, sample_decision_task, mock_registry): """Test that context activation works correctly in the integration.""" - # Mock workflow function - def mock_workflow_func(input_data): - return f"processed: {input_data}" - - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) @@ -144,11 +153,14 @@ def track_context_activation(): @pytest.mark.asyncio async def test_multiple_workflow_executions(self, handler, mock_registry): """Test handling multiple workflow executions creates new engines for each.""" - # Mock workflow function - def mock_workflow_func(input_data): - return f"processed: {input_data}" - - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Create multiple decision tasks for different workflows task1 = Mock(spec=PollForDecisionTaskResponse) @@ -194,11 +206,14 @@ def mock_workflow_func(input_data): @pytest.mark.asyncio async def test_workflow_engine_creation_integration(self, handler, sample_decision_task, mock_registry): """Test workflow engine creation integration.""" - # Mock workflow function - def mock_workflow_func(input_data): - return f"processed: {input_data}" - - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) @@ -218,11 +233,14 @@ def mock_workflow_func(input_data): @pytest.mark.asyncio async def test_error_handling_with_context_cleanup(self, handler, sample_decision_task, mock_registry): """Test that context cleanup happens even when errors occur.""" - # Mock workflow function - def mock_workflow_func(input_data): - return f"processed: {input_data}" - - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine to raise an error mock_engine = Mock(spec=WorkflowEngine) @@ -255,11 +273,14 @@ async def test_concurrent_task_handling(self, handler, mock_registry): """Test handling multiple tasks concurrently.""" import asyncio - # Mock workflow function - def mock_workflow_func(input_data): - return f"processed: {input_data}" - - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + def mock_workflow_func(): + return "test_result" + + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Create multiple tasks tasks = [] From 60b14440b556ff57968fbc2e9c762f4f469c9c57 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 8 Oct 2025 15:32:26 -0700 Subject: [PATCH 2/5] Redefine the workflow type to class instead of callable Signed-off-by: Tim Li --- cadence/__init__.py | 4 + cadence/worker/_registry.py | 58 +++++----- cadence/workflow.py | 145 ++++++++++--------------- tests/cadence/worker/test_registry.py | 147 +++++++++++++++++--------- 4 files changed, 184 insertions(+), 170 deletions(-) diff --git a/cadence/__init__.py b/cadence/__init__.py index 175f01b..abf4bbd 100644 --- a/cadence/__init__.py +++ b/cadence/__init__.py @@ -6,9 +6,13 @@ # Import main client functionality from .client import Client +from .worker import Registry +from .workflow import workflow __version__ = "0.1.0" __all__ = [ "Client", + "Registry", + "workflow", ] diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py index c9b5326..abbfbbf 100644 --- a/cadence/worker/_registry.py +++ b/cadence/worker/_registry.py @@ -7,7 +7,7 @@ """ import logging -from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload +from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload, Type from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions @@ -35,50 +35,52 @@ def __init__(self) -> None: def workflow( self, - func: Optional[Callable] = None, + cls: Optional[Type] = None, **kwargs: Unpack[RegisterWorkflowOptions] - ) -> Callable: + ) -> Type: """ - Register a workflow function. - + Register a workflow class. + This method can be used as a decorator or called directly. - + Only supports class-based workflows. + Args: - func: The workflow function to register + cls: The workflow class to register **kwargs: Options for registration (name, alias) - + Returns: - The decorated function or the function itself - + The decorated class + Raises: KeyError: If workflow name already exists + ValueError: If class workflow is invalid """ options = RegisterWorkflowOptions(**kwargs) - - def decorator(f: Callable) -> Callable: - workflow_name = options.get('name') or f.__name__ - + + def decorator(target: Type) -> Type: + workflow_name = options.get('name') or target.__name__ + if workflow_name in self._workflows: raise KeyError(f"Workflow '{workflow_name}' is already registered") - + # Create WorkflowDefinition with type information workflow_opts = WorkflowDefinitionOptions(name=workflow_name) - workflow_def = WorkflowDefinition.wrap(f, workflow_opts) + workflow_def = WorkflowDefinition.wrap(target, workflow_opts) self._workflows[workflow_name] = workflow_def - + # Register alias if provided alias = options.get('alias') if alias: if alias in self._workflow_aliases: raise KeyError(f"Workflow alias '{alias}' is already registered") self._workflow_aliases[alias] = workflow_name - + logger.info(f"Registered workflow '{workflow_name}'") - return f - - if func is None: + return target + + if cls is None: return decorator - return decorator(func) + return decorator(cls) @overload def activity(self, func: Callable[P, T]) -> ActivityDefinition[P, T]: @@ -142,22 +144,22 @@ def _register_activity(self, defn: ActivityDefinition) -> None: def get_workflow(self, name: str) -> WorkflowDefinition: """ Get a registered workflow by name. - + Args: name: Name or alias of the workflow - + Returns: - The workflow definition with type information - + The workflow definition + Raises: KeyError: If workflow is not found """ # Check if it's an alias actual_name = self._workflow_aliases.get(name, name) - + if actual_name not in self._workflows: raise KeyError(f"Workflow '{name}' not found in registry") - + return self._workflows[actual_name] def get_activity(self, name: str) -> ActivityDefinition: diff --git a/cadence/workflow.py b/cadence/workflow.py index 50692d7..332f13c 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -2,19 +2,12 @@ from contextlib import contextmanager from contextvars import ContextVar from dataclasses import dataclass -from functools import update_wrapper -from inspect import signature, Parameter -from typing import Iterator, Callable, TypeVar, ParamSpec, Generic, TypedDict, Unpack, overload, get_type_hints, Type, Any +from typing import Iterator, Callable, TypeVar, TypedDict, Type +from functools import wraps from cadence.client import Client - -@dataclass(frozen=True) -class WorkflowParameter: - """Parameter information for a workflow function.""" - name: str - type_hint: Type | None - default_value: Any | None +T = TypeVar('T') class WorkflowDefinitionOptions(TypedDict, total=False): @@ -22,26 +15,17 @@ class WorkflowDefinitionOptions(TypedDict, total=False): name: str -P = ParamSpec('P') -T = TypeVar('T') - - -class WorkflowDefinition(Generic[P, T]): +class WorkflowDefinition: """ - Definition of a workflow function with metadata. + Definition of a workflow class with metadata. - Similar to ActivityDefinition but for workflows. - Provides type safety and metadata for workflow functions. + Similar to ActivityDefinition but for workflow classes. + Provides type safety and metadata for workflow classes. """ - def __init__(self, wrapped: Callable[P, T], name: str, params: list[WorkflowParameter]): - self._wrapped = wrapped + def __init__(self, cls: Type, name: str): + self._cls = cls self._name = name - self._params = params - update_wrapper(self, wrapped) - - def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: - return self._wrapped(*args, **kwargs) @property def name(self) -> str: @@ -49,98 +33,75 @@ def name(self) -> str: return self._name @property - def params(self) -> list[WorkflowParameter]: - """Get the workflow parameters.""" - return self._params - - @property - def fn(self) -> Callable[P, T]: - """Get the underlying workflow function.""" - return self._wrapped + def cls(self) -> Type: + """Get the workflow class.""" + return self._cls - @classmethod - def wrap(cls, fn: Callable[P, T], opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition[P, T]': + @staticmethod + def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition': """ - Wrap a function as a WorkflowDefinition. + Wrap a class as a WorkflowDefinition. Args: - fn: The workflow function to wrap + cls: The workflow class to wrap opts: Options for the workflow definition Returns: A WorkflowDefinition instance + + Raises: + ValueError: If no run method is found or multiple run methods exist """ - name = fn.__qualname__ + name = cls.__name__ if "name" in opts and opts["name"]: name = opts["name"] - params = _get_workflow_params(fn) - return cls(fn, name, params) + # Validate that the class has exactly one run method + run_method_count = 0 + for attr_name in dir(cls): + if attr_name.startswith('_'): + continue + attr = getattr(cls, attr_name) + if not callable(attr): + continue -WorkflowDecorator = Callable[[Callable[P, T]], WorkflowDefinition[P, T]] + # Check for workflow run method + if hasattr(attr, '_workflow_run'): + run_method_count += 1 + if run_method_count == 0: + raise ValueError(f"No @workflow.run method found in class {cls.__name__}") + elif run_method_count > 1: + raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}") -@overload -def defn(fn: Callable[P, T]) -> WorkflowDefinition[P, T]: - ... + return WorkflowDefinition(cls, name) -@overload -def defn(**kwargs: Unpack[WorkflowDefinitionOptions]) -> WorkflowDecorator: - ... +def run(func: Callable[..., T]) -> Callable[..., T]: + """ + Decorator to mark a method as the main workflow run method. + Args: + func: The method to mark as the workflow run method -def defn(fn: Callable[P, T] | None = None, **kwargs: Unpack[WorkflowDefinitionOptions]) -> WorkflowDecorator | WorkflowDefinition[P, T]: + Returns: + The decorated method with workflow run metadata """ - Decorator to define a workflow function. + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) - Usage: - @defn - def my_workflow(input_data: str) -> str: - return f"processed: {input_data}" + # Attach metadata to the function + wrapper._workflow_run = True # type: ignore + return wrapper - @defn(name="custom_workflow_name") - def my_other_workflow(input_data: str) -> str: - return f"custom: {input_data}" - Args: - fn: The workflow function (when used without parentheses) - **kwargs: Workflow definition options +# Create a simple namespace object for the workflow decorators +class _WorkflowNamespace: + run = staticmethod(run) - Returns: - Either a WorkflowDefinition (direct decoration) or a decorator function - """ - opts = WorkflowDefinitionOptions(**kwargs) - - def decorator(inner_fn: Callable[P, T]) -> WorkflowDefinition[P, T]: - return WorkflowDefinition.wrap(inner_fn, opts) - - if fn is not None: - return decorator(fn) - - return decorator - - -def _get_workflow_params(fn: Callable) -> list[WorkflowParameter]: - """Extract parameter information from a workflow function.""" - args = signature(fn).parameters - hints = get_type_hints(fn) - result = [] - for name, param in args.items(): - # Filter out self parameter - if param.name == "self": - continue - default = None - if param.default != Parameter.empty: - default = param.default - if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD): - type_hint = hints.get(name, None) - result.append(WorkflowParameter(name, type_hint, default)) - else: - raise ValueError(f"Parameters must be positional. {name} is {param.kind}, and not valid") - - return result +workflow = _WorkflowNamespace() @dataclass diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 1b776c6..53c16f0 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -7,6 +7,7 @@ from cadence import activity from cadence.worker import Registry +from cadence.workflow import workflow, WorkflowDefinition from tests.cadence import common_activities @@ -21,30 +22,33 @@ def test_basic_registry_creation(self): with pytest.raises(KeyError): reg.get_activity("nonexistent") - @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) - def test_basic_registration_and_retrieval(self, registration_type): - """Test basic registration and retrieval for both workflows and activities.""" + def test_basic_workflow_registration_and_retrieval(self): + """Test basic registration and retrieval for class-based workflows.""" reg = Registry() - - if registration_type == "workflow": - @reg.workflow - def test_func(): - return "test" - # Registry stores WorkflowDefinition internally - func_def = reg.get_workflow(test_func.__name__) - # WorkflowDefinition can be called directly - assert func_def() == "test" - # Verify it's actually a WorkflowDefinition - from cadence.workflow import WorkflowDefinition - assert isinstance(func_def, WorkflowDefinition) - else: - @reg.activity - def test_func(): + @reg.workflow + class TestWorkflow: + @workflow.run + async def run(self): return "test" - - func = reg.get_activity(test_func.name) - assert func() == "test" + + # Registry stores WorkflowDefinition internally + workflow_def = reg.get_workflow("TestWorkflow") + # Verify it's actually a WorkflowDefinition + assert isinstance(workflow_def, WorkflowDefinition) + assert workflow_def.name == "TestWorkflow" + assert workflow_def.cls == TestWorkflow + + def test_basic_activity_registration_and_retrieval(self): + """Test basic registration and retrieval for activities.""" + reg = Registry() + + @reg.activity + def test_func(): + return "test" + + func = reg.get_activity(test_func.name) + assert func() == "test" def test_direct_call_behavior(self): reg = Registry() @@ -58,41 +62,47 @@ def test_func(): assert func() == "direct_call" - @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) - def test_not_found_error(self, registration_type): - """Test KeyError is raised when function not found.""" + def test_workflow_not_found_error(self): + """Test KeyError is raised when workflow not found.""" reg = Registry() - - if registration_type == "workflow": - with pytest.raises(KeyError): - reg.get_workflow("nonexistent") - else: - with pytest.raises(KeyError): - reg.get_activity("nonexistent") - - @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) - def test_duplicate_registration_error(self, registration_type): - """Test KeyError is raised for duplicate registrations.""" + with pytest.raises(KeyError): + reg.get_workflow("nonexistent") + + def test_activity_not_found_error(self): + """Test KeyError is raised when activity not found.""" reg = Registry() - - if registration_type == "workflow": - @reg.workflow - def test_func(): + with pytest.raises(KeyError): + reg.get_activity("nonexistent") + + def test_duplicate_workflow_registration_error(self): + """Test KeyError is raised for duplicate workflow registrations.""" + reg = Registry() + + @reg.workflow(name="duplicate_test") + class TestWorkflow: + @workflow.run + async def run(self): return "test" - - with pytest.raises(KeyError): - @reg.workflow - def test_func(): + + with pytest.raises(KeyError): + @reg.workflow(name="duplicate_test") + class TestWorkflow2: + @workflow.run + async def run(self): return "duplicate" - else: + + def test_duplicate_activity_registration_error(self): + """Test KeyError is raised for duplicate activity registrations.""" + reg = Registry() + + @reg.activity(name="test_func") + def test_func(): + return "test" + + with pytest.raises(KeyError): @reg.activity(name="test_func") def test_func(): - return "test" - - with pytest.raises(KeyError): - @reg.activity(name="test_func") - def test_func(): - return "duplicate" + return "duplicate" def test_register_activities_instance(self): reg = Registry() @@ -155,3 +165,40 @@ def test_of(self): assert result.get_activity("simple_fn") is not None assert result.get_activity("echo") is not None assert result.get_activity("async_fn") is not None + + def test_class_workflow_validation_errors(self): + """Test validation errors for class-based workflows.""" + reg = Registry() + + # Test missing run method + with pytest.raises(ValueError, match="No @workflow.run method found"): + @reg.workflow + class MissingRunWorkflow: + def some_method(self): + pass + + # Test duplicate run methods + with pytest.raises(ValueError, match="Multiple @workflow.run methods found"): + @reg.workflow + class DuplicateRunWorkflow: + @workflow.run + async def run1(self): + pass + + @workflow.run + async def run2(self): + pass + + def test_class_workflow_with_custom_name(self): + """Test class-based workflow with custom name.""" + reg = Registry() + + @reg.workflow(name="custom_workflow_name") + class CustomWorkflow: + @workflow.run + async def run(self, input: str) -> str: + return f"processed: {input}" + + workflow_def = reg.get_workflow("custom_workflow_name") + assert workflow_def.name == "custom_workflow_name" + assert workflow_def.cls == CustomWorkflow From dae76e66179ce78f42060303a15e336bed8c0032 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 8 Oct 2025 15:36:30 -0700 Subject: [PATCH 3/5] lint Signed-off-by: Tim Li --- cadence/worker/_registry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py index abbfbbf..816caad 100644 --- a/cadence/worker/_registry.py +++ b/cadence/worker/_registry.py @@ -7,7 +7,7 @@ """ import logging -from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload, Type +from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload, Type, Union from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions @@ -37,7 +37,7 @@ def workflow( self, cls: Optional[Type] = None, **kwargs: Unpack[RegisterWorkflowOptions] - ) -> Type: + ) -> Union[Type, Callable[[Type], Type]]: """ Register a workflow class. From 6bf973bf779dbe85e070a5db6f9d575f299d757b Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 9 Oct 2025 10:07:57 -0700 Subject: [PATCH 4/5] another bunch of minor changes Signed-off-by: Tim Li --- cadence/_internal/workflow/workflow_engine.py | 24 +++--- cadence/worker/_decision_task_handler.py | 6 +- cadence/workflow.py | 12 ++- .../test_workflow_engine_integration.py | 61 ++++++++++----- .../worker/test_decision_task_handler.py | 53 +++++++++---- .../test_decision_task_handler_integration.py | 9 ++- .../test_decision_worker_integration.py | 19 +++-- .../worker/test_task_handler_integration.py | 77 ++++++++++++------- 8 files changed, 172 insertions(+), 89 deletions(-) diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 2456cc1..7eff5c2 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -20,9 +20,12 @@ class DecisionResult: decisions: list[Decision] class WorkflowEngine: - def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None): + def __init__(self, info: WorkflowInfo, client: Client, workflow_definition=None): self._context = Context(client, info) - self._workflow_func = workflow_func + self._workflow_definition = workflow_definition + self._workflow_instance = None + if workflow_definition: + self._workflow_instance = workflow_definition.cls() self._decision_manager = DecisionManager() self._decisions_helper = DecisionsHelper(self._decision_manager) self._is_workflow_complete = False @@ -250,19 +253,17 @@ def _fallback_process_workflow_history(self, history) -> None: async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None: """ Execute the workflow function to generate new decisions. - + This blocks until the workflow schedules an activity or completes. - + Args: decision_task: The decision task containing workflow context """ try: - # Execute the workflow function - # The workflow function should block until it schedules an activity - workflow_func = self._workflow_func - if workflow_func is None: + # Execute the workflow function from the workflow instance + if self._workflow_definition is None or self._workflow_instance is None: logger.warning( - "No workflow function available", + "No workflow definition or instance available", extra={ "workflow_type": self._context.info().workflow_type, "workflow_id": self._context.info().workflow_id, @@ -271,6 +272,9 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes ) return + # Get the workflow run method from the instance + workflow_func = self._workflow_definition.get_run_method(self._workflow_instance) + # Extract workflow input from history workflow_input = await self._extract_workflow_input(decision_task) @@ -290,7 +294,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes "completion_type": "success" } ) - + except Exception as e: logger.error( "Error executing workflow function", diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index be8d44c..62f0edb 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -103,9 +103,9 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - workflow_engine = self._workflow_engines.get(cache_key) if workflow_engine is None: workflow_engine = WorkflowEngine( - info=workflow_info, - client=self._client, - workflow_func=workflow_definition.fn + info=workflow_info, + client=self._client, + workflow_definition=workflow_definition ) self._workflow_engines[cache_key] = workflow_engine diff --git a/cadence/workflow.py b/cadence/workflow.py index 332f13c..22fd866 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -2,7 +2,7 @@ from contextlib import contextmanager from contextvars import ContextVar from dataclasses import dataclass -from typing import Iterator, Callable, TypeVar, TypedDict, Type +from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any from functools import wraps from cadence.client import Client @@ -37,6 +37,16 @@ def cls(self) -> Type: """Get the workflow class.""" return self._cls + def get_run_method(self, instance: Any) -> Callable: + """Get the workflow run method from an instance of the workflow class.""" + for attr_name in dir(instance): + if attr_name.startswith('_'): + continue + attr = getattr(instance, attr_name) + if callable(attr) and hasattr(attr, '_workflow_run'): + return cast(Callable, attr) + raise ValueError(f"No @workflow.run method found in class {self._cls.__name__}") + @staticmethod def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition': """ diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index cb1f449..ecf7c13 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -36,19 +36,25 @@ def workflow_info(self): ) @pytest.fixture - def mock_workflow_func(self): - """Create a mock workflow function.""" - def workflow_func(input_data): - return f"processed: {input_data}" - return workflow_func + def mock_workflow_definition(self): + """Create a mock workflow definition.""" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class TestWorkflow: + @workflow.run + def weird_name(self, input_data): + return f"processed: {input_data}" + + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + return WorkflowDefinition.wrap(TestWorkflow, workflow_opts) @pytest.fixture - def workflow_engine(self, mock_client, workflow_info, mock_workflow_func): + def workflow_engine(self, mock_client, workflow_info, mock_workflow_definition): """Create a WorkflowEngine instance.""" return WorkflowEngine( info=workflow_info, client=mock_client, - workflow_func=mock_workflow_func + workflow_definition=mock_workflow_definition ) def create_mock_decision_task(self, workflow_id="test-workflow", run_id="test-run", workflow_type="test_workflow"): @@ -211,10 +217,13 @@ async def test_extract_workflow_input_deserialization_error(self, workflow_engin def test_execute_workflow_function_sync(self, workflow_engine): """Test synchronous workflow function execution.""" input_data = "test-input" - + + # Get the workflow function from the instance + workflow_func = workflow_engine._workflow_definition.get_run_method(workflow_engine._workflow_instance) + # Execute the workflow function - result = workflow_engine._execute_workflow_function_once(workflow_engine._workflow_func, input_data) - + result = workflow_engine._execute_workflow_function_once(workflow_func, input_data) + # Verify the result assert result == "processed: test-input" @@ -239,20 +248,21 @@ def test_execute_workflow_function_none(self, workflow_engine): with pytest.raises(TypeError, match="'NoneType' object is not callable"): workflow_engine._execute_workflow_function_once(None, input_data) - def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_func): + def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_definition): """Test WorkflowEngine initialization.""" assert workflow_engine._context is not None - assert workflow_engine._workflow_func == mock_workflow_func + assert workflow_engine._workflow_definition == mock_workflow_definition + assert workflow_engine._workflow_instance is not None assert workflow_engine._decision_manager is not None assert workflow_engine._is_workflow_complete is False @pytest.mark.asyncio - async def test_workflow_engine_without_workflow_func(self, mock_client, workflow_info): - """Test WorkflowEngine without workflow function.""" + async def test_workflow_engine_without_workflow_definition(self, mock_client, workflow_info): + """Test WorkflowEngine without workflow definition.""" engine = WorkflowEngine( info=workflow_info, client=mock_client, - workflow_func=None + workflow_definition=None ) decision_task = self.create_mock_decision_task() @@ -269,12 +279,21 @@ async def test_workflow_engine_without_workflow_func(self, mock_client, workflow async def test_workflow_engine_workflow_completion(self, workflow_engine, mock_client): """Test workflow completion detection.""" decision_task = self.create_mock_decision_task() - - # Mock workflow function to return a result (indicating completion) - def completing_workflow_func(input_data): - return "workflow-completed" - - workflow_engine._workflow_func = completing_workflow_func + + # Create a workflow definition that returns a result (indicating completion) + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class CompletingWorkflow: + @workflow.run + def run(self, input_data): + return "workflow-completed" + + workflow_opts = WorkflowDefinitionOptions(name="completing_workflow") + completing_definition = WorkflowDefinition.wrap(CompletingWorkflow, workflow_opts) + + # Replace the workflow definition and instance + workflow_engine._workflow_definition = completing_definition + workflow_engine._workflow_instance = completing_definition.cls() with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=[]): # Process the decision diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 928a6eb..55b1e1f 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -83,12 +83,15 @@ def test_initialization(self, mock_client, mock_registry): async def test_handle_task_implementation_success(self, handler, sample_decision_task, mock_registry): """Test successful decision task handling.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine @@ -148,12 +151,15 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp async def test_handle_task_implementation_caches_engines(self, handler, sample_decision_task, mock_registry): """Test that decision task handler caches workflow engines for same workflow execution.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine @@ -182,9 +188,17 @@ def mock_workflow_func(): @pytest.mark.asyncio async def test_handle_task_implementation_different_executions_get_separate_engines(self, handler, mock_registry): """Test that different workflow executions get separate engines.""" - # Mock workflow function - mock_workflow_func = Mock() - mock_registry.get_workflow.return_value = mock_workflow_func + # Create actual workflow definition + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" + + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition # Create two different decision tasks task1 = Mock(spec=PollForDecisionTaskResponse) @@ -333,10 +347,17 @@ async def test_respond_decision_task_completed_error(self, handler, sample_decis @pytest.mark.asyncio async def test_workflow_engine_creation_with_workflow_info(self, handler, sample_decision_task, mock_registry): """Test that WorkflowEngine is created with correct WorkflowInfo.""" - mock_workflow_func = Mock() - mock_workflow_definition = Mock() - mock_workflow_definition.fn = mock_workflow_func - mock_registry.get_workflow.return_value = mock_workflow_definition + # Create actual workflow definition + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" + + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) + mock_registry.get_workflow.return_value = workflow_definition mock_engine = Mock(spec=WorkflowEngine) mock_engine._is_workflow_complete = False # Add missing attribute @@ -363,4 +384,4 @@ async def test_workflow_engine_creation_with_workflow_info(self, handler, sample call_args = mock_workflow_engine_class.call_args assert call_args[1]['info'] is not None assert call_args[1]['client'] == handler._client - assert call_args[1]['workflow_func'] == mock_workflow_func + assert call_args[1]['workflow_definition'] == workflow_definition diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py index ef7e0c0..0327485 100644 --- a/tests/cadence/worker/test_decision_task_handler_integration.py +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -13,6 +13,7 @@ from cadence.api.v1.decision_pb2 import Decision from cadence.worker._decision_task_handler import DecisionTaskHandler from cadence.worker._registry import Registry +from cadence.workflow import workflow from cadence.client import Client @@ -37,9 +38,11 @@ def registry(self): reg = Registry() @reg.workflow(name="test_workflow") - def test_workflow(input_data): - """Simple test workflow that returns the input.""" - return f"processed: {input_data}" + class TestWorkflow: + @workflow.run + async def run(self, input_data): + """Simple test workflow that returns the input.""" + return f"processed: {input_data}" return reg diff --git a/tests/cadence/worker/test_decision_worker_integration.py b/tests/cadence/worker/test_decision_worker_integration.py index 85c55d2..712f312 100644 --- a/tests/cadence/worker/test_decision_worker_integration.py +++ b/tests/cadence/worker/test_decision_worker_integration.py @@ -11,6 +11,7 @@ from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes from cadence.worker._decision import DecisionWorker from cadence.worker._registry import Registry +from cadence.workflow import workflow from cadence.client import Client @@ -34,12 +35,14 @@ def mock_client(self): def registry(self): """Create a registry with a test workflow.""" reg = Registry() - + @reg.workflow - def test_workflow(input_data): - """Simple test workflow that returns the input.""" - return f"processed: {input_data}" - + class TestWorkflow: + @workflow.run + async def run(self, input_data): + """Simple test workflow that returns the input.""" + return f"processed: {input_data}" + return reg @pytest.fixture @@ -236,8 +239,10 @@ async def test_decision_worker_with_different_workflow_types(self, decision_work """Test decision worker with different workflow types.""" # Add another workflow to the registry @registry.workflow - def another_workflow(input_data): - return f"another-processed: {input_data}" + class AnotherWorkflow: + @workflow.run + async def run(self, input_data): + return f"another-processed: {input_data}" # Create decision tasks for different workflow types task1 = self.create_mock_decision_task(workflow_type="test_workflow") diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index 5c76f03..9d52c66 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -62,12 +62,15 @@ def sample_decision_task(self): async def test_full_task_handling_flow_success(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow from base handler through decision handler.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine @@ -90,12 +93,15 @@ def mock_workflow_func(): async def test_full_task_handling_flow_with_error(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow when an error occurs.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine to raise an error @@ -117,12 +123,15 @@ def mock_workflow_func(): async def test_context_activation_integration(self, handler, sample_decision_task, mock_registry): """Test that context activation works correctly in the integration.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine @@ -154,12 +163,15 @@ def track_context_activation(): async def test_multiple_workflow_executions(self, handler, mock_registry): """Test handling multiple workflow executions creates new engines for each.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Create multiple decision tasks for different workflows @@ -207,12 +219,15 @@ def mock_workflow_func(): async def test_workflow_engine_creation_integration(self, handler, sample_decision_task, mock_registry): """Test workflow engine creation integration.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine @@ -234,12 +249,15 @@ def mock_workflow_func(): async def test_error_handling_with_context_cleanup(self, handler, sample_decision_task, mock_registry): """Test that context cleanup happens even when errors occur.""" # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Mock workflow engine to raise an error @@ -274,12 +292,15 @@ async def test_concurrent_task_handling(self, handler, mock_registry): import asyncio # Create actual workflow definition - def mock_workflow_func(): - return "test_result" + from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + + class MockWorkflow: + @workflow.run + async def run(self): + return "test_result" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions workflow_opts = WorkflowDefinitionOptions(name="test_workflow") - workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts) + workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts) mock_registry.get_workflow.return_value = workflow_definition # Create multiple tasks From ca8fcd6adbd42f5d9897894f7a71bb10cf427d27 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 23 Oct 2025 12:17:05 -0700 Subject: [PATCH 5/5] respond to comments Signed-off-by: Tim Li --- cadence/__init__.py | 2 +- cadence/_internal/workflow/workflow_engine.py | 22 +----- cadence/worker/_registry.py | 18 ++--- cadence/workflow.py | 75 +++++++++++-------- .../test_workflow_engine_integration.py | 26 +++---- .../worker/test_decision_task_handler.py | 10 +-- .../test_decision_task_handler_integration.py | 2 +- .../test_decision_worker_integration.py | 2 +- tests/cadence/worker/test_registry.py | 3 +- .../worker/test_task_handler_integration.py | 16 +--- 10 files changed, 78 insertions(+), 98 deletions(-) diff --git a/cadence/__init__.py b/cadence/__init__.py index abf4bbd..c1c2a17 100644 --- a/cadence/__init__.py +++ b/cadence/__init__.py @@ -7,7 +7,7 @@ # Import main client functionality from .client import Client from .worker import Registry -from .workflow import workflow +from . import workflow __version__ = "0.1.0" diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 7eff5c2..67606b7 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -279,7 +279,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes workflow_input = await self._extract_workflow_input(decision_task) # Execute workflow function - result = self._execute_workflow_function_once(workflow_func, workflow_input) + result = await self._execute_workflow_function_once(workflow_func, workflow_input) # Check if workflow is complete if result is not None: @@ -341,7 +341,7 @@ async def _extract_workflow_input(self, decision_task: PollForDecisionTaskRespon logger.warning("No WorkflowExecutionStarted event found in history") return None - def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any: + async def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any: """ Execute the workflow function once (not during replay). @@ -355,23 +355,9 @@ def _execute_workflow_function_once(self, workflow_func: Callable, workflow_inpu logger.debug(f"Executing workflow function with input: {workflow_input}") result = workflow_func(workflow_input) - # If the workflow function is async, we need to handle it properly + # If the workflow function is async, await it properly if asyncio.iscoroutine(result): - # For now, use asyncio.run for async workflow functions - # TODO: Implement proper deterministic event loop for workflow execution - try: - result = asyncio.run(result) - except RuntimeError: - # If we're already in an event loop, create a new task - loop = asyncio.get_event_loop() - if loop.is_running(): - # We can't use asyncio.run inside a running loop - # For now, just get the result (this may not be deterministic) - logger.warning("Async workflow function called within running event loop - may not be deterministic") - # This is a workaround - in a real implementation, we'd need proper task scheduling - result = None - else: - result = loop.run_until_complete(result) + result = await result return result diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py index 816caad..f45c42a 100644 --- a/cadence/worker/_registry.py +++ b/cadence/worker/_registry.py @@ -7,12 +7,15 @@ """ import logging -from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload, Type, Union +from typing import Callable, Dict, Optional, Unpack, TypedDict, overload, Type, Union, TypeVar from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions logger = logging.getLogger(__name__) +# TypeVar for workflow class types +W = TypeVar('W') + class RegisterWorkflowOptions(TypedDict, total=False): """Options for registering a workflow.""" @@ -35,9 +38,9 @@ def __init__(self) -> None: def workflow( self, - cls: Optional[Type] = None, + cls: Optional[Type[W]] = None, **kwargs: Unpack[RegisterWorkflowOptions] - ) -> Union[Type, Callable[[Type], Type]]: + ) -> Union[Type[W], Callable[[Type[W]], Type[W]]]: """ Register a workflow class. @@ -57,7 +60,7 @@ def workflow( """ options = RegisterWorkflowOptions(**kwargs) - def decorator(target: Type) -> Type: + def decorator(target: Type[W]) -> Type[W]: workflow_name = options.get('name') or target.__name__ if workflow_name in self._workflows: @@ -194,7 +197,7 @@ def of(*args: 'Registry') -> 'Registry': return result -def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]: +def _find_activity_definitions(instance: object) -> list[ActivityDefinition]: attr_to_def = {} for t in instance.__class__.__mro__: for attr in dir(t): @@ -206,10 +209,7 @@ def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition] raise ValueError(f"'{attr}' was overridden with a duplicate activity definition") attr_to_def[attr] = value - # Create new definitions, copying the attributes from the declaring type but using the function - # from the specific object. This allows for the decorator to be applied to the base class and the - # function to be overridden - result = [] + result: list[ActivityDefinition] = [] for attr, definition in attr_to_def.items(): result.append(ActivityDefinition(getattr(instance, attr), definition.name, definition.strategy, definition.params)) diff --git a/cadence/workflow.py b/cadence/workflow.py index 22fd866..14cabec 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -2,12 +2,12 @@ from contextlib import contextmanager from contextvars import ContextVar from dataclasses import dataclass -from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any -from functools import wraps +from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any, Optional, Union +import inspect from cadence.client import Client -T = TypeVar('T') +T = TypeVar('T', bound=Callable[..., Any]) class WorkflowDefinitionOptions(TypedDict, total=False): @@ -23,9 +23,10 @@ class WorkflowDefinition: Provides type safety and metadata for workflow classes. """ - def __init__(self, cls: Type, name: str): + def __init__(self, cls: Type, name: str, run_method_name: str): self._cls = cls self._name = name + self._run_method_name = run_method_name @property def name(self) -> str: @@ -39,13 +40,7 @@ def cls(self) -> Type: def get_run_method(self, instance: Any) -> Callable: """Get the workflow run method from an instance of the workflow class.""" - for attr_name in dir(instance): - if attr_name.startswith('_'): - continue - attr = getattr(instance, attr_name) - if callable(attr) and hasattr(attr, '_workflow_run'): - return cast(Callable, attr) - raise ValueError(f"No @workflow.run method found in class {self._cls.__name__}") + return cast(Callable, getattr(instance, self._run_method_name)) @staticmethod def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition': @@ -66,8 +61,8 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition': if "name" in opts and opts["name"]: name = opts["name"] - # Validate that the class has exactly one run method - run_method_count = 0 + # Validate that the class has exactly one run method and find it + run_method_name = None for attr_name in dir(cls): if attr_name.startswith('_'): continue @@ -78,40 +73,54 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition': # Check for workflow run method if hasattr(attr, '_workflow_run'): - run_method_count += 1 + if run_method_name is not None: + raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}") + run_method_name = attr_name - if run_method_count == 0: + if run_method_name is None: raise ValueError(f"No @workflow.run method found in class {cls.__name__}") - elif run_method_count > 1: - raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}") - return WorkflowDefinition(cls, name) + return WorkflowDefinition(cls, name, run_method_name) -def run(func: Callable[..., T]) -> Callable[..., T]: +def run(func: Optional[T] = None) -> Union[T, Callable[[T], T]]: """ Decorator to mark a method as the main workflow run method. + Can be used with or without parentheses: + @workflow.run + async def my_workflow(self): + ... + + @workflow.run() + async def my_workflow(self): + ... + Args: func: The method to mark as the workflow run method Returns: The decorated method with workflow run metadata + + Raises: + ValueError: If the function is not async """ - @wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - # Attach metadata to the function - wrapper._workflow_run = True # type: ignore - return wrapper - - -# Create a simple namespace object for the workflow decorators -class _WorkflowNamespace: - run = staticmethod(run) - -workflow = _WorkflowNamespace() + def decorator(f: T) -> T: + # Validate that the function is async + if not inspect.iscoroutinefunction(f): + raise ValueError(f"Workflow run method '{f.__name__}' must be async") + + # Attach metadata to the function + f._workflow_run = True # type: ignore + return f + + # Support both @workflow.run and @workflow.run() + if func is None: + # Called with parentheses: @workflow.run() + return decorator + else: + # Called without parentheses: @workflow.run + return decorator(func) @dataclass diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index ecf7c13..3805f56 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -9,7 +9,8 @@ from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult -from cadence.workflow import WorkflowInfo +from cadence import workflow +from cadence.workflow import WorkflowInfo, WorkflowDefinition, WorkflowDefinitionOptions from cadence.client import Client @@ -38,11 +39,9 @@ def workflow_info(self): @pytest.fixture def mock_workflow_definition(self): """Create a mock workflow definition.""" - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class TestWorkflow: @workflow.run - def weird_name(self, input_data): + async def weird_name(self, input_data): return f"processed: {input_data}" workflow_opts = WorkflowDefinitionOptions(name="test_workflow") @@ -214,7 +213,8 @@ async def test_extract_workflow_input_deserialization_error(self, workflow_engin # Verify no input was extracted due to error assert input_data is None - def test_execute_workflow_function_sync(self, workflow_engine): + @pytest.mark.asyncio + async def test_execute_workflow_function_sync(self, workflow_engine): """Test synchronous workflow function execution.""" input_data = "test-input" @@ -222,12 +222,13 @@ def test_execute_workflow_function_sync(self, workflow_engine): workflow_func = workflow_engine._workflow_definition.get_run_method(workflow_engine._workflow_instance) # Execute the workflow function - result = workflow_engine._execute_workflow_function_once(workflow_func, input_data) + result = await workflow_engine._execute_workflow_function_once(workflow_func, input_data) # Verify the result assert result == "processed: test-input" - def test_execute_workflow_function_async(self, workflow_engine): + @pytest.mark.asyncio + async def test_execute_workflow_function_async(self, workflow_engine): """Test asynchronous workflow function execution.""" async def async_workflow_func(input_data): return f"async-processed: {input_data}" @@ -235,18 +236,19 @@ async def async_workflow_func(input_data): input_data = "test-input" # Execute the async workflow function - result = workflow_engine._execute_workflow_function_once(async_workflow_func, input_data) + result = await workflow_engine._execute_workflow_function_once(async_workflow_func, input_data) # Verify the result assert result == "async-processed: test-input" - def test_execute_workflow_function_none(self, workflow_engine): + @pytest.mark.asyncio + async def test_execute_workflow_function_none(self, workflow_engine): """Test workflow function execution with None function.""" input_data = "test-input" # Execute with None workflow function - should raise TypeError with pytest.raises(TypeError, match="'NoneType' object is not callable"): - workflow_engine._execute_workflow_function_once(None, input_data) + await workflow_engine._execute_workflow_function_once(None, input_data) def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_definition): """Test WorkflowEngine initialization.""" @@ -281,11 +283,9 @@ async def test_workflow_engine_workflow_completion(self, workflow_engine, mock_c decision_task = self.create_mock_decision_task() # Create a workflow definition that returns a result (indicating completion) - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class CompletingWorkflow: @workflow.run - def run(self, input_data): + async def run(self, input_data): return "workflow-completed" workflow_opts = WorkflowDefinitionOptions(name="completing_workflow") diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 55b1e1f..da2e79a 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -17,6 +17,8 @@ from cadence.worker._decision_task_handler import DecisionTaskHandler from cadence.worker._registry import Registry from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult +from cadence import workflow +from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions class TestDecisionTaskHandler: @@ -83,8 +85,6 @@ def test_initialization(self, mock_client, mock_registry): async def test_handle_task_implementation_success(self, handler, sample_decision_task, mock_registry): """Test successful decision task handling.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -151,8 +151,6 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp async def test_handle_task_implementation_caches_engines(self, handler, sample_decision_task, mock_registry): """Test that decision task handler caches workflow engines for same workflow execution.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -189,8 +187,6 @@ async def run(self): async def test_handle_task_implementation_different_executions_get_separate_engines(self, handler, mock_registry): """Test that different workflow executions get separate engines.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -348,8 +344,6 @@ async def test_respond_decision_task_completed_error(self, handler, sample_decis async def test_workflow_engine_creation_with_workflow_info(self, handler, sample_decision_task, mock_registry): """Test that WorkflowEngine is created with correct WorkflowInfo.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py index 0327485..fc65f0e 100644 --- a/tests/cadence/worker/test_decision_task_handler_integration.py +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -13,7 +13,7 @@ from cadence.api.v1.decision_pb2 import Decision from cadence.worker._decision_task_handler import DecisionTaskHandler from cadence.worker._registry import Registry -from cadence.workflow import workflow +from cadence import workflow from cadence.client import Client diff --git a/tests/cadence/worker/test_decision_worker_integration.py b/tests/cadence/worker/test_decision_worker_integration.py index 712f312..18e970e 100644 --- a/tests/cadence/worker/test_decision_worker_integration.py +++ b/tests/cadence/worker/test_decision_worker_integration.py @@ -11,7 +11,7 @@ from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes from cadence.worker._decision import DecisionWorker from cadence.worker._registry import Registry -from cadence.workflow import workflow +from cadence import workflow from cadence.client import Client diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 53c16f0..bf6721e 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -6,8 +6,9 @@ import pytest from cadence import activity +from cadence import workflow from cadence.worker import Registry -from cadence.workflow import workflow, WorkflowDefinition +from cadence.workflow import WorkflowDefinition from tests.cadence import common_activities diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index 9d52c66..daa36bb 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -12,6 +12,8 @@ from cadence.worker._decision_task_handler import DecisionTaskHandler from cadence.worker._registry import Registry from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult +from cadence import workflow +from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions class TestTaskHandlerIntegration: @@ -62,7 +64,7 @@ def sample_decision_task(self): async def test_full_task_handling_flow_success(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow from base handler through decision handler.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow + class MockWorkflow: @workflow.run @@ -93,8 +95,6 @@ async def run(self): async def test_full_task_handling_flow_with_error(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow when an error occurs.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -123,8 +123,6 @@ async def run(self): async def test_context_activation_integration(self, handler, sample_decision_task, mock_registry): """Test that context activation works correctly in the integration.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -163,8 +161,6 @@ def track_context_activation(): async def test_multiple_workflow_executions(self, handler, mock_registry): """Test handling multiple workflow executions creates new engines for each.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -219,8 +215,6 @@ async def run(self): async def test_workflow_engine_creation_integration(self, handler, sample_decision_task, mock_registry): """Test workflow engine creation integration.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -249,8 +243,6 @@ async def run(self): async def test_error_handling_with_context_cleanup(self, handler, sample_decision_task, mock_registry): """Test that context cleanup happens even when errors occur.""" # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self): @@ -292,8 +284,6 @@ async def test_concurrent_task_handling(self, handler, mock_registry): import asyncio # Create actual workflow definition - from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow - class MockWorkflow: @workflow.run async def run(self):