## RAG with Bedrock Agents

RAG architecture have proven effective at leveraging knowledge bases to enhance foundation model outputs. However, for more complex queries that require reasoning over diverse information sources, a single monolithic RAG model can face limitations around relevance, latency, and coherence. Multi-agent architectures offer a powerful way to overcome these limitations by factoring RAG into specialized components. By dividing responsibilities like query understanding, retrieval, ranking, and language modeling across dedicated agents, each component can focus on its core capability using tailored models and data sources. This improves the overall relevance of retrieved information and generated responses.

In this lab, we are going to extend the Technical document assistant from previous Naive RAG lab and build an agent that can generate API flow diagram, create unit testing code, on top of original knowledge retrieval capabilities. Behind the scenes, the agent uses foundation model from Amazon Bedrock to orchestrate a set of tools and APIs as illustrated in the diagram below:

![Agent](../static/advance-agent-rag.png)


## Pre-req
You must run the [workshop_setup.ipynb](../lab00-setup/workshop_setup.ipynb) notebook in `lab00-setup` before starting this lab.

In [None]:
import warnings
warnings.warn("Warning: if you did not run lab00-setup, please go back and run the lab00 notebook") 

## Load the parameters

In [None]:
print("load the data parameters....\n")
# bucket and parameter stored from Initial setup lab01
%store -r bucket
%store -r prefix
%store -r data_dir
%store -r yml_dir
%store -r uml_dir

## check all 5 values are printed and do not fail
print(bucket)
print(prefix)
print(yml_dir)
print(uml_dir)
print(data_dir)

print("\nload the vector db parameters....\n")

# vector parameters stored from Initial setup lab02
%store -r vector_store_name
%store -r vector_collection_arn
%store -r vector_collection_id
%store -r vector_host
%store -r bedrock_kb_execution_role_arn

## check all 4 values are printed and do not fail
print(vector_store_name)
print(vector_collection_arn)
print(vector_collection_id)
print(vector_host)
print(bedrock_kb_execution_role_arn)

print("\nload lambda parameters....\n")

%store -r lambda_arn
%store -r lambda_function_name

print(lambda_arn)
print(lambda_function_name)

## Setup

In [None]:
import boto3
from botocore.config import Config
import time
import random
import pprint as pp
import uuid
import json
from retrying import retry
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

# auth for opensearch
boto3_config = Config(
        connect_timeout=1, read_timeout=300,
        retries={'max_attempts': 1})

boto3_session = boto3.Session()
region_name = boto3_session.region_name
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()["Account"]
credentials = boto3_session.get_credentials()

# opensearch service
service = 'aoss'
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

suffix = random.randrange(200, 900)

bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)

## Create a vector store - OpenSearch Serverless index

For this lab, we will use *Amazon OpenSearch serverless.*

Amazon OpenSearch Serverless is a serverless option in Amazon OpenSearch Service. As a developer, you can use OpenSearch Serverless to run petabyte-scale workloads without configuring, managing, and scaling OpenSearch clusters. You get the same interactive millisecond response times as OpenSearch Service with the simplicity of a serverless environment. Pay only for what you use by automatically scaling resources to provide the right amount of capacity for your application—without impacting data ingestion.

In [None]:
aoss_client = boto3_session.client('opensearchserverless')

### Create the schema for vector index

In [None]:
index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true"
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1024,
            "method": {
                "name": "hnsw",
                "space_type": "innerproduct",
                "engine": "faiss",
                "parameters": {
                  "ef_construction": 256,
                  "m": 48
                }
             }
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         
         }
      }
   }
}
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': vector_host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

In [None]:
# Create index
response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
print('\nCreating index:')
print(response)

## Create Knowledge Base
Steps:
- initialize Open search serverless configuration which will include collection ARN, index name, vector field, text field and metadata field.
- initialize chunking strategy, based on which KB will split the documents into pieces of size equal to the chunk size mentioned in the `chunkingStrategyConfiguration`.
- initialize the s3 configuration, which will be used to create the data source object later.
- initialize the Titan embeddings model ARN, as this will be used to create the embeddings for each of the text chunks.

In [None]:
opensearchServerlessConfiguration = {
            "collectionArn": vector_collection_arn,
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

chunkingStrategyConfiguration = {
    "chunkingStrategy": "NONE",
}

s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket}",
    "inclusionPrefixes":[f"{prefix}/{yml_dir.replace(data_dir+'/', '')}/"] # you can use this if you want to create a KB using data within s3 prefixes.
}

embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v2:0"

kb_name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Swagger OpenAPI knowledge base."

Provide the above configurations as input to the `create_knowledge_base` method, which will create the Knowledge base.

In [None]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = kb_name,
        description = description,
        roleArn = bedrock_kb_execution_role_arn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [None]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

Next we need to create a data source, which will be associated with the knowledge base created above. Once the data source is ready, we can then start to ingest the documents.

In [None]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

In [None]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = kb_name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
# # It can take up to a minute for data access rules to be enforced
time.sleep(20)
pp.pprint(ds)

### Start ingestion job
Once the KB and data source is created, we can start the ingestion job.
During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS.

In [None]:
# Start an ingestion job
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [None]:
job = start_job_response["ingestionJob"]
pp.pprint(job)

In [None]:
# Get job 
while(job['status']!='COMPLETE' ):
  get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
  job = get_job_response["ingestionJob"]
pp.pprint(job)
time.sleep(80)

In [None]:
kb_id = kb["knowledgeBaseId"]
%store kb_id
pp.pprint(kb_id)

## Test the knowledge base
### Using RetrieveAndGenerate API
Behind the scenes, RetrieveAndGenerate API converts queries into embeddings, searches the knowledge base, and then augments the foundation model prompt with the search results as context information and returns the FM-generated response to the question. For multi-turn conversations, Knowledge Bases manage short-term memory of the conversation to provide more contextual results.

The output of the RetrieveAndGenerate API includes the generated response, source attribution as well as the retrieved text chunks.

In [None]:
# try out KB using RetrieveAndGenerate API
bedrock_agent_runtime_client = boto3_session.client("bedrock-agent-runtime", 
                                                    config=boto3_config)
model_id = "anthropic.claude-3-sonnet-20240229-v1:0" 
model_arn = f'arn:aws:bedrock:{region_name}::foundation-model/{model_id}'

In [None]:
from IPython.display import Markdown, display

query = "How do I add a new pet to petstore? Can you generate a test code in python?"
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        'text': query
    },
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': kb_id,
            'modelArn': model_arn
        }
    },
)

generated_text = response['output']['text']

display(Markdown(generated_text))

## Create the Agent
Once the needed IAM role is created, we can use the bedrock agent client to create a new agent. To do so we use the create_agent api from boto3. It requires an agent name, underline foundation model and instruction. You can also provide an agent description. Note that the agent created is not yet prepared. We will focus on preparing the agent and then using it to invoke actions and use other APIs

In [None]:
agent_name = f'swagger-api-agent-{suffix}'

agent_instruction = """
You are an agent designed to support users with coding questions related to Swagger APIs. You have access to the Swagger
documentation in a Knowledge Base and you can answer questions from this document or use document as referecne to generate
code or uml flow diagrams. Only answer questions based on the documentation and reply with "There is no information about your 
question in the Documentation at the moment, sorry! Do you want to ask another question?"  If the answer to the question 
is not available in the documentation
"""

agent_action_group_description = """
Actions for generating test code and uml diagrams
"""

In [None]:
response = bedrock_agent_client.create_agent(
    agentName=agent_name,
    agentResourceRoleArn=bedrock_kb_execution_role_arn,
    description="This Agent supports Swagger API Developers to answer question, generate code, or create uml flow diagrams",
    idleSessionTTLInSeconds=1800,
    foundationModel=model_id,
    instruction=agent_instruction,
)

agent_id = response['agent']['agentId']
agent_id

## Create the Agent Action Group

We will now create an agent action group that uses the lambda function created before. The create_agent_action_group function provides this functionality. We will use DRAFT as the agent version since we haven't yet created an agent version or alias. To inform the agent about the action group functionalities, we will provide an action group description containing the functionalities of the action group.

In this example, we will provide the Action Group functionality using a functionSchema.

To define the functions using a function schema, you need to provide the name, description and parameters for each function.

In [None]:
agent_functions = [
    {
        'name': 'get_uml_diagram',
        'description': 'Generate a UML flow diagram referencing the OpenAPI API specification. the function will return summary of the diagram and S3 location of the image.',
        'parameters': {
            "yml_body": {
                "description": "openapi standard yml file of the swagger api",
                "required": True,
                "type": "string"
            }
        }
    },
    {
        'name': 'get_unit_test_code',
        'description': 'Generate functional testing code referencing the OpenAPI API specification. the function returns the code snippet.',
        'parameters': {
            "yml_body": {
                "description": "openapi standard yml file of the swagger api",
                "required": True,
                "type": "string"
            },
            "user_query": {
                "description": "question from user or code user wants to generate",
                "required": True,
                "type": "string"
            }
        }
    },
]

We now use the function schema to create the agent action group using the create_agent_action_group API

In [None]:
# Pause to make sure agent is created
time.sleep(40)

# Now, we can configure and create an action group here:
agent_action_group_response = bedrock_agent_client.create_agent_action_group(
    agentId=agent_id,
    agentVersion='DRAFT',
    actionGroupExecutor={
        'lambda': lambda_arn
    },
    actionGroupName="CodeAndDiagramActionGroup",
    functionSchema={
        'functions': agent_functions
    },
    description=agent_action_group_description
)

## Allow the Agent to invoke the Action Group Lambda
Before using the action group, we need to allow the agent to invoke the lambda function associated with the action group. This is done via resource-based policy. Let's add the resource-based policy to the lambda function created

In [None]:
# Create allow to invoke permission on lambda
lambda_client = boto3.client('lambda')
response = lambda_client.add_permission(
    FunctionName=lambda_function_name,
    StatementId='allow_bedrock',
    Action='lambda:InvokeFunction',
    Principal='bedrock.amazonaws.com',
    SourceArn=f"arn:aws:bedrock:{region_name}:{account_id}:agent/{agent_id}",
)

## Associate the Knowledge Base to the agent
Now we have created the Agent we can go ahead and associate the Knowledge Base we created earlier.

In [None]:
agent_kb_description = bedrock_agent_client.associate_agent_knowledge_base(
    agentId=agent_id,
    agentVersion='DRAFT',
    description=f'Use the information in the {kb_name} knowledge base to retrieve OpenAPI Specification in YML format',
    knowledgeBaseId=kb_id,
    knowledgeBaseState='ENABLED'
)

## Prepare the Agent and create an alias
Let's create a DRAFT version of the agent that can be used for internal testing.

In [None]:
agent_prepare = bedrock_agent_client.prepare_agent(agentId=agent_id)
agent_prepare
time.sleep(30)

In [None]:
# Pause to make sure agent is prepared
agent_alias_name = f"swagger-docs-action-alias-{suffix}5"

agent_alias = bedrock_agent_client.create_agent_alias(
    agentId=agent_id,
    agentAliasName=agent_alias_name
)
agent_alias_id = agent_alias["agentAlias"]["agentAliasId"]

# Pause to make sure agent alias is ready
time.sleep(60)

## Test the Agent
Now that we've created the agent, let's use the bedrock-agent-runtime client to invoke this agent and perform some tasks. You can invoke your agent with the invoke_agent API

In [None]:
def invokeAgent(query, session_id, enable_trace=False, session_state=dict()):
    end_session:bool = False
    
    # invoke the agent API
    agentResponse = bedrock_agent_runtime_client.invoke_agent(
        inputText=query,
        agentId=agent_id,
        agentAliasId=agent_alias_id, 
        sessionId=session_id,
        enableTrace=enable_trace, 
        endSession= end_session,
        sessionState=session_state
    )
    
    if enable_trace:
        pp.pprint(agentResponse)
    
    event_stream = agentResponse['completion']
    try:
        for event in event_stream:        
            if 'chunk' in event:
                data = event['chunk']['bytes']
                if enable_trace:
                    pp.pprint(f"Final answer ->\n{data.decode('utf8')}")
                agent_answer = data.decode('utf8')
                end_event_received = True
                return agent_answer
                # End event indicates that the request finished successfully
            elif 'trace' in event:
                if enable_trace:
                    pp.pprint(json.dumps(event['trace'], indent=2))
            else:
                raise Exception("unexpected event.", event)
    except Exception as e:
        raise Exception("unexpected event.", e)

Invoke Agent to query Knowledge Base
Let's now use our support `invokeAgent` function to query our Knowledge Base with the Agent

## Generating UML Sequential Diagrams

We will generate PlantUML Code for our API and then use PlantWeb to generate the UML Sequential diagram.

In [None]:
%%time
import uuid
session_id:str = str(uuid.uuid1())
query = "can you generate UML diagram for the bookstore swagger api?" #"How do I add a new pet using the petstore api? Can you generate a test code in python?" #
response = invokeAgent(query, session_id, enable_trace=False)
display(Markdown(response))

## Generating Python Code for a sample API. 

In [None]:
%%time
import uuid
session_id:str = str(uuid.uuid1())
query = "How do I add a new pet using the petstore api? Can you generate a test code in python?"
response = invokeAgent(query, session_id, enable_trace=False)
display(Markdown(response))