# Knowledge Bases for Amazon Bedrock - End to end example

This notebook provides sample code for creating an empty Amazon OpenSearch Serverless (AOSS) index, Amazon Bedrock Knowledge Base and ingesting documents into the index.


#### Notebook Walkthrough

We will create a data pipeline that ingests documents (typically stored in Amazon S3) into a Knowledge Base with Amazon OpenSearch Service Serverless (AOSS) as a vector store so the data is available for lookup when a question is received.

- Load the documents into the Knowledge Base by connecting your S3 bucket (data source). 
- Ingestion - Knowledge Base will split the messages into smaller chunks (based on the strategy selected), generate embeddings and store it in the associated vectore store.

![data_ingestion.png](./images/data_ingestion.png)


#### Steps: 
- Create Amazon Bedrock Knowledge Base execution role with necessary policies for accessing data from S3 and writing embeddings into AOSS.
- Create an empty OpenSearch serverless index.
- Download documents
- Create Amazon Bedrock Knowledge Base
- Create a data source within Knowledge Base which will connect to Amazon S3
- Start an ingestion job using Knowledge Base APIs which will read data from s3, chunk it, convert chunks into embeddings using Amazon Titan Embeddings model and then store these embeddings in the OpenSearch index. All of this without having to build, deploy and manage the data pipeline.

Once the data is available in the Bedrock Knowledge Base then a question answering application can be built by following the notebooks in the same folder using the Amazon Bedrock Knowledge Base APIs: 
- [02_managed-rag-kb-retrieve-generate-api.ipynb](02_managed-rag-kb-retrieve-generate-api.ipynb)
- [03_strands-rag-retrieve-api-nova.ipynb](03_strands-rag-retrieve-api-nova.ipynb)


In [1]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [2]:
import json
import os
import boto3
from botocore.exceptions import ClientError
import pprint
from utility import create_bedrock_execution_role, create_aoss_policy_attach_bedrock_execution_role, create_policies_in_aoss, interactive_sleep_for
import random
from retrying import retry

import sys
sys.path.append('../')
from util.tagging import standard_tags, standard_tags_kv, standard_tags_kv_lc
from util.model_selector import create_text_model_selector

In [3]:
suffix = random.randrange(200, 900)
region_name = os.environ.get("AWS_REGION", "us-east-1")
bedrock_agent_client = boto3.client('bedrock-agent')
sts_client = boto3.client('sts')
s3_client = boto3.client('s3')
service = 'aoss'
account_id = sts_client.get_caller_identity()["Account"]
s3_suffix = f"{region_name}-{account_id}"
bucket_name = f'bedrock-kb-{s3_suffix}-{suffix}'
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
pp = pprint.PrettyPrinter(indent=2)

In [4]:
# Check if bucket exists, and if not create S3 bucket for Knowledge Base data source
try:
    s3_client.head_bucket(Bucket=bucket_name)
    print(f'Bucket {bucket_name} Exists')
except ClientError as e:
    print(f'Creating bucket {bucket_name}')
    if region_name == "us-east-1":
        s3bucket = s3_client.create_bucket(
            Bucket=bucket_name
        )
    else:
        s3bucket = s3_client.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={'LocationConstraint': region_name }
    )
    s3_client.put_bucket_tagging(
        Bucket=bucket_name,
        Tagging={
            'TagSet': standard_tags_kv
        }
    )

Creating bucket bedrock-kb-us-east-1-522040354751-252


In [5]:
%store bucket_name

Stored 'bucket_name' (str)


## Create a vector store - Amazon OpenSearch Serverless (AOSS) index

### Step 1 - Create AOSS policies and collection
First of all we have to create a vector store. In this section we will use *Amazon OpenSearch Serverless.*

Amazon OpenSearch Serverless is a serverless option in Amazon OpenSearch Service. As a developer, you can use OpenSearch Serverless to run petabyte-scale workloads without configuring, managing, and scaling OpenSearch clusters. You get the same interactive millisecond response times as OpenSearch Service with the simplicity of a serverless environment. Pay only for what you use by automatically scaling resources to provide the right amount of capacity for your application—without impacting data ingestion.

In [6]:
aoss_client = boto3.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

Create an Amazon OpenSeach Serverless collection for the vector store. Note that creation of the collection can take several minutes. You can use the Amazon OpenSearch Serverless console to monitor creation progress.

In [7]:
# create security, network and data access policies within AOSS
encryption_policy, network_policy, access_policy = create_policies_in_aoss(
    vector_store_name=vector_store_name,
    aoss_client=aoss_client,
    bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH', tags=standard_tags_kv_lc)

In [8]:
pp.pprint(collection)

{ 'ResponseMetadata': { 'HTTPHeaders': { 'connection': 'keep-alive',
                                         'content-length': '314',
                                         'content-type': 'application/x-amz-json-1.0',
                                         'date': 'Tue, 23 Sep 2025 13:21:16 '
                                                 'GMT',
                                         'x-amzn-requestid': '234b920e-6e78-4ce2-8c46-c1f3d827b4e3'},
                        'HTTPStatusCode': 200,
                        'RequestId': '234b920e-6e78-4ce2-8c46-c1f3d827b4e3',
                        'RetryAttempts': 0},
  'createCollectionDetail': { 'arn': 'arn:aws:aoss:us-east-1:522040354751:collection/fdweovp7otbzp95vocfa',
                              'createdDate': 1758633676099,
                              'id': 'fdweovp7otbzp95vocfa',
                              'kmsKeyArn': 'auto',
                              'lastModifiedDate': 1758633676099,
                             

In [9]:
%store encryption_policy network_policy access_policy collection

Stored 'encryption_policy' (dict)
Stored 'network_policy' (dict)
Stored 'access_policy' (dict)
Stored 'collection' (dict)


In [10]:
# Get the OpenSearch serverless collection URL
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

fdweovp7otbzp95vocfa.us-east-1.aoss.amazonaws.com


In [11]:
# wait for collection creation
# This can take couple of minutes to finish
def collection_created():    
    response = aoss_client.batch_get_collection(names=[vector_store_name])
    return response['collectionDetails'][0]['status'] != 'CREATING'

interactive_sleep_for(collection_created)

Waiting for condition `collection_created` to be met. Checking every 10s:

❌.........❌.........❌.........❌.........❌.........
❌.........❌.........❌.........❌.........❌.........
❌.........❌.........❌.........❌.........❌.........
❌.........❌.........❌.........❌.........❌.........
❌.........✅

In [12]:
# create opensearch serverless access policy and attach it to Bedrock execution role
try:
    create_aoss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                    bedrock_kb_execution_role=bedrock_kb_execution_role)
except Exception as e:
    print("Policy already exists")
    pp.pprint(e)

Opensearch serverless arn:  arn:aws:iam::522040354751:policy/AmazonBedrockOSSPolicyForKnowledgeBase_485


In [13]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, AuthorizationException, RequestError, AuthenticationException
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region_name, service)

# Build the OpenSearch client
oss_api_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

## Step 2 - Create vector index

We will create the vector index in Amazon Opensearch Serverless, with the `knn_vector` type, specifying the dimension size, name, and engine.
Read the [OpenSearch documentation on k-NN vector](https://docs.opensearch.org/latest/field-types/supported-field-types/knn-vector/) for more details.


In [14]:
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1024,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss",
                 "space_type": "l2"
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"
         }
      }
   }
}

In [15]:
# we need to retry as it can take a minute to propagate the security policies to AOSS
@retry(retry_on_exception=lambda e: isinstance(e, (AuthenticationException, AuthorizationException)),
       wait_fixed=5000,
       stop_max_delay=60*1000)
def create_index():
    # Create index
    try:
        response = oss_api_client.indices.create(index=index_name, body=json.dumps(body_json))
        print('\nCreating index:')
        pp.pprint(response)
    except RequestError as e:
        if e.error == 'resource_already_exists_exception':
            # oss_api_client.indices.delete(index=index_name)
            print("Index already exists. You can delete the index if its already exists by the delete line in this cell.")
        else:
            raise

create_index()


Creating index:
{ 'acknowledged': True,
  'index': 'bedrock-sample-rag-index-252',
  'shards_acknowledged': True}


## Download data to ingest into our Knowledge Base

In [16]:
# Download and prepare dataset
!mkdir -p ./data

from urllib.request import urlretrieve
urls = [
    'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf'
]

filenames = [
    'AMZN-2022-Shareholder-Letter.pdf',
    'AMZN-2021-Shareholder-Letter.pdf',
    'AMZN-2020-Shareholder-Letter.pdf',
    'AMZN-2019-Shareholder-Letter.pdf'
]

data_root = "./data/"

for idx, url in enumerate(urls):
    file_path = data_root + filenames[idx]
    urlretrieve(url, file_path)


#### Upload data to S3 Bucket data source

In [17]:
# Upload data to s3 to the bucket that was configured as a data source to the Knowledge Base
s3_client = boto3.client(service_name="s3", region_name=region_name)
def uploadDirectory(path,bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                s3_client.upload_file(os.path.join(root,file),bucket_name,file)

uploadDirectory(data_root, bucket_name)

## Create Knowledge Base
Steps:
- Initialize Open search serverless configuration which will include collection ARN, index name, vector field, text field and metadata field.
- Initialize chunking strategy, based on which Knowledge Base will split the documents into pieces of size equal to the chunk size mentioned in the `chunkingStrategyConfiguration`.
- Initialize the s3 configuration, which will be used to create the data source object later.
- Initialize the Titan Embed Text model ARN, as this will be used to create the embeddings for each of the text chunks.

In [18]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

# Ingest strategy - How to ingest data from the data source
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

# The data source to ingest documents from, into the OpenSearch serverless Knowledge Base index
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    # "inclusionPrefixes":["*.*"] # you can use this if you want to create a Knowledge Base using data within S3 prefixes.
}

# The embedding model used by Bedrock to embed ingested documents, and realtime prompts
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v2:0"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Amazon shareholder letter Knowledge Base."
roleArn = bedrock_kb_execution_role_arn


Provide the above configurations as input to the `create_knowledge_base` method, which will create the Knowledge Base.

In [19]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        },
        tags=standard_tags
    )
    return create_kb_response["knowledgeBase"]

In [20]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [21]:
pp.pprint(kb)

{ 'createdAt': datetime.datetime(2025, 9, 23, 13, 24, 56, 35098, tzinfo=tzlocal()),
  'description': 'Amazon shareholder letter Knowledge Base.',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-east-1:522040354751:knowledge-base/V8YH0IAHKO',
  'knowledgeBaseConfiguration': { 'type': 'VECTOR',
                                  'vectorKnowledgeBaseConfiguration': { 'embeddingModelArn': 'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v2:0'}},
  'knowledgeBaseId': 'V8YH0IAHKO',
  'name': 'bedrock-sample-knowledge-base-252',
  'roleArn': 'arn:aws:iam::522040354751:role/AmazonBedrockExecutionRoleForKnowledgeBase_485',
  'status': 'CREATING',
  'storageConfiguration': { 'opensearchServerlessConfiguration': { 'collectionArn': 'arn:aws:aoss:us-east-1:522040354751:collection/fdweovp7otbzp95vocfa',
                                                                   'fieldMapping': { 'metadataField': 'text-metadata',
                                                                   

In [22]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

Next we need to create a data source, which will be associated with the Knowledge Base created above. Once the data source is ready, we can then start to ingest the documents.

In [23]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)

{ 'createdAt': datetime.datetime(2025, 9, 23, 13, 25, 0, 578284, tzinfo=tzlocal()),
  'dataDeletionPolicy': 'DELETE',
  'dataSourceConfiguration': { 's3Configuration': { 'bucketArn': 'arn:aws:s3:::bedrock-kb-us-east-1-522040354751-252'},
                               'type': 'S3'},
  'dataSourceId': 'IG6OZRJBBY',
  'description': 'Amazon shareholder letter Knowledge Base.',
  'knowledgeBaseId': 'V8YH0IAHKO',
  'name': 'bedrock-sample-knowledge-base-252',
  'status': 'AVAILABLE',
  'updatedAt': datetime.datetime(2025, 9, 23, 13, 25, 0, 578284, tzinfo=tzlocal()),
  'vectorIngestionConfiguration': { 'chunkingConfiguration': { 'chunkingStrategy': 'FIXED_SIZE',
                                                               'fixedSizeChunkingConfiguration': { 'maxTokens': 512,
                                                                                                   'overlapPercentage': 20}}}}


In [24]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

{'ResponseMetadata': {'RequestId': 'c02cdfd5-f010-476e-8362-904f31f0c29a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Tue, 23 Sep 2025 13:25:00 GMT',
   'content-type': 'application/json',
   'content-length': '607',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'c02cdfd5-f010-476e-8362-904f31f0c29a',
   'x-amz-apigw-id': 'RW3jCGm4oAMEKTA=',
   'x-amzn-trace-id': 'Root=1-68d29fac-4436959d2f7fdd5f75f0fcf1'},
  'RetryAttempts': 0},
 'dataSource': {'knowledgeBaseId': 'V8YH0IAHKO',
  'dataSourceId': 'IG6OZRJBBY',
  'name': 'bedrock-sample-knowledge-base-252',
  'status': 'AVAILABLE',
  'description': 'Amazon shareholder letter Knowledge Base.',
  'dataSourceConfiguration': {'type': 'S3',
   's3Configuration': {'bucketArn': 'arn:aws:s3:::bedrock-kb-us-east-1-522040354751-252'}},
  'vectorIngestionConfiguration': {'chunkingConfiguration': {'chunkingStrategy': 'FIXED_SIZE',
    'fixedSizeChunkingConfiguration': {'maxTokens': 512,
     'overlapPercentage': 20}}},
  'dataDeleti

### Start ingestion job
Once the Knowledge Base and data source is created, we can start the ingestion job.
During the ingestion job, Knowledge Base will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case AOSS.

In [25]:
# we need to retry as it can take a minute so the previous settings are effective
@retry(retry_on_exception=lambda e: isinstance(e, (bedrock_agent_client.exceptions.ValidationException, bedrock_agent_client.exceptions.ConflictException)),
       wait_fixed=5000,
       stop_max_delay=60*1000)
def start_job():
    return bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

start_job_response = start_job()

In [26]:
job = start_job_response["ingestionJob"]
pp.pprint(job)

{ 'dataSourceId': 'IG6OZRJBBY',
  'ingestionJobId': '6NZLTC1YMX',
  'knowledgeBaseId': 'V8YH0IAHKO',
  'startedAt': datetime.datetime(2025, 9, 23, 13, 25, 1, 737535, tzinfo=tzlocal()),
  'statistics': { 'numberOfDocumentsDeleted': 0,
                  'numberOfDocumentsFailed': 0,
                  'numberOfDocumentsScanned': 0,
                  'numberOfMetadataDocumentsModified': 0,
                  'numberOfMetadataDocumentsScanned': 0,
                  'numberOfModifiedDocumentsIndexed': 0,
                  'numberOfNewDocumentsIndexed': 0},
  'status': 'STARTING',
  'updatedAt': datetime.datetime(2025, 9, 23, 13, 25, 1, 737535, tzinfo=tzlocal())}


In [27]:
# Get job 
def job_completed():
  global job
  get_job_response = bedrock_agent_client.get_ingestion_job(
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceId = ds["dataSourceId"],
    ingestionJobId = job["ingestionJobId"]
  )
  job = get_job_response["ingestionJob"]
  return job['status']=='COMPLETE'

interactive_sleep_for(job_completed)

Waiting for condition `job_completed` to be met. Checking every 10s:

❌.........❌.........✅

In [28]:
# Print the Knowledge Base Id in bedrock, that corresponds to the Opensearch index in the collection we created before, we will use it for the invocation later
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)

'V8YH0IAHKO'


In [29]:
# keep the kb_id for invocation later in the invoke request
%store kb_id

Stored 'kb_id' (str)


Proceed with the next notebook to retrieve contexts from your Knowledge Base.