diff --git a/src/aws_durable_execution_sdk_python/lambda_service.py b/src/aws_durable_execution_sdk_python/lambda_service.py index d73ecda..3e092f9 100644 --- a/src/aws_durable_execution_sdk_python/lambda_service.py +++ b/src/aws_durable_execution_sdk_python/lambda_service.py @@ -708,6 +708,356 @@ class Operation: callback_details: CallbackDetails | None = None chained_invoke_details: ChainedInvokeDetails | None = None + def create_succeeded( + self, end_timestamp: datetime.datetime | None = None + ) -> Operation: + """Create a succeeded operation with end timestamp.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=OperationStatus.SUCCEEDED, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_failed( + self, end_timestamp: datetime.datetime | None = None + ) -> Operation: + """Create a failed operation with end timestamp.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=OperationStatus.FAILED, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_ready(self) -> Operation: + """Create a ready operation.""" + updated_step_details = None + if self.step_details: + updated_step_details = StepDetails( + attempt=self.step_details.attempt, + next_attempt_timestamp=None, + result=self.step_details.result, + error=self.step_details.error, + ) + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=OperationStatus.READY, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=updated_step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_completed_retry(self) -> Operation: + """Create an operation with completed retry (clears next_attempt_timestamp).""" + updated_step_details = None + if self.step_details: + updated_step_details = StepDetails( + attempt=self.step_details.attempt, + next_attempt_timestamp=None, + result=self.step_details.result, + error=self.step_details.error, + ) + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=OperationStatus.READY, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=updated_step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_callback_result( + self, result: str | None, end_timestamp: datetime.datetime | None = None + ) -> Operation: + """Create a succeeded callback operation with result.""" + updated_callback_details = None + if self.callback_details: + updated_callback_details = CallbackDetails( + callback_id=self.callback_details.callback_id, + result=result, + error=self.callback_details.error, + ) + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=OperationStatus.SUCCEEDED, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=updated_callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_callback_failure( + self, error: ErrorObject, end_timestamp: datetime.datetime | None = None + ) -> Operation: + """Create a failed callback operation with error.""" + updated_callback_details = None + if self.callback_details: + updated_callback_details = CallbackDetails( + callback_id=self.callback_details.callback_id, + result=self.callback_details.result, + error=error, + ) + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=OperationStatus.FAILED, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=updated_callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_execution_end( + self, status: OperationStatus, end_timestamp: datetime.datetime | None = None + ) -> Operation: + """Create an ended execution operation.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_merged_from_previous(self, previous_operation: Operation) -> Operation: + """Merge current operation with previous operation, preserving previous state.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id or previous_operation.parent_id, + name=self.name or previous_operation.name, + start_timestamp=previous_operation.start_timestamp, + end_timestamp=previous_operation.end_timestamp, + sub_type=self.sub_type or previous_operation.sub_type, + execution_details=previous_operation.execution_details, + context_details=previous_operation.context_details, + step_details=previous_operation.step_details, + wait_details=previous_operation.wait_details, + callback_details=previous_operation.callback_details, + chained_invoke_details=previous_operation.chained_invoke_details, + ) + + def create_with_start_timestamp(self, timestamp: datetime.datetime) -> Operation: + """Create operation with updated start timestamp.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_with_end_timestamp(self, timestamp: datetime.datetime) -> Operation: + """Create operation with updated end timestamp.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_with_execution_details( + self, execution_details: ExecutionDetails + ) -> Operation: + """Create operation with execution details.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_with_callback_details( + self, callback_details: CallbackDetails + ) -> Operation: + """Create operation with callback details.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_with_step_details(self, step_details: StepDetails) -> Operation: + """Create operation with step details.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_with_wait_details(self, wait_details: WaitDetails) -> Operation: + """Create operation with wait details.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_with_context_details(self, context_details: ContextDetails) -> Operation: + """Create operation with context details.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=self.chained_invoke_details, + ) + + def create_with_chained_invoke_details( + self, chained_invoke_details: ChainedInvokeDetails + ) -> Operation: + """Create operation with chained invoke details.""" + return Operation( + operation_id=self.operation_id, + operation_type=self.operation_type, + status=self.status, + parent_id=self.parent_id, + name=self.name, + start_timestamp=self.start_timestamp, + end_timestamp=self.end_timestamp, + sub_type=self.sub_type, + execution_details=self.execution_details, + context_details=self.context_details, + step_details=self.step_details, + wait_details=self.wait_details, + callback_details=self.callback_details, + chained_invoke_details=chained_invoke_details, + ) + @classmethod def from_dict(cls, data: MutableMapping[str, Any]) -> Operation: """Create an Operation instance from a dictionary with the original Smithy model field names. diff --git a/tests/lambda_service_test.py b/tests/lambda_service_test.py index 21ef45e..179ed4a 100644 --- a/tests/lambda_service_test.py +++ b/tests/lambda_service_test.py @@ -2088,3 +2088,306 @@ def test_lambda_client_checkpoint_with_non_none_client_token(): call_args = mock_client.checkpoint_durable_execution.call_args[1] assert call_args["ClientToken"] == "client_token_123" assert result.checkpoint_token == "new_token" # noqa: S105 + + +def test_operation_create_succeeded(): + """Test Operation.create_succeeded factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.STARTED, + name="test_step", + ) + + end_time = datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC) + updated = original.create_succeeded(end_timestamp=end_time) + + assert updated.operation_id == "op1" + assert updated.status == OperationStatus.SUCCEEDED + assert updated.end_timestamp == end_time + assert updated.name == "test_step" + + +def test_operation_create_failed(): + """Test Operation.create_failed factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.STARTED, + ) + + end_time = datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC) + updated = original.create_failed(end_timestamp=end_time) + + assert updated.status == OperationStatus.FAILED + assert updated.end_timestamp == end_time + + +def test_operation_create_ready(): + """Test Operation.create_ready factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + step_details=StepDetails( + attempt=1, + result="test_result", + next_attempt_timestamp=datetime.datetime(2023, 1, 1, tzinfo=datetime.UTC), + ), + ) + + updated = original.create_ready() + + assert updated.status == OperationStatus.READY + assert updated.step_details.attempt == 1 + assert updated.step_details.result == "test_result" + assert updated.step_details.next_attempt_timestamp is None # Cleared + + +def test_operation_create_callback_result(): + """Test Operation.create_callback_result factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.CALLBACK, + status=OperationStatus.STARTED, + callback_details=CallbackDetails(callback_id="cb123"), + ) + + end_time = datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC) + updated = original.create_callback_result(result="success", end_timestamp=end_time) + + assert updated.status == OperationStatus.SUCCEEDED + assert updated.callback_details.result == "success" + assert updated.callback_details.callback_id == "cb123" # Preserved + assert updated.end_timestamp == end_time + + +def test_operation_create_callback_failure(): + """Test Operation.create_callback_failure factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.CALLBACK, + status=OperationStatus.STARTED, + callback_details=CallbackDetails(callback_id="cb123"), + ) + + error = ErrorObject.from_message("callback failed") + updated = original.create_callback_failure(error=error) + + assert updated.status == OperationStatus.FAILED + assert updated.callback_details.error.message == "callback failed" + assert updated.callback_details.callback_id == "cb123" # Preserved from original + + +def test_operation_create_execution_end(): + """Test Operation.create_execution_end factory method.""" + original = Operation( + operation_id="exec1", + operation_type=OperationType.EXECUTION, + status=OperationStatus.STARTED, + ) + + end_time = datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC) + updated = original.create_execution_end( + status=OperationStatus.SUCCEEDED, end_timestamp=end_time + ) + + assert updated.status == OperationStatus.SUCCEEDED + assert updated.end_timestamp == end_time + + +def test_operation_create_merged_from_previous(): + """Test Operation.create_merged_from_previous factory method.""" + previous = Operation( + operation_id="prev1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + parent_id="parent1", + name="previous_step", + start_timestamp=datetime.datetime(2023, 1, 1, tzinfo=datetime.UTC), + end_timestamp=datetime.datetime(2023, 1, 2, tzinfo=datetime.UTC), + sub_type=OperationSubType.STEP, + step_details=StepDetails(attempt=1, result="prev_result"), + ) + + current = Operation( + operation_id="curr1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + name="current_step", + ) + + merged = current.create_merged_from_previous(previous) + + assert merged.operation_id == "curr1" # Current operation ID preserved + assert merged.status == OperationStatus.PENDING # Current status preserved + assert merged.name == "current_step" # Current name takes precedence + assert merged.parent_id == "parent1" # Previous parent_id used + assert ( + merged.start_timestamp == previous.start_timestamp + ) # Previous timestamps preserved + assert merged.end_timestamp == previous.end_timestamp + assert merged.sub_type == OperationSubType.STEP # Previous sub_type used + assert merged.step_details == previous.step_details # Previous details preserved + + +def test_operation_create_with_start_timestamp(): + """Test Operation.create_with_start_timestamp factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.STARTED, + ) + + new_timestamp = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + updated = original.create_with_start_timestamp(new_timestamp) + + assert updated.start_timestamp == new_timestamp + assert updated.operation_id == "op1" + assert updated.status == OperationStatus.STARTED + + +def test_operation_create_with_end_timestamp(): + """Test Operation.create_with_end_timestamp factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + ) + + new_timestamp = datetime.datetime(2023, 1, 1, 11, 0, 0, tzinfo=datetime.UTC) + updated = original.create_with_end_timestamp(new_timestamp) + + assert updated.end_timestamp == new_timestamp + assert updated.operation_id == "op1" + assert updated.status == OperationStatus.SUCCEEDED + + +def test_operation_create_with_execution_details(): + """Test Operation.create_with_execution_details factory method.""" + original = Operation( + operation_id="exec1", + operation_type=OperationType.EXECUTION, + status=OperationStatus.STARTED, + ) + + execution_details = ExecutionDetails(input_payload="test_input") + updated = original.create_with_execution_details(execution_details) + + assert updated.execution_details == execution_details + assert updated.execution_details.input_payload == "test_input" + assert updated.operation_id == "exec1" + + +def test_operation_create_with_callback_details(): + """Test Operation.create_with_callback_details factory method.""" + original = Operation( + operation_id="cb1", + operation_type=OperationType.CALLBACK, + status=OperationStatus.STARTED, + ) + + callback_details = CallbackDetails(callback_id="cb123", result="success") + updated = original.create_with_callback_details(callback_details) + + assert updated.callback_details == callback_details + assert updated.callback_details.callback_id == "cb123" + assert updated.callback_details.result == "success" + + +def test_operation_create_with_step_details(): + """Test Operation.create_with_step_details factory method.""" + original = Operation( + operation_id="step1", + operation_type=OperationType.STEP, + status=OperationStatus.READY, + ) + + step_details = StepDetails(attempt=2, result="step_result") + updated = original.create_with_step_details(step_details) + + assert updated.step_details == step_details + assert updated.step_details.attempt == 2 + assert updated.step_details.result == "step_result" + + +def test_operation_create_with_wait_details(): + """Test Operation.create_with_wait_details factory method.""" + original = Operation( + operation_id="wait1", + operation_type=OperationType.WAIT, + status=OperationStatus.STARTED, + ) + + scheduled_time = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + wait_details = WaitDetails(scheduled_end_timestamp=scheduled_time) + updated = original.create_with_wait_details(wait_details) + + assert updated.wait_details == wait_details + assert updated.wait_details.scheduled_end_timestamp == scheduled_time + + +def test_operation_create_with_context_details(): + """Test Operation.create_with_context_details factory method.""" + original = Operation( + operation_id="ctx1", + operation_type=OperationType.CONTEXT, + status=OperationStatus.SUCCEEDED, + ) + + context_details = ContextDetails(result="context_result", replay_children=True) + updated = original.create_with_context_details(context_details) + + assert updated.context_details == context_details + assert updated.context_details.result == "context_result" + assert updated.context_details.replay_children is True + + +def test_operation_create_with_chained_invoke_details(): + """Test Operation.create_with_chained_invoke_details factory method.""" + original = Operation( + operation_id="invoke1", + operation_type=OperationType.CHAINED_INVOKE, + status=OperationStatus.SUCCEEDED, + ) + + invoke_details = ChainedInvokeDetails(result="invoke_result") + updated = original.create_with_chained_invoke_details(invoke_details) + + assert updated.chained_invoke_details == invoke_details + assert updated.chained_invoke_details.result == "invoke_result" + + +def test_operation_create_callback_failure_without_existing_callback_details(): + """Test Operation.create_callback_failure when no existing callback_details.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.CALLBACK, + status=OperationStatus.STARTED, + callback_details=None, + ) + + error = ErrorObject.from_message("callback failed") + updated = original.create_callback_failure(error=error) + + assert updated.status == OperationStatus.FAILED + assert updated.callback_details is None # No existing details to update + + +def test_operation_create_completed_retry(): + """Test Operation.create_completed_retry factory method.""" + original = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + step_details=StepDetails( + attempt=2, + next_attempt_timestamp=datetime.datetime(2023, 1, 1, tzinfo=datetime.UTC), + ), + ) + + updated = original.create_completed_retry() + + assert updated.status == OperationStatus.READY + assert updated.step_details.attempt == 2 # Preserved + assert updated.step_details.next_attempt_timestamp is None # Cleared