diff --git a/early-stopping-sf-bedrock/README.md b/early-stopping-sf-bedrock/README.md new file mode 100644 index 00000000..69d2d4c1 --- /dev/null +++ b/early-stopping-sf-bedrock/README.md @@ -0,0 +1,95 @@ +# Parallel Execution with Early Stopping pattern with AWS Step Functions and Amazon Bedrock + +The **Parallel Agent Early Stopping** pattern uses AWS Step Functions and Amazon Bedrock to run multiple AI agents simultaneously on the same problem, with different approaches, and automatically terminates unnecessary processes once a high-confidence solution is discovered. The workflow coordinates Worker Agents that either retrieve information from the AWS Documentation MCP Server or generate responses using Amazon Bedrock models, while an Evaluation Agent continuously assesses confidence levels and triggers early stopping when a predetermined threshold is met. This design optimizes both performance and cost through parallel exploration, intelligent termination, and resource optimization techniques including agent tiering, token optimization, and Lambda memory tuning. + +Learn more about this workflow at Step Functions workflows collection: https://serverlessland.com/workflows/early-stopping-sf-bedrock + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/step-functions-workflows-collection + ``` +2. Change directory to the pattern directory: + ``` + cd step-functions-workflows-collection/early-stopping-sf-bedrock + ``` +3. From the command line, use AWS SAM to deploy the AWS resources for the workflow as specified in the template.yaml file: + ``` + sam deploy --guided + ``` +4. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + +5. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +1. A user submits a query (eg. What is CloudFront ?), and AWS Step Functions orchestrates 4 parallel agent workflows simultaneously. Each agent uses a different specialized approach (service-specific, architecture patterns, cost optimization, and general overview). + +2. Each agent independently retrieves information from the AWS Documentation MCP Server (Context Provider). If relevant information isn't available, agents fall back to Amazon Bedrock models to generate responses. Claude Haiku 3.5 is used in this example for its speed and cost-effectiveness. + +3. As results arrive, an Evaluation Agent assesses each response's confidence level. When any agent produces a result exceeding the confidence threshold (typically 0.95), Step Functions automatically terminates the other executing workflows to optimize resources and costs. + +4. If no single agent achieves high confidence, a Synthesis Agent combines insights from multiple agents into a comprehensive answer. The final result is returned to the user with processing metadata. + + +## Image + +![image](./resources/MainWorkflow.png) +![image](./resources/WorkerWorkflows.png) +![image](./resources/WorkerExecutionStatus.png) + +## Testing + +1. Log in to the AWS Step Functions Console + - In the Step Functions dashboard, locate and click on the "MainStateMachine" that was created during deployment + +2. Start a New Execution + - Click the "Start execution" button + - In the input field, enter a test query in JSON format: + ```json + {"query": "What is CloudFront?"} + ``` + - Optionally, provide a name for your execution in the "Name" field + - Click "Start execution" to begin the workflow + +3. Monitor the Execution + - The console will display a visual representation of your workflow execution + - The main workflow starts execution of 4 worker agents in parallel using a distributed map + - Worker agent execution shows the agent with success, and others as aborted + - Review the main workflow's output in the "Execution output" tab to see the synthesized result + +4. Verify Results + - Check that the output includes a "status" field (typically "synthesized") + - Review the "processing_details" section to confirm early stopping functionality + - Verify that responses include attribution to AWS Documentation + + +## Cleanup + +1. Delete the stack + ```bash + aws cloudformation delete-stack --stack-name STACK_NAME + ``` +2. Confirm the stack has been deleted + ```bash + aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'STACK_NAME')].StackStatus" + ``` +---- +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/early-stopping-sf-bedrock/example-workflow.json b/early-stopping-sf-bedrock/example-workflow.json new file mode 100644 index 00000000..e28058d1 --- /dev/null +++ b/early-stopping-sf-bedrock/example-workflow.json @@ -0,0 +1,76 @@ +{ + "title": "Smart Exploration with Early Stopping workflow", + "description": "Runs multiple worker workflows to answer AWS questions, and terminates once high confidence answer is received.", + "language": "Python", + "simplicity": "2 - Pattern", + "usecase": "", + "type": "Standard", + "diagram":"/resources/MainWorkflow.png", + "videoId": "", + "level": "200", + "framework": "SAM", + "services": ["bedrock","stepfunctions", "eventbridge","lambda"], + "introBox": { + "headline": "How it works", + "text": [ + "A user submits a query (eg. What is CloudFront ?), and AWS Step Functions orchestrates 4 parallel agent workflows simultaneously. Each agent uses a different specialized approach (service-specific, architecture patterns, cost optimization, and general overview). Response with high confidence is used to terminate rest of the workflows." + ] + }, + "testing": { + "headline": "Testing", + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "headline": "Cleanup", + "text": [ + "1. Delete the stack: sam delete." + ] + }, + "deploy": { + "text": [ + "sam deploy --guided" + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/step-functions-workflows-collection/tree/main/early-stopping-sf-bedrock/", + "templateDir":"early-stopping-sf-bedrock", + "templateFile": "template.yaml", + "ASL": "statemachine/main.asl.json" + }, + "payloads": [ + { + "headline": "", + "payloadURL": "" + } + ] + }, + "resources": { + "headline": "Additional resources", + "bullets": [ + { + "text": "The AWS Step Functions Workshop", + "link": "https://catalog.workshops.aws/stepfunctions/en-US" + } + ] + }, + "authors": [ + { + "name": "Dave Horne", + "image": "https://d2908q01vomqb2.cloudfront.net/9e6a55b6b4563e652a23be9d623ca5055c356940/2025/04/07/me.jpeg", + "bio": "Dave is a senior Solutions Architect supporting Federal System Integrators at AWS. He is based in Washington, DC, and has 15 years of experience building, modernizing, and integrating systems for public sector customers. Outside of work, Dave enjoys playing with his kids, hiking, and watching Penn State football.", + "linkedin": "davidjhorne", + "twitter": "" + }, + { + "name": "Satya Vedamtam", + "image": "", + "bio": "Satya is a senior Solutions Architect supporting US Federal customers at AWS. He is passionate about serverless technologies and everyday is an opportunity to solve customer challenges.", + "linkedin": "svedamtam", + "twitter": "" + } + ] + } + \ No newline at end of file diff --git a/early-stopping-sf-bedrock/functions/awsdocmcpserver.py b/early-stopping-sf-bedrock/functions/awsdocmcpserver.py new file mode 100644 index 00000000..ca8de6ff --- /dev/null +++ b/early-stopping-sf-bedrock/functions/awsdocmcpserver.py @@ -0,0 +1,138 @@ +import json +import subprocess +import sys +import os +# import tempfile +import asyncio +import shutil +import glob + + +# Initialize MCP server at module level (runs once per container) +print("Initializing MCP server...") + +# Install packages to persistent location +temp_dir = "/tmp/mcp_packages" +os.makedirs(temp_dir, exist_ok=True) + +# Only install if not already installed +if not os.path.exists(os.path.join(temp_dir, "awslabs")): + print(f"Installing packages to {temp_dir}") + + # Clear any existing SSL environment variables + ssl_env_vars = ["SSL_CERT_FILE", "REQUESTS_CA_BUNDLE", "CURL_CA_BUNDLE"] + for var in ssl_env_vars: + if var in os.environ: + del os.environ[var] + + # Install packages + subprocess.check_call([ + sys.executable, "-m", "pip", "install", + "awslabs.aws-documentation-mcp-server", + "certifi", + "--target", temp_dir, + "--trusted-host", "pypi.org", + "--trusted-host", "pypi.python.org", + "--trusted-host", "files.pythonhosted.org" + ]) + +# Add to path +sys.path.insert(0, temp_dir) + +# Set environment variables +os.environ["FASTMCP_LOG_LEVEL"] = "ERROR" +os.environ["AWS_DOCUMENTATION_PARTITION"] = "AWS" + +# Find and set SSL certificates +cert_pattern = os.path.join(temp_dir, "**/cacert.pem") +cert_files = glob.glob(cert_pattern, recursive=True) + +if not cert_files: + # Fallback: try to find any .pem file + pem_pattern = os.path.join(temp_dir, "**/*.pem") + cert_files = glob.glob(pem_pattern, recursive=True) + +if cert_files: + # Use the first certificate file found + cert_path = cert_files[0] + print(f"Found certificate file: {cert_path}") + + # Copy to persistent location + persistent_cert_path = "/tmp/cacert.pem" + shutil.copy2(cert_path, persistent_cert_path) + + # Set SSL environment variables + os.environ["SSL_CERT_FILE"] = persistent_cert_path + os.environ["REQUESTS_CA_BUNDLE"] = persistent_cert_path + os.environ["CURL_CA_BUNDLE"] = persistent_cert_path + print(f"SSL certificates set to: {persistent_cert_path}") +else: + print("Warning: No certificate file found") + +print("MCP server initialization complete") + +class CustomJSONEncoder(json.JSONEncoder): + def default(self, obj): + if hasattr(obj, '__dict__'): + return obj.__dict__ + elif hasattr(obj, 'model_dump'): + return obj.model_dump() + elif hasattr(obj, 'dict'): + return obj.dict() + return super().default(obj) + +async def async_handler(event, context): + try: + # Process the request + action = event.get('action') + + if action == 'search': + search_phrase = event.get('search_phrase', '') + limit = event.get('limit', 5) + + print(f"Searching for: {search_phrase}") + + # Import and use MCP server + from awslabs.aws_documentation_mcp_server import server_aws + + class MockContext: + pass + + mock_ctx = MockContext() + result = await server_aws.search_documentation(mock_ctx, search_phrase=search_phrase, limit=limit) + + # Convert to JSON + result_json = json.loads(json.dumps(result, cls=CustomJSONEncoder)) + + print(f"Search completed successfully") + return { + "output": { + "kind": "text", + "content": json.dumps(result_json) + } + } + + else: + return {"error": f"Unsupported action: {action}"} + + except Exception as e: + import traceback + error_msg = f"Error: {str(e)}\nTraceback: {traceback.format_exc()}" + print(error_msg) + return {"error": error_msg} + +def handler(event, context): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(async_handler(event, context)) + return result + finally: + loop.close() + except Exception as e: + import traceback + return { + "error": str(e), + "traceback": traceback.format_exc() + } diff --git a/early-stopping-sf-bedrock/functions/evaluationfn.py b/early-stopping-sf-bedrock/functions/evaluationfn.py new file mode 100644 index 00000000..67bb668d --- /dev/null +++ b/early-stopping-sf-bedrock/functions/evaluationfn.py @@ -0,0 +1,103 @@ +import json +import os +import boto3 + +def lambda_handler(event, context): + # Extract the confidence threshold from environment variables + confidence_threshold = float(os.environ.get('CONFIDENCE_THRESHOLD', 0.8)) + + # Get the result from the worker agent + agent_result = event + confidence_score = agent_result.get('confidence_score', 0) + + # Additional AWS-specific evaluation criteria + aws_service_completeness = evaluate_service_completeness( + agent_result.get('query', ''), + agent_result.get('result', ''), + agent_result.get('services_referenced', []) + ) + + has_implementation_steps = 'steps' in agent_result.get('result', '').lower() or \ + 'step 1' in agent_result.get('result', '').lower() + + # Adjust confidence score based on AWS-specific criteria + adjusted_score = confidence_score + if aws_service_completeness > 0.8: + adjusted_score += 0.1 + if has_implementation_steps: + adjusted_score += 0.05 + + adjusted_score = min(adjusted_score, 1.0) # Cap at 1.0 + + # Determine if the confidence threshold is met + threshold_met = adjusted_score >= confidence_threshold + + # If threshold is met, we will stop other executions + if threshold_met: + # In production, extract and stop the parent Map execution + execution_arn = os.environ.get('AWS_LAMBDA_FUNCTION_NAME', 'unknown') + stop_other_executions(execution_arn) + + return { + 'result': agent_result['result'], + 'confidence_score': adjusted_score, + 'original_confidence': confidence_score, + 'threshold_met': threshold_met, + 'agent_id': agent_result.get('agent_id', 'unknown'), + 'aws_service_completeness': aws_service_completeness, + 'has_implementation_steps': has_implementation_steps + } + +def evaluate_service_completeness(query, result, services_referenced): + """Evaluate if the result references appropriate AWS services for the query""" + # Simple heuristic - could be enhanced with ML-based evaluation + compute_keywords = ['compute', 'server', 'instance', 'container', 'serverless'] + storage_keywords = ['storage', 'file', 'object', 'backup'] + database_keywords = ['database', 'table', 'query', 'record', 'data store'] + network_keywords = ['network', 'vpc', 'subnet', 'traffic', 'routing'] + + expected_services = [] + + # Determine expected service types based on query + query_lower = query.lower() + if any(keyword in query_lower for keyword in compute_keywords): + expected_services.extend(['EC2', 'Lambda', 'ECS', 'EKS', 'Fargate']) + if any(keyword in query_lower for keyword in storage_keywords): + expected_services.extend(['S3', 'EBS', 'EFS', 'FSx', 'Storage Gateway']) + if any(keyword in query_lower for keyword in database_keywords): + expected_services.extend(['RDS', 'DynamoDB', 'Aurora', 'DocumentDB', 'Neptune']) + if any(keyword in query_lower for keyword in network_keywords): + expected_services.extend(['VPC', 'Route 53', 'CloudFront', 'API Gateway']) + + # If no specific service type is identified, return medium score + if not expected_services: + return 0.7 + + # Calculate overlap between expected and referenced services + service_overlap = len(set([s.upper() for s in services_referenced]) & + set([s.upper() for s in expected_services])) + + if service_overlap == 0: + return 0.5 + + # Calculate completeness score + return min(1.0, service_overlap / (len(expected_services) * 0.7)) + +def stop_other_executions(execution_arn): + """Stop other parallel executions when threshold is met""" + # In a real implementation, you would: + # 1. Extract the execution ID from context + # 2. Use Step Functions API to stop the Map state executions + + # Example implementation with AWS SDK: + try: + # This is a simplified example - in production, you'd need to + # properly extract the execution ARN from the context + sfn_client = boto3.client('stepfunctions') + sfn_client.stop_execution( + executionArn=execution_arn, + cause="Confidence threshold met by another agent" + ) + print(f"Successfully stopped execution: {execution_arn}") + except Exception as e: + print(f"Error stopping execution: {str(e)}") diff --git a/early-stopping-sf-bedrock/functions/synthesis.py b/early-stopping-sf-bedrock/functions/synthesis.py new file mode 100644 index 00000000..d617af37 --- /dev/null +++ b/early-stopping-sf-bedrock/functions/synthesis.py @@ -0,0 +1,122 @@ +import json +import boto3 +import os +import traceback + +def lambda_handler(event, context): + print(f"Received event: {json.dumps(event)}") # Log the input for debugging + + try: + # Initialize Bedrock client + bedrock_runtime = boto3.client('bedrock-runtime') + + # Extract the original query from the event + original_query = event.get('original_query', 'How to implement on AWS?') + + # Extract results from multiple agents + agent_results = event.get('agent_results', []) + + # Consolidate mentioned AWS services across all agents + all_services = set() + for result in agent_results: + if isinstance(result, dict): + services = result.get('services_referenced', []) + if isinstance(services, list): + all_services.update(services) + + # Prepare a synthesis prompt + results_text = "" + for i, result in enumerate(agent_results): + if isinstance(result, dict): + approach = result.get('approach', 'general') + confidence = result.get('confidence_score', 'unknown') + result_text = result.get('result', '') + if not result_text and 'body' in result: + # Try to extract from body if result is empty + try: + body = result['body'] + if body.startswith('"') and body.endswith('"'): + body = body[1:-1] # Remove quotes + result_text = body + except: + result_text = "No result available" + + results_text += f"Agent {i+1} ({approach}, confidence: {confidence}):\n" + results_text += f"{result_text}\n\n" + + synthesis_prompt = f"""You are an AWS solutions architect synthesizer. + The original question was: "{original_query}" + + Below are different perspectives from multiple AWS experts on how to implement this solution. + Please synthesize these results into a comprehensive answer. + + {results_text} + + Provide a synthesized solution that: + 1. Begins with a clear architectural overview + 2. Includes the most appropriate AWS services from all responses + 3. Provides step-by-step implementation guidance + 4. Highlights best practices and security considerations + 5. Mentions cost optimization strategies + + Focus on creating actionable guidance that an AWS customer could follow. + """ + + # Invoke Bedrock for synthesis + response = bedrock_runtime.invoke_model( + modelId=os.environ.get('MODEL_ID', 'us.anthropic.claude-3-5-haiku-20241022-v1:0'), + body=json.dumps({ + "anthropic_version": "bedrock-2023-05-31", + "max_tokens": 3000, + "messages": [ + { + "role": "user", + "content": synthesis_prompt + } + ], + "temperature": 0.3, + "top_p": 0.9 + }) + ) + + # Parse response + response_body = json.loads(response['body'].read().decode('utf-8')) + synthesized_result = response_body.get('content', [{}])[0].get('text', '') + + # Extract AWS services mentioned in the synthesized result + services_mentioned = extract_aws_services(synthesized_result) + if not services_mentioned: + services_mentioned = list(all_services) + + return { + 'synthesized_result': synthesized_result, + 'source_count': len(agent_results), + 'services_mentioned': services_mentioned, + 'original_query': original_query + } + except Exception as e: + print(f"Error in lambda_handler: {str(e)}") + print(traceback.format_exc()) + # Fallback response if anything fails + return { + 'synthesized_result': f"I encountered an error while processing your request about {original_query if 'original_query' in locals() else 'your query'}. Please try again.", + 'source_count': 0, + 'services_mentioned': [], + 'original_query': original_query if 'original_query' in locals() else "Unknown query" + } + +def extract_aws_services(text): + """Extract mentioned AWS services from the response""" + aws_services = [ + "EC2", "S3", "Lambda", "DynamoDB", "RDS", "Aurora", "ECS", "EKS", + "SQS", "SNS", "API Gateway", "CloudFormation", "CloudFront", "Route 53", + "VPC", "IAM", "CloudWatch", "Step Functions", "EventBridge", "Cognito", + "Kinesis", "Glue", "Athena", "Redshift", "EMR", "SageMaker", "Bedrock" + ] + + found_services = [] + for service in aws_services: + if service in text or service.lower() in text.lower(): + found_services.append(service) + + return found_services diff --git a/early-stopping-sf-bedrock/functions/workeragent.py b/early-stopping-sf-bedrock/functions/workeragent.py new file mode 100644 index 00000000..d4d9b14a --- /dev/null +++ b/early-stopping-sf-bedrock/functions/workeragent.py @@ -0,0 +1,237 @@ +import json +import os +import boto3 +import time +import traceback + +# Initialize Bedrock client +bedrock_runtime = boto3.client('bedrock-runtime') + +def lambda_handler(event, context): + # Extract the AWS question + query = event['query'] + agent_id = event.get('agent_id', 'default') + approach = event.get('approach', 'general') + + print(f"Processing query: {query} with approach: {approach}") + + # First, try to get answer from MCP server with specialized search + mcp_info = query_mcp_server(query, approach) + + if mcp_info: + # Use MCP server response directly but format based on approach + print("Using MCP server response directly") + + # Format the MCP response based on agent's approach + if approach == 'service_specific': + result = f"Based on AWS Documentation (Service Implementation Focus):\n\n{mcp_info}" + elif approach == 'architecture_patterns': + result = f"Based on AWS Documentation (Architecture & Design Focus):\n\n{mcp_info}" + elif approach == 'cost_optimization': + result = f"Based on AWS Documentation (Cost & Pricing Focus):\n\n{mcp_info}" + else: + result = f"Based on AWS Documentation (General Overview):\n\n{mcp_info}" + + confidence_score = 0.95 # High confidence for official documentation + + return { + 'agent_id': agent_id, + 'approach': approach, + 'query': query, + 'result': result, + 'confidence_score': confidence_score, + 'processing_time': time.time() - context.get_remaining_time_in_millis()/1000, + 'services_referenced': extract_aws_services(result), + 'source': 'mcp_server', + 'lambda_execution_arn': context.aws_request_id + } + + # Fallback to Bedrock if MCP server doesn't have the information + print("MCP server didn't provide results, falling back to Bedrock") + + # Create a customized prompt based on the agent's assigned approach + if approach == 'service_specific': + prompt = f"""You are an AWS service specialist. + Provide a step-by-step guide on how to accomplish this task on AWS: + {query} + + Focus on the specific AWS services needed, their configurations, and how they interact. + Include console instructions and any relevant CLI commands or CloudFormation snippets. + Include confidence score from 0-1 at the end of your response. + """ + elif approach == 'architecture_patterns': + prompt = f"""You are an AWS solutions architect. + Provide an architectural solution for this AWS implementation question: + {query} + + Focus on best practice architectural patterns, service selection rationale, and design considerations. + Include a logical architecture description and service interactions. + Include confidence score from 0-1 at the end of your response. + """ + elif approach == 'cost_optimization': + prompt = f"""You are an AWS cost optimization specialist. + For the following AWS implementation question: + {query} + + Provide a cost-effective approach, pricing considerations, and optimization strategies. + Include relevant pricing models, reserved capacity options, and cost comparison of alternatives. + Include confidence score from 0-1 at the end of your response. + """ + else: + prompt = f"""You are an AWS technical expert. + Provide a comprehensive answer to the following AWS implementation question: + {query} + + Include relevant AWS services, implementation steps, and best practices. + Include confidence score from 0-1 at the end of your response. + """ + + try: + # Invoke Bedrock model + model_id = os.environ.get('MODEL_ID', 'us.anthropic.claude-3-5-haiku-20241022-v1:0') + print(f"Invoking Bedrock model: {model_id}") + + response = bedrock_runtime.invoke_model( + modelId=model_id, + body=json.dumps({ + "anthropic_version": "bedrock-2023-05-31", + "max_tokens": 2000, + "messages": [ + { + "role": "user", + "content": prompt + } + ], + "temperature": 0.2, + "top_p": 0.9 + }) + ) + + # Parse response + response_body = json.loads(response['body'].read().decode('utf-8')) + result = response_body.get('content', [{}])[0].get('text', '') + + # Extract confidence score + try: + if 'confidence score:' in result.lower(): + confidence_parts = result.lower().split('confidence score:') + confidence_score = float(confidence_parts[1].strip()) + else: + import re + matches = re.findall(r'confidence(?:\s*(?:score|level|rating)?)?(?:\s*[:=]\s*)?([0-9]*\.?[0-9]+)', + result.lower()) + confidence_score = float(matches[-1]) if matches else 0.7 + except Exception: + confidence_score = 0.7 + + return { + 'agent_id': agent_id, + 'approach': approach, + 'query': query, + 'result': result, + 'confidence_score': confidence_score, + 'processing_time': time.time() - context.get_remaining_time_in_millis()/1000, + 'services_referenced': extract_aws_services(result), + 'source': 'bedrock', + 'lambda_execution_arn': context.aws_request_id + } + + except Exception as e: + print(f"Error invoking Bedrock: {str(e)}") + return { + 'agent_id': agent_id, + 'approach': approach, + 'query': query, + 'result': f"Error processing query: {query}. Please try again.", + 'confidence_score': 0.1, + 'processing_time': time.time() - context.get_remaining_time_in_millis()/1000, + 'services_referenced': [], + 'source': 'error', + 'lambda_execution_arn': context.aws_request_id + } + +def query_mcp_server(query, approach): + """Use the AWS Documentation MCP Server Lambda with approach-specific search strategies""" + try: + print(f"Querying MCP server for: {query} with approach: {approach}") + + # Create approach-specific search phrases + if approach == 'service_specific': + # Focus on implementation, configuration, setup + search_phrase = f"{query} implementation configuration setup guide" + elif approach == 'architecture_patterns': + # Focus on architecture, design, patterns, best practices + search_phrase = f"{query} architecture design patterns best practices" + elif approach == 'cost_optimization': + # Focus on pricing, cost, billing, optimization + search_phrase = f"{query} pricing cost optimization billing" + else: + # General search + search_phrase = query + + print(f"Using specialized search phrase: {search_phrase}") + + # Create a Lambda client + lambda_client = boto3.client('lambda') + + # Create the MCP request with specialized search + mcp_request = { + "action": "search", + "search_phrase": search_phrase, + "limit": 5 + } + + # Invoke the MCP server Lambda + response = lambda_client.invoke( + FunctionName=os.environ.get('MCP_FUNCTION_NAME', 'AWSMCPServerFunction'), + InvocationType='RequestResponse', + Payload=json.dumps(mcp_request) + ) + + # Parse the response + payload = json.loads(response['Payload'].read().decode('utf-8')) + print(f"MCP server response received for {approach} approach") + + # Check for errors + if 'error' in payload: + print(f"Error from MCP server: {payload['error']}") + return "" + + # Process the search results + if 'output' in payload and 'content' in payload['output']: + content = json.loads(payload['output']['content']) + + # Extract the top search results + results = [] + for item in content: + if isinstance(item, dict) and 'url' in item and 'title' in item and 'context' in item: + results.append(f"# {item['title']}\n\n{item['context']}\n\nSource: {item['url']}") + + # If we have results, return them + if results: + print(f"Found {len(results)} MCP results for {approach} approach") + return "\n\n".join(results) + + print(f"No MCP content found for {approach} approach") + return "" + + except Exception as e: + print(f"Error querying MCP server: {str(e)}") + print(traceback.format_exc()) + return "" + +def extract_aws_services(text): + """Extract mentioned AWS services from the response""" + aws_services = [ + "EC2", "S3", "Lambda", "DynamoDB", "RDS", "Aurora", "ECS", "EKS", + "SQS", "SNS", "API Gateway", "CloudFormation", "CloudFront", "Route 53", + "VPC", "IAM", "CloudWatch", "Step Functions", "EventBridge", "Cognito", + "Kinesis", "Glue", "Athena", "Redshift", "EMR", "SageMaker" + ] + + found_services = [] + for service in aws_services: + if service in text or service.lower() in text.lower(): + found_services.append(service) + + return found_services diff --git a/early-stopping-sf-bedrock/resources/MainWorkflow.png b/early-stopping-sf-bedrock/resources/MainWorkflow.png new file mode 100644 index 00000000..68bada98 Binary files /dev/null and b/early-stopping-sf-bedrock/resources/MainWorkflow.png differ diff --git a/early-stopping-sf-bedrock/resources/WorkerExecutionStatus.png b/early-stopping-sf-bedrock/resources/WorkerExecutionStatus.png new file mode 100644 index 00000000..588b96fe Binary files /dev/null and b/early-stopping-sf-bedrock/resources/WorkerExecutionStatus.png differ diff --git a/early-stopping-sf-bedrock/resources/WorkerWorkflows.png b/early-stopping-sf-bedrock/resources/WorkerWorkflows.png new file mode 100644 index 00000000..46191126 Binary files /dev/null and b/early-stopping-sf-bedrock/resources/WorkerWorkflows.png differ diff --git a/early-stopping-sf-bedrock/statemachine/main.asl.json b/early-stopping-sf-bedrock/statemachine/main.asl.json new file mode 100644 index 00000000..6bf0beef --- /dev/null +++ b/early-stopping-sf-bedrock/statemachine/main.asl.json @@ -0,0 +1,158 @@ +{ + "Comment": "Main Step Function for Job Processing - Coordinates multiple workers to generate random numbers until reaching a target sum", + "StartAt": "ParseInput", + "States": { + "ParseInput": { + "Comment": "Handles both SQS message format and direct JSON input", + "Type": "Choice", + "Choices": [ + { + "Variable": "$.query", + "IsPresent": true, + "Next": "Parse Direct Input" + } + ], + "Default": "Parse SQS Input" + }, + "Parse SQS Input": { + "Comment": "Extracts and parses JSON from SQS message body", + "Type": "Pass", + "InputPath": "$[0].body", + "Parameters": { + "parsed": { + "input.$": "States.StringToJson($)" + } + }, + "Next": "Initialize" + }, + "Parse Direct Input": { + "Comment": "Passes through direct JSON input", + "Type": "Pass", + "Parameters": { + "parsed": { + "input.$": "$" + } + }, + "Next": "Initialize" + }, + "Initialize": { + "Comment": "Sets up initial state with query and 4 workers using different approaches", + "Type": "Pass", + "Parameters": { + "query.$": "$.parsed.input.query", + "approaches": [ + { + "workerId": 1, + "approach": "general" + }, + { + "workerId": 2, + "approach": "service_specific" + }, + { + "workerId": 3, + "approach": "architecture_patterns" + }, + { + "workerId": 4, + "approach": "cost_optimization" + } + ] + }, + "Next": "Launch Workers" + }, + "Launch Workers": { + "Comment": "Starts worker state machine with task token. First worker to complete advances the workflow.", + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.waitForTaskToken", + "Parameters": { + "StateMachineArn": "${WorkerStateMachine}", + "Input": { + "workers.$": "$.approaches", + "query.$": "$.query", + "executionName.$": "$$.Execution.Name", + "taskToken.$": "$$.Task.Token" + }, + "Name.$": "$$.Execution.Name" + }, + "TimeoutSeconds": 600, + "Catch": [ + { + "ErrorEquals": ["States.Timeout", "States.TaskFailed"], + "Next": "All Workers Failed" + } + ], + "ResultPath": "$.workerResult", + "Next": "Clean Up Workers" + }, + "All Workers Failed": { + "Comment": "Handles case when all workers fail or timeout", + "Type": "Fail", + "Cause": "All workers failed or timed out", + "Error": "AllWorkersFailed" + }, + "Clean Up Workers": { + "Comment": "Stops the worker state machine execution after first result", + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:sfn:stopExecution", + "Parameters": { + "ExecutionArn.$": "$.workerResult.executionArn", + "Cause": "Successfully completed - First worker returned valid result", + "Error": "SUCCESS" + }, + "ResultPath": null, + "Next": "Update State" + }, + "Update State": { + "Comment": "Extracts successful worker result", + "Type": "Pass", + "Parameters": { + "query.$": "$.query", + "approaches.$": "$.approaches", + "result.$": "$.workerResult.result", + "confidence_score.$": "$.workerResult.confidence_score", + "agent_id.$": "$.workerResult.agent_id" + }, + "Next": "Generate Synthesis" + }, + "Generate Synthesis": { + "Comment": "Generates final synthesis output with early termination details", + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${SynthesisFunction}", + "Payload": { + "original_query.$": "$.query", + "agent_results": [ + { + "result.$": "$.result", + "confidence_score.$": "$.confidence_score", + "agent_id.$": "$.agent_id" + } + ] + } + }, + "ResultPath": "$.synthesis", + "Next": "Format Final Output" + }, + "Format Final Output": { + "Comment": "Formats the final JSON output with all required fields", + "Type": "Pass", + "Parameters": { + "early_termination": true, + "original_query.$": "$.query", + "winning_approach.$": "$.approaches[?(@.workerId == $.agent_id)].approach", + "confidence_score.$": "$.confidence_score", + "result.$": "$.result", + "synthesized_result.$": "$.synthesis.Payload.synthesized_result", + "agent_id.$": "$.agent_id" + }, + "Next": "Success" + }, + "Success": { + "Comment": "Final state - returns text obtained AWS documentation server", + "Type": "Succeed", + "OutputPath": "$" + } + } +} \ No newline at end of file diff --git a/early-stopping-sf-bedrock/statemachine/worker.asl.json b/early-stopping-sf-bedrock/statemachine/worker.asl.json new file mode 100644 index 00000000..235ed2f2 --- /dev/null +++ b/early-stopping-sf-bedrock/statemachine/worker.asl.json @@ -0,0 +1,93 @@ +{ + "Comment": "Worker Step Function that generates random numbers between 1-10 and reports back to main state machine", + "StartAt": "Initialize Workers", + "States": { + "Initialize Workers": { + "Comment": "Map state to handle multiple workers in parallel", + "Type": "Map", + "ItemsPath": "$.workers", + "Parameters": { + "taskToken.$": "$.taskToken", + "executionName.$": "$.executionName", + "query.$": "$.query", + "workerId.$": "$$.Map.Item.Value.workerId", + "approach.$": "$$.Map.Item.Value.approach" + }, + "Iterator": { + "StartAt": "Call Worker Agent", + "States": { + "Call Worker Agent": { + "Comment": "Invokes WorkerAgent to query AWS MCP server", + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload": { + "query.$": "$.query", + "agent_id.$": "$.workerId", + "approach.$": "$.approach" + }, + "FunctionName": "${WorkerAgentFunction}" + }, + "ResultPath": "$.agentResult", + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Next": "Worker Error" + } + ], + "Next": "Evaluate Result" + }, + "Worker Error": { + "Comment": "Handles worker agent errors and ends workflow", + "Type": "Fail", + "Cause": "Worker agent execution failed", + "Error": "WorkerAgentError" + }, + "Evaluate Result": { + "Comment": "Evaluates if the agent result meets confidence threshold", + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload.$": "$.agentResult.Payload", + "FunctionName": "${EvaluationFunction}" + }, + "ResultPath": "$.evaluation", + "Next": "Check Confidence Threshold" + }, + "Check Confidence Threshold": { + "Comment": "Checks if confidence threshold is met", + "Type": "Choice", + "Choices": [ + { + "Variable": "$.evaluation.Payload.threshold_met", + "BooleanEquals": true, + "Next": "Report Success" + } + ], + "Default": "Worker Error" + }, + "Report Success": { + "Comment": "Reports back to main state machine with generated number using task token", + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess", + "Parameters": { + "TaskToken.$": "$.taskToken", + "Output": { + "result.$": "$.evaluation.Payload.result", + "confidence_score.$": "$.evaluation.Payload.confidence_score", + "agent_id.$": "$.evaluation.Payload.agent_id", + "workerId.$": "$.workerId", + "executionArn.$": "$$.Execution.Id", + "executionName.$": "$.executionName", + "query.$": "$.query", + "approach.$": "$.approach" + } + }, + "End": true + } + } + }, + "End": true + } + } +} \ No newline at end of file diff --git a/early-stopping-sf-bedrock/template.yaml b/early-stopping-sf-bedrock/template.yaml new file mode 100644 index 00000000..2f25eb0f --- /dev/null +++ b/early-stopping-sf-bedrock/template.yaml @@ -0,0 +1,206 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: SAM Template for Worker Job Processing + +Resources: + # AWS MCP Server Lambda Function (First Deployment) + AWSMCPServerFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/ + Handler: awsdocmcpserver.handler + Runtime: python3.14 + Timeout: 300 + MemorySize: 512 + Environment: + Variables: + FASTMCP_LOG_LEVEL: ERROR + AWS_DOCUMENTATION_PARTITION: AWS + + # IAM Role for invoking MCP Server Lambda + MCPServerInvokeRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub ${AWS::StackName}-MCPServerInvokeRole + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + - states.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: InvokeMCPServerPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !GetAtt AWSMCPServerFunction.Arn + + # SQS Queue + JobQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: JobQueue + VisibilityTimeout: 180 # 3 minutes, adjust as needed + + # EventBridge Pipe + SQSToStateMachinePipe: + Type: AWS::Pipes::Pipe + DependsOn: + - SQSToStateMachinePipeRole + - MainStateMachine + Properties: + Name: pipe-sqs-to-sfn + RoleArn: !GetAtt SQSToStateMachinePipeRole.Arn + Source: !GetAtt JobQueue.Arn + Target: !Ref MainStateMachine + SourceParameters: + SqsQueueParameters: + BatchSize: 1 + TargetParameters: + StepFunctionStateMachineParameters: + InvocationType: FIRE_AND_FORGET + + # IAM Role for EventBridge Pipe + SQSToStateMachinePipeRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: pipes.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: SQSAccess + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - sqs:ReceiveMessage + - sqs:DeleteMessage + - sqs:GetQueueAttributes + Resource: !GetAtt JobQueue.Arn + - PolicyName: StepFunctionAccess + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: states:StartExecution + Resource: !Sub arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${AWS::StackName}-MainStateMachine + + WorkerAgentFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/ + Handler: workeragent.lambda_handler + Runtime: python3.14 + Timeout: 60 + MemorySize: 256 + Environment: + Variables: + MODEL_ID: us.anthropic.claude-3-5-haiku-20241022-v1:0 + MCP_FUNCTION_NAME: !Ref AWSMCPServerFunction + Policies: + - Statement: + - Effect: Allow + Action: + - bedrock:InvokeModel + - lambda:InvokeFunction + Resource: + - "arn:aws:bedrock:*::foundation-model/us.anthropic.claude-3-5-haiku-20241022-v1:0" + - !GetAtt AWSMCPServerFunction.Arn + + EvaluationFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/ + Handler: evaluationfn.lambda_handler + Runtime: python3.14 + Timeout: 30 + MemorySize: 128 + Environment: + Variables: + CONFIDENCE_THRESHOLD: "0.8" + + SynthesisFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/ + Handler: synthesis.lambda_handler + Runtime: python3.14 + Timeout: 60 + MemorySize: 256 + Environment: + Variables: + MODEL_ID: us.anthropic.claude-3-5-haiku-20241022-v1:0 + Policies: + - Statement: + - Effect: Allow + Action: + - bedrock:InvokeModel + Resource: "arn:aws:bedrock:*::foundation-model/us.anthropic.claude-3-5-haiku-20241022-v1:0" + + # Step Functions + MainStateMachine: + Type: AWS::Serverless::StateMachine + Properties: + DefinitionSubstitutions: + WorkerStateMachine: !GetAtt WorkerStateMachine.Arn + SynthesisFunction: !GetAtt SynthesisFunction.Arn + Policies: + - LambdaInvokePolicy: + FunctionName: '*' + - Statement: + - Effect: Allow + Action: + - states:StartExecution + - states:StopExecution + - states:DescribeExecution + Resource: + - !Sub arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:WorkerStateMachine-* + - !Sub arn:aws:states:${AWS::Region}:${AWS::AccountId}:execution:WorkerStateMachine-* + DefinitionUri: statemachine/main.asl.json + + WorkerStateMachine: + Type: AWS::Serverless::StateMachine + Properties: + DefinitionSubstitutions: + WorkerAgentFunction: !GetAtt WorkerAgentFunction.Arn + EvaluationFunction: !GetAtt EvaluationFunction.Arn + Policies: + - LambdaInvokePolicy: + FunctionName: '*' + - Statement: + - Effect: Allow + Action: + - states:SendTaskSuccess + - states:SendTaskFailure + - states:SendTaskHeartbeat + Resource: '*' + DefinitionUri: statemachine/worker.asl.json + +Outputs: + AWSMCPServerFunctionArn: + Description: ARN of the AWS MCP Server Lambda Function + Value: !GetAtt AWSMCPServerFunction.Arn + Export: + Name: !Sub ${AWS::StackName}-MCPServerArn + MCPServerInvokeRoleArn: + Description: ARN of the role to invoke MCP Server Lambda + Value: !GetAtt MCPServerInvokeRole.Arn + Export: + Name: !Sub ${AWS::StackName}-MCPServerInvokeRoleArn + JobQueueUrl: + Description: URL of the Job Queue + Value: !Ref JobQueue + MainStateMachineArn: + Description: ARN of the Main State Machine + Value: !Ref MainStateMachine \ No newline at end of file