Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 350 additions & 0 deletions src/aws_durable_execution_sdk_python/lambda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,356 @@ class Operation:
callback_details: CallbackDetails | None = None
chained_invoke_details: ChainedInvokeDetails | None = None

def create_succeeded(
self, end_timestamp: datetime.datetime | None = None
) -> Operation:
"""Create a succeeded operation with end timestamp."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=OperationStatus.SUCCEEDED,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_failed(
self, end_timestamp: datetime.datetime | None = None
) -> Operation:
"""Create a failed operation with end timestamp."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=OperationStatus.FAILED,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_ready(self) -> Operation:
"""Create a ready operation."""
updated_step_details = None
if self.step_details:
updated_step_details = StepDetails(
attempt=self.step_details.attempt,
next_attempt_timestamp=None,
result=self.step_details.result,
error=self.step_details.error,
)
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=OperationStatus.READY,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=updated_step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_completed_retry(self) -> Operation:
"""Create an operation with completed retry (clears next_attempt_timestamp)."""
updated_step_details = None
if self.step_details:
updated_step_details = StepDetails(
attempt=self.step_details.attempt,
next_attempt_timestamp=None,
result=self.step_details.result,
error=self.step_details.error,
)
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=OperationStatus.READY,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=updated_step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_callback_result(
self, result: str | None, end_timestamp: datetime.datetime | None = None
) -> Operation:
"""Create a succeeded callback operation with result."""
updated_callback_details = None
if self.callback_details:
updated_callback_details = CallbackDetails(
callback_id=self.callback_details.callback_id,
result=result,
error=self.callback_details.error,
)
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=OperationStatus.SUCCEEDED,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=updated_callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_callback_failure(
self, error: ErrorObject, end_timestamp: datetime.datetime | None = None
) -> Operation:
"""Create a failed callback operation with error."""
updated_callback_details = None
if self.callback_details:
updated_callback_details = CallbackDetails(
callback_id=self.callback_details.callback_id,
result=self.callback_details.result,
error=error,
)
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=OperationStatus.FAILED,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=updated_callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_execution_end(
self, status: OperationStatus, end_timestamp: datetime.datetime | None = None
) -> Operation:
"""Create an ended execution operation."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_merged_from_previous(self, previous_operation: Operation) -> Operation:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this for?

"""Merge current operation with previous operation, preserving previous state."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id or previous_operation.parent_id,
name=self.name or previous_operation.name,
start_timestamp=previous_operation.start_timestamp,
end_timestamp=previous_operation.end_timestamp,
sub_type=self.sub_type or previous_operation.sub_type,
execution_details=previous_operation.execution_details,
context_details=previous_operation.context_details,
step_details=previous_operation.step_details,
wait_details=previous_operation.wait_details,
callback_details=previous_operation.callback_details,
chained_invoke_details=previous_operation.chained_invoke_details,
)

def create_with_start_timestamp(self, timestamp: datetime.datetime) -> Operation:
"""Create operation with updated start timestamp."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_with_end_timestamp(self, timestamp: datetime.datetime) -> Operation:
"""Create operation with updated end timestamp."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_with_execution_details(
self, execution_details: ExecutionDetails
) -> Operation:
"""Create operation with execution details."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_with_callback_details(
self, callback_details: CallbackDetails
) -> Operation:
"""Create operation with callback details."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_with_step_details(self, step_details: StepDetails) -> Operation:
"""Create operation with step details."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_with_wait_details(self, wait_details: WaitDetails) -> Operation:
"""Create operation with wait details."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_with_context_details(self, context_details: ContextDetails) -> Operation:
"""Create operation with context details."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=self.chained_invoke_details,
)

def create_with_chained_invoke_details(
self, chained_invoke_details: ChainedInvokeDetails
) -> Operation:
"""Create operation with chained invoke details."""
return Operation(
operation_id=self.operation_id,
operation_type=self.operation_type,
status=self.status,
parent_id=self.parent_id,
name=self.name,
start_timestamp=self.start_timestamp,
end_timestamp=self.end_timestamp,
sub_type=self.sub_type,
execution_details=self.execution_details,
context_details=self.context_details,
step_details=self.step_details,
wait_details=self.wait_details,
callback_details=self.callback_details,
chained_invoke_details=chained_invoke_details,
)

@classmethod
def from_dict(cls, data: MutableMapping[str, Any]) -> Operation:
"""Create an Operation instance from a dictionary with the original Smithy model field names.
Expand Down
Loading
Loading