Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions early-stopping-sf-bedrock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Parallel Execution with Early Stopping pattern with AWS Step Functions and Amazon Bedrock

The **Parallel Agent Early Stopping** pattern uses AWS Step Functions and Amazon Bedrock to run multiple AI agents simultaneously on the same problem, with different approaches, and automatically terminates unnecessary processes once a high-confidence solution is discovered. The workflow coordinates Worker Agents that either retrieve information from the AWS Documentation MCP Server or generate responses using Amazon Bedrock models, while an Evaluation Agent continuously assesses confidence levels and triggers early stopping when a predetermined threshold is met. This design optimizes both performance and cost through parallel exploration, intelligent termination, and resource optimization techniques including agent tiering, token optimization, and Lambda memory tuning.

Learn more about this workflow at Step Functions workflows collection: https://serverlessland.com/workflows/early-stopping-sf-bedrock

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/step-functions-workflows-collection
```
2. Change directory to the pattern directory:
```
cd step-functions-workflows-collection/early-stopping-sf-bedrock
```
3. From the command line, use AWS SAM to deploy the AWS resources for the workflow as specified in the template.yaml file:
```
sam deploy --guided
```
4. During the prompts:
* Enter a stack name
* Enter the desired AWS Region
* Allow SAM CLI to create IAM roles with the required permissions.

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults.

5. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing.

## How it works

1. A user submits a query (eg. What is CloudFront ?), and AWS Step Functions orchestrates 4 parallel agent workflows simultaneously. Each agent uses a different specialized approach (service-specific, architecture patterns, cost optimization, and general overview).

2. Each agent independently retrieves information from the AWS Documentation MCP Server (Context Provider). If relevant information isn't available, agents fall back to Amazon Bedrock models to generate responses. Claude Haiku 3.5 is used in this example for its speed and cost-effectiveness.

3. As results arrive, an Evaluation Agent assesses each response's confidence level. When any agent produces a result exceeding the confidence threshold (typically 0.95), Step Functions automatically terminates the other executing workflows to optimize resources and costs.

4. If no single agent achieves high confidence, a Synthesis Agent combines insights from multiple agents into a comprehensive answer. The final result is returned to the user with processing metadata.


## Image

![image](./resources/MainWorkflow.png)
![image](./resources/WorkerWorkflows.png)
![image](./resources/WorkerExecutionStatus.png)

## Testing

1. Log in to the AWS Step Functions Console
- In the Step Functions dashboard, locate and click on the "MainStateMachine" that was created during deployment

2. Start a New Execution
- Click the "Start execution" button
- In the input field, enter a test query in JSON format:
```json
{"query": "What is CloudFront?"}
```
- Optionally, provide a name for your execution in the "Name" field
- Click "Start execution" to begin the workflow

3. Monitor the Execution
- The console will display a visual representation of your workflow execution
- The main workflow starts execution of 4 worker agents in parallel using a distributed map
- Worker agent execution shows the agent with success, and others as aborted
- Review the main workflow's output in the "Execution output" tab to see the synthesized result

4. Verify Results
- Check that the output includes a "status" field (typically "synthesized")
- Review the "processing_details" section to confirm early stopping functionality
- Verify that responses include attribution to AWS Documentation


## Cleanup

1. Delete the stack
```bash
aws cloudformation delete-stack --stack-name STACK_NAME
```
2. Confirm the stack has been deleted
```bash
aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'STACK_NAME')].StackStatus"
```
----
Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
76 changes: 76 additions & 0 deletions early-stopping-sf-bedrock/example-workflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"title": "Smart Exploration with Early Stopping workflow",
"description": "Runs multiple worker workflows to answer AWS questions, and terminates once high confidence answer is received.",
"language": "Python",
"simplicity": "2 - Pattern",
"usecase": "",
"type": "Standard",
"diagram":"/resources/MainWorkflow.png",
"videoId": "",
"level": "200",
"framework": "SAM",
"services": ["bedrock","stepfunctions", "eventbridge","lambda"],
"introBox": {
"headline": "How it works",
"text": [
"A user submits a query (eg. What is CloudFront ?), and AWS Step Functions orchestrates 4 parallel agent workflows simultaneously. Each agent uses a different specialized approach (service-specific, architecture patterns, cost optimization, and general overview). Response with high confidence is used to terminate rest of the workflows."
]
},
"testing": {
"headline": "Testing",
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"headline": "Cleanup",
"text": [
"1. Delete the stack: <code>sam delete</code>."
]
},
"deploy": {
"text": [
"sam deploy --guided"
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/step-functions-workflows-collection/tree/main/early-stopping-sf-bedrock/",
"templateDir":"early-stopping-sf-bedrock",
"templateFile": "template.yaml",
"ASL": "statemachine/main.asl.json"
},
"payloads": [
{
"headline": "",
"payloadURL": ""
}
]
},
"resources": {
"headline": "Additional resources",
"bullets": [
{
"text": "The AWS Step Functions Workshop",
"link": "https://catalog.workshops.aws/stepfunctions/en-US"
}
]
},
"authors": [
{
"name": "Dave Horne",
"image": "https://d2908q01vomqb2.cloudfront.net/9e6a55b6b4563e652a23be9d623ca5055c356940/2025/04/07/me.jpeg",
"bio": "Dave is a senior Solutions Architect supporting Federal System Integrators at AWS. He is based in Washington, DC, and has 15 years of experience building, modernizing, and integrating systems for public sector customers. Outside of work, Dave enjoys playing with his kids, hiking, and watching Penn State football.",
"linkedin": "davidjhorne",
"twitter": ""
},
{
"name": "Satya Vedamtam",
"image": "",
"bio": "Satya is a senior Solutions Architect supporting US Federal customers at AWS. He is passionate about serverless technologies and everyday is an opportunity to solve customer challenges.",
"linkedin": "svedamtam",
"twitter": ""
}
]
}

138 changes: 138 additions & 0 deletions early-stopping-sf-bedrock/functions/awsdocmcpserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import json
import subprocess
import sys
import os
# import tempfile
import asyncio
import shutil
import glob


# Initialize MCP server at module level (runs once per container)
print("Initializing MCP server...")

# Install packages to persistent location
temp_dir = "/tmp/mcp_packages"
os.makedirs(temp_dir, exist_ok=True)

# Only install if not already installed
if not os.path.exists(os.path.join(temp_dir, "awslabs")):
print(f"Installing packages to {temp_dir}")

# Clear any existing SSL environment variables
ssl_env_vars = ["SSL_CERT_FILE", "REQUESTS_CA_BUNDLE", "CURL_CA_BUNDLE"]
for var in ssl_env_vars:
if var in os.environ:
del os.environ[var]

# Install packages
subprocess.check_call([
sys.executable, "-m", "pip", "install",
"awslabs.aws-documentation-mcp-server",
"certifi",
"--target", temp_dir,
"--trusted-host", "pypi.org",
"--trusted-host", "pypi.python.org",
"--trusted-host", "files.pythonhosted.org"
])

# Add to path
sys.path.insert(0, temp_dir)

# Set environment variables
os.environ["FASTMCP_LOG_LEVEL"] = "ERROR"
os.environ["AWS_DOCUMENTATION_PARTITION"] = "AWS"

# Find and set SSL certificates
cert_pattern = os.path.join(temp_dir, "**/cacert.pem")
cert_files = glob.glob(cert_pattern, recursive=True)

if not cert_files:
# Fallback: try to find any .pem file
pem_pattern = os.path.join(temp_dir, "**/*.pem")
cert_files = glob.glob(pem_pattern, recursive=True)

if cert_files:
# Use the first certificate file found
cert_path = cert_files[0]
print(f"Found certificate file: {cert_path}")

# Copy to persistent location
persistent_cert_path = "/tmp/cacert.pem"
shutil.copy2(cert_path, persistent_cert_path)

# Set SSL environment variables
os.environ["SSL_CERT_FILE"] = persistent_cert_path
os.environ["REQUESTS_CA_BUNDLE"] = persistent_cert_path
os.environ["CURL_CA_BUNDLE"] = persistent_cert_path
print(f"SSL certificates set to: {persistent_cert_path}")
else:
print("Warning: No certificate file found")

print("MCP server initialization complete")

class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if hasattr(obj, '__dict__'):
return obj.__dict__
elif hasattr(obj, 'model_dump'):
return obj.model_dump()
elif hasattr(obj, 'dict'):
return obj.dict()
return super().default(obj)

async def async_handler(event, context):
try:
# Process the request
action = event.get('action')

if action == 'search':
search_phrase = event.get('search_phrase', '')
limit = event.get('limit', 5)

print(f"Searching for: {search_phrase}")

# Import and use MCP server
from awslabs.aws_documentation_mcp_server import server_aws

class MockContext:
pass

mock_ctx = MockContext()
result = await server_aws.search_documentation(mock_ctx, search_phrase=search_phrase, limit=limit)

# Convert to JSON
result_json = json.loads(json.dumps(result, cls=CustomJSONEncoder))

print(f"Search completed successfully")
return {
"output": {
"kind": "text",
"content": json.dumps(result_json)
}
}

else:
return {"error": f"Unsupported action: {action}"}

except Exception as e:
import traceback
error_msg = f"Error: {str(e)}\nTraceback: {traceback.format_exc()}"
print(error_msg)
return {"error": error_msg}

def handler(event, context):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(async_handler(event, context))
return result
finally:
loop.close()
except Exception as e:
import traceback
return {
"error": str(e),
"traceback": traceback.format_exc()
}
Loading