Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cadence/_internal/workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ def __init__(self, info: WorkflowInfo, workflow_definition=None):
self._context = Context(info, self._decisions_helper, self._decision_manager)
self._is_workflow_complete = False

async def process_decision(
def process_decision(
self, decision_task: PollForDecisionTaskResponse
) -> DecisionResult:
return asyncio.run(self._process_decision(decision_task))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be replaced with running inside the DeterministicEventLoop


async def _process_decision(
self, decision_task: PollForDecisionTaskResponse
) -> DecisionResult:
"""
Expand Down
6 changes: 5 additions & 1 deletion cadence/worker/_decision.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from cadence.api.v1.service_worker_pb2 import (
Expand All @@ -24,8 +25,11 @@ def __init__(
permits = asyncio.Semaphore(
options["max_concurrent_decision_task_execution_size"]
)
executor = ThreadPoolExecutor(
max_workers=options["max_concurrent_decision_task_execution_size"]
)
self._decision_handler = DecisionTaskHandler(
client, task_list, registry, **options
client, task_list, registry, executor=executor, **options
)
self._poller = Poller[PollForDecisionTaskResponse](
options["decision_task_pollers"], permits, self._poll, self._execute
Expand Down
10 changes: 8 additions & 2 deletions cadence/worker/_decision_task_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
import threading
from typing import Dict, Tuple
from typing import Dict, Optional, Tuple

from cadence._internal.workflow.history_event_iterator import iterate_history_events
from cadence.api.v1.common_pb2 import Payload
Expand Down Expand Up @@ -33,6 +35,7 @@ def __init__(
task_list: str,
registry: Registry,
identity: str = "unknown",
executor: Optional[ThreadPoolExecutor] = None,
**options,
):
"""
Expand All @@ -50,6 +53,7 @@ def __init__(
# Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id)
self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {}
self._cache_lock = threading.RLock()
self._executor = executor

async def _handle_task_implementation(
self, task: PollForDecisionTaskResponse
Expand Down Expand Up @@ -131,7 +135,9 @@ async def _handle_task_implementation(
)
self._workflow_engines[cache_key] = workflow_engine

decision_result = await workflow_engine.process_decision(task)
decision_result = await asyncio.get_running_loop().run_in_executor(
self._executor, workflow_engine.process_decision, task
)

# Clean up completed workflows from cache to prevent memory leaks
if workflow_engine._is_workflow_complete:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ def create_mock_decision_task(

return decision_task

@pytest.mark.asyncio
async def test_process_decision_success(
def test_process_decision_success(
self, workflow_engine, mock_client, decision_task
):
"""Test successful decision processing."""
Expand All @@ -119,14 +118,13 @@ async def test_process_decision_success(
return_value=[Mock()],
):
# Process the decision
result = await workflow_engine.process_decision(decision_task)
result = workflow_engine.process_decision(decision_task)

# Verify the result
assert isinstance(result, DecisionResult)
assert len(result.decisions) == 1

@pytest.mark.asyncio
async def test_process_decision_with_history(
def test_process_decision_with_history(
self, workflow_engine, mock_client, decision_task
):
"""Test decision processing with history events."""
Expand All @@ -141,13 +139,12 @@ async def test_process_decision_with_history(
return_value=[],
):
# Process the decision
await workflow_engine.process_decision(decision_task)
workflow_engine.process_decision(decision_task)

# Verify history events were processed
mock_handle.assert_called()

@pytest.mark.asyncio
async def test_process_decision_workflow_complete(
def test_process_decision_workflow_complete(
self, workflow_engine, mock_client, decision_task
):
"""Test decision processing when workflow is already complete."""
Expand All @@ -160,14 +157,13 @@ async def test_process_decision_workflow_complete(
return_value=[],
):
# Process the decision
result = await workflow_engine.process_decision(decision_task)
result = workflow_engine.process_decision(decision_task)

# Verify the result
assert isinstance(result, DecisionResult)
assert len(result.decisions) == 0

@pytest.mark.asyncio
async def test_process_decision_error_handling(
def test_process_decision_error_handling(
self, workflow_engine, mock_client, decision_task
):
"""Test decision processing error handling."""
Expand All @@ -179,7 +175,7 @@ async def test_process_decision_error_handling(
side_effect=Exception("Test error"),
):
# Process the decision
result = await workflow_engine.process_decision(decision_task)
result = workflow_engine.process_decision(decision_task)

# Verify error handling - should return empty decisions
assert isinstance(result, DecisionResult)
Expand Down Expand Up @@ -314,8 +310,7 @@ def test_workflow_engine_initialization(
assert workflow_engine._decision_manager is not None
assert workflow_engine._is_workflow_complete is False

@pytest.mark.asyncio
async def test_workflow_engine_without_workflow_definition(
def test_workflow_engine_without_workflow_definition(
self, mock_client: Client, workflow_info, decision_task
):
"""Test WorkflowEngine without workflow definition."""
Expand All @@ -328,14 +323,13 @@ async def test_workflow_engine_without_workflow_definition(
engine._decision_manager, "collect_pending_decisions", return_value=[]
):
# Process the decision
result = await engine.process_decision(decision_task)
result = engine.process_decision(decision_task)

# Verify the result
assert isinstance(result, DecisionResult)
assert len(result.decisions) == 0

@pytest.mark.asyncio
async def test_workflow_engine_workflow_completion(
def test_workflow_engine_workflow_completion(
self, workflow_engine, mock_client, decision_task
):
"""Test workflow completion detection."""
Expand All @@ -361,7 +355,7 @@ async def run(self, input_data):
return_value=[],
):
# Process the decision
await workflow_engine.process_decision(decision_task)
workflow_engine.process_decision(decision_task)

# Verify workflow is marked as complete
assert workflow_engine._is_workflow_complete is True
Expand All @@ -371,8 +365,7 @@ def test_close_event_loop(self, workflow_engine):
# This should not raise an exception
workflow_engine._close_event_loop()

@pytest.mark.asyncio
async def test_process_decision_with_query_results(
def test_process_decision_with_query_results(
self, workflow_engine, mock_client, decision_task
):
"""Test decision processing with query results."""
Expand All @@ -386,7 +379,7 @@ async def test_process_decision_with_query_results(
return_value=mock_decisions,
):
# Process the decision
result = await workflow_engine.process_decision(decision_task)
result = workflow_engine.process_decision(decision_task)

# Verify the result
assert isinstance(result, DecisionResult)
Expand Down
8 changes: 4 additions & 4 deletions tests/cadence/worker/test_decision_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = [Decision()]
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down Expand Up @@ -178,7 +178,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down Expand Up @@ -245,7 +245,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down Expand Up @@ -428,7 +428,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down
10 changes: 4 additions & 6 deletions tests/cadence/worker/test_decision_task_handler_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def create_mock_decision_task(

@pytest.mark.asyncio
async def test_handle_decision_task_success(
self, decision_task_handler, mock_client
self, decision_task_handler: DecisionTaskHandler, mock_client
):
"""Test successful decision task handling."""
# Create a mock decision task
Expand All @@ -110,7 +110,7 @@ async def test_handle_decision_task_success(
mock_engine = Mock()
# Create a proper Decision object
decision = Decision()
mock_engine.process_decision = AsyncMock(
mock_engine.process_decision = Mock(
return_value=Mock(
decisions=[decision], # Proper Decision object
)
Expand Down Expand Up @@ -182,7 +182,7 @@ async def test_workflow_engine_creation_each_task(
"cadence.worker._decision_task_handler.WorkflowEngine"
) as mock_engine_class:
mock_engine = Mock()
mock_engine.process_decision = AsyncMock(
mock_engine.process_decision = Mock(
return_value=Mock(
decisions=[],
)
Expand Down Expand Up @@ -211,9 +211,7 @@ async def test_decision_task_failure_handling(
"cadence.worker._decision_task_handler.WorkflowEngine"
) as mock_engine_class:
mock_engine = Mock()
mock_engine.process_decision = AsyncMock(
side_effect=Exception("Test error")
)
mock_engine.process_decision = Mock(side_effect=Exception("Test error"))
mock_engine_class.return_value = mock_engine

# Handle the decision task - this should catch the exception
Expand Down
14 changes: 7 additions & 7 deletions tests/cadence/worker/test_task_handler_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down Expand Up @@ -117,7 +117,7 @@ async def run(self):
# Mock workflow engine to raise an error
mock_engine = Mock(spec=WorkflowEngine)
mock_engine._is_workflow_complete = False # Add missing attribute
mock_engine.process_decision = AsyncMock(
mock_engine.process_decision = Mock(
side_effect=RuntimeError("Workflow processing failed")
)

Expand Down Expand Up @@ -157,7 +157,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

# Track if context is activated
context_activated = False
Expand Down Expand Up @@ -229,7 +229,7 @@ async def run(self):
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []

mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down Expand Up @@ -266,7 +266,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down Expand Up @@ -298,7 +298,7 @@ async def run(self):
# Mock workflow engine to raise an error
mock_engine = Mock(spec=WorkflowEngine)
mock_engine._is_workflow_complete = False # Add missing attribute
mock_engine.process_decision = AsyncMock(
mock_engine.process_decision = Mock(
side_effect=RuntimeError("Workflow processing failed")
)

Expand Down Expand Up @@ -366,7 +366,7 @@ async def run(self):
mock_engine._is_workflow_complete = False # Add missing attribute
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
mock_engine.process_decision = Mock(return_value=mock_decision_result)

with patch(
"cadence.worker._decision_task_handler.WorkflowEngine",
Expand Down