Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ __pycache__/

dist/

.idea
.idea

.kiro/
55 changes: 21 additions & 34 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,15 @@ readme = "README.md"
requires-python = ">=3.13"
license = "Apache-2.0"
keywords = []
authors = [
{ name = "yaythomas", email = "tgaigher@amazon.com" },
]
authors = [{ name = "yaythomas", email = "tgaigher@amazon.com" }]
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"boto3>=1.40.30"
]
dependencies = ["boto3>=1.40.30"]

[project.urls]
Documentation = "https://github.com/aws/aws-durable-execution-sdk-python#readme"
Expand All @@ -38,48 +34,31 @@ packages = ["src/aws_durable_execution_sdk_python"]
[tool.hatch.version]
path = "src/aws_durable_execution_sdk_python/__about__.py"

# [tool.hatch.envs.default]
# dependencies=["pytest"]

# [tool.hatch.envs.default.scripts]
# test="pytest"

[tool.hatch.envs.test]
dependencies = [
"coverage[toml]",
"pytest",
"pytest-cov",
]
dependencies = ["coverage[toml]", "pytest", "pytest-cov"]

[tool.hatch.envs.test.scripts]
cov="pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/aws_durable_execution_sdk_python --cov=tests --cov-fail-under=98"
cov = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/aws_durable_execution_sdk_python --cov-fail-under=98"

[tool.hatch.envs.types]
extra-dependencies = [
"mypy>=1.0.0",
"pytest"
]
extra-dependencies = ["mypy>=1.0.0", "pytest"]
[tool.hatch.envs.types.scripts]
check = "mypy --install-types --non-interactive {args:src/aws_durable_execution_sdk_python tests}"

[tool.coverage.run]
source_pkgs = ["aws_durable_execution_sdk_python", "tests"]
source_pkgs = ["aws_durable_execution_sdk_python"]
branch = true
parallel = true
omit = [
"src/aws_durable_execution_sdk_python/__about__.py",
]
omit = ["src/aws_durable_execution_sdk_python/__about__.py"]

[tool.coverage.paths]
aws_durable_execution_sdk_python = ["src/aws_durable_execution_sdk_python", "*/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python"]
tests = ["tests", "*/aws-durable-execution-sdk-python/tests"]
aws_durable_execution_sdk_python = [
"src/aws_durable_execution_sdk_python",
"*/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python",
]

[tool.coverage.report]
exclude_lines = [
"no cov",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]
exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"]

[tool.ruff]
line-length = 88
Expand All @@ -88,4 +67,12 @@ line-length = 88
preview = false

[tool.ruff.lint.per-file-ignores]
"tests/**" = ["ARG001", "ARG002", "ARG005", "S101", "PLR2004", "SIM117", "TRY301"]
"tests/**" = [
"ARG001",
"ARG002",
"ARG005",
"S101",
"PLR2004",
"SIM117",
"TRY301",
]
11 changes: 10 additions & 1 deletion src/aws_durable_execution_sdk_python/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from aws_durable_execution_sdk_python.retries import RetryDecision # noqa: TCH001

R = TypeVar("R")
P = TypeVar("P") # Payload type
R = TypeVar("R") # Result type
T = TypeVar("T")
U = TypeVar("U")

Expand Down Expand Up @@ -133,6 +134,14 @@ class MapConfig:
serdes: SerDes | None = None


@dataclass
class InvokeConfig(Generic[P, R]):
# retry_strategy: Callable[[Exception, int], RetryDecision] | None = None
timeout_seconds: int = 0
Comment thread
yaythomas marked this conversation as resolved.
serdes_payload: SerDes[P] | None = None
serdes_result: SerDes[R] | None = None


@dataclass(frozen=True)
class CallbackConfig:
"""Configuration for callbacks."""
Expand Down
46 changes: 40 additions & 6 deletions src/aws_durable_execution_sdk_python/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BatchedInput,
CallbackConfig,
ChildConfig,
InvokeConfig,
MapConfig,
ParallelConfig,
StepConfig,
Expand All @@ -30,6 +31,7 @@
wait_for_callback_handler,
)
from aws_durable_execution_sdk_python.operation.child import child_handler
from aws_durable_execution_sdk_python.operation.invoke import invoke_handler
from aws_durable_execution_sdk_python.operation.map import map_handler
from aws_durable_execution_sdk_python.operation.parallel import parallel_handler
from aws_durable_execution_sdk_python.operation.step import step_handler
Expand All @@ -56,17 +58,19 @@

from aws_durable_execution_sdk_python.state import CheckpointedResult

R = TypeVar("R")
P = TypeVar("P") # Payload type
R = TypeVar("R") # Result type
T = TypeVar("T")
U = TypeVar("U")
P = ParamSpec("P")
Params = ParamSpec("Params")


logger = logging.getLogger(__name__)


def durable_step(
func: Callable[Concatenate[StepContext, P], T],
) -> Callable[P, Callable[[StepContext], T]]:
func: Callable[Concatenate[StepContext, Params], T],
) -> Callable[Params, Callable[[StepContext], T]]:
"""Wrap your callable into a named function that a Durable step can run."""

def wrapper(*args, **kwargs):
Expand All @@ -80,8 +84,8 @@ def function_with_arguments(context: StepContext):


def durable_with_child_context(
func: Callable[Concatenate[DurableContext, P], T],
) -> Callable[P, Callable[[DurableContext], T]]:
func: Callable[Concatenate[DurableContext, Params], T],
) -> Callable[Params, Callable[[DurableContext], T]]:
"""Wrap your callable into a Durable child context."""

def wrapper(*args, **kwargs):
Expand Down Expand Up @@ -291,6 +295,36 @@ def create_callback(
serdes=config.serdes,
)

def invoke(
self,
function_name: str,
payload: P,
name: str | None = None,
config: InvokeConfig[P, R] | None = None,
) -> R:
"""Invoke another Durable Function.

Args:
function_name: Name of the function to invoke
payload: Input payload to send to the function
name: Optional name for the operation
config: Optional configuration for the invoke operation

Returns:
The result of the invoked function
"""
return invoke_handler(
function_name=function_name,
payload=payload,
state=self.state,
operation_identifier=OperationIdentifier(
operation_id=self._create_step_id(),
parent_id=self._parent_id,
name=name,
),
config=config,
)

def map(
self,
inputs: Sequence[U],
Expand Down
19 changes: 19 additions & 0 deletions src/aws_durable_execution_sdk_python/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import time
from dataclasses import dataclass


Expand Down Expand Up @@ -77,6 +78,24 @@ def __init__(self, message: str, scheduled_timestamp: float):
super().__init__(message)
self.scheduled_timestamp = scheduled_timestamp

@classmethod
def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
"""Create a timed suspension with the delay calculated from now.

Args:
message: Descriptive message for the suspension
delay_seconds: Duration to suspend in seconds from current time

Returns:
TimedSuspendExecution: Instance with calculated resume time

Example:
>>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30)
>>> # Will suspend for 30 seconds from now
"""
resume_time = time.time() + delay_seconds
return cls(message, scheduled_timestamp=resume_time)


class OrderedLockError(DurableExecutionsError):
"""An error from OrderedLock.
Expand Down
33 changes: 26 additions & 7 deletions src/aws_durable_execution_sdk_python/lambda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class OperationSubType(Enum):
PARALLEL_BRANCH = "ParallelBranch"
WAIT_FOR_CALLBACK = "WaitForCallback"
WAIT_FOR_CONDITION = "WaitForCondition"
INVOKE = "Invoke"


@dataclass(frozen=True)
Expand Down Expand Up @@ -241,15 +242,11 @@ def to_dict(self) -> MutableMapping[str, Any]:
@dataclass(frozen=True)
class InvokeOptions:
function_name: str
function_qualifier: str | None = None
durable_execution_name: str | None = None
timeout_seconds: int = 0

def to_dict(self) -> MutableMapping[str, Any]:
result = {"FunctionName": self.function_name}
if self.function_qualifier:
result["FunctionQualifier"] = self.function_qualifier
if self.durable_execution_name:
result["DurableExecutionName"] = self.durable_execution_name
result: MutableMapping[str, Any] = {"FunctionName": self.function_name}
result["TimeoutSeconds"] = self.timeout_seconds
return result


Expand Down Expand Up @@ -471,6 +468,28 @@ def create_step_retry(

# endregion step

# region invoke
@classmethod
def create_invoke_start(
cls,
identifier: OperationIdentifier,
payload: str,
invoke_options: InvokeOptions,
) -> OperationUpdate:
"""Create an instance of OperationUpdate for type: INVOKE, action: START."""
return cls(
operation_id=identifier.operation_id,
parent_id=identifier.parent_id,
operation_type=OperationType.INVOKE,
sub_type=OperationSubType.INVOKE,
action=OperationAction.START,
name=identifier.name,
payload=payload,
invoke_options=invoke_options,
)

# endregion invoke

# region wait for condition
@classmethod
def create_wait_for_condition_start(
Expand Down
Loading