Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions cadence/_internal/workflow/workflow_engine.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import logging
from dataclasses import dataclass
import traceback
from typing import List

from cadence._internal.workflow.context import Context
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
from cadence._internal.workflow.workflow_intance import WorkflowInstance
from cadence.api.v1.common_pb2 import Payload
from cadence.api.v1.common_pb2 import Failure, Payload
from cadence.api.v1.decision_pb2 import (
CompleteWorkflowExecutionDecisionAttributes,
Decision,
FailWorkflowExecutionDecisionAttributes,
)
from cadence.api.v1.history_pb2 import (
HistoryEvent,
WorkflowExecutionStartedEventAttributes,
)
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
from cadence.error import WorkflowFailure
from cadence.workflow import WorkflowDefinition, WorkflowInfo

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,23 +78,29 @@ def process_decision(
decisions = self._decision_manager.collect_pending_decisions()

# complete workflow if it is done
try:
if self._workflow_instance.is_done():
if self._workflow_instance.is_done():
try:
result = self._workflow_instance.get_result()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe use the exception() method instead of try/catch to simplify the flow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about this. The exception() actually would raise CancelledError rather than just return it. So there isn't any simplification for this. I still want to keep the WorkflowInstance's API easier: one way to get the result or throw an exception.

except WorkflowFailure as e:
decisions.append(
Decision(
fail_workflow_execution_decision_attributes=FailWorkflowExecutionDecisionAttributes(
failure=_failure_from_workflow_failure(e)
)
)
)
# TODO: handle cancellation error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: handle cancellation error
# TODO: handle cancellation error
# TODO: handle unknown error, fail decision task and try again instead of breaking the engine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually for other exceptions, they should be handled all together in the outter most layer WorkflowWorker.

except Exception:
raise
else:
decisions.append(
Decision(
complete_workflow_execution_decision_attributes=CompleteWorkflowExecutionDecisionAttributes(
result=result
)
)
)
return DecisionResult(decisions=decisions)

except Exception:
# TODO: handle CancellationError
# TODO: handle WorkflowError
# TODO: handle unknown error, fail decision task and try again instead of breaking the engine
raise
return DecisionResult(decisions=decisions)

except Exception as e:
# Log decision task failure with full context (matches Java ReplayDecisionTaskHandler)
Expand Down Expand Up @@ -221,3 +230,16 @@ def _extract_workflow_input(
return started_attrs.input

raise ValueError("No WorkflowExecutionStarted event found in history")


def _failure_from_workflow_failure(e: WorkflowFailure) -> Failure:
cause = e.__cause__

stacktrace = "".join(traceback.format_exception(cause))

details = f"message: {str(cause)}\nstacktrace: {stacktrace}"

return Failure(
reason=type(cause).__name__,
details=details.encode("utf-8"),
)
10 changes: 8 additions & 2 deletions cadence/_internal/workflow/workflow_intance.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from asyncio import Task
from asyncio import CancelledError, InvalidStateError, Task
from typing import Any, Optional
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
from cadence.api.v1.common_pb2 import Payload
from cadence.data_converter import DataConverter
from cadence.error import WorkflowFailure
from cadence.workflow import WorkflowDefinition


Expand Down Expand Up @@ -33,6 +34,11 @@ def is_done(self) -> bool:
def get_result(self) -> Payload:
if self._task is None:
raise RuntimeError("Workflow is not started yet")
result = self._task.result()
try:
result = self._task.result()
except (CancelledError, InvalidStateError) as e:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to wrap cancelled error? There are different scenarios to iterate on this.

raise e
except Exception as e:
raise WorkflowFailure(f"Workflow failed: {e}") from e
# TODO: handle result with multiple outputs
return self._data_converter.to_data([result])
5 changes: 5 additions & 0 deletions cadence/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ def __init__(self, message: str) -> None:
super().__init__(message)


class WorkflowFailure(Exception):
def __init__(self, message: str) -> None:
super().__init__(message)


class CadenceRpcError(Exception):
def __init__(self, message: str, code: grpc.StatusCode, *args):
super().__init__(message, code, *args)
Expand Down
14 changes: 13 additions & 1 deletion tests/integration_tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ services:
services-network:
aliases:
- cadence
cadence-web:
image: ubercadence/web:latest
environment:
- "CADENCE_GRPC_PEERS=cadence:7833"
ports:
- "8088:8088"
depends_on:
- cadence
networks:
services-network:
aliases:
- cadence-web
networks:
services-network:
name: services-network
driver: bridge
driver: bridge
45 changes: 45 additions & 0 deletions tests/integration_tests/workflow/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ async def echo(self, message: str) -> str:
return message


class MockedFailure(Exception):
pass


@reg.workflow()
class FailureWorkflow:
@workflow.run
async def failure(self, message: str) -> str:
raise MockedFailure("mocked workflow failure")


async def test_simple_workflow(helper: CadenceHelper):
async with helper.worker(reg) as worker:
execution = await worker.client.start_workflow(
Expand Down Expand Up @@ -50,6 +61,40 @@ async def test_simple_workflow(helper: CadenceHelper):
)


async def test_workflow_failure(helper: CadenceHelper):
async with helper.worker(reg) as worker:
execution = await worker.client.start_workflow(
"FailureWorkflow",
"hello world",
task_list=worker.task_list,
execution_start_to_close_timeout=timedelta(seconds=10),
)

response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest(
domain=DOMAIN_NAME,
workflow_execution=execution,
wait_for_new_event=True,
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
skip_archival=True,
)
)

assert (
"MockedFailure"
== response.history.events[
-1
].workflow_execution_failed_event_attributes.failure.reason
)

assert (
"""raise MockedFailure("mocked workflow failure")"""
in response.history.events[
-1
].workflow_execution_failed_event_attributes.failure.details.decode()
)


@pytest.mark.skip(reason="Incorrect WorkflowType")
async def test_workflow_fn(helper: CadenceHelper):
async with helper.worker(reg) as worker:
Expand Down