# Amazon Bedrock Knowledge base

This notebook covers steps to create Amazon Bedrock Knowledge base. With Knowledge Bases for Amazon Bedrock, you can give FMs and agents contextual information from your company’s private data sources for Retrieval Augmented Generation (RAG) to deliver more relevant, accurate, and customized responses.

In this notebook, we will create Amazon Bedrock knowledge base and ingest documents. Amazon OpenSearch Serverless will be used as a vector databse and index the policy files from a S3 bucket. 

To run this notebook, assumed role needs to have permissions to 
* Create IAM role and policies
* Access Bedrock
* Access S3
* Create OpenSearch serverless collection, create index and ingest documents

This notebook is a fork of Bedrock ImmersionDay notebook here https://github.com/aws-samples/amazon-bedrock-workshop/blob/main/07_Agents/insurance_claims_agent/with_kb/create_and_invoke_agent_with_kb.ipynb. 

## Pre-requisites & Install dependencies

In [None]:
#Check Python version is greater than 3.8 which is required by Langchain if you want to use Langchain
import sys
sys.version

In [None]:
!pip install opensearch-py
!pip install requests-aws4auth
!pip install -U boto3
!pip install -U botocore
!pip install -U awscli
!pip install langchain --upgrade
!pip install langchain-community --upgrade

## Restart Kernel

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)  

## Import dependencies

In [None]:
import sys
assert sys.version_info >= (3, 8)

In [1]:
import sagemaker
import boto3
import json
import random
import time
import zipfile
from io import BytesIO
import uuid
import os
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
iam = boto3.client('iam')
s3 = boto3.client('s3')
lambda_c = boto3.client('lambda')
oss = boto3.client('opensearchserverless')
sts = boto3.client('sts')

In [3]:
session = boto3.Session()
sagemaker_session = sagemaker.Session()
studio_region = sagemaker_session.boto_region_name 
caller_identity = sagemaker_session.get_caller_identity_arn()
account_id = sts.get_caller_identity()["Account"]

In [6]:
embedding_model_arn = f'arn:aws:bedrock:{studio_region}::foundation-model/amazon.titan-embed-text-v1'
suffix = f"{studio_region}-{account_id}"
kb_bedrock_allow_policy_name = f"ica-kb-bedrock-allow-{suffix}"
kb_aoss_allow_policy_name = f"ica-kb-aoss-allow-{suffix}"
kb_s3_allow_policy_name = f"ica-kb-s3-allow-{suffix}"
kb_collection_name = f'ica-kbc-{suffix}'
kb_bucket_prefix = 'kb_documents'
kb_role_name = f'AmazonBedrockExecutionRoleForKnowledgeBase_icakb'
default_bucket = sagemaker_session.default_bucket()
default_bucket_arn = f"arn:aws:s3:::{default_bucket}"
kb_name = f'insurance-claims-kb-{suffix}'

## Create IAM Roles and attach policies for OpenSearch service

We will create IAM policies that permit Knowledge Base to 
* invoke Bedrock Titan Embedding Foundation model
* Amazon OpenSearch Serverless
* S3 bucket with policy files

In [None]:
# Create IAM policies for KB to invoke embedding model
bedrock_kb_allow_fm_model_policy_statement = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmazonBedrockAgentBedrockFoundationModelPolicy",
            "Effect": "Allow",
            "Action": "bedrock:InvokeModel",
            "Resource": [
                embedding_model_arn
            ]
        }
    ]
}

kb_bedrock_policy_json = json.dumps(bedrock_kb_allow_fm_model_policy_statement)

kb_bedrock_policy = iam.create_policy(
    PolicyName=kb_bedrock_allow_policy_name,
    PolicyDocument=kb_bedrock_policy_json)

In [None]:
# Create IAM policies for KB to access OpenSearch Serverless
bedrock_kb_allow_aoss_policy_statement = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:APIAccessAll",
            "Resource": [
                f"arn:aws:aoss:{studio_region}:{account_id}:collection/*"
            ]
        }
    ]
}


kb_aoss_policy_json = json.dumps(bedrock_kb_allow_aoss_policy_statement)

kb_aoss_policy = iam.create_policy(
    PolicyName=kb_aoss_allow_policy_name,
    PolicyDocument=kb_aoss_policy_json
)



In [None]:
# Create IAM policies for KB to access S3 bucket
kb_s3_allow_policy_statement = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowKBAccessDocuments",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                f"arn:aws:s3:::{default_bucket}/*",
                f"arn:aws:s3:::{default_bucket}"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:ResourceAccount": f"{account_id}"
                }
            }
        }
    ]
}


kb_s3_json = json.dumps(kb_s3_allow_policy_statement)
kb_s3_policy = iam.create_policy(
    PolicyName=kb_s3_allow_policy_name,
    PolicyDocument=kb_s3_json
)

In [None]:
# Create IAM Role for the agent and attach IAM policies
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [{
          "Effect": "Allow",
          "Principal": {
            "Service": "bedrock.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
    }]
}

assume_role_policy_document_json = json.dumps(assume_role_policy_document)
kb_role = iam.create_role(
    RoleName=kb_role_name,
    AssumeRolePolicyDocument=assume_role_policy_document_json
)

# Pause to make sure role is created
time.sleep(10)
    
iam.attach_role_policy(
    RoleName=kb_role_name,
    PolicyArn=kb_bedrock_policy['Policy']['Arn']
)

iam.attach_role_policy(
    RoleName=kb_role_name,
    PolicyArn=kb_aoss_policy['Policy']['Arn']
)

iam.attach_role_policy(
    RoleName=kb_role_name,
    PolicyArn=kb_s3_policy['Policy']['Arn']
)

In [None]:
kb_role_arn = kb_role["Role"]["Arn"]
kb_role_arn

## Upload Knowledge Base files to S3

In [None]:
# Upload Knowledge Base files to this s3 bucket
for f in os.listdir(kb_bucket_prefix):
    if f.endswith(".docx"):
        s3.upload_file(kb_bucket_prefix+'/'+f, default_bucket, kb_bucket_prefix+'/'+f)


## Create a Vector database

In this section we will create a vector database with OpenSearch Serverless. Amazon OpenSearch Serverless is a serverless option in Amazon OpenSearch Service. As a developer, you can use OpenSearch Serverless to run petabyte-scale workloads without configuring, managing, and scaling OpenSearch clusters. You get the same interactive millisecond response times as OpenSearch Service with the simplicity of a serverless environment. Pay only for what you use by automatically scaling resources to provide the right amount of capacity for your application—without impacting data ingestion.

In [None]:
kb_collection_name = f'ica-kbc-{suffix}'

In [None]:
# Create OpenSearch Collection
security_policy_json = {
    "Rules": [
        {
            "ResourceType": "collection",
            "Resource":[
                f"collection/{kb_collection_name}"
            ]
        }
    ],
    "AWSOwnedKey": True
}
security_policy = oss.create_security_policy(
    description='security policy of aoss collection',
    name=kb_collection_name,
    policy=json.dumps(security_policy_json),
    type='encryption'
)

In [None]:
network_policy_json = [
  {
    "Rules": [
      {
        "Resource": [
          f"collection/{kb_collection_name}"
        ],
        "ResourceType": "dashboard"
      },
      {
        "Resource": [
          f"collection/{kb_collection_name}"
        ],
        "ResourceType": "collection"
      }
    ],
    "AllowFromPublic": True
  }
]

network_policy = oss.create_security_policy(
    description='network policy of aoss collection',
    name=kb_collection_name,
    policy=json.dumps(network_policy_json),
    type='network'
)


In [None]:
response = sts.get_caller_identity()
current_role = response['Arn']
current_role

In [None]:
data_policy_json = [
  {
    "Rules": [
      {
        "Resource": [
          f"collection/{kb_collection_name}"
        ],
        "Permission": [
          "aoss:DescribeCollectionItems",
          "aoss:CreateCollectionItems",
          "aoss:UpdateCollectionItems",
          "aoss:DeleteCollectionItems"
        ],
        "ResourceType": "collection"
      },
      {
        "Resource": [
          f"index/{kb_collection_name}/*"
        ],
        "Permission": [
            "aoss:CreateIndex",
            "aoss:DeleteIndex",
            "aoss:UpdateIndex",
            "aoss:DescribeIndex",
            "aoss:ReadDocument",
            "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
        kb_role_arn,
        f"arn:aws:sts::{account_id}:assumed-role/Admin/*",
        current_role
    ],
    "Description": ""
  }
]

data_policy = oss.create_access_policy(
    description='data access policy for aoss collection',
    name=kb_collection_name,
    policy=json.dumps(data_policy_json),
    type='data'
)


In [None]:
opensearch_collection_response = oss.create_collection(
    description='OpenSearch collection for Amazon Bedrock Knowledge Base',
    name=kb_collection_name,
    standbyReplicas='DISABLED',
    type='VECTORSEARCH'
)
opensearch_collection_response

In [None]:
collection_arn = opensearch_collection_response["createCollectionDetail"]["arn"]
collection_arn

In [None]:
# wait for collection creation
response = oss.batch_get_collection(names=[kb_collection_name])
# Periodically check collection status
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    time.sleep(30)
    response = oss.batch_get_collection(names=[kb_collection_name])
print('\nCollection successfully created:')
print(response["collectionDetails"])
# Extract the collection endpoint from the response
host = (response['collectionDetails'][0]['collectionEndpoint'])
final_host = host.replace("https://", "")
final_host

## Creating OpenSearch Index

In [5]:
kb_vector_index_name = "bedrock-knowledge-base-index"
kb_metadataField = 'bedrock-knowledge-base-metadata'
kb_textField = 'bedrock-knowledge-base-text'
kb_vectorField = 'bedrock-knowledge-base-vector'

In [None]:
credentials = boto3.Session().get_credentials()
service = 'aoss'
awsauth = AWS4Auth(
    credentials.access_key, 
    credentials.secret_key,
    studio_region, 
    service, 
    session_token=credentials.token
)

# Build the OpenSearch client
open_search_client = OpenSearch(
    hosts=[{'host': final_host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# It can take up to a minute for data access rules to be enforced
time.sleep(45)
index_body = {
    "settings": {
        "index.knn": True,
        "number_of_shards": 1,
        "knn.algo_param.ef_search": 512,
        "number_of_replicas": 0,
    },
    "mappings": {
        "properties": {}
    }
}



In [None]:
index_body["mappings"]["properties"][kb_vectorField] = {
    "type": "knn_vector",
    "dimension": 1536,
    "method": {
        "name": "hnsw",
        "engine": "nmslib",
        "space_type": "cosinesimil",
        "parameters": {
            "ef_construction": 512, 
            "m": 16
        },
    },
}

index_body["mappings"]["properties"][kb_textField] = {
    "type": "text"
}

index_body["mappings"]["properties"][kb_metadataField] = {
    "type": "text"
}

# Create index
if not open_search_client.indices.exists(kb_vector_index_name):
    response = open_search_client.indices.create(kb_vector_index_name, body=index_body)
    print('\nCreating index:')
    print(response)
else:
    print(f'Index {kb_vector_index_name} already exists')    

In [None]:
storage_configuration = {
    'opensearchServerlessConfiguration': {
        'collectionArn': collection_arn, 
        'fieldMapping': {
            'metadataField': kb_metadataField,
            'textField': kb_textField,
            'vectorField': kb_vectorField
        },
        'vectorIndexName': kb_vector_index_name
    },
    'type': 'OPENSEARCH_SERVERLESS'
}

## Create Bedrock Knowledge base

In [None]:
bedrock_agent = boto3.client('bedrock-agent')

# Creating the knowledge base
try:
    # ensure the index is created and available
    time.sleep(45)
    kb_obj = bedrock_agent.create_knowledge_base(
        name=kb_name, 
        description='KB that contains information about documents requirements for insurance claims',
        roleArn=kb_role_arn,
        knowledgeBaseConfiguration={
            'type': 'VECTOR',  # Corrected type
            'vectorKnowledgeBaseConfiguration': {
                'embeddingModelArn': embedding_model_arn
            }
        },
        storageConfiguration=storage_configuration
    )

    #Print the response 
    print(kb_obj)

except Exception as e:
    print(f"Error occurred: {e}")

In [None]:
knowledge_base_id = kb_obj["knowledgeBase"]["knowledgeBaseId"]
knowledge_base_arn = kb_obj["knowledgeBase"]["knowledgeBaseArn"]

## Create a data source to attach to created Knowledge Base

Let's create a data source for our Knowledge Base. Then we will ingest our data and convert it into embeddings.


In [None]:
# Define the S3 configuration for your data source
s3_configuration = {
    'bucketArn': default_bucket_arn,
    'inclusionPrefixes': [kb_bucket_prefix]  
}

# Define the data source configuration
data_source_configuration = {
    's3Configuration': s3_configuration,
    'type': 'S3'
}

chunking_strategy_configuration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

In [None]:
data_source_name = f'insurance-claims-kb-docs-{suffix}'
# Create the data source
try:
    # ensure that the KB is created and available
    time.sleep(45)
    data_source_response = bedrock_agent.create_data_source(
        knowledgeBaseId=knowledge_base_id,
        name=data_source_name,
        description='DataSource for the insurance claim documents requirements',
        dataSourceConfiguration=data_source_configuration,
        vectorIngestionConfiguration = {
            "chunkingConfiguration": chunking_strategy_configuration
        }
    )

    # Print the response
    print(data_source_response)

except Exception as e:
    print(f"Error occurred: {e}")

## Start the Ingestion process

Once the Knowledge Base and Data Source are created, we can start the ingestion job. During the ingestion job, Knowledge Base will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case Amazon OpenSource Serverless.

In [None]:
# Start an ingestion job
data_source_id = data_source_response["dataSource"]["dataSourceId"]
start_job_response = bedrock_agent.start_ingestion_job(
    knowledgeBaseId=knowledge_base_id, 
    dataSourceId=data_source_id
)

In [None]:
ingestion_job_id= start_job_response["ingestionJob"]["ingestionJobId"]

In [None]:
ingestion_job_response = bedrock_agent.get_ingestion_job(ingestionJobId=ingestion_job_id,
                                knowledgeBaseId=knowledge_base_id, 
                                dataSourceId=data_source_id)
print(ingestion_job_response)

## Get Knowledge base details
Let's try to details of the knowledge base created in the previous step

In [None]:
bedrock_agent = boto3.client('bedrock-agent')
list_kb_response = bedrock_agent.list_knowledge_bases()

for kb_obj in list_kb_response["knowledgeBaseSummaries"]:
    if kb_obj["name"] == kb_name:
        knowledge_base_id = kb_obj["knowledgeBaseId"]
        break
print(f"Knowledgebase name:{kb_name} ID:{knowledge_base_id}")

In [None]:
get_kb_response = bedrock_agent.get_knowledge_base(knowledgeBaseId=knowledge_base_id)
knowledge_base_arn = get_kb_response["knowledgeBase"]["knowledgeBaseArn"]
print(f"Knowledgebase ARN: {knowledge_base_arn}")

## Query Knowledge base
Run a sample query with Bedrock agent runtime client and get a response. We will run two APIS in this section

* retrieve - Queries a knowledge base and retrieves information from it.
* retrieve_and_generate - Queries a knowledge base and generates responses based on the retrieved results. The response cites up to five sources but only selects the ones that are relevant to the query.

In [15]:
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime')

In [16]:
query = "what are vehicle registration requirements?"
retrieve_response = bedrock_agent_runtime.retrieve(
    knowledgeBaseId=knowledge_base_id,
    retrievalQuery={
        'text': query
    },
    retrievalConfiguration={
        'vectorSearchConfiguration': {
            'numberOfResults': 5
        }
    }
)

In [None]:
retrieve_response["retrievalResults"]

In [None]:
model_arn = f'arn:aws:bedrock:{studio_region}::foundation-model/anthropic.claude-v2'

retrieve_and_generate_response = bedrock_agent_runtime.retrieve_and_generate(
    input={
        'text': query
    },
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': knowledge_base_id,
            'modelArn': model_arn
        }
    }
)
retrieve_and_generate_response

## RAG with Bedrock Knowledgebase and LangChain
In the below steps we will integrate Bedrock Knowledgebase retirever with LangChain and invoke a Bedrock model.


In [19]:
bedrock = session.client("bedrock-runtime", region_name=studio_region)

In [20]:
from langchain_community.llms.bedrock import Bedrock

#Creating Anthropic Claude
model_args= {'max_tokens_to_sample':200,'temperature':0}
llm = Bedrock(model_id="anthropic.claude-v1", client=bedrock, model_kwargs=model_args)

In [21]:
from langchain_community.retrievers.bedrock import AmazonKnowledgeBasesRetriever
kb_retriever = AmazonKnowledgeBasesRetriever(
        knowledge_base_id=knowledge_base_id,
        retrieval_config={"vectorSearchConfiguration": {"numberOfResults": 4}}
)

In [None]:
docs = kb_retriever.get_relevant_documents(
        query=query
    )
print(docs)

In [23]:
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

prompt_template = """Human: Use the following pieces of context to provide a concise answer to the question at the end. 

{context}

Question: {question}
Assistant:"""
PROMPT = PromptTemplate(
    template=prompt_template, input_variables=["context", "question"]
)

In [24]:
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=kb_retriever,
    return_source_documents=True,
    chain_type_kwargs={"prompt": PROMPT}
)

response = qa.invoke({'query':query})
print(response['result'])

 Vehicle registration requires:

- Proof of vehicle ownership (VehicleRegistration)
- Driver's license (DriverLicense)
- Accident report (AccidentReport) if there was an accident 
- Accident images (AccidentImages) if there was an accident


## Cleanup (Optional)
We will perform clean-up of the objects created above
* Remove knowledge base
* Remove OpenSearch components
* Empty documents from S3 bucket

In [None]:
objects = s3.list_objects(Bucket=default_bucket,Prefix=kb_bucket_prefix)
# Empty and delete S3 Bucket
if 'Contents' in objects:
    for obj in objects['Contents']:
        s3.delete_object(Bucket=default_bucket, Key=obj['Key'])

In [None]:
# Delete IAM Policies, detach and the IAM role
for policy in [
    kb_bedrock_policy,
    kb_aoss_policy,
    kb_s3_policy
]:
    response = iam.list_entities_for_policy(
        PolicyArn=policy['Policy']['Arn'],
        EntityFilter='Role'
    )

    for role in response['PolicyRoles']:
        iam.detach_role_policy(
            RoleName=role['RoleName'], 
            PolicyArn=policy['Policy']['Arn']
        )

    iam.delete_policy(
        PolicyArn=policy['Policy']['Arn']
    )

    
for role_name in [
    kb_role_name
]:
    try: 
        iam.delete_role(
            RoleName=role_name
        )
    except Exception as e:
        print(e)
        print("couldn't delete role", role_name)

In [None]:
try:
    oss.delete_collection(
        id=opensearch_collection_response["createCollectionDetail"]["id"]
    )

    oss.delete_access_policy(
          name=kb_collection_name,
          type='data'
    )    

    oss.delete_security_policy(
          name=kb_collection_name,
          type='network'
    )   

    oss.delete_security_policy(
          name=kb_collection_name,
          type='encryption'
    )    

except Exception as e:
    print(e)

In [None]:
try:
    bedrock_agent.delete_knowledge_base(
        knowledgeBaseId=knowledge_base_id
    )
except Exception as e:
    print(e)