New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
I hope support Agents for Amazon Bedrock #317
Comments
You want to combine both Bedrock Agent and the actual API in one Lambda function. It will be difficult to do with this tool. I would suggest to have two Lambda functions: 1). implement Bedrock Agent, 2). FastAPI with web adapter to provide the actual API service. The Bedrock Agent function will invoke the second API server using the request data from Bedrock. Hope this make sense. |
@moritalous another option is to use Powertools for AWS Lambda. Watch this issue for the progress. |
Yeah separate lambdas is the way to go with it, If you are worried about having to decouple the codebase for each lambda, you may use It would be nice if some form of bundler existed for python but I have not seen any. |
Thank you for your reply. Upon exploring alternative approaches, I found that Mangum allows the creation of custom adapters, and it suited my needs. I have detailed the process in a blog post, so if you're interested, please take a look. (I apologize for it being in Japanese) Qiita - Running FastAPI on AWS Lambda with Mangum |
Cool. Also FastAPI by default has an API |
@moritalous this is an interesting approach. We might support it if #216 got implemented. |
We are very excited to release the pass-through feature. I immediately tried using Bedrock Agent, but it did not work well. Could you please advise if I am using it incorrectly? I created an API using FastAPI.
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel, Field
app = FastAPI()
@app.post("/events", include_in_schema=False)
def events(request: dict[str, Any] | None = None):
print("/events")
print(request)
return {}
class list_regions_request(BaseModel):
opt_status: str = Field(
description="A list of Region statuses (Enabling, Enabled, Disabling, Disabled, Enabled_by_default) to use to filter the list of Regions for a given account."
)
@app.post(
"/list_regions",
description="""Lists all the Regions for a given account and their respective opt-in statuses. """,
)
def list_regions(request: list_regions_request):
print("call list_regions")
print(request)
import boto3
client = boto3.client("account")
return client.list_regions(RegionOptStatusContains=request.opt_status)
@app.post("/", include_in_schema=False)
def root(request: dict[str, Any] | None = None):
print("call root")
print(request)
pass
@app.get("/check", include_in_schema=False)
def check():
print("call check")
return {}
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
fastapi response streaming
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 60
Resources:
FastAPIFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: agents/
Handler: run.sh
Runtime: python3.12
MemorySize: 256
Environment:
Variables:
AWS_LAMBDA_EXEC_WRAPPER: /opt/bootstrap
AWS_LWA_PORT: "8000"
AWS_LWA_READINESS_CHECK_PATH: /check
AWS_LWA_PASS_THROUGH_PATH: /events
Layers:
- !Sub arn:aws:lambda:${AWS::Region}:753240598075:layer:LambdaAdapterLayerX86:19 Result of sam local invoke.(I use this event.json )
I had similar results when I deployed it to AWS and called it from Bedrock Agent.
The request is not routed to /events, it appears to be routed to /. Is there a setting to enable/disable the pass-through function? Thank you for reading to the end. I would appreciate it if you could give me some advice. |
Thanks for reporting this. I can reproduce this issue. I suspect lambda-http crate deserialized the Bedrock Agent event into something else, not passing through. I'm working on this. |
@moritalous You can try this test build before the changes made into a new release. It's on Dockerhub: |
Thank you your response. I would like to report that it worked as expected!!
from enum import Enum
import json
import os
from typing import Any
import requests
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
app = FastAPI()
PORT = os.environ.get("AWS_LWA_PORT", "8080")
@app.post("/events", include_in_schema=False)
def events(request: dict[str, Any] | None = None):
print(request)
if request is None:
return {}
print(request)
actionGroup = request["actionGroup"]
httpMethod = request["httpMethod"]
apiPath = request["apiPath"]
params = {}
parameters = request.get("parameters", [])
for item in parameters:
params[item["name"]] = item["value"]
data = {}
requestBody: dict = request.get("requestBody", {})
content: dict = requestBody.get("content", {})
for key in content.keys():
content_type = key
break
content_value = content.get(content_type, {})
properties = content_value.get("properties", [])
for item in properties:
data[item["name"]] = item["value"]
api_response = requests.request(
httpMethod,
f"http://localhost:{PORT}{apiPath}",
headers={"Content-Type": content_type},
params=params,
data=json.dumps(data) if len(data) > 0 else None,
)
return {
"messageVersion": "1.0",
"response": {
"actionGroup": actionGroup,
"apiPath": apiPath,
"httpMethod": httpMethod,
"httpStatusCode": api_response.status_code,
"responseBody": {content_type: {"body": api_response.text}},
},
}
class status(str, Enum):
ENABLED = "ENABLED"
ENABLING = "ENABLING"
DISABLING = "DISABLING"
DISABLED = "DISABLED"
ENABLED_BY_DEFAULT = "ENABLED_BY_DEFAULT"
class list_regions_request(BaseModel):
opt_status: status = Field(
description="A list of Region statuses (ENABLING, ENABLED, DISABLING, DISABLED, ENABLED_BY_DEFAULT) to use to filter the list of Regions for a given account."
)
@app.post(
"/list_regions",
description="""Lists all the Regions for a given account and their respective opt-in statuses. """,
)
def list_regions(request: list_regions_request):
import boto3
client = boto3.client("account")
return client.list_regions(RegionOptStatusContains=[request.opt_status])
@app.get("/check", include_in_schema=False)
def check():
print("call check")
return {}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(PORT))
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
fastapi response streaming
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 60
Resources:
FastAPIFunction:
Type: AWS::Serverless::Function
Properties:
PackageType: Image
MemorySize: 256
Environment:
Variables:
AWS_LAMBDA_EXEC_WRAPPER: /opt/bootstrap
AWS_LWA_PORT: "8000"
AWS_LWA_READINESS_CHECK_PATH: /check
AWS_LWA_PASS_THROUGH_PATH: /events
Policies: arn:aws:iam::aws:policy/AWSAccountManagementReadOnlyAccess
# Layers:
# - !Sub arn:aws:lambda:${AWS::Region}:753240598075:layer:LambdaAdapterLayerX86:19
Metadata:
Dockerfile: Dockerfile
DockerContext: ./agents
DockerTag: v1
I'll try to blog about it once it's released! Thank you very much for adding this feature!!! |
@moritalous Great to see it works! I would suggest to create a class-based custom middleware to intercept the request to '/events' route and do the transformation there. This will avoid the second request to the FastAPI. You could make that middleware generic and publish a package to pypi, so that everyone can use it. :) |
@moritalous |
Here is an example of the middleware. It needs some more work to handle parameters and body. from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.responses import JSONResponse
from fastapi import Response
import json
class BedrockAgentMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# pass through any non-events requests
if request.url.path != "/events":
return await call_next(request)
# convert the request body to json object
req_body = await request.body()
req_body = json.loads(req_body)
print(req_body)
request.scope["path"] = req_body["apiPath"]
request.scope["method"] = req_body["httpMethod"]
# Pass the request to be processed by the rest of the application
response = await call_next(request)
if isinstance(response, Response) and hasattr(response, 'body'):
res_body = response.body
elif hasattr(response, 'body_iterator'):
res_body = b''
async for chunk in response.body_iterator:
res_body += chunk
response.body_iterator = self.recreate_iterator(res_body)
else:
res_body = None
# Now you have the body, you can do whatever you want with it
print(res_body)
res_status_code = response.status_code
res_content_type = response.headers["content-type"]
response = JSONResponse(content = {
"messageVersion": "1.0",
"response": {
"actionGroup": req_body["actionGroup"],
"apiPath": req_body["apiPath"],
"httpMethod": req_body["httpMethod"],
"httpStatusCode": res_status_code,
"responseBody": {
res_content_type: {
"body": res_body.decode('utf-8')
}
},
"sessionAttributes": req_body["sessionAttributes"],
"promptSessionAttributes": req_body["promptSessionAttributes"]
}
})
print(response)
return response
@staticmethod
async def recreate_iterator(body):
yield body The main.py is quite simple. from fastapi import FastAPI
from pydantic import BaseModel
from bedrock_agent.middleware import BedrockAgentMiddleware
import boto3
app = FastAPI(
description="This agent allows you to query the number of S3 buckets in your AWS account.",
)
app.openapi_version = "3.0.2"
app.add_middleware(BedrockAgentMiddleware)
s3 = boto3.resource('s3')
class S3BucketCountResponse(BaseModel):
count: int
@app.get("/s3_bucket_count")
async def get_s3_bucket_count() -> S3BucketCountResponse:
"""
This method returns the number of S3 buckets in your AWS account.
Return:
S3BucketCountResponse: A json object containing the number of S3 buckets in your AWS account.
"""
count = len(list(s3.buckets.all()))
return S3BucketCountResponse(count=count) Here is the Dockerfile. FROM public.ecr.aws/docker/library/python:3.12-slim
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.1 /lambda-adapter /opt/extensions/lambda-adapter
ENV PORT=8000 AWS_LWA_READINESS_CHECK_PROTOCOL=tcp
WORKDIR /var/task
COPY requirements.txt ./
RUN python -m pip install -r requirements.txt
COPY . .
CMD exec uvicorn --port=$PORT main:app |
I create example and PR. Please review it. Is it okay to turn the middleware code into a library and publish a package to pypi? |
Thanks for the PR. I leave a few comments there. Yes, please go ahead to package the middleware into a library and publish it to pypi. The middleware needs a few improvements to be a generic library.
|
The middleware package has been released! https://pypi.org/project/lwa-fastapi-middleware-bedrock-agent/ Thank you very much for making Lambda Web Adapter compatible with Bedrock Agent. |
Awesome! |
I planned to create Agents for Amazon Bedrock using FastAPI and aws-lambda-web-adapter.
Agents for Amazon Bedrock requires an OpenAPI schema to be prepared.
The development experience would be improved if FastAPI could be used in combination with aws-lambda-web-adapter to develop Agents for Amazon Bedrock.
However, the Lambda input event from Amazon Bedrock is different from API Gateway.
Lambda input event from Amazon Bedrock
Event format from API Gateway
It would be great if it is supported.
The text was updated successfully, but these errors were encountered: