diff --git a/examples/src/wait_for_callback/wait_for_callback.py b/examples/src/wait_for_callback/wait_for_callback.py index 0f72c19..4cfdd77 100644 --- a/examples/src/wait_for_callback/wait_for_callback.py +++ b/examples/src/wait_for_callback/wait_for_callback.py @@ -3,6 +3,7 @@ from aws_durable_execution_sdk_python.config import WaitForCallbackConfig from aws_durable_execution_sdk_python.context import DurableContext from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration def external_system_call(_callback_id: str) -> None: @@ -13,7 +14,9 @@ def external_system_call(_callback_id: str) -> None: @durable_execution def handler(_event: Any, context: DurableContext) -> str: - config = WaitForCallbackConfig(timeout_seconds=120, heartbeat_timeout_seconds=60) + config = WaitForCallbackConfig( + timeout=Duration.from_seconds(120), heartbeat_timeout=Duration.from_seconds(60) + ) result = context.wait_for_callback( external_system_call, name="external_call", config=config diff --git a/examples/test/conftest.py b/examples/test/conftest.py index 1f329d4..5339c0c 100644 --- a/examples/test/conftest.py +++ b/examples/test/conftest.py @@ -105,6 +105,36 @@ def run( """Execute the durable function and return results.""" return self._runner.run(input=input, timeout=timeout) + def run_async( + self, + input: str | None = None, # noqa: A002 + timeout: int = 60, + ) -> str: + return self._runner.run_async(input=input, timeout=timeout) + + def send_callback_success(self, callback_id: str) -> None: + self._runner.send_callback_success(callback_id=callback_id) + + def send_callback_failure(self, callback_id: str) -> None: + self._runner.send_callback_failure(callback_id=callback_id) + + def send_callback_heartbeat(self, callback_id: str) -> None: + self._runner.send_callback_heartbeat(callback_id=callback_id) + + def wait_for_result( + self, execution_arn: str, timeout: int = 60 + ) -> DurableFunctionTestResult: + return self._runner.wait_for_result( + execution_arn=execution_arn, timeout=timeout + ) + + def wait_for_callback( + self, execution_arn: str, name: str | None = None, timeout: int = 60 + ) -> str: + return self._runner.wait_for_callback( + execution_arn=execution_arn, name=name, timeout=timeout + ) + @property def mode(self) -> str: """Get the runner mode (local or cloud).""" diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index cb88999..866f50d 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -18,6 +18,7 @@ import aws_durable_execution_sdk_python import boto3 # type: ignore +from botocore.exceptions import ClientError # type: ignore from aws_durable_execution_sdk_python.execution import ( InvocationStatus, durable_execution, @@ -75,6 +76,8 @@ from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.web.server import WebServiceConfig + from aws_durable_execution_sdk_python_testing.model import Event + logger = logging.getLogger(__name__) @@ -792,9 +795,9 @@ def run( msg = f"Failed to invoke Lambda function {self.function_name}: {e}" raise DurableFunctionsTestError(msg) from e - # Check HTTP status code (200 for RequestResponse, 202 for Event, 204 for DryRun) + # Check HTTP status code, 200 for RequestResponse status_code = response.get("StatusCode") - if status_code not in (200, 202, 204): + if status_code != 200: error_payload = response["Payload"].read().decode("utf-8") msg = f"Lambda invocation failed with status {status_code}: {error_payload}" raise DurableFunctionsTestError(msg) @@ -819,17 +822,126 @@ def run( ) raise DurableFunctionsTestError(msg) - # Poll for completion - execution_response = self._wait_for_completion(execution_arn, timeout) + return self.wait_for_result(execution_arn=execution_arn, timeout=timeout) - # Get execution history - history_response = self._get_execution_history(execution_arn) + def run_async( + self, + input: str | None = None, # noqa: A002 + timeout: int = 60, + ) -> str: + """Execute function on AWS Lambda asynchronously""" + logger.info( + "Invoking Lambda function: %s (timeout: %ds)", self.function_name, timeout + ) + payload = json.dumps(input) + try: + response = self.lambda_client.invoke( + FunctionName=self.function_name, + InvocationType="Event", + Payload=payload, + ) + except Exception as e: + msg = f"Failed to invoke Lambda function {self.function_name}: {e}" + raise DurableFunctionsTestError(msg) from e - # Build test result from execution history - return DurableFunctionTestResult.from_execution_history( - execution_response, history_response + # Check HTTP status code, 202 for Event + status_code = response.get("StatusCode") + if status_code != 202: + error_payload = response["Payload"].read().decode("utf-8") + msg = f"Lambda invocation failed with status {status_code}: {error_payload}" + raise DurableFunctionsTestError(msg) + + return response.get("DurableExecutionArn") + + def _get_callback_id_from_events( + self, events: list[Event], name: str | None = None + ) -> str | None: + """ + Get callback ID from execution history for callbacks that haven't completed. + + Args: + execution_arn: The ARN of the execution to query. + name: Optional callback name to search for. If not provided, returns the latest callback. + + Returns: + The callback ID string for a non-completed callback, or None if not found. + + Raises: + DurableFunctionsTestError: If the named callback has already succeeded/failed/timed out. + """ + callback_started_events = [ + event for event in events if event.event_type == "CallbackStarted" + ] + + if not callback_started_events: + return None + + completed_callback_ids = { + event.event_id + for event in events + if event.event_type + in ["CallbackSucceeded", "CallbackFailed", "CallbackTimedOut"] + } + + if name is not None: + for event in callback_started_events: + if event.name == name: + callback_id = event.event_id + if callback_id in completed_callback_ids: + raise DurableFunctionsTestError( + f"Callback {name} has already completed (succeeded/failed/timed out)" + ) + return ( + event.callback_started_details.callback_id + if event.callback_started_details + else None + ) + return None + + # If name is not provided, find the latest non-completed callback event + active_callbacks = [ + event + for event in callback_started_events + if event.event_id not in completed_callback_ids + ] + + if not active_callbacks: + return None + + latest_event = active_callbacks[-1] + return ( + latest_event.callback_started_details.callback_id + if latest_event.callback_started_details + else None ) + def send_callback_success(self, callback_id: str) -> None: + try: + self.lambda_client.send_durable_execution_callback_success( + CallbackId=callback_id + ) + except Exception as e: + msg = f"Failed to send callback success for {self.function_name}, callback_id {callback_id}: {e}" + raise DurableFunctionsTestError(msg) from e + + def send_callback_failure(self, callback_id: str) -> None: + try: + self.lambda_client.send_durable_execution_callback_failure( + CallbackId=callback_id + ) + except Exception as e: + msg = f"Failed to send callback failure for {self.function_name}, callback_id {callback_id}: {e}" + raise DurableFunctionsTestError(msg) from e + + def send_callback_heartbeat(self, callback_id: str) -> None: + try: + self.lambda_client.send_durable_execution_callback_heartbeat( + CallbackId=callback_id + ) + except Exception as e: + msg = f"Failed to send callback heartbeat for {self.function_name}, callback_id {callback_id}: {e}" + raise DurableFunctionsTestError(msg) from e + def _wait_for_completion( self, execution_arn: str, timeout: int ) -> GetDurableExecutionResponse: @@ -886,7 +998,81 @@ def _wait_for_completion( ) raise TimeoutError(msg) - def _get_execution_history( + def wait_for_result( + self, execution_arn: str, timeout: int = 60 + ) -> DurableFunctionTestResult: + # Poll for completion + execution_response = self._wait_for_completion(execution_arn, timeout) + + try: + history_response = self._fetch_execution_history(execution_arn) + except Exception as e: + msg = f"Failed to fetch execution history: {e}" + raise DurableFunctionsTestError(msg) from e + + # Build test result from execution history + return DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + def wait_for_callback( + self, execution_arn: str, name: str | None = None, timeout: int = 60 + ) -> str: + """ + Wait for and retrieve a callback ID from a Step Functions execution. + + Polls the execution history at regular intervals until a callback ID is found + or the timeout is reached. + + Args: + execution_arn: Execution Arn + name: Specific callback name, default to None + timeout: Maximum time in seconds to wait for callback. Defaults to 60. + + Returns: + str: The callback ID/token retrieved from the execution history + + Raises: + TimeoutError: If callback is not found within the specified timeout period + DurableFunctionsTestError: If there's an error fetching execution history + (excluding retryable errors) + """ + start_time = time.time() + + while time.time() - start_time < timeout: + try: + history_response = self._fetch_execution_history(execution_arn) + callback_id = self._get_callback_id_from_events( + events=history_response.events, name=name + ) + if callback_id: + return callback_id + except ClientError as e: + error_code = e.response["Error"]["Code"] + # retryable error, the execution may not start yet in async invoke situation + if error_code in ["ResourceNotFoundException"]: + pass + else: + msg = f"Failed to fetch execution history: {e}" + raise DurableFunctionsTestError(msg) from e + except DurableFunctionsTestError as e: + raise e + except Exception as e: + msg = f"Failed to fetch execution history: {e}" + raise DurableFunctionsTestError(msg) from e + + # Wait before next poll + time.sleep(self.poll_interval) + + # Timeout reached + elapsed = time.time() - start_time + msg = ( + f"Callback did not available within {timeout}s " + f"(elapsed: {elapsed:.1f}s." + ) + raise TimeoutError(msg) + + def _fetch_execution_history( self, execution_arn: str ) -> GetDurableExecutionHistoryResponse: """Retrieve execution history from Lambda service. @@ -898,19 +1084,13 @@ def _get_execution_history( GetDurableExecutionHistoryResponse with typed Event objects Raises: - DurableFunctionsTestError: If history retrieval fails + ClientError: If lambda client encounter error """ - try: - history_dict = self.lambda_client.get_durable_execution_history( - DurableExecutionArn=execution_arn, - IncludeExecutionData=True, - ) - history_response = GetDurableExecutionHistoryResponse.from_dict( - history_dict - ) - except Exception as e: - msg = f"Failed to get execution history: {e}" - raise DurableFunctionsTestError(msg) from e + history_dict = self.lambda_client.get_durable_execution_history( + DurableExecutionArn=execution_arn, + IncludeExecutionData=True, + ) + history_response = GetDurableExecutionHistoryResponse.from_dict(history_dict) logger.info("Retrieved %d events from history", len(history_response.events)) diff --git a/tests/runner_test.py b/tests/runner_test.py index c2a6962..f34a76b 100644 --- a/tests/runner_test.py +++ b/tests/runner_test.py @@ -1617,3 +1617,499 @@ def test_cloud_runner_wait_for_completion_aborted_status(mock_boto3): result = runner._wait_for_completion("test-arn", timeout=10) assert result.status == "ABORTED" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_async_success(mock_boto3): + """Test DurableFunctionCloudTestRunner.run_async with successful invocation.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.invoke.return_value = { + "StatusCode": 202, + "Payload": Mock(read=lambda: b'{"result": "success"}'), + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + execution_arn = runner.run_async(input="test-input") + + assert ( + execution_arn + == "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1" + ) + mock_client.invoke.assert_called_once_with( + FunctionName="test-function", + InvocationType="Event", + Payload='"test-input"', + ) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_async_with_400(mock_boto3): + """Test DurableFunctionCloudTestRunner.run_async with successful invocation.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.invoke.return_value = { + "StatusCode": 400, + "Payload": Mock(read=lambda: b'{"result": "failed"}'), + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Lambda invocation failed with status 400" + ): + runner.run_async(input="test-input") + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_async_failure(mock_boto3): + """Test DurableFunctionCloudTestRunner.run_async with invocation failure.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.invoke.side_effect = Exception("Async invoke failed") + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Failed to invoke Lambda function" + ): + runner.run_async(input="test-input") + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_send_callback_success(mock_boto3): + """Test DurableFunctionCloudTestRunner.send_callback_success.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + runner.send_callback_success("callback-123") + + mock_client.send_durable_execution_callback_success.assert_called_once_with( + CallbackId="callback-123" + ) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_send_callback_failure(mock_boto3): + """Test DurableFunctionCloudTestRunner.send_callback_failure.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + runner.send_callback_failure("callback-123") + + mock_client.send_durable_execution_callback_failure.assert_called_once_with( + CallbackId="callback-123" + ) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_send_callback_heartbeat(mock_boto3): + """Test DurableFunctionCloudTestRunner.send_callback_heartbeat.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + runner.send_callback_heartbeat("callback-123") + + mock_client.send_durable_execution_callback_heartbeat.assert_called_once_with( + CallbackId="callback-123" + ) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_send_callback_error(mock_boto3): + """Test DurableFunctionCloudTestRunner callback methods with API errors.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.send_durable_execution_callback_success.side_effect = Exception( + "API error" + ) + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Failed to send callback success" + ): + runner.send_callback_success("callback-123") + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_success(mock_boto3): + """Test DurableFunctionCloudTestRunner.wait_for_callback success.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution_history.return_value = { + "Events": [ + { + "EventType": "CallbackStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + "CallbackStartedDetails": {"CallbackId": "callback-123"}, + } + ] + } + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + callback_id = runner.wait_for_callback("test-arn", name="test-callback", timeout=10) + + assert callback_id == "callback-123" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_none(mock_boto3): + """Test DurableFunctionCloudTestRunner.wait_for_callback none.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution_history.return_value = { + "Events": [ + { + "EventType": "CallbackStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + "CallbackStartedDetails": {"CallbackId": "callback-123"}, + } + ] + } + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + + with pytest.raises(TimeoutError, match="Callback did not available within"): + runner.wait_for_callback("test-arn", name="test-callback1", timeout=2) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_success_without_name(mock_boto3): + """Test DurableFunctionCloudTestRunner.wait_for_callback success.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution_history.return_value = { + "Events": [ + { + "EventType": "CallbackStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + "CallbackStartedDetails": {"CallbackId": "callback-123"}, + } + ] + } + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + callback_id = runner.wait_for_callback("test-arn") + + assert callback_id == "callback-123" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_all_done_without_name(mock_boto3): + """Test DurableFunctionCloudTestRunner.wait_for_callback all_done_without_name.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution_history.return_value = { + "Events": [ + { + "EventType": "CallbackStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + "CallbackStartedDetails": {"CallbackId": "callback-123"}, + }, + { + "EventType": "CallbackSucceeded", + "EventTimestamp": "2023-01-01T00:05:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + }, + ] + } + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + with pytest.raises(TimeoutError, match="Callback did not available within"): + runner.wait_for_callback("test-arn", timeout=2) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +@patch("aws_durable_execution_sdk_python_testing.runner.time") +def test_cloud_runner_wait_for_callback_timeout(mock_time, mock_boto3): + """Test DurableFunctionCloudTestRunner.wait_for_callback timeout.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_time.time.side_effect = [0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0] + + mock_client.get_durable_execution_history.return_value = {"Events": []} + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + + with pytest.raises(TimeoutError, match="Callback did not available within"): + runner.wait_for_callback("test-arn", timeout=2) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_already_completed(mock_boto3): + """Test DurableFunctionCloudTestRunner.wait_for_callback already completed.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution_history.return_value = { + "Events": [ + { + "EventType": "CallbackStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + "CallbackStartedDetails": {"CallbackId": "callback-123"}, + }, + { + "EventType": "CallbackSucceeded", + "EventTimestamp": "2023-01-01T00:05:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + }, + ] + } + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + + with pytest.raises( + DurableFunctionsTestError, match="Callback test-callback has already completed" + ): + runner.wait_for_callback("test-arn", "test-callback", timeout=2) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_client_error_retryable(mock_boto3): + """Test wait_for_callback with retryable ClientError.""" + from botocore.exceptions import ClientError # type: ignore + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First call raises ResourceNotFoundException, second succeeds + mock_client.get_durable_execution_history.side_effect = [ + ClientError( + error_response={"Error": {"Code": "ResourceNotFoundException"}}, + operation_name="GetDurableExecutionHistory", + ), + { + "Events": [ + { + "EventType": "CallbackStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "callback-event-1", + "Name": "test-callback", + "CallbackStartedDetails": {"CallbackId": "callback-123"}, + } + ] + }, + ] + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + callback_id = runner.wait_for_callback("test-arn", name="test-callback", timeout=10) + + assert callback_id == "callback-123" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_client_error_non_retryable( + mock_boto3, +): + """Test wait_for_callback with non-retryable ClientError.""" + from botocore.exceptions import ClientError + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution_history.side_effect = ClientError( + error_response={"Error": {"Code": "AccessDeniedException"}}, + operation_name="GetDurableExecutionHistory", + ) + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Failed to fetch execution history" + ): + runner.wait_for_callback("test-arn", timeout=10) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_callback_generic_exception(mock_boto3): + """Test wait_for_callback with generic Exception.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution_history.side_effect = Exception("Network error") + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Failed to fetch execution history" + ): + runner.wait_for_callback("test-arn", timeout=10) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_result_fetch_history_exception(mock_boto3): + """Test wait_for_result with exception in _fetch_execution_history.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # Mock successful _wait_for_completion + mock_execution_response = Mock() + mock_execution_response.status = "SUCCEEDED" + + # Mock _fetch_execution_history to raise exception + runner = DurableFunctionCloudTestRunner(function_name="test-function") + runner._wait_for_completion = Mock(return_value=mock_execution_response) + runner._fetch_execution_history = Mock( + side_effect=Exception("History fetch failed") + ) + + with pytest.raises( + DurableFunctionsTestError, + match="Failed to fetch execution history: History fetch failed", + ): + runner.wait_for_result("test-arn", timeout=60) + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_result_success(mock_boto3): + """Test wait_for_result successful execution.""" + from aws_durable_execution_sdk_python.execution import InvocationStatus + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # Mock successful responses + mock_execution_response = Mock() + mock_execution_response.status = "SUCCEEDED" + mock_history_response = Mock() + mock_history_response.events = [] + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + runner._wait_for_completion = Mock(return_value=mock_execution_response) + runner._fetch_execution_history = Mock(return_value=mock_history_response) + + # Mock the from_execution_history method + with patch( + "aws_durable_execution_sdk_python_testing.runner.DurableFunctionTestResult.from_execution_history" + ) as mock_from_history: + mock_result = Mock() + mock_result.status = InvocationStatus.SUCCEEDED + mock_from_history.return_value = mock_result + + result = runner.wait_for_result("test-arn", timeout=60) + + assert result.status == InvocationStatus.SUCCEEDED + mock_from_history.assert_called_once_with( + mock_execution_response, mock_history_response + ) diff --git a/tests/stores/filesystem_store_test.py b/tests/stores/filesystem_store_test.py index 0467959..6b613c8 100644 --- a/tests/stores/filesystem_store_test.py +++ b/tests/stores/filesystem_store_test.py @@ -12,8 +12,11 @@ from aws_durable_execution_sdk_python_testing.model import StartDurableExecutionInput from aws_durable_execution_sdk_python_testing.stores.filesystem import ( FileSystemExecutionStore, + datetime_object_hook, ) +from datetime import datetime, timezone + @pytest.fixture def temp_storage_dir(): @@ -264,3 +267,16 @@ def test_filesystem_execution_store_thread_safety_basic(store, sample_execution) store.save(sample_execution) loaded = store.load(sample_execution.durable_execution_arn) assert loaded.durable_execution_arn == sample_execution.durable_execution_arn + + +def test_datetime_object_hook_converts_timestamp_fields(): + """Test conversion of timestamp fields to datetime objects.""" + timestamp = 1672531200.0 # 2023-01-01 00:00:00 UTC + obj = { + "start_timestamp": timestamp, + } + + result = datetime_object_hook(obj) + + expected_datetime = datetime.fromtimestamp(timestamp, tz=timezone.utc) + assert result["start_timestamp"] == expected_datetime