Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@ hatch run examples:list
hatch run examples:deploy "Hello World"
```

### Generate Event Files for Testing
```bash
# Generate expected events JSON file for a function
# This runs the function locally and captures execution events

# Basic usage - hello_world example
hatch run examples:generate-events \
--function-module hello_world \
--function-name handler \
--input '"test input"' \
--output examples/events/hello_world_events.json

# Available options:
# --function-module: Python module path (required)
# --function-name: Function name within module (required)
# --input: JSON string input for the function (optional)
# --output: Output path for events JSON file (required)
# --timeout: Execution timeout in seconds (default: 60)
# --verbose: Enable detailed logging

# Use generated events in your tests with the event assertion helper:
# assert_events('events/hello_world_events.json', result.events)
```

### Other CLI Commands
```bash
# Invoke deployed function
Expand Down
113 changes: 113 additions & 0 deletions examples/events/hello_world_events.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"events": [
{
"event_type": "ExecutionStarted",
"event_timestamp": "2025-12-11T00:32:13.887857+00:00",
"event_id": 1,
"operation_id": "inv-12345678-1234-1234-1234-123456789012",
"name": "execution-name",
"execution_started_details": {
"input": {
"truncated": true
},
"execution_timeout": 60
}
},
{
"event_type": "StepStarted",
"event_timestamp": "2025-12-11T00:32:13.994326+00:00",
"sub_type": "Step",
"event_id": 2,
"operation_id": "1ced8f5be2db23a6513eba4d819c73806424748a7bc6fa0d792cc1c7d1775a97",
"name": "step_1",
"step_started_details": {}
},
{
"event_type": "StepSucceeded",
"event_timestamp": "2025-12-11T00:32:13.994354+00:00",
"sub_type": "Step",
"event_id": 3,
"operation_id": "1ced8f5be2db23a6513eba4d819c73806424748a7bc6fa0d792cc1c7d1775a97",
"name": "step_1",
"step_succeeded_details": {
"result": {
"truncated": true
},
"retry_details": {
"current_attempt": 1,
"next_attempt_delay_seconds": 0
}
}
},
{
"event_type": "WaitStarted",
"event_timestamp": "2025-12-11T00:32:14.099840+00:00",
"sub_type": "Wait",
"event_id": 4,
"operation_id": "c5faca15ac2f93578b39ef4b6bbb871bdedce4ddd584fd31f0bb66fade3947e6",
"wait_started_details": {
"duration": 10,
"scheduled_end_timestamp": "2025-12-11T00:32:24.099828+00:00"
}
},
{
"event_type": "InvocationCompleted",
"event_timestamp": "2025-12-11T00:32:14.205118+00:00",
"event_id": 5
},
{
"event_type": "WaitSucceeded",
"event_timestamp": "2025-12-11T00:32:24.206724+00:00",
"sub_type": "Wait",
"event_id": 6,
"operation_id": "c5faca15ac2f93578b39ef4b6bbb871bdedce4ddd584fd31f0bb66fade3947e6",
"wait_succeeded_details": {
"duration": 10
}
},
{
"event_type": "StepStarted",
"event_timestamp": "2025-12-11T00:32:24.310890+00:00",
"sub_type": "Step",
"event_id": 7,
"operation_id": "6f760b9e9eac89f07ab0223b0f4acb04d1e355d893a1b86a83f4d4b405adee99",
"name": "step_2",
"step_started_details": {}
},
{
"event_type": "StepSucceeded",
"event_timestamp": "2025-12-11T00:32:24.310917+00:00",
"sub_type": "Step",
"event_id": 8,
"operation_id": "6f760b9e9eac89f07ab0223b0f4acb04d1e355d893a1b86a83f4d4b405adee99",
"name": "step_2",
"step_succeeded_details": {
"result": {
"truncated": true
},
"retry_details": {
"current_attempt": 1,
"next_attempt_delay_seconds": 0
}
}
},
{
"event_type": "InvocationCompleted",
"event_timestamp": "2025-12-11T00:32:24.413013+00:00",
"event_id": 9
},
{
"event_type": "ExecutionSucceeded",
"event_timestamp": "2025-12-11T00:32:24.413238+00:00",
"event_id": 10,
"operation_id": "inv-12345678-1234-1234-1234-123456789012",
"name": "execution-name",
"execution_succeeded_details": {
"result": {
"payload": "{\"statusCode\": 200, \"body\": \"Hello from Durable Lambda! (status: 200)\"}",
"truncated": true
}
}
}
]
}
236 changes: 236 additions & 0 deletions examples/scripts/cli_event_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""CLI tool for generating event assertion files from durable function executions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice loading events either via code or via file like so:

Events.load("<<string>>")
Events.load_from_file(path="")

events = new Events()
event = Event.create_execution_started()
events.add(event_one)


This tool runs durable functions locally and captures their execution events
to generate JSON files that can be used for event-based test assertions.

Usage:
python examples/cli_event_generator.py \
--function-module examples.src.hello_world \
--function-name handler \
--input '{"test": "data"}' \
--output examples/events/hello_world_events.json
"""

import argparse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could build this into [project.scripts] dex-local-runner = "aws_durable_execution_sdk_python_testing.cli:main"?

i.e so you can do something like

localrunner invoke --function-name whatever
localrunner get-durable-execution-history --durable-execution-arn whatever-arn

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly

localrunner get-durable-execution-history --durable-execution-arn whatever-arn --output myfile.json

import importlib
import json
import logging
import sys
from pathlib import Path
from typing import Any

# Add src directories to Python path
examples_dir = Path(__file__).parent
src_dir = examples_dir / "src"
main_src_dir = examples_dir.parent / "src"

for path in [str(src_dir), str(main_src_dir)]:
if path not in sys.path:
sys.path.insert(0, path)

from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner


logger = logging.getLogger(__name__)


def setup_logging(verbose: bool = False) -> None:
"""Configure logging for the CLI tool."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(levelname)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)


def import_function(module_name: str, function_name: str) -> Any:
"""Import a function from a module dynamically.

Args:
module_name: Python module path (e.g., 'examples.src.hello_world')
function_name: Function name within the module (e.g., 'handler')

Returns:
The imported function

Raises:
ImportError: If module or function cannot be imported
"""
try:
module = importlib.import_module(module_name)
return getattr(module, function_name)
except ImportError as e:
raise ImportError(f"Failed to import module '{module_name}': {e}") from e
except AttributeError as e:
raise ImportError(
f"Function '{function_name}' not found in module '{module_name}': {e}"
) from e


def serialize_event(event: Any) -> dict:
"""Serialize an Event object to a JSON-serializable dictionary.

Args:
event: Event object to serialize

Returns:
Dictionary representation of the event
"""
# Convert the event to a dictionary, handling datetime objects
event_dict = {}

for field_name, field_value in event.__dict__.items():
if field_value is None:
continue

if hasattr(field_value, "isoformat"): # datetime objects
event_dict[field_name] = field_value.isoformat()
elif hasattr(field_value, "__dict__"): # nested objects
event_dict[field_name] = serialize_nested_object(field_value)
else:
event_dict[field_name] = field_value

return event_dict


def serialize_nested_object(obj: Any) -> dict:
"""Serialize nested objects recursively."""
if obj is None:
return None

result = {}
for field_name, field_value in obj.__dict__.items():
if field_value is None:
continue

if hasattr(field_value, "isoformat"): # datetime objects
result[field_name] = field_value.isoformat()
elif hasattr(field_value, "__dict__"): # nested objects
result[field_name] = serialize_nested_object(field_value)
else:
result[field_name] = field_value

return result


def generate_events_file(
function_module: str,
function_name: str,
input_data: str | None,
output_path: Path,
timeout: int = 60,
) -> None:
"""Generate events file by running the durable function locally.

Args:
function_module: Python module containing the function
function_name: Name of the durable function
input_data: JSON string input for the function
output_path: Path where to save the events JSON file
timeout: Execution timeout in seconds
"""
logger.info(f"Importing function {function_name} from {function_module}")
handler = import_function(function_module, function_name)

logger.info("Running durable function locally...")
with DurableFunctionTestRunner(handler=handler) as runner:
result = runner.run(input=input_data, timeout=timeout)

logger.info(f"Execution completed with status: {result.status}")
logger.info(f"Captured {len(result.events)} events")

# Serialize events to JSON-compatible format
events_data = {"events": [serialize_event(event) for event in result.events]}

# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)

# Write events to JSON file
with open(output_path, "w", encoding="utf-8") as f:
json.dump(events_data, f, indent=2, ensure_ascii=False)

logger.info(f"Events saved to: {output_path}")


def main() -> None:
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
description="Generate event assertion files from durable function executions",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Generate events for hello_world example
python examples/cli_event_generator.py \\
--function-module hello_world \\
--function-name handler \\
--input '"test input"' \\
--output examples/events/hello_world_events.json

# Generate events for a function with complex input
python examples/cli_event_generator.py \\
--function-module step.step_with_retry \\
--function-name handler \\
--input '{"retries": 3, "data": "test"}' \\
--output examples/events/step_with_retry_events.json
""",
)

parser.add_argument(
"--function-module",
required=True,
help="Python module containing the durable function (e.g., 'hello_world' or 'step.step_with_retry')",
)

parser.add_argument(
"--function-name",
required=True,
help="Name of the durable function within the module (e.g., 'handler')",
)

parser.add_argument(
"--input", help="JSON string input for the function (default: None)"
)

parser.add_argument(
"--output",
type=Path,
required=True,
help="Output path for the events JSON file",
)

parser.add_argument(
"--timeout",
type=int,
default=60,
help="Execution timeout in seconds (default: 60)",
)

parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose logging"
)

args = parser.parse_args()

setup_logging(args.verbose)

try:
generate_events_file(
function_module=args.function_module,
function_name=args.function_name,
input_data=args.input,
output_path=args.output,
timeout=args.timeout,
)
logger.info("Event generation completed successfully!")

except Exception as e:
logger.error(f"Event generation failed: {e}")
if args.verbose:
logger.exception("Full traceback:")
sys.exit(1)


if __name__ == "__main__":
main()
Loading
Loading