Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions .github/workflows/deploy-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ jobs:
run: hatch run examples:build

- name: Deploy Lambda function - ${{ matrix.example.name }}
id: deploy
env:
AWS_ACCOUNT_ID: ${{ secrets.AWS_ACCOUNT_ID }}
LAMBDA_ENDPOINT: ${{ secrets.LAMBDA_ENDPOINT_BETA }}
Expand All @@ -88,13 +89,35 @@ jobs:
echo "Deploying ${{ matrix.example.name }} as $FUNCTION_NAME"
hatch run examples:deploy "${{ matrix.example.name }}" --function-name "$FUNCTION_NAME"

# $LATEST is also a qualified version
QUALIFIED_FUNCTION_NAME="$FUNCTION_NAME:\$LATEST"
# $LATEST is also a qualified version
QUALIFIED_FUNCTION_NAME="${FUNCTION_NAME}:\$LATEST"

# Store both names for later steps
echo "FUNCTION_NAME=$FUNCTION_NAME" >> $GITHUB_ENV
echo "QUALIFIED_FUNCTION_NAME=$QUALIFIED_FUNCTION_NAME" >> $GITHUB_ENV
echo "VERSION=$VERSION" >> $GITHUB_ENV
echo "DEPLOYED_FUNCTION_NAME=$FUNCTION_NAME" >> $GITHUB_OUTPUT
echo "QUALIFIED_FUNCTION_NAME=$QUALIFIED_FUNCTION_NAME" >> $GITHUB_OUTPUT

- name: Run Integration Tests - ${{ matrix.example.name }}
env:
AWS_REGION: ${{ env.AWS_REGION }}
LAMBDA_ENDPOINT: ${{ secrets.LAMBDA_ENDPOINT_BETA }}
QUALIFIED_FUNCTION_NAME: ${{ env.QUALIFIED_FUNCTION_NAME }}
LAMBDA_FUNCTION_TEST_NAME: ${{ matrix.example.name }}
run: |
echo "Running integration tests for ${{ matrix.example.name }}"
echo "Function name: ${{ steps.deploy.outputs.DEPLOYED_FUNCTION_NAME }}"
echo "Qualified function name: ${QUALIFIED_FUNCTION_NAME}"
echo "AWS Region: ${AWS_REGION}"
echo "Lambda Endpoint: ${LAMBDA_ENDPOINT}"

# Convert example name to test name: "Hello World" -> "test_hello_world"
TEST_NAME="test_$(echo "${{ matrix.example.name }}" | tr '[:upper:]' '[:lower:]' | tr ' ' '_')"
echo "Test name: ${TEST_NAME}"

# Run integration tests
hatch run test:examples-integration

- name: Invoke Lambda function - ${{ matrix.example.name }}
env:
Expand Down
44 changes: 44 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,50 @@
"ExecutionTimeout": 300
},
"path": "./src/map_operations.py"
},
{
"name": "Block Example",
"description": "Nested child contexts demonstrating block operations",
"handler": "block_example.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/block_example.py"
},
{
"name": "Logger Example",
"description": "Demonstrating logger usage and enrichment in DurableContext",
"handler": "logger_example.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/logger_example.py"
},
{
"name": "Steps with Retry",
"description": "Multiple steps with retry logic in a polling pattern",
"handler": "steps_with_retry.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/steps_with_retry.py"
},
{
"name": "Wait for Condition",
"description": "Polling pattern that waits for a condition to be met",
"handler": "wait_for_condition.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/wait_for_condition.py"
}
]
}
46 changes: 46 additions & 0 deletions examples/src/block_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Example demonstrating nested child contexts (blocks)."""

from typing import Any

from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_with_child_context,
)
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_with_child_context
def nested_block(ctx: DurableContext) -> str:
"""Nested block with its own child context."""
# Wait in the nested block
ctx.wait(seconds=1)
return "nested block result"


@durable_with_child_context
def parent_block(ctx: DurableContext) -> dict[str, str]:
"""Parent block with nested operations."""
# Nested step
nested_result: str = ctx.step(
lambda _: "nested step result",
name="nested_step",
)

# Nested block with its own child context
nested_block_result: str = ctx.run_in_child_context(nested_block())

return {
"nestedStep": nested_result,
"nestedBlock": nested_block_result,
}


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, str]:
"""Handler demonstrating nested child contexts."""
# Run parent block which contains nested operations
result: dict[str, str] = context.run_in_child_context(
parent_block(), name="parent_block"
)

return result
50 changes: 50 additions & 0 deletions examples/src/logger_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Example demonstrating logger usage in DurableContext."""

from typing import Any

from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_with_child_context,
)
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_with_child_context
def child_workflow(ctx: DurableContext) -> str:
"""Child workflow with its own logging context."""
# Child context logger has step_id populated with child context ID
ctx.logger.info("Running in child context")

# Step in child context has nested step ID
child_result: str = ctx.step(
lambda _: "child-processed",
name="child_step",
)

ctx.logger.info("Child workflow completed", extra={"result": child_result})

return child_result


@durable_execution
def handler(event: Any, context: DurableContext) -> str:
"""Handler demonstrating logger usage."""
# Top-level context logger: no step_id field
context.logger.info("Starting workflow", extra={"eventId": event.get("id")})

# Logger in steps - gets enriched with step ID and attempt number
result1: str = context.step(
lambda _: "processed",
name="process_data",
)

context.logger.info("Step 1 completed", extra={"result": result1})

# Child contexts inherit the parent's logger and have their own step ID
result2: str = context.run_in_child_context(child_workflow(), name="child_workflow")

context.logger.info(
"Workflow completed", extra={"result1": result1, "result2": result2}
)

return f"{result1}-{result2}"
8 changes: 5 additions & 3 deletions examples/src/map_operations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Example demonstrating map-like operations for processing collections durably."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
Expand All @@ -9,8 +11,8 @@ def square(x: int) -> int:


@durable_execution
def handler(_event: Any, context: DurableContext) -> str:
# Process a list of items using map-like operations
def handler(_event: Any, context: DurableContext) -> list[int]:
"""Process a list of items using map-like operations."""
items = [1, 2, 3, 4, 5]

# Process each item as a separate durable step
Expand All @@ -19,4 +21,4 @@ def handler(_event: Any, context: DurableContext) -> str:
result = context.step(lambda _, x=item: square(x), name=f"square_{i}")
results.append(result)

return f"Squared results: {results}"
return results
8 changes: 5 additions & 3 deletions examples/src/parallel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""Example demonstrating parallel-like operations for concurrent execution."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> str:
# Execute multiple operations in parallel
def handler(_event: Any, context: DurableContext) -> list[str]:
# Execute multiple operations
task1 = context.step(lambda _: "Task 1 complete", name="task1")
task2 = context.step(lambda _: "Task 2 complete", name="task2")
task3 = context.step(lambda _: "Task 3 complete", name="task3")

# All tasks execute concurrently and results are collected
return f"Results: {task1}, {task2}, {task3}"
return [task1, task2, task3]
4 changes: 3 additions & 1 deletion examples/src/step_with_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@


@durable_step
def unreliable_operation(_step_context: StepContext) -> str:
def unreliable_operation(
_step_context: StepContext,
) -> str:
failure_threshold = 0.5
if random() > failure_threshold: # noqa: S311
msg = "Random error occurred"
Expand Down
73 changes: 73 additions & 0 deletions examples/src/steps_with_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Example demonstrating multiple steps with retry logic."""

from random import random
from typing import Any

from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)


def simulated_get_item(name: str) -> dict[str, Any] | None:
"""Simulate getting an item that may fail randomly."""
# Fail 50% of the time
if random() < 0.5: # noqa: S311
msg = "Random failure"
raise RuntimeError(msg)

# Simulate finding item after some attempts
if random() > 0.3: # noqa: S311
return {"id": name, "data": "item data"}

return None
Comment on lines +15 to +26
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we introduce randomness like this, we should seed the random so that we don't get transient failures.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So either seeding, or mocking the response and knowing the sequence. I prefer large sequence of randoms tested over multiple seeds.



@durable_execution
def handler(event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating polling with retry logic."""
name = event.get("name", "test-item")

# Retry configuration for steps
retry_config = RetryStrategyConfig(
max_attempts=5,
retryable_error_types=[RuntimeError],
)

step_config = StepConfig(create_retry_strategy(retry_config))

item = None
poll_count = 0
max_polls = 5

try:
while poll_count < max_polls:
poll_count += 1

# Try to get the item with retry
get_response = context.step(
lambda _, n=name: simulated_get_item(n),
name=f"get_item_poll_{poll_count}",
config=step_config,
)

# Did we find the item?
if get_response:
item = get_response
break

# Wait 1 second until next poll
context.wait(seconds=1)

except RuntimeError as e:
# Retries exhausted
return {"error": "DDB Retries Exhausted", "message": str(e)}

if not item:
return {"error": "Item Not Found"}

# We found the item!
return {"success": True, "item": item, "pollsRequired": poll_count}
33 changes: 33 additions & 0 deletions examples/src/wait_for_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Example demonstrating wait-for-condition pattern."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> int:
"""Handler demonstrating wait-for-condition pattern."""
state = 0
attempt = 0
max_attempts = 5

while attempt < max_attempts:
attempt += 1

# Execute step to update state
state = context.step(
lambda _, s=state: s + 1,
name=f"increment_state_{attempt}",
)

# Check condition
if state >= 3:
# Condition met, stop
break

# Wait before next attempt
context.wait(seconds=1)

return state
Loading