# Module 2 - Create Knowledge Base and Ingest Documents

----

This notebook provides sample code with step by step instructions for building an Amazon Bedrock Knowledge Base.

----

### Contents

1. [item 1](#item-1)
1. [item 2](#item-2)
1. [item 3](#item-3)
1. [item 4](#item-4)

----

### Background

Foundation models (FMs) are powerful generative AI models trained on vast amounts of general-purpose data. However, many real-world applications require these models to generate responses grounded in domain-specific or proprietary information. Retrieval Augmented Generation (RAG) is a technique that enhances generative AI responses by retrieving relevant information from external data sources at query time.

Bedrock Knowledge Bases (BKBs) provide a fully managed capability to implement RAG-based solutions. By integrating your own data — such as documents, manuals, or knowledge articles — into a knowledge base, you can improve the accuracy, relevance, and usefulness of model-generated responses. When a user submits a query, Amazon Bedrock Knowledge Bases search across the ingested data, retrieve the most relevant content, and pass this information to the foundation model to generate a more informed response.

This notebook demonstrates how to create an empty Amazon OpenSearch Serverless (AOSS) index, build an Amazon Bedrock Knowledge Base, and ingest documents into it for retrieval-augmented generation.

### Pre-requisites
This notebook requires permissions to:
- create and delete **Amazon IAM** roles
- create, update and delete **Amazon S3** buckets
- access **Amazon Bedrock**
- access to **Amazon OpenSearch Serverless**

If running on **SageMaker Studio**, you should add the following managed policies to your role:
- `IAMFullAccess`
- `AWSLambda_FullAccess`
- `AmazonS3FullAccess`
- `AmazonBedrockFullAccess`
- Custom policy for Amazon OpenSearch Serverless such as:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "*"
        }
    ]
}
```
<div class="alert alert-block alert-info">
<b>Note:</b> Please make sure to enable `Anthropic Claude 3 Sonnet` and `Anthropic Claude 3 Haiku` model access in Amazon Bedrock Console, as the notebook will use Anthropic Claude 3 Sonnet and Claude 3 Haiku models for testing the knowledge base once its created.
</div>


## 1. Setup

### 1.1 Install and import the required libraries


In [None]:
%pip install --force-reinstall -q -r ./requirements.txt

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
#import warnings
#warnings.filterwarnings('ignore')

In [1]:
# Standard library imports
import os
import sys
import json
import time
import pprint
import random

# Third-party imports
import boto3
from botocore.exceptions import ClientError
from retrying import retry

# Local imports
import utility

# Setup pretty printing
pp = pprint.PrettyPrinter(indent=2)

# Print SDK versions
print(f"Python version: {sys.version.split()[0]}")
print(f"Boto3 SDK version: {boto3.__version__}")

Python version: 3.11.11
Boto3 SDK version: 1.36.23


### 1.2 Initial setup for clients and global variables

In [2]:
# Create boto3 session and set AWS region
boto_session = boto3.Session()
aws_region = boto_session.region_name

# Create boto3 clients for AOSS, Bedrock, and S3 services
aoss_client = boto3.client('opensearchserverless')
bedrock_agent_client = boto3.client('bedrock-agent')
s3_client = boto3.client('s3')

# Define names for AOSS, Bedrock, and S3 resources
resource_suffix = random.randrange(100, 999)
s3_bucket_name = f"bedrock-kb-{aws_region}-{resource_suffix}"
aoss_collection_name = f"bedrock-kb-collection-{resource_suffix}" # was: vector_store_name
aoss_index_name = f"bedrock-kb-index-{resource_suffix}"           # was: index_name

# Some temporary local paths
local_data_dir = 'data'

# Print configurations
print(f"AWS Region: {aws_region}")
print(f"S3 Bucket: {s3_bucket_name}")
print(f"AOSS Collection Name: {aoss_collection_name}")

AWS Region: us-east-1
S3 Bucket: bedrock-kb-us-east-1-439
AOSS Collection Name: bedrock-kb-collection-439


### 1.3 Store some variables for later reuse

In [3]:
%store s3_bucket_name

Stored 's3_bucket_name' (str)


## 2. Create an S3 data source

Amazon Bedrock Knowledge Bases can connect to a variety of data sources for downstream RAG applications. Supported data sources include Amazon S3, Confluence, Microsoft SharePoint, Salesforce, Web Crawler, and custom data sources.

In this workshop, we will use Amazon S3 to store unstructured data — specifically, PDF files containing Amazon Shareholder Letters from different years. This S3 bucket will serve as the source of documents for our Knowledge Base. During the ingestion process, Bedrock will parse these documents, convert them into vector embeddings using an embedding model, and store them in a vector database for efficient retrieval during queries.

### 2.1 Create an S3 bucket, if needed

In [4]:
# Check if bucket exists, and if not create S3 bucket for KB data source

try:
    s3_client.head_bucket(Bucket=s3_bucket_name)
    print(f"Bucket '{s3_bucket_name}' already exists..")
except ClientError as e:
    print(f"Creating bucket: '{s3_bucket_name}'..")
    if aws_region == 'us-east-1':
        s3_client.create_bucket(Bucket=s3_bucket_name)
    else:
        s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': region}
        )

Creating bucket 'bedrock-kb-us-east-1-439'..


### 2.2 Download data and upload to S3

In [5]:
from urllib.request import urlretrieve

# URLs of shareholder letters to download
urls = [
    'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf'
]

# Corresponding local file names
filenames = [
    'AMZN-2022-Shareholder-Letter.pdf',
    'AMZN-2021-Shareholder-Letter.pdf',
    'AMZN-2020-Shareholder-Letter.pdf',
    'AMZN-2019-Shareholder-Letter.pdf'
]

# Create local staging directory if it doesn't exist
os.makedirs(local_data_dir, exist_ok=True)

# Download each file and print confirmation
for url, filename in zip(urls, filenames):
    file_path = os.path.join(local_data_dir, filename)
    urlretrieve(url, file_path)
    print(f"Downloaded: '{filename}' to '{local_data_dir}'..")

Downloaded: 'AMZN-2022-Shareholder-Letter.pdf' to 'data'..
Downloaded: 'AMZN-2021-Shareholder-Letter.pdf' to 'data'..
Downloaded: 'AMZN-2020-Shareholder-Letter.pdf' to 'data'..
Downloaded: 'AMZN-2019-Shareholder-Letter.pdf' to 'data'..


In [6]:
for root, _, files in os.walk(local_data_dir):
    for file in files:
        full_path = os.path.join(root, file)
        s3_client.upload_file(full_path, s3_bucket_name, file)
        print(f"Uploaded: '{file}' to 's3://{s3_bucket_name}'..")

Uploaded: AMZN-2022-Shareholder-Letter.pdf to 's3://bedrock-kb-us-east-1-439'..
Uploaded: AMZN-2021-Shareholder-Letter.pdf to 's3://bedrock-kb-us-east-1-439'..
Uploaded: AMZN-2020-Shareholder-Letter.pdf to 's3://bedrock-kb-us-east-1-439'..
Uploaded: AMZN-2019-Shareholder-Letter.pdf to 's3://bedrock-kb-us-east-1-439'..


## 3 Create AOSS Vector Index and Configure BKB Access Permissions

In this section, we’ll create a vector index using Amazon OpenSearch Serverless (AOSS) and configure the necessary access permissions for the Bedrock Knowledge Base (BKB) that we’ll set up later. AOSS provides a fully managed, serverless solution for running vector search workloads at billion-vector scale. It automatically handles resource scaling and eliminates the need for cluster management, while delivering low-latency, millisecond response times with pay-per-use pricing.

While this example uses AOSS, it’s worth noting that Bedrock Knowledge Bases also supports other popular vector stores, including Amazon Aurora PostgreSQL with pgvector, Pinecone, Redis Enterprise Cloud, and MongoDB, among others

### 3.1 Create IAM Role with Necessary Permissions for Bedrock Knowledge Base

Let's first create an IAM role with all the necessary policies and permissions to allow BKB to execute operations, such as invoking Bedrock FMs and reading data from an S3 bucket. We will use a helper function for this.

In [7]:
bedrock_kb_execution_role = utility.create_bedrock_execution_role(bucket_name=s3_bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

print(f"Created KB execution role with ARN: '{bedrock_kb_execution_role_arn}'..")

Created KB execution role with ARN: 'arn:aws:iam::242711262407:role/AmazonBedrockExecutionRoleForKnowledgeBase_347'..


### 3.2 Create AOSS Policies and Vector Collection

Next we need to create and attach three key policies for securing and managing access to the AOSS collection: an encryption policy, a network access policy, and a data access policy. These policies ensure proper encryption, network security, and the necessary permissions for creating, reading, updating, and deleting collection items and indexes. This step is essential for configuring the OpenSearch collection to interact with BKB securely and efficiently. We will use another helper function for this.

⚠️ **Note:** _in order to keep setup overhead at mininum, in this example we **allow public internet access** to the OpenSearch Serverless collection resource. However, for production environments we strongly suggest to leverage private connection between your VPC and Amazon OpenSearch Serverless resources via an VPC endpoint, as described [here](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-vpc.html)._

In [8]:
# Create AOSS policies for the new vector collection
encryption_policy, network_policy, access_policy = utility.create_policies_in_oss(
    vector_store_name=aoss_collection_name,
    aoss_client=aoss_client,
    bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)

print(f"Created encryption policy with name '{encryption_policy['securityPolicyDetail']['name']}'..")
print(f"Created network policy with name '{network_policy['securityPolicyDetail']['name']}'..")
print(f"Created access policy with name '{access_policy['accessPolicyDetail']['name']}'..")

Created encryption policy with name 'bedrock-sample-rag-sp-347'..
Created network policy with name 'bedrock-sample-rag-np-347'..
Created access policy with name 'bedrock-sample-rag-ap-347'..


In [9]:
# Request to create AOSS collection
aoss_collection = aoss_client.create_collection(name=aoss_collection_name, type='VECTORSEARCH')

# Wait until collection becomes active
print("Waiting until AOSS collection becomes active: ", end='')
while True:
    aoss_response = aoss_client.list_collections(collectionFilters={'name': aoss_collection_name})
    aoss_status = aoss_response['collectionSummaries'][0]['status']
    if aoss_status in ('ACTIVE', 'FAILED'):
        print(" done.")
        break
    print('█', end='', flush=True)
    time.sleep(5)

print("An AOSS collection created:", json.dumps(aoss_response['collectionSummaries'], indent=2))

Waiting until AOSS collection becomes active: █████ done.
An AOSS collection created: [
  {
    "id": "rujdjpbw60nqh9sd61e1",
    "name": "bedrock-kb-collection-439",
    "status": "ACTIVE",
    "arn": "arn:aws:aoss:us-east-1:242711262407:collection/rujdjpbw60nqh9sd61e1"
  }
]


Store AOSS-related resouces for access in other notebooks later:

In [10]:
%store encryption_policy network_policy access_policy aoss_collection

Stored 'encryption_policy' (dict)
Stored 'network_policy' (dict)
Stored 'access_policy' (dict)
Stored 'aoss_collection' (dict)


### 3.2 Grant BKB Access to AOSS Data

In this step, we create a data access policy that grants BKB the necessary permissions to read from our AOSS collections. We then attach this policy to the Bedrock execution role we created earlier, allowing BKB to securely access AOSS data when generating responses. We will be using helper function once again.

In [11]:
# create opensearch serverless access policy and attach it to Bedrock execution role
try:
    aoss_policy_arn = utility.create_oss_policy_attach_bedrock_execution_role(
        collection_id=aoss_collection['createCollectionDetail']['id'],
        bedrock_kb_execution_role=bedrock_kb_execution_role)
    
    print("Waiting 60 sec for data access rules to be enforced: ", end='', flush=True)
    for _ in range(12):  # 12 * 5 sec = 60 sec
        print('█', end='', flush=True)
        time.sleep(5)
    print(" done.")
except Exception as e:
    print("Policy already exists! \n", e)

print("Created and attached policy with ARN:", aoss_policy_arn)

Created and attached policy with ARN: arn:aws:iam::242711262407:policy/AmazonBedrockOSSPolicyForKnowledgeBase_347
███████ for data access rules to be enforced: 

KeyboardInterrupt: 

### 3.3 Create an AOSS vector index

In [None]:
# Create the vector index in Opensearch serverless, with the knn_vector field index mapping, specifying the dimension size, name and engine.
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError


credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region_name, 'aoss')

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss",
                 "space_type": "l2"
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}

# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)


In [None]:
# Create index
try:
    response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
    print('\nCreating index:')
    pp.pprint(response)

    # index creation can take up to a minute
    interactive_sleep(60)
except RequestError as e:
    # you can delete the index if its already exists
    # oss_client.indices.delete(index=index_name)
    print(f'Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to delete, and recreate the index')
    

#### Upload data to S3 Bucket data source

## Create Knowledge Base
Steps:
- initialize Open search serverless configuration which will include collection ARN, index name, vector field, text field and metadata field.
- initialize chunking strategy, based on which KB will split the documents into pieces of size equal to the chunk size mentioned in the `chunkingStrategyConfiguration`.
- initialize the s3 configuration, which will be used to create the data source object later.
- initialize the Titan embeddings model ARN, as this will be used to create the embeddings for each of the text chunks.

In [None]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

# Ingest strategy - How to ingest data from the data source
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

# The data source to ingest documents from, into the OpenSearch serverless knowledge base index
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    # "inclusionPrefixes":["*.*"] # you can use this if you want to create a KB using data within s3 prefixes.
}

# The embedding model used by Bedrock to embed ingested documents, and realtime prompts
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Amazon shareholder letter knowledge base."
roleArn = bedrock_kb_execution_role_arn


Provide the above configurations as input to the `create_knowledge_base` method, which will create the Knowledge base.

In [None]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [None]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [None]:
pp.pprint(kb)

In [None]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

Next we need to create a data source, which will be associated with the knowledge base created above. Once the data source is ready, we can then start to ingest the documents.

In [None]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)

In [None]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

### Start ingestion job
Once the KB and data source is created, we can start the ingestion job.
During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS.

In [None]:
# Start an ingestion job
interactive_sleep(30)
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [None]:
job = start_job_response["ingestionJob"]
pp.pprint(job)

In [None]:
# Get job 
while(job['status']!='COMPLETE' ):
    get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
    job = get_job_response["ingestionJob"]
    
    interactive_sleep(30)

pp.pprint(job)

In [None]:
# Print the knowledge base Id in bedrock, that corresponds to the Opensearch index in the collection we created before, we will use it for the invocation later
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)

In [None]:
# keep the kb_id for invocation later in the invoke request
%store kb_id

## Test the knowledge base
### Note: If you plan to run any following notebooks, you can skip this section
### Using RetrieveAndGenerate API
Behind the scenes, RetrieveAndGenerate API converts queries into embeddings, searches the knowledge base, and then augments the foundation model prompt with the search results as context information and returns the FM-generated response to the question. For multi-turn conversations, Knowledge Bases manage short-term memory of the conversation to provide more contextual results.

The output of the RetrieveAndGenerate API includes the generated response, source attribution as well as the retrieved text chunks.

In [None]:
# try out KB using RetrieveAndGenerate API
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime", region_name=region_name)
# Lets see how different Anthropic Claude 3 models responds to the input text we provide
claude_model_ids = [ ["Claude 3 Sonnet", "anthropic.claude-3-sonnet-20240229-v1:0"], ["Claude 3 Haiku", "anthropic.claude-3-haiku-20240307-v1:0"]]

In [None]:
def ask_bedrock_llm_with_knowledge_base(query: str, model_arn: str, kb_id: str) -> str:
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        input={
            'text': query
        },
        retrieveAndGenerateConfiguration={
            'type': 'KNOWLEDGE_BASE',
            'knowledgeBaseConfiguration': {
                'knowledgeBaseId': kb_id,
                'modelArn': model_arn
            }
        },
    )

    return response

In [None]:
query = "What is Amazon's doing in the field of generative AI?"

for model_id in claude_model_ids:
    model_arn = f'arn:aws:bedrock:{region_name}::foundation-model/{model_id[1]}'
    response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
    generated_text = response['output']['text']
    citations = response["citations"]
    contexts = []
    for citation in citations:
        retrievedReferences = citation["retrievedReferences"]
        for reference in retrievedReferences:
            contexts.append(reference["content"]["text"])
    print(f"---------- Generated using {model_id[0]}:")
    pp.pprint(generated_text )
    print(f'---------- The citations for the response generated by {model_id[0]}:')
    pp.pprint(contexts)
    print()

### Retrieve API
Retrieve API converts user queries into embeddings, searches the knowledge base, and returns the relevant results, giving you more control to build custom workﬂows on top of the semantic search results. The output of the Retrieve API includes the the retrieved text chunks, the location type and URI of the source data, as well as the relevance scores of the retrievals.

In [None]:
# retrieve api for fetching only the relevant context.
relevant_documents = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id,
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)

In [None]:
pp.pprint(relevant_documents["retrievalResults"])

<div class="alert alert-block alert-warning">
<b>Next steps:</b> Proceed to the next labs to learn how to use Bedrock Knowledge bases. Remember to CLEAN_UP at the end of your session.
</div>