diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index ef9f0f0..09f5ab5 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -1,21 +1,24 @@ import logging from dataclasses import dataclass +import traceback from typing import List from cadence._internal.workflow.context import Context from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator from cadence._internal.workflow.statemachine.decision_manager import DecisionManager from cadence._internal.workflow.workflow_intance import WorkflowInstance -from cadence.api.v1.common_pb2 import Payload +from cadence.api.v1.common_pb2 import Failure, Payload from cadence.api.v1.decision_pb2 import ( CompleteWorkflowExecutionDecisionAttributes, Decision, + FailWorkflowExecutionDecisionAttributes, ) from cadence.api.v1.history_pb2 import ( HistoryEvent, WorkflowExecutionStartedEventAttributes, ) from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse +from cadence.error import WorkflowFailure from cadence.workflow import WorkflowDefinition, WorkflowInfo logger = logging.getLogger(__name__) @@ -75,9 +78,21 @@ def process_decision( decisions = self._decision_manager.collect_pending_decisions() # complete workflow if it is done - try: - if self._workflow_instance.is_done(): + if self._workflow_instance.is_done(): + try: result = self._workflow_instance.get_result() + except WorkflowFailure as e: + decisions.append( + Decision( + fail_workflow_execution_decision_attributes=FailWorkflowExecutionDecisionAttributes( + failure=_failure_from_workflow_failure(e) + ) + ) + ) + # TODO: handle cancellation error + except Exception: + raise + else: decisions.append( Decision( complete_workflow_execution_decision_attributes=CompleteWorkflowExecutionDecisionAttributes( @@ -85,13 +100,7 @@ def process_decision( ) ) ) - return DecisionResult(decisions=decisions) - - except Exception: - # TODO: handle CancellationError - # TODO: handle WorkflowError - # TODO: handle unknown error, fail decision task and try again instead of breaking the engine - raise + return DecisionResult(decisions=decisions) except Exception as e: # Log decision task failure with full context (matches Java ReplayDecisionTaskHandler) @@ -221,3 +230,16 @@ def _extract_workflow_input( return started_attrs.input raise ValueError("No WorkflowExecutionStarted event found in history") + + +def _failure_from_workflow_failure(e: WorkflowFailure) -> Failure: + cause = e.__cause__ + + stacktrace = "".join(traceback.format_exception(cause)) + + details = f"message: {str(cause)}\nstacktrace: {stacktrace}" + + return Failure( + reason=type(cause).__name__, + details=details.encode("utf-8"), + ) diff --git a/cadence/_internal/workflow/workflow_intance.py b/cadence/_internal/workflow/workflow_intance.py index ae94780..cb7e05a 100644 --- a/cadence/_internal/workflow/workflow_intance.py +++ b/cadence/_internal/workflow/workflow_intance.py @@ -1,8 +1,9 @@ -from asyncio import Task +from asyncio import CancelledError, InvalidStateError, Task from typing import Any, Optional from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop from cadence.api.v1.common_pb2 import Payload from cadence.data_converter import DataConverter +from cadence.error import WorkflowFailure from cadence.workflow import WorkflowDefinition @@ -33,6 +34,11 @@ def is_done(self) -> bool: def get_result(self) -> Payload: if self._task is None: raise RuntimeError("Workflow is not started yet") - result = self._task.result() + try: + result = self._task.result() + except (CancelledError, InvalidStateError) as e: + raise e + except Exception as e: + raise WorkflowFailure(f"Workflow failed: {e}") from e # TODO: handle result with multiple outputs return self._data_converter.to_data([result]) diff --git a/cadence/error.py b/cadence/error.py index 3437f89..9e713ee 100644 --- a/cadence/error.py +++ b/cadence/error.py @@ -6,6 +6,11 @@ def __init__(self, message: str) -> None: super().__init__(message) +class WorkflowFailure(Exception): + def __init__(self, message: str) -> None: + super().__init__(message) + + class CadenceRpcError(Exception): def __init__(self, message: str, code: grpc.StatusCode, *args): super().__init__(message, code, *args) diff --git a/tests/integration_tests/docker-compose.yml b/tests/integration_tests/docker-compose.yml index 1130578..a06251a 100644 --- a/tests/integration_tests/docker-compose.yml +++ b/tests/integration_tests/docker-compose.yml @@ -41,7 +41,19 @@ services: services-network: aliases: - cadence + cadence-web: + image: ubercadence/web:latest + environment: + - "CADENCE_GRPC_PEERS=cadence:7833" + ports: + - "8088:8088" + depends_on: + - cadence + networks: + services-network: + aliases: + - cadence-web networks: services-network: name: services-network - driver: bridge \ No newline at end of file + driver: bridge diff --git a/tests/integration_tests/workflow/test_workflows.py b/tests/integration_tests/workflow/test_workflows.py index 84ccf91..4a01af1 100644 --- a/tests/integration_tests/workflow/test_workflows.py +++ b/tests/integration_tests/workflow/test_workflows.py @@ -23,6 +23,17 @@ async def echo(self, message: str) -> str: return message +class MockedFailure(Exception): + pass + + +@reg.workflow() +class FailureWorkflow: + @workflow.run + async def failure(self, message: str) -> str: + raise MockedFailure("mocked workflow failure") + + async def test_simple_workflow(helper: CadenceHelper): async with helper.worker(reg) as worker: execution = await worker.client.start_workflow( @@ -50,6 +61,40 @@ async def test_simple_workflow(helper: CadenceHelper): ) +async def test_workflow_failure(helper: CadenceHelper): + async with helper.worker(reg) as worker: + execution = await worker.client.start_workflow( + "FailureWorkflow", + "hello world", + task_list=worker.task_list, + execution_start_to_close_timeout=timedelta(seconds=10), + ) + + response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + wait_for_new_event=True, + history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT, + skip_archival=True, + ) + ) + + assert ( + "MockedFailure" + == response.history.events[ + -1 + ].workflow_execution_failed_event_attributes.failure.reason + ) + + assert ( + """raise MockedFailure("mocked workflow failure")""" + in response.history.events[ + -1 + ].workflow_execution_failed_event_attributes.failure.details.decode() + ) + + @pytest.mark.skip(reason="Incorrect WorkflowType") async def test_workflow_fn(helper: CadenceHelper): async with helper.worker(reg) as worker: