diff --git a/.gitignore b/.gitignore index b7c8535..b28ca30 100644 --- a/.gitignore +++ b/.gitignore @@ -227,3 +227,4 @@ local_settings.py .bedrock_agentcore.yaml .dockerignore Dockerfile +CLAUDE.md diff --git a/README.md b/README.md index 1f6bfad..68c7465 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,10 @@
+
+ + image + +
+

Bedrock AgentCore SDK

@@ -17,13 +23,19 @@

- Python SDK - ◆ Starter Toolkit + DocumentationSamplesDiscord + ◆ Boto3 Python SDK + ◆ Runtime Python SDK + ◆ Starter Toolkit +

+## Overview +Amazon Bedrock AgentCore enables you to deploy and operate highly effective agents securely, at scale using any framework and model. With Amazon Bedrock AgentCore, developers can accelerate AI agents into production with the scale, reliability, and security, critical to real-world deployment. AgentCore provides tools and capabilities to make agents more effective and capable, purpose-built infrastructure to securely scale agents, and controls to operate trustworthy agents. Amazon Bedrock AgentCore services are composable and work with popular open-source frameworks and any model, so you don’t have to choose between open-source flexibility and enterprise-grade security and reliability. + ## 🚀 From Local Development to Bedrock AgentCore ```python @@ -52,31 +64,14 @@ app.run() # Ready to run on Bedrock AgentCore - ✅ **Enterprise-grade platform** - Built-in auth, memory, observability, security - ✅ **Production-ready deployment** - Reliable, scalable, compliant hosting -## ⚠️ Preview Status - -Bedrock AgentCore SDK is currently in public preview. APIs may change as we refine the SDK. - -## 🛠️ Built for AI Developers - -**Real-time Health Monitoring** -```python -@app.async_task # Automatically tracks background work -async def process_documents(files): - # Long-running AI processing - return results - -@app.ping # Custom health status -def health_check(): - return "HEALTHY" if all_services_up() else "HEALTHY_BUSY" -``` - -**Enterprise Platform Services** -- 🧠 **Memory** - Persistent knowledge across sessions -- 🔗 **Gateway** - Transform APIs into MCP tools -- 💻 **Code Interpreter** - Secure sandboxed execution -- 🌐 **Browser** - Cloud-based web automation -- 📊 **Observability** - OpenTelemetry tracing -- 🔐 **Identity** - AWS & third-party auth +## Amazon Bedrock AgentCore services +- 🚀 **Runtime** - Secure and session isolated compute: **[Runtime Quick Start](https://aws.github.io/bedrock-agentcore-starter-toolkit/user-guide/runtime/quickstart.html)** +- 🧠 **Memory** - Persistent knowledge across sessions: **[Memory Quick Start](https://aws.github.io/bedrock-agentcore-starter-toolkit/user-guide/memory/quickstart.html)** +- 🔗 **Gateway** - Transform APIs into MCP tools: **[Gateway Quick Start](https://aws.github.io/bedrock-agentcore-starter-toolkit/user-guide/gateway/quickstart.html)** +- 💻 **Code Interpreter** - Secure sandboxed execution: **[Code Interpreter Quick Start](https://aws.github.io/bedrock-agentcore-starter-toolkit/user-guide/builtin-tools/quickstart-code-interpreter.html)** +- 🌐 **Browser** - Cloud-based web automation: **[Browser Quick Start](https://aws.github.io/bedrock-agentcore-starter-toolkit/user-guide/builtin-tools/quickstart-browser.html)** +- 📊 **Observability** - OpenTelemetry tracing: **[Observability Quick Start](https://aws.github.io/bedrock-agentcore-starter-toolkit/user-guide/observability/quickstart.html)** +- 🔐 **Identity** - AWS & third-party auth: ## 🏗️ Deployment @@ -84,6 +79,10 @@ def health_check(): **Production:** [AWS CDK](https://aws.amazon.com/cdk/) - coming soon. +## ⚠️ Preview Status + +Bedrock AgentCore SDK is currently in public preview. APIs may change as we refine the SDK. + ## 📝 License & Contributing - **License:** Apache 2.0 - see [LICENSE.txt](LICENSE.txt) diff --git a/src/bedrock_agentcore/runtime/app.py b/src/bedrock_agentcore/runtime/app.py index 25992d8..37217f3 100644 --- a/src/bedrock_agentcore/runtime/app.py +++ b/src/bedrock_agentcore/runtime/app.py @@ -16,10 +16,12 @@ from starlette.applications import Starlette from starlette.responses import JSONResponse, Response, StreamingResponse from starlette.routing import Route +from starlette.types import Lifespan from .context import BedrockAgentCoreContext, RequestContext from .models import ( ACCESS_TOKEN_HEADER, + REQUEST_ID_HEADER, SESSION_HEADER, TASK_ACTION_CLEAR_FORCED_STATUS, TASK_ACTION_FORCE_BUSY, @@ -30,31 +32,34 @@ ) from .utils import convert_complex_objects -# Request context for logging -request_id_context: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar("request_id", default=None) - class RequestContextFormatter(logging.Formatter): - """Custom formatter that includes request ID in log messages.""" + """Formatter including request and session IDs.""" def format(self, record): - """Format log record with request ID context.""" - request_id = request_id_context.get() + """Format log record with request and session ID context.""" + request_id = BedrockAgentCoreContext.get_request_id() + session_id = BedrockAgentCoreContext.get_session_id() + + parts = [] if request_id: - record.request_id = f"[{request_id}] " - else: - record.request_id = "" + parts.append(f"[rid:{request_id}]") + if session_id: + parts.append(f"[sid:{session_id}]") + + record.request_id = " ".join(parts) + " " if parts else "" return super().format(record) class BedrockAgentCoreApp(Starlette): """Bedrock AgentCore application class that extends Starlette for AI agent deployment.""" - def __init__(self, debug: bool = False): + def __init__(self, debug: bool = False, lifespan: Optional[Lifespan] = None): """Initialize Bedrock AgentCore application. Args: debug: Enable debug actions for task management (default: False) + lifespan: Optional lifespan context manager for startup/shutdown """ self.handlers: Dict[str, Callable] = {} self._ping_handler: Optional[Callable] = None @@ -67,7 +72,7 @@ def __init__(self, debug: bool = False): Route("/invocations", self._handle_invocation, methods=["POST"]), Route("/ping", self._handle_ping, methods=["GET"]), ] - super().__init__(routes=routes) + super().__init__(routes=routes, lifespan=lifespan) self.debug = debug # Set after super().__init__ to avoid override self.logger = logging.getLogger("bedrock_agentcore.app") @@ -76,7 +81,7 @@ def __init__(self, debug: bool = False): formatter = RequestContextFormatter("%(asctime)s - %(name)s - %(levelname)s - %(request_id)s%(message)s") handler.setFormatter(formatter) self.logger.addHandler(handler) - self.logger.setLevel(logging.INFO) + self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO) def entrypoint(self, func: Callable) -> Callable: """Decorator to register a function as the main entrypoint. @@ -246,17 +251,25 @@ def complete_async_task(self, task_id: int) -> bool: return False def _build_request_context(self, request) -> RequestContext: - """Build request context and setup auth if present.""" + """Build request context and setup all context variables.""" try: - agent_identity_token = request.headers.get(ACCESS_TOKEN_HEADER) or request.headers.get( - ACCESS_TOKEN_HEADER.lower() - ) + headers = request.headers + request_id = headers.get(REQUEST_ID_HEADER) + if not request_id: + request_id = str(uuid.uuid4()) + + session_id = headers.get(SESSION_HEADER) + BedrockAgentCoreContext.set_request_context(request_id, session_id) + + agent_identity_token = headers.get(ACCESS_TOKEN_HEADER) if agent_identity_token: BedrockAgentCoreContext.set_workload_access_token(agent_identity_token) - session_id = request.headers.get(SESSION_HEADER) or request.headers.get(SESSION_HEADER.lower()) + return RequestContext(session_id=session_id) except Exception as e: self.logger.warning("Failed to build request context: %s: %s", type(e).__name__, e) + request_id = str(uuid.uuid4()) + BedrockAgentCoreContext.set_request_context(request_id, None) return RequestContext(session_id=None) def _takes_context(self, handler: Callable) -> bool: @@ -267,8 +280,8 @@ def _takes_context(self, handler: Callable) -> bool: return False async def _handle_invocation(self, request): - request_id = str(uuid.uuid4())[:8] - request_id_context.set(request_id) + request_context = self._build_request_context(request) + start_time = time.time() try: @@ -287,7 +300,6 @@ async def _handle_invocation(self, request): self.logger.error("No entrypoint defined") return JSONResponse({"error": "No entrypoint defined"}, status_code=500) - request_context = self._build_request_context(request) takes_context = self._takes_context(handler) handler_name = handler.__name__ if hasattr(handler, "__name__") else "unknown" @@ -341,7 +353,7 @@ def run(self, port: int = 8080, host: Optional[str] = None): host = "0.0.0.0" # nosec B104 - Docker needs this to expose the port else: host = "127.0.0.1" - uvicorn.run(self, host=host, port=port) + uvicorn.run(self, host=host, port=port, access_log=self.debug, log_level="info" if self.debug else "warning") async def _invoke_handler(self, handler, request_context, takes_context, payload): try: @@ -351,7 +363,8 @@ async def _invoke_handler(self, handler, request_context, takes_context, payload return await handler(*args) else: loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, handler, *args) + ctx = contextvars.copy_context() + return await loop.run_in_executor(None, ctx.run, handler, *args) except Exception as e: handler_name = getattr(handler, "__name__", "unknown") self.logger.error("Handler '%s' execution failed: %s: %s", handler_name, type(e).__name__, e) diff --git a/src/bedrock_agentcore/runtime/context.py b/src/bedrock_agentcore/runtime/context.py index 8409be2..640d43b 100644 --- a/src/bedrock_agentcore/runtime/context.py +++ b/src/bedrock_agentcore/runtime/context.py @@ -16,9 +16,11 @@ class RequestContext(BaseModel): class BedrockAgentCoreContext: - """Context manager for Bedrock AgentCore.""" + """Unified context manager for Bedrock AgentCore.""" - _workload_access_token: ContextVar[str] = ContextVar("workload_access_token") + _workload_access_token: ContextVar[Optional[str]] = ContextVar("workload_access_token") + _request_id: ContextVar[Optional[str]] = ContextVar("request_id") + _session_id: ContextVar[Optional[str]] = ContextVar("session_id") @classmethod def set_workload_access_token(cls, token: str): @@ -32,3 +34,25 @@ def get_workload_access_token(cls) -> Optional[str]: return cls._workload_access_token.get() except LookupError: return None + + @classmethod + def set_request_context(cls, request_id: str, session_id: Optional[str] = None): + """Set request-scoped identifiers.""" + cls._request_id.set(request_id) + cls._session_id.set(session_id) + + @classmethod + def get_request_id(cls) -> Optional[str]: + """Get current request ID.""" + try: + return cls._request_id.get() + except LookupError: + return None + + @classmethod + def get_session_id(cls) -> Optional[str]: + """Get current session ID.""" + try: + return cls._session_id.get() + except LookupError: + return None diff --git a/src/bedrock_agentcore/runtime/models.py b/src/bedrock_agentcore/runtime/models.py index a482d3b..4555de4 100644 --- a/src/bedrock_agentcore/runtime/models.py +++ b/src/bedrock_agentcore/runtime/models.py @@ -15,6 +15,7 @@ class PingStatus(str, Enum): # Header constants SESSION_HEADER = "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id" +REQUEST_ID_HEADER = "X-Amzn-Bedrock-AgentCore-Runtime-Request-Id" ACCESS_TOKEN_HEADER = "WorkloadAccessToken" # nosec # Task action constants diff --git a/tests/bedrock_agentcore/runtime/test_app.py b/tests/bedrock_agentcore/runtime/test_app.py index 9412703..4e984ce 100644 --- a/tests/bedrock_agentcore/runtime/test_app.py +++ b/tests/bedrock_agentcore/runtime/test_app.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import json import os import threading @@ -192,7 +193,9 @@ def test_serve_in_docker(self, mock_uvicorn): bedrock_agentcore = BedrockAgentCoreApp() bedrock_agentcore.run(port=8080) - mock_uvicorn.assert_called_once_with(bedrock_agentcore, host="0.0.0.0", port=8080) + mock_uvicorn.assert_called_once_with( + bedrock_agentcore, host="0.0.0.0", port=8080, access_log=False, log_level="warning" + ) @patch("os.path.exists", return_value=True) @patch("uvicorn.run") @@ -201,7 +204,9 @@ def test_serve_with_dockerenv_file(self, mock_uvicorn, mock_exists): bedrock_agentcore = BedrockAgentCoreApp() bedrock_agentcore.run(port=8080) - mock_uvicorn.assert_called_once_with(bedrock_agentcore, host="0.0.0.0", port=8080) + mock_uvicorn.assert_called_once_with( + bedrock_agentcore, host="0.0.0.0", port=8080, access_log=False, log_level="warning" + ) @patch("uvicorn.run") def test_serve_localhost(self, mock_uvicorn): @@ -209,7 +214,9 @@ def test_serve_localhost(self, mock_uvicorn): bedrock_agentcore = BedrockAgentCoreApp() bedrock_agentcore.run(port=8080) - mock_uvicorn.assert_called_once_with(bedrock_agentcore, host="127.0.0.1", port=8080) + mock_uvicorn.assert_called_once_with( + bedrock_agentcore, host="127.0.0.1", port=8080, access_log=False, log_level="warning" + ) @patch("uvicorn.run") def test_serve_custom_host(self, mock_uvicorn): @@ -217,7 +224,9 @@ def test_serve_custom_host(self, mock_uvicorn): bedrock_agentcore = BedrockAgentCoreApp() bedrock_agentcore.run(port=8080, host="custom-host.example.com") - mock_uvicorn.assert_called_once_with(bedrock_agentcore, host="custom-host.example.com", port=8080) + mock_uvicorn.assert_called_once_with( + bedrock_agentcore, host="custom-host.example.com", port=8080, access_log=False, log_level="warning" + ) def test_entrypoint_serve_method(self): """Test that entrypoint decorator adds serve method that works.""" @@ -230,7 +239,129 @@ def handler(payload): # Test that the serve method exists and can be called with mocked uvicorn with patch("uvicorn.run") as mock_uvicorn: handler.run(port=9000, host="test-host") - mock_uvicorn.assert_called_once_with(bedrock_agentcore, host="test-host", port=9000) + mock_uvicorn.assert_called_once_with( + bedrock_agentcore, + host="test-host", + port=9000, + access_log=False, # Default production behavior + log_level="warning", + ) + + def test_debug_mode_uvicorn_config(self): + """Test that debug mode enables full uvicorn logging.""" + bedrock_agentcore = BedrockAgentCoreApp(debug=True) + + @bedrock_agentcore.entrypoint + def handler(payload): + return {"result": "success"} + + # Test that debug mode uses full uvicorn logging + with patch("uvicorn.run") as mock_uvicorn: + handler.run(port=9000, host="test-host") + mock_uvicorn.assert_called_once_with( + bedrock_agentcore, + host="test-host", + port=9000, + access_log=True, # Debug mode enables access logs + log_level="info", # Debug mode uses info level + ) + + def test_invocation_with_request_id_header(self): + """Test that request ID from header is used.""" + bedrock_agentcore = BedrockAgentCoreApp() + + @bedrock_agentcore.entrypoint + def handler(request): + return {"status": "ok", "data": request} + + client = TestClient(bedrock_agentcore) + headers = {"X-Amzn-Bedrock-AgentCore-Runtime-Request-Id": "custom-request-id"} + response = client.post("/invocations", json={"test": "data"}, headers=headers) + + assert response.status_code == 200 + assert response.json()["status"] == "ok" + + def test_invocation_with_both_ids(self): + """Test with both request and session ID headers.""" + bedrock_agentcore = BedrockAgentCoreApp() + + @bedrock_agentcore.entrypoint + def handler(request, context): + return {"session_id": context.session_id, "data": request} + + client = TestClient(bedrock_agentcore) + headers = { + "X-Amzn-Bedrock-AgentCore-Runtime-Request-Id": "custom-request", + "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": "custom-session", + } + response = client.post("/invocations", json={"test": "data"}, headers=headers) + + assert response.status_code == 200 + assert response.json()["session_id"] == "custom-session" + + def test_headers_case_insensitive(self): + """Test that headers work with any case.""" + bedrock_agentcore = BedrockAgentCoreApp() + + @bedrock_agentcore.entrypoint + def handler(request, context): + return {"session_id": context.session_id} + + client = TestClient(bedrock_agentcore) + + # Test lowercase + headers = { + "x-amzn-bedrock-agentcore-request-id": "lower-request", + "x-amzn-bedrock-agentcore-runtime-session-id": "lower-session", + } + response = client.post("/invocations", json={}, headers=headers) + assert response.status_code == 200 + assert response.json()["session_id"] == "lower-session" + + # Test uppercase + headers = { + "X-AMZN-BEDROCK-AGENTCORE-REQUEST-ID": "UPPER-REQUEST", + "X-AMZN-BEDROCK-AGENTCORE-RUNTIME-SESSION-ID": "UPPER-SESSION", + } + response = client.post("/invocations", json={}, headers=headers) + assert response.status_code == 200 + assert response.json()["session_id"] == "UPPER-SESSION" + + def test_initialization_with_lifespan(self): + """Test that BedrockAgentCoreApp accepts lifespan parameter.""" + + @contextlib.asynccontextmanager + async def lifespan(app): + yield + + app = BedrockAgentCoreApp(lifespan=lifespan) + assert app is not None + + def test_lifespan_startup_and_shutdown(self): + """Test that lifespan startup and shutdown are called.""" + startup_called = False + shutdown_called = False + + @contextlib.asynccontextmanager + async def lifespan(app): + nonlocal startup_called, shutdown_called + startup_called = True + yield + shutdown_called = True + + app = BedrockAgentCoreApp(lifespan=lifespan) + + with TestClient(app): + assert startup_called is True + assert shutdown_called is True + + def test_initialization_without_lifespan(self): + """Test that BedrockAgentCoreApp still works without lifespan.""" + app = BedrockAgentCoreApp() # No lifespan parameter + + with TestClient(app) as client: + response = client.get("/ping") + assert response.status_code == 200 class TestConcurrentInvocations: @@ -1326,3 +1457,57 @@ def __str__(self): parsed = json.loads(result) assert parsed["error"] == "Serialization failed" assert parsed["original_type"] == "UnserializableObject" + + +class TestRequestContextFormatter: + """Test the RequestContextFormatter log formatting.""" + + def test_request_context_formatter_with_both_ids(self): + """Test formatter with both request and session IDs.""" + import logging + + from bedrock_agentcore.runtime.app import RequestContextFormatter + from bedrock_agentcore.runtime.context import BedrockAgentCoreContext + + formatter = RequestContextFormatter("%(request_id)s%(message)s") + + BedrockAgentCoreContext.set_request_context("req-123", "sess-456") + record = logging.LogRecord("test", logging.INFO, "", 1, "Test message", (), None) + formatted = formatter.format(record) + + assert "[rid:req-123] [sid:sess-456] Test message" == formatted + + def test_request_context_formatter_with_only_request_id(self): + """Test formatter with only request ID.""" + import logging + + from bedrock_agentcore.runtime.app import RequestContextFormatter + from bedrock_agentcore.runtime.context import BedrockAgentCoreContext + + formatter = RequestContextFormatter("%(request_id)s%(message)s") + + BedrockAgentCoreContext.set_request_context("req-789", None) + record = logging.LogRecord("test", logging.INFO, "", 1, "Test message", (), None) + formatted = formatter.format(record) + + assert "[rid:req-789] Test message" == formatted + assert "[sid:" not in formatted + + def test_request_context_formatter_with_no_ids(self): + """Test formatter with no IDs set.""" + import contextvars + import logging + + from bedrock_agentcore.runtime.app import RequestContextFormatter + + formatter = RequestContextFormatter("%(request_id)s%(message)s") + + # Run in fresh context to ensure no IDs are set + ctx = contextvars.Context() + + def format_in_new_context(): + record = logging.LogRecord("test", logging.INFO, "", 1, "Test message", (), None) + return formatter.format(record) + + formatted = ctx.run(format_in_new_context) + assert formatted == "Test message" diff --git a/tests/bedrock_agentcore/runtime/test_context.py b/tests/bedrock_agentcore/runtime/test_context.py index 280237e..b9e1589 100644 --- a/tests/bedrock_agentcore/runtime/test_context.py +++ b/tests/bedrock_agentcore/runtime/test_context.py @@ -28,3 +28,42 @@ def test_in_new_context(): result = ctx.run(test_in_new_context) assert result is None + + def test_set_and_get_request_context(self): + """Test setting and getting request and session IDs.""" + request_id = "test-request-123" + session_id = "test-session-456" + + BedrockAgentCoreContext.set_request_context(request_id, session_id) + + assert BedrockAgentCoreContext.get_request_id() == request_id + assert BedrockAgentCoreContext.get_session_id() == session_id + + def test_set_request_context_without_session(self): + """Test setting request context without session ID.""" + request_id = "test-request-789" + + BedrockAgentCoreContext.set_request_context(request_id, None) + + assert BedrockAgentCoreContext.get_request_id() == request_id + assert BedrockAgentCoreContext.get_session_id() is None + + def test_get_request_id_when_none_set(self): + """Test getting request ID when none is set.""" + ctx = contextvars.Context() + + def test_in_new_context(): + return BedrockAgentCoreContext.get_request_id() + + result = ctx.run(test_in_new_context) + assert result is None + + def test_get_session_id_when_none_set(self): + """Test getting session ID when none is set.""" + ctx = contextvars.Context() + + def test_in_new_context(): + return BedrockAgentCoreContext.get_session_id() + + result = ctx.run(test_in_new_context) + assert result is None diff --git a/tests_integ/agents/sample_agent.py b/tests_integ/agents/sample_agent.py index 8fe9580..7c52acc 100644 --- a/tests_integ/agents/sample_agent.py +++ b/tests_integ/agents/sample_agent.py @@ -7,10 +7,10 @@ @app.entrypoint async def invoke(payload): - print(payload) - print("Starting long invoke...") + app.logger.info("Received payload: %s", payload) + app.logger.info("Starting long invoke...") await asyncio.sleep(60) # 1 minute sleep - print("Finished long invoke") + app.logger.info("Finished long invoke") return {"message": "hello after 1 minute"} diff --git a/tests_integ/agents/streaming_agent.py b/tests_integ/agents/streaming_agent.py index 79dc249..33c3500 100644 --- a/tests_integ/agents/streaming_agent.py +++ b/tests_integ/agents/streaming_agent.py @@ -14,7 +14,7 @@ async def agent_invocation(payload): ) stream = agent.stream_async(user_message) async for event in stream: - print(event) + app.logger.info("Streaming event: %s", event) yield (event) diff --git a/tests_integ/async/async_status_example.py b/tests_integ/async/async_status_example.py index 233289e..18d39fa 100644 --- a/tests_integ/async/async_status_example.py +++ b/tests_integ/async/async_status_example.py @@ -22,17 +22,17 @@ @app.async_task async def background_data_processing(): """Simulate a long-running background task.""" - print("Starting background data processing...") + app.logger.info("Starting background data processing...") await asyncio.sleep(200) # Simulate work - print("Background data processing completed") + app.logger.info("Background data processing completed") @app.async_task async def database_cleanup(): """Simulate database cleanup task.""" - print("Starting database cleanup...") + app.logger.info("Starting database cleanup...") await asyncio.sleep(100) # Simulate work - print("Database cleanup completed") + app.logger.info("Database cleanup completed") # Main entrypoint @@ -83,21 +83,21 @@ async def handler(event): if __name__ == "__main__": # For local testing - print("Starting BedrockAgentCore app with async status functionality...") - print("Available endpoints:") - print(" GET /ping - Check current ping status") - print(" POST /invocations - Main handler") - print("") - print("Example debug action calls (debug=True is enabled):") - print(" {'_agent_core_app_action': 'ping_status'}") - print(" {'_agent_core_app_action': 'job_status'}") - print(" {'_agent_core_app_action': 'force_healthy'}") - print(" {'_agent_core_app_action': 'force_busy'}") - print(" {'_agent_core_app_action': 'clear_forced_status'}") - print("") - print("Example regular calls:") - print(" {'action': 'start_background_task'}") - print(" {'action': 'get_task_info'}") - print(" {'action': 'force_status', 'ping_status': 'HealthyBusy'}") + app.logger.info("Starting BedrockAgentCore app with async status functionality...") + app.logger.info("Available endpoints:") + app.logger.info(" GET /ping - Check current ping status") + app.logger.info(" POST /invocations - Main handler") + app.logger.info("") + app.logger.info("Example debug action calls (debug=True is enabled):") + app.logger.info(" {'_agent_core_app_action': 'ping_status'}") + app.logger.info(" {'_agent_core_app_action': 'job_status'}") + app.logger.info(" {'_agent_core_app_action': 'force_healthy'}") + app.logger.info(" {'_agent_core_app_action': 'force_busy'}") + app.logger.info(" {'_agent_core_app_action': 'clear_forced_status'}") + app.logger.info("") + app.logger.info("Example regular calls:") + app.logger.info(" {'action': 'start_background_task'}") + app.logger.info(" {'action': 'get_task_info'}") + app.logger.info(" {'action': 'force_status', 'ping_status': 'HealthyBusy'}") app.run() diff --git a/tests_integ/async/interactive_async_strands.py b/tests_integ/async/interactive_async_strands.py index 0c9f7f5..43025ae 100644 --- a/tests_integ/async/interactive_async_strands.py +++ b/tests_integ/async/interactive_async_strands.py @@ -168,9 +168,9 @@ def cleanup(self): time.sleep(300) if os.path.exists(self.result_file): os.remove(self.result_file) - print(f"[Processor {self.task_id}] Cleaned up result file") + logger.info("Processor %s: Cleaned up result file", self.task_id) except Exception as e: - print(f"[Processor {self.task_id}] Error during cleanup: {e}") + logger.error("Processor %s: Error during cleanup: %s", self.task_id, e) def run_data_processing(task_id: int, dataset_size: str, processing_type: str, duration_minutes: int, batch_size: int): @@ -192,7 +192,7 @@ def run_data_processing(task_id: int, dataset_size: str, processing_type: str, d # Break if we've exceeded our time limit (safety check) elapsed_minutes = (datetime.now() - processor.start_time).total_seconds() / 60 if elapsed_minutes > duration_minutes * 1.2: # 20% buffer - print(f"[Processor {task_id}] Time limit exceeded, completing processing") + logger.warning("Processor %s: Time limit exceeded, completing processing", task_id) break # Mark as completed @@ -216,7 +216,7 @@ def run_data_processing(task_id: int, dataset_size: str, processing_type: str, d finally: # Complete the async task success = app.complete_async_task(task_id) - print(f"[Processor {task_id}] Task completion: {'SUCCESS' if success else 'FAILED'}") + logger.info("Processor %s: Task completion: %s", task_id, "SUCCESS" if success else "FAILED") # Remove from active tasks active_tasks.pop(task_id, None) @@ -524,41 +524,41 @@ def agent_invocation(payload): if __name__ == "__main__": - print("🤖 Interactive Async Strands Demo") - print("=" * 60) - print("🎯 Long-Running Data Processing with Real-Time Progress") - print("📊 Features: 30-min processing, file-based progress, agent interactivity") - print("🔄 Task Tracking: Proper async task lifecycle management") - print() - print("🧪 Example Commands:") - print() - print("1️⃣ **Start Processing:**") - print("curl -X POST http://localhost:8080/invocations \\") - print(" -H 'Content-Type: application/json' \\") - print(' -d \'{"prompt": "Start processing a medium dataset for data analysis"}\'') - print() - print("2️⃣ **Check Progress (anytime during processing):**") - print("curl -X POST http://localhost:8080/invocations \\") - print(" -H 'Content-Type: application/json' \\") - print(' -d \'{"prompt": "What is the processing progress?"}\'') - print() - print("3️⃣ **Test Interactivity (while processing):**") - print("curl -X POST http://localhost:8080/invocations \\") - print(" -H 'Content-Type: application/json' \\") - print(' -d \'{"prompt": "Tell me about the weather while we wait"}\'') - print() - print("4️⃣ **Quick Test (2 minutes):**") - print("curl -X POST http://localhost:8080/invocations \\") - print(" -H 'Content-Type: application/json' \\") - print(' -d \'{"prompt": "Start a small dataset analysis for 2 minutes"}\'') - print() - print("📊 **Expected Flow:**") - print(" • Health: HEALTHY → BUSY → HEALTHY") - print(" • Files: Progress saved every second to JSON") - print(" • Agent: Always responsive and interactive") - print(" • Processing: Realistic multi-stage simulation") - print() - print("🚀 Starting server on http://localhost:8080") - print("=" * 60) + app.logger.info("🤖 Interactive Async Strands Demo") + app.logger.info("=" * 60) + app.logger.info("🎯 Long-Running Data Processing with Real-Time Progress") + app.logger.info("📊 Features: 30-min processing, file-based progress, agent interactivity") + app.logger.info("🔄 Task Tracking: Proper async task lifecycle management") + app.logger.info("") + app.logger.info("🧪 Example Commands:") + app.logger.info("") + app.logger.info("1️⃣ **Start Processing:**") + app.logger.info("curl -X POST http://localhost:8080/invocations \\") + app.logger.info(" -H 'Content-Type: application/json' \\") + app.logger.info(' -d \'{"prompt": "Start processing a medium dataset for data analysis"}\'') + app.logger.info("") + app.logger.info("2️⃣ **Check Progress (anytime during processing):**") + app.logger.info("curl -X POST http://localhost:8080/invocations \\") + app.logger.info(" -H 'Content-Type: application/json' \\") + app.logger.info(' -d \'{"prompt": "What is the processing progress?"}\'') + app.logger.info("") + app.logger.info("3️⃣ **Test Interactivity (while processing):**") + app.logger.info("curl -X POST http://localhost:8080/invocations \\") + app.logger.info(" -H 'Content-Type: application/json' \\") + app.logger.info(' -d \'{"prompt": "Tell me about the weather while we wait"}\'') + app.logger.info("") + app.logger.info("4️⃣ **Quick Test (2 minutes):**") + app.logger.info("curl -X POST http://localhost:8080/invocations \\") + app.logger.info(" -H 'Content-Type: application/json' \\") + app.logger.info(' -d \'{"prompt": "Start a small dataset analysis for 2 minutes"}\'') + app.logger.info("") + app.logger.info("📊 **Expected Flow:**") + app.logger.info(" • Health: HEALTHY → BUSY → HEALTHY") + app.logger.info(" • Files: Progress saved every second to JSON") + app.logger.info(" • Agent: Always responsive and interactive") + app.logger.info(" • Processing: Realistic multi-stage simulation") + app.logger.info("") + app.logger.info("🚀 Starting server on http://localhost:8080") + app.logger.info("=" * 60) app.run(port=8080)