# Knowledge Bases for Amazon Bedrock - End to end example

This notebook provides sample code for building an empty OpenSearch Serverless (OSS) index, Amazon Bedrock knowledge base and ingest documents into the index.


#### Notebook Walkthrough

A data pipeline that ingests documents (typically stored in Amazon S3) into a knowledge base i.e. a vector database such as Amazon OpenSearch Service Serverless (AOSS) so that it is available for lookup when a question is received.

- Load the documents into the knowledge base by connecting your s3 bucket (data source). 
- Ingestion - Knowledge base will split them into smaller chunks (based on the strategy selected), generate embeddings and store it in the associated vectore store.

![data_ingestion.png](./images/data_ingestion.png)


#### Steps: 
- Create Amazon Bedrock Knowledge Base execution role with necessary policies for accessing data from S3 and writing embeddings into OSS.
- Create an empty OpenSearch serverless index.
- Download documents
- Create Amazon Bedrock knowledge base
- Create a data source within knowledge base which will connect to Amazon S3
- Start an ingestion job using KB APIs which will read data from s3, chunk it, convert chunks into embeddings using Amazon Titan Embeddings model and then store these embeddings in AOSS. All of this without having to build, deploy and manage the data pipeline.

Once the data is available in the Bedrock Knowledge Base then a question answering application can be built using the Knowledge Base APIs provided by Amazon Bedrock in following notebooks in the same folder. 
- [1_managed-rag-kb-retrieve-generate-api.ipynb](./1\_managed-rag-kb-retrieve-generate-api.ipynb)
- [2_customized-rag-retrieve-api-claude-v2.ipynb](./2\_customized-rag-retrieve-api-claude-v2.ipynb)
- [3_customized-rag-retrieve-api-langchain-claude-v2.ipynb](./3\_customized-rag-retrieve-api-langchain-claude-v2.ipynb)


#### Pre-requisites
This notebook requires permissions to:
- create and delete Amazon IAM roles
- create, update and delete Amazon S3 buckets
- access Amazon Bedrock
- access to Amazon OpenSearch Serverless

If running on SageMaker Studio, you should add the following managed policies to your role:
- IAMFullAccess
- AWSLambda_FullAccess
- AmazonS3FullAccess
- AmazonBedrockFullAccess
- Custom policy for Amazon OpenSearch Serverless such as:
```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "*"
        }
    ]
}
```
<div class="alert alert-block alert-info">
<b>Note:</b> Please make sure to enable `Anthropic Claude 3 Sonnet`  and `Anthropic Claude 3 Haiku` model access in Amazon Bedrock Console, as the notebook will use Anthropic Claude 3 Sonnet and Claude 3 Haiku models for testing the knowledge base once its created.
</div>


## Setup
Before running the rest of this notebook, you'll need to run the cells below to (ensure necessary libraries are installed and) connect to Bedrock.

In [1]:
# %pip install -U opensearch-py==2.3.1
# # %pip install -U boto3==1.33.2
# %pip install -U retrying==1.3.4

Collecting opensearch-py==2.3.1
  Downloading opensearch_py-2.3.1-py2.py3-none-any.whl (327 kB)
                                              0.0/327.3 kB ? eta -:--:--
     ---                                     30.7/327.3 kB 1.3 MB/s eta 0:00:01
     ----                                  41.0/327.3 kB 653.6 kB/s eta 0:00:01
     -----------------------                204.8/327.3 kB 1.8 MB/s eta 0:00:01
     -------------------------------------- 327.3/327.3 kB 2.5 MB/s eta 0:00:00
Installing collected packages: opensearch-py
Successfully installed opensearch-py-2.3.1
Note: you may need to restart the kernel to use updated packages.
Collecting retrying==1.3.4
  Downloading retrying-1.3.4-py3-none-any.whl (11 kB)
Installing collected packages: retrying
Successfully installed retrying-1.3.4
Note: you may need to restart the kernel to use updated packages.


In [1]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
import json
import os
import boto3
from botocore.exceptions import ClientError
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss, interactive_sleep
import random
from retrying import retry


In [4]:
# 生成一个随机数作为后缀，范围在 200 到 900 之间，用于避免名称冲突
suffix = random.randrange(200, 900)

# 创建 STS（安全令牌服务）客户端，用于获取 AWS 账户相关信息
sts_client = boto3.client('sts')

# 创建一个 boto3 会话对象，用于与 AWS 服务交互
boto3_session = boto3.session.Session()

# 设置 AWS 区域名称为 "us-west-2"
region_name = "us-west-2"
# region_name = boto3_session.region_name # 也可以获取默认配置的区域名称（如果被注释掉）

# 创建一个 Bedrock Agent 客户端，用于与 Bedrock Agent 服务交互
# 使用我们指定的 region_name，即 'us-west-2'
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)

# 定义要使用的 AWS 服务名，这里是 'aoss'（可能表示特定的服务，具体取决于实际业务）
service = 'aoss'

# 创建一个 S3 客户端，用于与 S3 服务交互
s3_client = boto3.client('s3')

# 使用 STS 客户端获取当前账户的账户 ID
account_id = sts_client.get_caller_identity()["Account"]

# 生成 S3 后缀，由区域名称和账户 ID 组成，用于确保存储桶名称唯一
s3_suffix = f"{region_name}-{account_id}"

# 定义 S3 存储桶的名称，格式为 'bedrock-kb-<区域名称>-<账户 ID>'，确保唯一性
# 需要根据具体需求替换为自己的存储桶名称
bucket_name = f'bedrock-kb-{s3_suffix}'

# 创建一个 PrettyPrinter 对象，用于格式化输出（缩进为 2）
pp = pprint.PrettyPrinter(indent=2)

In [5]:
print(boto3_session.region_name)

us-west-2


In [5]:
# 检查 S3 存储桶是否存在，如果不存在则创建一个用于知识库数据源的 S3 存储桶
try:
    # 调用 S3 客户端的 head_bucket 方法检查指定名称的存储桶是否存在
    s3_client.head_bucket(Bucket=bucket_name)
    # 如果存储桶存在，输出存储桶名称和存在的信息
    print(f'Bucket {bucket_name} Exists')
except ClientError as e:
    # 如果发生 ClientError 异常，表示存储桶不存在，接下来需要创建它
    print(f'Creating bucket {bucket_name}')
    # 检查区域是否为 "us-east-1"，因为在这个区域创建 S3 存储桶不需要指定位置约束
    if region_name == "us-east-1":
        # 在 "us-east-1" 区域创建存储桶
        s3bucket = s3_client.create_bucket(Bucket=bucket_name)
    else:
        # 在其他区域创建存储桶时，需要指定位置约束为对应的区域名称
        s3bucket = s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': region_name}
        )


Bucket bedrock-kb-us-west-2-590183949634 Exists


In [6]:
%store bucket_name

Stored 'bucket_name' (str)


## Create a vector store - OpenSearch Serverless index

### Step 1 - Create OSS policies and collection
First of all we have to create a vector store. In this section we will use *Amazon OpenSerach serverless.*

Amazon OpenSearch Serverless is a serverless option in Amazon OpenSearch Service. As a developer, you can use OpenSearch Serverless to run petabyte-scale workloads without configuring, managing, and scaling OpenSearch clusters. You get the same interactive millisecond response times as OpenSearch Service with the simplicity of a serverless environment. Pay only for what you use by automatically scaling resources to provide the right amount of capacity for your application—without impacting data ingestion.

In [7]:
import boto3
import time

# 使用随机后缀生成向量存储名称和索引名称，确保唯一性
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"

# 创建 OpenSearch Serverless 客户端，用于与 AWS OpenSearch 无服务器版本交互
aoss_client = boto3_session.client('opensearchserverless', region_name=region_name)

# 调用函数创建用于 Bedrock 的执行角色，并返回角色信息
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)

# 获取执行角色的 ARN（Amazon Resource Name），以便后续使用
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

In [8]:
# # create security, network and data access policies within OSS
# encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
#                        aoss_client=aoss_client,
#                        bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
# collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

# 创建安全、网络和数据访问策略，并将其应用于 OpenSearch Serverless
encryption_policy, network_policy, access_policy = create_policies_in_oss(
    vector_store_name=vector_store_name,
    aoss_client=aoss_client,
    bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn
)

# 在 OpenSearch Serverless 中创建一个名为 `vector_store_name` 的集合，用于向量搜索
collection = aoss_client.create_collection(name=vector_store_name, type='VECTORSEARCH')


In [9]:
pp.pprint(collection)

{ 'ResponseMetadata': { 'HTTPHeaders': { 'connection': 'keep-alive',
                                         'content-length': '314',
                                         'content-type': 'application/x-amz-json-1.0',
                                         'date': 'Thu, 03 Oct 2024 12:37:39 '
                                                 'GMT',
                                         'x-amzn-requestid': '99a3360a-e86d-4f8c-a221-d9ddf7d1c8ad'},
                        'HTTPStatusCode': 200,
                        'RequestId': '99a3360a-e86d-4f8c-a221-d9ddf7d1c8ad',
                        'RetryAttempts': 0},
  'createCollectionDetail': { 'arn': 'arn:aws:aoss:us-west-2:590183949634:collection/51eh6pduvy17hrmiwb2g',
                              'createdDate': 1727959059116,
                              'id': '51eh6pduvy17hrmiwb2g',
                              'kmsKeyArn': 'auto',
                              'lastModifiedDate': 1727959059116,
                             

In [10]:
# 将加密、网络、数据访问策略和集合信息存储为变量，便于后续操作
%store encryption_policy network_policy access_policy collection

Stored 'encryption_policy' (dict)
Stored 'network_policy' (dict)
Stored 'access_policy' (dict)
Stored 'collection' (dict)


In [11]:
# 获取 OpenSearch Serverless 集合的 URL
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

51eh6pduvy17hrmiwb2g.us-west-2.aoss.amazonaws.com


In [12]:
# 等待集合的创建完成
# 集合的创建过程可能需要几分钟时间
response = aoss_client.batch_get_collection(names=[vector_store_name])

# 定期检查集合的状态，直到状态变为“已创建”
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    interactive_sleep(30)  # 每隔 30 秒检查一次集合状态
    response = aoss_client.batch_get_collection(names=[vector_store_name])

# 集合创建成功，打印集合详情
print('\nCollection successfully created:')
pp.pprint(response["collectionDetails"])

Creating collection...
Creating collection...........
Creating collection...........
Creating collection...........
Creating collection...........
Creating collection...........
..............................
Collection successfully created:
[ { 'arn': 'arn:aws:aoss:us-west-2:590183949634:collection/51eh6pduvy17hrmiwb2g',
    'collectionEndpoint': 'https://51eh6pduvy17hrmiwb2g.us-west-2.aoss.amazonaws.com',
    'createdDate': 1727959059116,
    'dashboardEndpoint': 'https://51eh6pduvy17hrmiwb2g.us-west-2.aoss.amazonaws.com/_dashboards',
    'id': '51eh6pduvy17hrmiwb2g',
    'kmsKeyArn': 'auto',
    'lastModifiedDate': 1727959295572,
    'name': 'bedrock-sample-rag-568',
    'standbyReplicas': 'ENABLED',
    'status': 'ACTIVE',
    'type': 'VECTORSEARCH'}]


In [13]:
# 创建 OpenSearch Serverless 访问策略并将其附加到 Bedrock 执行角色
try:
    create_oss_policy_attach_bedrock_execution_role(
        collection_id=collection_id,
        bedrock_kb_execution_role=bedrock_kb_execution_role
    )
    # 数据访问规则的应用可能需要一分钟时间，因此等待 60 秒
    interactive_sleep(60)
except Exception as e:
    # 如果策略已经存在，输出相关提示信息
    print("Policy already exists")
    pp.pprint(e)

Opensearch serverless arn:  arn:aws:iam::590183949634:policy/AmazonBedrockOSSPolicyForKnowledgeBase_511
............................................................

## Step 2 - Create vector index

In [14]:
# 在 OpenSearch Serverless 中创建向量索引，并设置 knn_vector 字段索引映射，指定维度大小、名称和引擎。
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError
credentials = boto3.Session().get_credentials()  # 获取 AWS 凭证
awsauth = AWSV4SignerAuth(credentials, region_name, service)  # 使用 AWS 凭证和区域进行身份验证

# 定义索引名称，确保名称唯一
index_name = f"bedrock-sample-index-{suffix}"

# 定义索引配置，设置 KNN（近邻搜索）参数和字段映射
body_json = {
   "settings": {
      "index.knn": "true",  # 启用 KNN 功能
      "number_of_shards": 1,  # 设置分片数为 1
      "knn.algo_param.ef_search": 512,  # 设置近邻搜索的参数，影响搜索速度和准确性
      "number_of_replicas": 0,  # 设置副本数为 0，节省资源
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",  # 向量字段类型，使用 KNN 索引
            "dimension": 1536,  # 向量维度为 1536
             "method": {
                 "name": "hnsw",  # 使用 HNSW 算法
                 "engine": "faiss",  # 使用 FAISS 引擎
                 "space_type": "l2"  # 使用 L2 距离度量
             },
         },
         "text": {
            "type": "text"  # 文本字段
         },
         "text-metadata": {
            "type": "text"  # 文本元数据字段
         }
      }
   }
}

# 构建 OpenSearch 客户端
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],  # 指定主机和端口
    http_auth=awsauth,  # 使用 AWS V4 签名认证
    use_ssl=True,  # 使用 SSL 进行安全连接
    verify_certs=True,  # 验证 SSL 证书
    connection_class=RequestsHttpConnection,
    timeout=300  # 设置超时时间为 300 秒
)


In [15]:
# 创建索引
try:
    response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))  # 创建索引
    print('\nCreating index:')
    pp.pprint(response)  # 输出创建索引的响应

    # 索引创建可能需要一分钟
    interactive_sleep(60)
except RequestError as e:
    # 如果索引已经存在，可以选择删除现有索引
    # oss_client.indices.delete(index=index_name)
    print(f'Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to delete, and recreate the index')



Creating index:
{ 'acknowledged': True,
  'index': 'bedrock-sample-index-568',
  'shards_acknowledged': True}
............................................................

## Download data to ingest into our knowledge base

In [19]:
import os

data_root = "./data/"
if not os.path.exists(data_root):
    os.makedirs(data_root)

In [20]:
# # 下载并准备数据集
# !mkdir -p ./data  # 创建用于存储数据的文件夹

from urllib.request import urlretrieve
urls = [
    'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf'
]


filenames = [
    'AMZN-2022-Shareholder-Letter.pdf',
    'AMZN-2021-Shareholder-Letter.pdf',
    'AMZN-2020-Shareholder-Letter.pdf',
    'AMZN-2019-Shareholder-Letter.pdf'
]

data_root = "./data/"

# # 下载上述文件并保存到 data_root 目录下
# for idx, url in enumerate(urls):
#     file_path = data_root + filenames[idx]
#     urlretrieve(url, file_path)

for idx, url in enumerate(urls):
    file_path = os.path.join(data_root, filenames[idx])
    print(f"Attempting to download to: {file_path}")
    urlretrieve(url, file_path)


Attempting to download to: ./data/AMZN-2022-Shareholder-Letter.pdf
Attempting to download to: ./data/AMZN-2021-Shareholder-Letter.pdf
Attempting to download to: ./data/AMZN-2020-Shareholder-Letter.pdf
Attempting to download to: ./data/AMZN-2019-Shareholder-Letter.pdf


#### Upload data to S3 Bucket data source

In [21]:
# 将数据上传到 S3 存储桶，作为知识库的数据源
s3_client = boto3.client("s3")
def uploadDirectory(path, bucket_name):
    for root, dirs, files in os.walk(path):
        for file in files:
            s3_client.upload_file(os.path.join(root, file), bucket_name, file)

uploadDirectory(data_root, bucket_name)


## Create Knowledge Base
Steps:
- initialize Open search serverless configuration which will include collection ARN, index name, vector field, text field and metadata field.
- initialize chunking strategy, based on which KB will split the documents into pieces of size equal to the chunk size mentioned in the `chunkingStrategyConfiguration`.
- initialize the s3 configuration, which will be used to create the data source object later.
- initialize the Titan embeddings model ARN, as this will be used to create the embeddings for each of the text chunks.

In [22]:
# 配置 OpenSearch Serverless
opensearchServerlessConfiguration = {
    "collectionArn": collection["createCollectionDetail"]['arn'],
    "vectorIndexName": index_name,
    "fieldMapping": {
        "vectorField": "vector",  # 向量字段名称
        "textField": "text",  # 文本字段名称
        "metadataField": "text-metadata"  # 元数据字段名称
    }
}

# 配置数据分块策略 - 如何从数据源中分块导入数据
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,  # 每个块的最大 token 数量
        "overlapPercentage": 20  # 分块时的重叠百分比，确保上下文连续性
    }
}

# 配置数据源（S3）以将文档导入到 OpenSearch Serverless 知识库索引中
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",  # S3 存储桶的 ARN
    # "inclusionPrefixes":["*.*"] # 可选择使用此配置来限定从 S3 前缀中创建知识库
}

# 配置用于嵌入文档和实时提示的模型
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

# 知识库的名称和描述
name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Amazon shareholder letter knowledge base."
roleArn = bedrock_kb_execution_role_arn


Provide the above configurations as input to the `create_knowledge_base` method, which will create the Knowledge base.

In [23]:
# 创建知识库
from retrying import retry

# 使用 retry 装饰器，设置重试次数和等待时间
@retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=7)
def create_knowledge_base_func():
    # 调用 bedrock_agent_client 创建知识库
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name=name,
        description=description,
        roleArn=roleArn,
        knowledgeBaseConfiguration={
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn  # 嵌入模型的 ARN
            }
        },
        storageConfiguration={
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration": opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [24]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [25]:
pp.pprint(kb)

{ 'createdAt': datetime.datetime(2024, 10, 3, 13, 26, 28, 134836, tzinfo=tzutc()),
  'description': 'Amazon shareholder letter knowledge base.',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-west-2:590183949634:knowledge-base/UPMFSI0RZI',
  'knowledgeBaseConfiguration': { 'type': 'VECTOR',
                                  'vectorKnowledgeBaseConfiguration': { 'embeddingModelArn': 'arn:aws:bedrock:us-west-2::foundation-model/amazon.titan-embed-text-v1'}},
  'knowledgeBaseId': 'UPMFSI0RZI',
  'name': 'bedrock-sample-knowledge-base-568',
  'roleArn': 'arn:aws:iam::590183949634:role/AmazonBedrockExecutionRoleForKnowledgeBase_511',
  'status': 'CREATING',
  'storageConfiguration': { 'opensearchServerlessConfiguration': { 'collectionArn': 'arn:aws:aoss:us-west-2:590183949634:collection/51eh6pduvy17hrmiwb2g',
                                                                   'fieldMapping': { 'metadataField': 'text-metadata',
                                                                      

In [26]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

Next we need to create a data source, which will be associated with the knowledge base created above. Once the data source is ready, we can then start to ingest the documents.

In [27]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)

{ 'createdAt': datetime.datetime(2024, 10, 3, 13, 30, 22, 957705, tzinfo=tzutc()),
  'dataDeletionPolicy': 'DELETE',
  'dataSourceConfiguration': { 's3Configuration': { 'bucketArn': 'arn:aws:s3:::bedrock-kb-us-west-2-590183949634'},
                               'type': 'S3'},
  'dataSourceId': '4DDMCZ5L6V',
  'description': 'Amazon shareholder letter knowledge base.',
  'knowledgeBaseId': 'UPMFSI0RZI',
  'name': 'bedrock-sample-knowledge-base-568',
  'status': 'AVAILABLE',
  'updatedAt': datetime.datetime(2024, 10, 3, 13, 30, 22, 957705, tzinfo=tzutc()),
  'vectorIngestionConfiguration': { 'chunkingConfiguration': { 'chunkingStrategy': 'FIXED_SIZE',
                                                               'fixedSizeChunkingConfiguration': { 'maxTokens': 512,
                                                                                                   'overlapPercentage': 20}}}}


In [28]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

{'ResponseMetadata': {'RequestId': '8733fce9-462a-4276-a6ae-0861f5b44383',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 03 Oct 2024 13:30:28 GMT',
   'content-type': 'application/json',
   'content-length': '603',
   'connection': 'keep-alive',
   'x-amzn-requestid': '8733fce9-462a-4276-a6ae-0861f5b44383',
   'x-amz-apigw-id': 'fE1iPFwVvHcEWkA=',
   'x-amzn-trace-id': 'Root=1-66fe9c74-7752c0085ce66ec544f69365'},
  'RetryAttempts': 0},
 'dataSource': {'createdAt': datetime.datetime(2024, 10, 3, 13, 30, 22, 957705, tzinfo=tzutc()),
  'dataDeletionPolicy': 'DELETE',
  'dataSourceConfiguration': {'s3Configuration': {'bucketArn': 'arn:aws:s3:::bedrock-kb-us-west-2-590183949634'},
   'type': 'S3'},
  'dataSourceId': '4DDMCZ5L6V',
  'description': 'Amazon shareholder letter knowledge base.',
  'knowledgeBaseId': 'UPMFSI0RZI',
  'name': 'bedrock-sample-knowledge-base-568',
  'status': 'AVAILABLE',
  'updatedAt': datetime.datetime(2024, 10, 3, 13, 30, 22, 957705, tzinfo=tzutc()),
  

### Start ingestion job
Once the KB and data source is created, we can start the ingestion job.
During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS.

In [29]:
# Start an ingestion job
interactive_sleep(30)
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

..............................

In [30]:
job = start_job_response["ingestionJob"]
pp.pprint(job)

{ 'dataSourceId': '4DDMCZ5L6V',
  'ingestionJobId': 'DMLWQ5IDUV',
  'knowledgeBaseId': 'UPMFSI0RZI',
  'startedAt': datetime.datetime(2024, 10, 3, 13, 31, 55, 489639, tzinfo=tzutc()),
  'statistics': { 'numberOfDocumentsDeleted': 0,
                  'numberOfDocumentsFailed': 0,
                  'numberOfDocumentsScanned': 0,
                  'numberOfMetadataDocumentsModified': 0,
                  'numberOfMetadataDocumentsScanned': 0,
                  'numberOfModifiedDocumentsIndexed': 0,
                  'numberOfNewDocumentsIndexed': 0},
  'status': 'STARTING',
  'updatedAt': datetime.datetime(2024, 10, 3, 13, 31, 55, 489639, tzinfo=tzutc())}


In [31]:
# Get job 
while(job['status']!='COMPLETE' ):
    get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
    job = get_job_response["ingestionJob"]
    
    interactive_sleep(30)

pp.pprint(job)

{ 'dataSourceId': '4DDMCZ5L6V',
  'ingestionJobId': 'DMLWQ5IDUV',
  'knowledgeBaseId': 'UPMFSI0RZI',
  'startedAt': datetime.datetime(2024, 10, 3, 13, 31, 55, 489639, tzinfo=tzutc()),
  'statistics': { 'numberOfDocumentsDeleted': 0,
                  'numberOfDocumentsFailed': 0,
                  'numberOfDocumentsScanned': 4,
                  'numberOfMetadataDocumentsModified': 0,
                  'numberOfMetadataDocumentsScanned': 0,
                  'numberOfModifiedDocumentsIndexed': 0,
                  'numberOfNewDocumentsIndexed': 4},
  'status': 'COMPLETE',
  'updatedAt': datetime.datetime(2024, 10, 3, 13, 32, 11, 288177, tzinfo=tzutc())}


In [32]:
# Print the knowledge base Id in bedrock, that corresponds to the Opensearch index in the collection we created before, we will use it for the invocation later
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)

'UPMFSI0RZI'


In [33]:
# keep the kb_id for invocation later in the invoke request
%store kb_id

Stored 'kb_id' (str)


## Test the knowledge base
### Note: If you plan to run any following notebooks, you can skip this section
### Using RetrieveAndGenerate API
Behind the scenes, RetrieveAndGenerate API converts queries into embeddings, searches the knowledge base, and then augments the foundation model prompt with the search results as context information and returns the FM-generated response to the question. For multi-turn conversations, Knowledge Bases manage short-term memory of the conversation to provide more contextual results.

The output of the RetrieveAndGenerate API includes the generated response, source attribution as well as the retrieved text chunks.

In [37]:
# try out KB using RetrieveAndGenerate API
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime", region_name=region_name)
# Lets see how different Anthropic Claude 3 models responds to the input text we provide
claude_model_ids = [ ["llama3-1-8b", "meta.llama3-1-8b-instruct-v1:0"], ["titan-text-lite", "amazon.titan-text-lite-v1"]]

In [35]:
def ask_bedrock_llm_with_knowledge_base(query: str, model_arn: str, kb_id: str) -> str:
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        input={
            'text': query
        },
        retrieveAndGenerateConfiguration={
            'type': 'KNOWLEDGE_BASE',
            'knowledgeBaseConfiguration': {
                'knowledgeBaseId': kb_id,
                'modelArn': model_arn
            }
        },
    )

    return response

In [39]:
query = "What is Amazon's doing in the field of generative AI?"

for model_id in claude_model_ids:
    model_arn = f'arn:aws:bedrock:{region_name}::foundation-model/{model_id[1]}'
    print(f"Trying model ARN: {model_arn}")
    try:
        response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
        print("Success!")
    except Exception as e:
        print(f"Failed with error: {str(e)}")

    generated_text = response['output']['text']
    citations = response["citations"]
    contexts = []
    for citation in citations:
        retrievedReferences = citation["retrievedReferences"]
        for reference in retrievedReferences:
            contexts.append(reference["content"]["text"])
    print(f"---------- Generated using {model_id[0]}:")
    pp.pprint(generated_text )
    print(f'---------- The citations for the response generated by {model_id[0]}:')
    pp.pprint(contexts)
    print()

Trying model ARN: arn:aws:bedrock:us-west-2::foundation-model/meta.llama3-1-8b-instruct-v1:0
Success!
---------- Generated using llama3-1-8b:
('Amazon is working on its own Large Language Models (LLMs) and has been '
 'investing substantially in these models across all of its consumer, seller, '
 'brand, and creator experiences. The company believes that LLMs and '
 'Generative AI will transform and improve virtually every customer '
 'experience. Amazon is also democratizing this technology so companies of all '
 'sizes can leverage Generative AI through its AWS platform, which offers the '
 'most price-performant machine learning chips in Trainium and Inferentia. '
 'This allows companies to choose from various LLMs and build applications '
 'with all of the AWS security, privacy, and other features that customers are '
 'accustomed to using.')
---------- The citations for the response generated by llama3-1-8b:
[ 'Imagine what they’ll be able to do with reliable connectivity, from pe

### Retrieve API
Retrieve API converts user queries into embeddings, searches the knowledge base, and returns the relevant results, giving you more control to build custom workﬂows on top of the semantic search results. The output of the Retrieve API includes the the retrieved text chunks, the location type and URI of the source data, as well as the relevance scores of the retrievals.

In [40]:
# retrieve api for fetching only the relevant context.
relevant_documents = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id,
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)

In [41]:
pp.pprint(relevant_documents["retrievalResults"])

[ { 'content': { 'text': 'Amazon has been using machine learning extensively '
                         'for 25 years, employing it in everything from '
                         'personalized ecommerce recommendations, to '
                         'fulfillment center pick paths, to drones for Prime '
                         'Air, to Alexa, to the many machine learning services '
                         'AWS offers (where AWS has the broadest machine '
                         'learning functionality and customer base of any '
                         'cloud provider). More recently, a newer form of '
                         'machine learning, called Generative AI, has burst '
                         'onto the scene and promises to significantly '
                         'accelerate machine learning adoption. Generative AI '
                         'is based on very Large Language Models (trained on '
                         'up to hundreds of billions of parameters, and '
     

<div class="alert alert-block alert-warning">
<b>Next steps:</b> Proceed to the next labs to learn how to use Bedrock Knowledge bases. Remember to CLEAN_UP at the end of your session.
</div>