# Bedrock Knowledge Base via Your own Opensearch Database
If you have have any questions, please feel free to contact Hao Huang (tonyhh@amazon.com, GAIIC), Dong Xiaoqun(xiaoqunn@amazon.com, GenAI SSA).

- Step 0. Pre-request
- Step 1. Authentication
- Step 2. Build Amazon Opensearch Serveless Vector Database
- Step 3. Insert Knowledge to AOS
- Step 4. Create Bedrock Knowledge Base
    * Step 4.1. Create Bedrock Knowledge Base Role
    * Step 4.2. Update collection policy
- Step 5. Test knowledge bases
    * Step 5.1. Test knowledge bases retrive
    * Step 5.2. Test knowledge bases retrive and generate
    * Step 5.3. Associate to an agent and test
- Step 6. Delete resource

### Step 0. Pre-request

In [None]:
!pip install opensearch-py
!pip install requests-aws4auth
!pip install boto3

### Step 1. Authentication

In [None]:
import os
import json
import boto3
import botocore
import logging
import pprint
import time

# from uuid import uuid4
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from utils import (
    createEncryptionPolicy,
    createNetworkPolicy,
    createCollection,
    waitForCollectionCreation,
    short_uuid,
    createAccessPolicy,
    updateAccessPolicy,
    progress_bar
)

logging.basicConfig(
    format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
    level=logging.INFO,
    encoding="utf-8"
)
logger = logging.getLogger(__name__)
account_id = "" # input your account id
role_arn = "" # Could find in IAM console, e.g. arn:aws:iam::{acount_id}:user/demo-test
service = 'aoss'
region = 'us-east-1'
credentials = boto3.Session().get_credentials()
aws_access_key_id = credentials.access_key # you can modify to your access_key 
aws_secret_access_key = credentials.secret_key  # you can modify to your secret_key 
awsauth = AWS4Auth(
    aws_access_key_id,
    aws_secret_access_key,
    region,
    service,
)
id = short_uuid()


### Step 2. Build Amazon Opensearch Serveless Vector Database


In [None]:
# Notice: INDEX_NAME name need to start with "bedrock-knowledge-" for demo, we set all the policy below for "bedrock-knowledge-demo-". Or you can modify all the policy function for collection name.
client = boto3.client('opensearchserverless', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
Index_Name = f"bedrock-kb-demo-{id}" 
Collection_Name = f"bedrock-kb-demo-{id}"  

# Creates an encryption policy
encryption_policy_name = createEncryptionPolicy(client, Collection_Name, id)

#Creates an network policy
network_polic_name = createNetworkPolicy(client, Collection_Name, id)
progress_bar(5)

In [None]:

# Create collection access policy
"""
Notice: need add your own role/user arn in here, e.g.
"Principal":[ <br>
    "arn:aws:iam::{your-account-id}:user/XXX",  # credential role arn
    "arn:aws:iam::{your-account-id}:role/Admin"  # console
]
"""
    
access_policy_name, policy_version = createAccessPolicy(client, Index_Name, Collection_Name, role_arn, account_id, id)

In [None]:
# Create collection
createCollection(client, Collection_Name)
# Waite for collection create completely
final_host, collectionarn, collection_id = waitForCollectionCreation(client, Collection_Name)
progress_bar(2)

In [None]:
# Create index in collection

dimensions = 1536

client = OpenSearch(
        hosts=[{'host': final_host, 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        timeout=300
    )

client.indices.create(
    Index_Name,
    body={
            "settings":{
                "index":{
                "number_of_shards" : 1,
                "number_of_replicas" : 0,
                "knn": "true",
                "knn.algo_param.ef_search": 32
                }
            },
            "mappings":{
                "properties": {
                    "bedrock-knowledge-base-default-vector": {
                        "type": "knn_vector", 
                        "dimension": dimensions,
                        "method": {
                            "engine": "nmslib",
                            "space_type": "cosinesimil",
                            "name": "hnsw",
                            "parameters": {}
                        }
                    },
                    "AMAZON_BEDROCK_METADATA": {
                        "type": "text",
                        "index": False
                    },
                    "AMAZON_BEDROCK_TEXT_CHUNK": {
                        "type": "text"
                    },
                    "id": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }

                }
            }
        }
    }
)

### Step 3. Insert Knowledge to AOS

In [None]:
# Upload kb file to S3

bucket_name = f"invoice-agent-demo-kb-{id}"
s3_client = boto3.client("s3", region)


file_name = "piaozone2.faq"
file_path = "../conf/"
response = s3_client.create_bucket(Bucket=bucket_name)
upload_file = s3_client.upload_file(os.path.join(file_path, file_name), bucket_name, file_name)
s3_path = f"s3://{bucket_name}/{file_name}"
s3_arn = f"arn:aws:s3:::{bucket_name}/{file_name}"
print(s3_arn, s3_path)

In [None]:
def create_vector_embedding_with_bedrock(text, s3_path, docs, embedding_modelId='amazon.titan-embed-text-v1'):
    brt = boto3.client(service_name='bedrock-runtime', region_name=region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
    body = json.dumps({
        "inputText": text
    })

    accept = 'application/json'
    contentType = 'application/json'
    response = brt.invoke_model(body=body, modelId=embedding_modelId, accept=accept, contentType=contentType)
    response_body = json.loads(response.get('body').read())
    embedding = response_body['embedding']
    info = {
        "AMAZON_BEDROCK_METADATA": '{{"source":"{}"}}'.format(s3_path), 
        "AMAZON_BEDROCK_TEXT_CHUNK": docs, 
        "bedrock-knowledge-base-default-vector": embedding, 
        "id": "0"
        }
    return info


# Use local file embedding text and insert to aos
# You can modify this function adapt to your docment format
def WriteToAos(file_path, index_name, s3_path):
    Q = ""
    A = ""
    with open(file_path, "r") as rf:
        for idx, line in enumerate(rf):
            if idx == 10:
                break
            if line.startswith("Question"):
                Q = line.split("Question:")[-1]
            elif line.startswith("Answer"):
                A = line.split("Answer:")[-1] 
            elif line.startswith("===="):
                doc_template = "Answer: {}"
                docs = doc_template.format(A)
                insert_body_q = create_vector_embedding_with_bedrock(Q, s3_path ,docs)
                insert_body_a = create_vector_embedding_with_bedrock(A, s3_path, docs)

                response = client.index(
                    index=index_name,
                    body=insert_body_q,
                )
                response = client.index(
                    index=index_name,
                    body=insert_body_a,
                )
                print(f'Document added: {idx}')
                print(response)
                Q = ""
                A = ""
            else:
                continue

    if Q != "" and A != "":
        doc_template = "Answer: {}"
        docs = doc_template.format(A)
        insert_body_q = create_vector_embedding_with_bedrock(Q, s3_path ,docs)
        insert_body_a = create_vector_embedding_with_bedrock(A, s3_path, docs)

        response = client.index(
            index=index_name,
            body=insert_body_q,
        )
        response = client.index(
            index=index_name,
            body=insert_body_a,
        )
        print('\nDocument added:')
        print(response)  

In [None]:
WriteToAos("../conf/piaozone2.faq", Index_Name, s3_path)

### Step 4. Create Bedrock Knowledge Base

#### Step 4.1. Create Bedrock Knowledge Base Role

In [None]:
from utils import create_role, create_policy, create_policy, attach_policy

# Create Bedrock Knowledge Base role
# Role name must startwith "AmazonBedrockExecutionRoleForKnowledgeBase_" 
iam = boto3.resource("iam", region_name=region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
bedrock_knowledge_base_role_name = "AmazonBedrockExecutionRoleForKnowledgeBase_demo2"
bedrock_knowledge_base_role = create_role(
    iam,
    bedrock_knowledge_base_role_name,
    ["bedrock.amazonaws.com"]
)
print(bedrock_knowledge_base_role.arn)

In [None]:
# Create s3, bedrock invoke, aos policy
s3_file_policy = create_policy(
    iam,
    f"invoice-kb-s3-demo-{id}-policy",
    "Policy for IAM demonstration.",
    ["s3:GetObject","s3:ListBucket"],
    s3_arn
)

bedrock_kb_invoke_demo_policy = create_policy(
    iam,
    f"invoice-kb-invoke-demo-{id}-policy",
    "Policy for knowleadge Base demonstration.",
    "bedrock:InvokeModel",
    "arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1"
)


bedrock_kb_os_demo_policy = create_policy(
    iam,
    f"invoice-kb-aos-demo-{id}-policy",
    "Policy for knowleadge Base demonstration.",
    "aoss:APIAccessAll",
    collectionarn 
)

In [None]:
# attach policy to Bedrock Agent role
attach_policy(
    iam,
    bedrock_knowledge_base_role_name,
    s3_file_policy.arn
) 
attach_policy(
    iam,
    bedrock_knowledge_base_role_name,
    bedrock_kb_invoke_demo_policy.arn
)
attach_policy(
    iam,
    bedrock_knowledge_base_role_name,
    bedrock_kb_os_demo_policy.arn
)

#### Step 4.2. Update collection policy

In [None]:
# update policy for knowledge base role
client = boto3.client('opensearchserverless', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)


updateAccessPolicy(
    client, Index_Name, Collection_Name, role_arn, account_id,
    access_policy_name, bedrock_knowledge_base_role_name, policy_version
    )

progress_bar(5)

In [None]:
# Now, create bedorck Konowledge base

client = boto3.client("bedrock-agent", region_name=region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
knowledge_base_name = 'invoice-kb-demo1'
response = client.create_knowledge_base(
    name=knowledge_base_name,
    description='invocie demo notebook-test',
    roleArn=bedrock_knowledge_base_role.arn,
    knowledgeBaseConfiguration={
        'type': 'VECTOR',
        'vectorKnowledgeBaseConfiguration': {
            'embeddingModelArn': 'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
        }
    },
    storageConfiguration={
        'type': 'OPENSEARCH_SERVERLESS',
        'opensearchServerlessConfiguration': {
            'collectionArn': collectionarn,
            'vectorIndexName': Index_Name,
            'fieldMapping': {
                'vectorField': 'bedrock-knowledge-base-default-vector',
                'textField': 'AMAZON_BEDROCK_TEXT_CHUNK',
                'metadataField': 'AMAZON_BEDROCK_METADATA'
            }
        }
    },
)

In [None]:
knowledge_base_id = response['knowledgeBase']['knowledgeBaseId']
knowledge_base_arn = response["knowledgeBase"]["knowledgeBaseArn"]
print(knowledge_base_id)
print(knowledge_base_arn)

### Step 5. Test knowledge bases

#### Step 5.1. Test knowledge bases retrive

In [None]:
knowledgebases_client = boto3.client("bedrock-agent-runtime", region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
response = knowledgebases_client.retrieve(
    knowledgeBaseId=knowledge_base_id,
    retrievalQuery={
        'text': '使用单位线上申请是否一定和线下资料申请时使用单位 保持一致?如果想添加使用单位后续如何添加申请 ?'
    },
    retrievalConfiguration={
        'vectorSearchConfiguration': {
            'numberOfResults': 1 
        }
    },
)
pprint.pprint(response)

#### Step 5.2. Test knowledge bases retrive and generate

In [None]:
response = knowledgebases_client.retrieve_and_generate(
    input={
        'text': '使用单位线上申请是否一定和线下资料申请时使用单位 保持一致?如果想添加使用单位后续如何添加申请 ?'
    },
    
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': knowledge_base_id,
            'modelArn': 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2'
        }
    },
)
pprint.pprint(response)

#### Step 5.3. Associate to an agent and test

In [None]:
# If you didn't grant knowledge base retrieve permission to bedrock agent role before

agent_role_name = "AmazonBedrockExecutionRoleForAgents_demo2"
kb_retrive_policy = create_policy(
    iam,
    f"invoice-agent-kb-demo-{id}",
    "Policy for agent kb retreive.",
    ["bedrock:Retrieve"],
    [knowledge_base_arn] 
)

attach_policy(
    iam,
    agent_role_name,
    kb_retrive_policy.arn 
)

In [None]:
# Associate to an agent

agent_id = "XHAO4M9ZJK"
response = client.associate_agent_knowledge_base(
    agentId=agent_id,
    agentVersion='DRAFT',
    description='Use this knowledge base whenever question relate to invoice or issurance.',
    knowledgeBaseId=knowledge_base_id
)
pprint.pprint(response)

In [None]:
agent_alias_name = "demo_agent_kb_test"
response = client.prepare_agent(agentId=agent_id)

# Need wait for preparing
time.sleep(15)

agent_alias_description = "add kb version"
agent_alias = client.create_agent_alias(
    agentId=agent_id,
    agentAliasName=agent_alias_name,
    description=agent_alias_description
)
agent_alias_id = agent_alias['agentAlias']['agentAliasId']
# Need change to "bedrock-agent-runtime" 
client = boto3.client("bedrock-agent-runtime", region_name=region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

def invoke(question: str, sessionid: str, agent_id: str, agent_alias_id: str, enable_trace=False):
    final_answer = ""
    response = client.invoke_agent(inputText=question,
        agentId=agent_id,
        agentAliasId=agent_alias_id,
        sessionId=sessionid,
        enableTrace=enable_trace
    )
    event_stream = response['completion']
    try:
        for event in event_stream:        
            # print(event)
            if 'chunk' in event:
                data = event['chunk']['bytes']
                final_answer = data.decode('utf8')
                print(f"Final answer ->\n{final_answer}") 
                end_event_received = True
                # End event indicates that the request finished successfully
            elif 'trace' in event:
                logger.info(json.dumps(event['trace'], indent=2))
            else:
                raise Exception("unexpected event.", event)
    except Exception as e:
        raise Exception("unexpected event.", e)
    return final_answer 

In [None]:
import uuid
sessionid = str(uuid.uuid1())
enable_trace:bool = True

In [None]:
question = "你好"
answer = invoke(question, sessionid, agent_id, agent_alias_id, enable_trace)

In [None]:
question = "我想开发票"
answer = invoke(question, sessionid, agent_id, agent_alias_id, enable_trace)

In [None]:
question = "我想查询开发票相关知识，直连单位是指全集团的年收入吗？"
answer = invoke(question, sessionid, agent_id, agent_alias_id, enable_trace)

### Step 6. Delete resource

In [None]:
from utils import teardown


# delete Knowledge Base
client = boto3.client("bedrock-agent", region_name=region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
response = client.delete_knowledge_base(
    knowledgeBaseId=knowledge_base_id
)
print(response)

# delete collection
client = boto3.client('opensearchserverless', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
# collection_id = "bsf93rdvnbxo4gqmr0hc"
response = client.delete_collection(
    id=collection_id
)

# delete S3 buket
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket(bucket_name)
bucket.objects.delete()
bucket.delete()
print(f"Emptied and deleted bucket {bucket.name}.\n")

# delete collection policy
client = boto3.client('opensearchserverless', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
# client = boto3.client('opensearchserverless', region)
response = client.delete_access_policy(
    name=access_policy_name,
    type='data'
)
response = client.delete_security_policy(
    name=encryption_policy_name,
    type='encryption'
)
response = client.delete_security_policy(
    name=network_polic_name,
    type='network'
)

# delete role and policy
teardown(iam, [bedrock_knowledge_base_role])