## 문서 레벨 API(DLA)를 활용한 준실시간 Ingestion - 엔드 투 엔드 예제

Document Level API(DLA)를 사용하면 고객은 데이터 소스와 주기적으로 전체 동기화를 수행하거나 변경 사항마다 전체 동기화를 하지 않고도, 단일 API 호출로 Amazon Bedrock Knowledge Bases에 데이터를 효율적이고 비용 효율적으로 적재·업데이트·삭제할 수 있습니다.

DLA에 대한 자세한 내용은 [문서](https://docs.aws.amazon.com/bedrock/latest/userguide/kb-direct-ingestion-add.html)를 참고하세요.

#### 사전 준비 사항: 

- [01_create_ingest_documents_test_kb_multi_ds.ipynb](/knowledge-bases/01-rag-concepts/01_create_ingest_documents_test_kb_multi_ds.ipynb)를 실행해 Amazon Bedrock Knowledge Base를 생성한 상태여야 합니다.
- KB ID를 기록해 둡니다.

#### Knowledge Base 테스트: 
- DLA를 사용해 문서를 Knowledge Base에 적재합니다.
- Knowledge Base를 질의해 정보를 확인합니다.

<div class="alert alert-block alert-info">
<b>참고:</b> Amazon Bedrock 콘솔에서 `amazon.nova-micro-v1:0`, `Anthropic Claude 3 Sonnet`, `amazon.titan-text-express-v1`, `anthropic.claude-3-haiku-20240307-v1:0`, `Titan Text Embeddings V2` 모델 접근 권한을 활성화하세요.
<br> -------------------------------------------------------------------------------------------------------------------------------------------------------   </br>
노트북 셀을 하나씩 실행하고 "Run All Cells" 옵션은 사용하지 마세요.
</div>

### 0 - 설정
나머지 셀을 실행하기 전에 아래 셀을 먼저 실행해 필요한 라이브러리를 설치하고 Bedrock에 연결하세요.

라이브러리를 설치하는 동안 pip 의존성 오류가 표시되더라도 무시해도 됩니다.

In [None]:
%pip install --upgrade pip --quiet
%pip install -r ../requirements.txt --no-deps --quiet
%pip install -r ../requirements.txt --upgrade --quiet

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
import boto3
print(boto3.__version__)

In [4]:
%load_ext autoreload
%autoreload 2

In [5]:
import warnings
warnings.filterwarnings('ignore')

In [6]:
import os
import sys
import time
import boto3
import logging
import pprint
import json
import uuid

# Set the path to import module
from pathlib import Path
current_path = Path().resolve()
current_path = current_path.parent
if str(current_path) not in sys.path:
    sys.path.append(str(current_path))
# Print sys.path to verify
# print(sys.path)

from utils.knowledge_base_operators import create_document_config, ingest_documents_dla

In [None]:
#Clients
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session()
region =  session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime') 
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id

### Document Level API(INLINE)를 사용해 Knowledge Base에 문서 직접 적재

문서를 Knowledge Base에 직접 적재하려면 [IngestKnowledgeBaseDocuments](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_agent_IngestKnowledgeBaseDocuments.html) 요청에서 Knowledge Base ID와 데이터 소스를 지정합니다. 

In [None]:
%store -r kb_id
# kb_id = "<<knowledge_base_id>>" # Replace with your knowledge base id here.

In [None]:
ds_id_list = bedrock_agent_client.list_data_sources( knowledgeBaseId=kb_id, maxResults=100)['dataSourceSummaries']
ds_id = ds_id_list[0]['dataSourceId']

kb_id, ds_id

현재 DLA는 Knowledge Base가 다음 데이터 소스 유형 중 하나에 연결된 경우에만 사용할 수 있습니다:

     - Amazon S3
     - Custom 

구성에 따라 다양한 적재 패턴을 적용할 수 있습니다. 자세한 내용은 API 문서 [링크](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent/client/ingest_knowledge_base_documents.html)를 참고하세요.

In [None]:
# You can choose between different Ingest patterns (based on Data source type i.e. s3 or Custom) using DLA

print("Different DLA ingest pattern:")
# For S3 Data source type
print("1. Data Source Type: S3 - Metadata: INLINE")
print("2. Data Source Type: S3 - Metadata: S3_Location")

# For CUSTOM Data source type
print("3. Data Source Type: CUSTOM -  Document source type: INLINE -  conetent type: TEXT - Metadata: INLINE")
print("4. Data Source Type: CUSTOM -  Document source type: INLINE -  conetent type: TEXT - Metadata: S3_Location")
print("5. Data Source Type: CUSTOM -  Document source type: INLINE -  conetent type: BYTE - Metadata: INLINE")
print("6. Data Source Type: CUSTOM -  Document source type: INLINE -  conetent type: BYTE - Metadata: S3_Location")
print("7. Data Source Type: CUSTOM -  Document source type: S3_LOCATION - Metadata: INLINE")
print("8. Data Source Type: CUSTOM -  Document source type: S3_LOCATION - Metadata: S3_Location")


DLA를 사용하려면 knowledgeBaseId, dataSourceId, documents(추가할 문서 정보를 포함하는 객체 목록)를 정의해야 합니다.

- 이 노트북에서는 `create_document_config`라는 커스텀 함수를 만들어 선택한 적재 패턴에 따라 문서 목록을 정의합니다. 이 함수는 다음 인수를 받습니다:

    - data_source_type: 'CUSTOM' 또는 'S3'.
    - document_id: 사용자 지정 문서 ID.
    - s3_uri: S3 데이터 소스의 S3 URI.
    - inline_content: 사용자 지정 데이터 소스용 인라인 콘텐츠 구성.
    - metadata: 인라인 속성 목록 또는 S3 위치 형태의 메타데이터 정보.

이 노트북에서는 패턴 1, 2, 3, 4만 구현했지만, 필요에 따라 패턴 5, 6, 7, 8까지 확장할 수 있습니다.

<div class="alert alert-block alert-info">
<b>참고:</b> DLA를 사용할 때 각 문서에 지정하는 dataSourceType은 지정한 데이터 소스 유형과 일치해야 합니다. 일치하지 않으면 ingestion에서 오류가 발생합니다. 
<ul> - KB 데이터 소스가 S3라면 DLA API를 사용할 때 데이터 소스 유형도 S3로 지정하세요.</ul>
<ul> - KB 데이터 소스가 CUSTOM이라면 데이터 소스 유형을 CUSTOM으로 지정하세요.</ul>

In [10]:
# Provide below information based on your ingest pattern
#---------------------------------------------------------------------------------------
# FOR INGEST PATTERN CHOICE = 1, i.e. Data Source Type: S3 - Metadata: INLINE
# **************************************************************************************
# S3 uri of the data to be ingetsed
document_s3_uri = 's3://semantic-kb-9194117/octank_financial_10K.pdf'

# INLINE Metadata details
metdata_1 = {'key': 'company', 'value': { 'stringValue': 'octank', 'type': 'STRING'}}
metdata_2 = {'key': 'document', 'value': { 'stringValue': '10k', 'type': 'STRING'}}
metadata_list =[metdata_1, metdata_2]

inline_metadata ={'inlineAttributes':metadata_list}

# Create document configuration for this ingest pattern
s3_doc_inline_metadata = create_document_config(
    data_source_type='S3',
    s3_uri=document_s3_uri,
    metadata= inline_metadata
)

# #---------------------------------------------------------------------------------------
# # FOR INGEST PATTERN CHOICE = 2, i.e. Data Source Type: S3 - Metadata: S3_Location
# # **************************************************************************************
# document_s3_uri = '<Insert S3 URI here>' 
# metadata_s3_uri = '<Insert S3 URI here>' 
# metadata_s3_accountid = '<Insert S3 URI accountid here>' 

# # if your metada is stored at S3_location
# metadata_s3_uri = '<Insert S3 URI here>' 
# metadata_s3_accountid = '<Insert S3 URI accountid here>' 
# s3_metadata = {'uri': metadata_s3_uri, 'bucketOwnerAccountId': metadata_s3_accountid }

# s3_doc_s3_metadata = create_document_config(
#             data_source_type='S3',
#             s3_uri='s3://standard-kb-7104855/octank_financial_10K (1).pdf',
#             metadata= s3_metadata
#         )


## ---------------------------------------------------------------------------------------
## FOR INGEST PATTERN CHOICE = 3, i.e. Data Source Type: CUSTOM -  Document source type: INLINE -  conetent type: TEXT - Metadata: INLINE
## **************************************************************************************

## Example :  USE DLA to ingest a custom document with TEXT inline content and inline metadata

# document_content = '''This is sample document content'''
# document_id = '<insert document id here>'

# # if your Metadata is INLINE
# metdata_1 = {'key': 'company', 'value': { 'stringValue': 'octank', 'type': 'STRING'}}
# metdata_2 = {'key': 'document', 'value': { 'stringValue': '10k', 'type': 'STRING'}}
# metadata_list =[metdata_1, metdata_2]

# inline_metadata ={'inlineAttributes': metadata_list}

# custom_inline_text_inline_metadata = create_document_config(
#     data_source_type='CUSTOM',
#     document_id=document_id,
#     inline_content={
#         'type': 'TEXT',
#         'data': document_content
#     },
#     metadata= inline_metadata
# )

##---------------------------------------------------------------------------------------
## FOR INGEST PATTERN CHOICE = 4, i.e. Data Source Type: CUSTOM -  Document source type: INLINE -  conetent type: TEXT - Metadata: S3_Location
## **************************************************************************************

## Example : USE DLA to ingest a custom document with TEXT inline content and S3 metadata

# document_content = '''This is sample document content'''
# document_id = '<insert document id here>'

# # if your metada is stored at S3_location
# metadata_s3_uri = '<Insert S3 URI here>' 
# metadata_s3_accountid = '<Insert S3 URI accountid here>' 
# s3_metadata = {'uri': metadata_s3_uri, 'bucketOwnerAccountId': metadata_s3_accountid }

# custom_inline_text_s3_metadata = create_document_config(
#     data_source_type='CUSTOM',
#     document_id=document_id,
#     inline_content={
#         'type': 'TEXT',
#         'data': document_content
#     },
#     metadata=s3_metadata
#)


문서 목록 구성이 완료되면 `ingest_documents_dla`(또 다른 커스텀 함수)를 호출해 문서를 Knowledge Base에 적재할 수 있습니다. 이 함수는 [ingest_knowledge_base_documents](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent/client/ingest_knowledge_base_documents.html) API를 호출합니다.

- 함수는 다음 인수를 받습니다:

    - knowledge_base_id: Knowledge Base ID.
    - data_source_id: 데이터 소스 ID.
    - documents: 적재할 문서 구성 목록.

In [None]:
# Ingest the documents using DLA
response = ingest_documents_dla(
    knowledge_base_id=kb_id,
    data_source_id=ds_id,
    documents=[ s3_doc_inline_metadata] # Based on the ingest pattern, this can be changed to [s3_doc_s3_metadata], [custom_inline_text_inline_metadata] or [custom_inline_text_s3_metadata]
)

print(response)


DLA로 적재한 문서의 상태를 확인합니다.

In [12]:
## To fetch the status of documents
# response = bedrock_agent_client.list_knowledge_base_documents(
#     dataSourceId=ds_id,
#     knowledgeBaseId=kb_id,
# )
# print(response)

### 2.2 Knowledge Base 테스트
Knowledge Base가 준비되었으므로 [**retrieve**](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/retrieve.html)와 [**retrieve_and_generate**](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/retrieve_and_generate.html) 함수를 사용해 확인할 수 있습니다. 

#### Retrieve and Generate API로 Knowledge Base 테스트

먼저 retrieve and generate API로 Knowledge Base를 시험해 보겠습니다. 이 API는 Bedrock이 Knowledge Base에서 필요한 참조를 검색하고 Bedrock의 Foundation Model로 최종 답변을 생성합니다.

query = `Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019.`

해당 질의의 정답(ground truth QA 쌍 기준)은 다음과 같습니다:
```
The cash flow statement for Octank Financial in the year ended December 31, 2019 reveals the following:
- Cash generated from operating activities amounted to $710 million, which can be attributed to a $700 million profit and non-cash charges such as depreciation and amortization.
- Cash outflow from investing activities totaled $240 million, with major expenditures being the acquisition of property, plant, and equipment ($200 million) and marketable securities ($60 million), partially offset by the sale of property, plant, and equipment ($40 million) and maturing marketable securities ($20 million).
- Financing activities resulted in a cash inflow of $350 million, stemming from the issuance of common stock ($200 million) and long-term debt ($300 million), while common stock repurchases ($50 million) and long-term debt payments ($100 million) reduced the cash inflow.
Overall, Octank Financial experienced a net cash enhancement of $120 million in 2019, bringing their total cash and cash equivalents to $210 million.
```

In [13]:
query = "Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019?"

In [None]:
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

In [15]:
import boto3
import time

def delete_all_knowledge_bases():
    try:
        # Create Bedrock client
        bedrock = boto3.client('bedrock-agent-runtime')
        bedrock_agent = boto3.client('bedrock-agent')

        # List all knowledge bases
        kb_response = bedrock_agent.list_knowledge_bases()
        knowledge_bases = kb_response.get('knowledgeBaseSummaries', [])

        if not knowledge_bases:
            print("No knowledge bases found.")
            return

        # Iterate through each knowledge base
        for kb in knowledge_bases:
            kb_id = kb['knowledgeBaseId']
            print(f"Processing Knowledge Base: {kb_id}")

            # List data sources for the knowledge base
            data_sources = bedrock_agent.list_data_sources(
                knowledgeBaseId=kb_id
            )

            # Delete each data source
            for ds in data_sources.get('dataSourceSummaries', []):
                ds_id = ds['dataSourceId']
                print(f"Deleting Data Source: {ds_id}")
                
                try:
                    bedrock_agent.delete_data_source(
                        knowledgeBaseId=kb_id,
                        dataSourceId=ds_id
                    )
                    print(f"Data Source {ds_id} deleted successfully")
                except Exception as e:
                    print(f"Error deleting Data Source {ds_id}: {str(e)}")

            # Delete the knowledge base
            try:
                print(f"Deleting Knowledge Base: {kb_id}")
                bedrock_agent.delete_knowledge_base(
                    knowledgeBaseId=kb_id
                )
                print(f"Knowledge Base {kb_id} deleted successfully")
            except Exception as e:
                print(f"Error deleting Knowledge Base {kb_id}: {str(e)}")

            # Wait for a few seconds between deletions
            time.sleep(2)

        print("Deletion process completed")

    except Exception as e:
        print(f"An error occurred: {str(e)}")

In [16]:
import boto3
import time
from botocore.exceptions import ClientError

def delete_all_opensearch_serverless():
    # Create OpenSearch Serverless client
    client = boto3.client('opensearchserverless')
    
    def delete_security_policies():
        print("\nDeleting security policies...")
        for policy_type in ['encryption', 'network']:
            try:
                # Get policies with correct response key
                response = client.list_security_policies(type=policy_type)
                policies = response.get('securityPolicies', [])  # Changed from 'securityPolicyDetails'
                
                if not policies:
                    print(f"No {policy_type} policies found.")
                    continue

                for policy in policies:
                    try:
                        print(f"Deleting {policy_type} policy: {policy['name']}")
                        client.delete_security_policy(
                            name=policy['name'],
                            type=policy_type
                        )
                        print(f"Successfully deleted {policy_type} policy: {policy['name']}")
                        # Add small delay to avoid throttling
                        time.sleep(1)
                    except ClientError as e:
                        print(f"Error deleting {policy_type} policy {policy['name']}: {str(e)}")
            except ClientError as e:
                print(f"Error listing {policy_type} policies: {str(e)}")

    def delete_access_policies():
        print("\nDeleting access policies...")
        for policy_type in ['data', 'collection']:
            try:
                response = client.list_access_policies(type=policy_type)
                policies = response.get('accessPolicies', [])  # Changed from 'accessPolicySummaries'
                
                if not policies:
                    print(f"No {policy_type} access policies found.")
                    continue

                for policy in policies:
                    try:
                        print(f"Deleting {policy_type} access policy: {policy['name']}")
                        client.delete_access_policy(
                            name=policy['name'],
                            type=policy_type
                        )
                        print(f"Successfully deleted {policy_type} access policy: {policy['name']}")
                        # Add small delay to avoid throttling
                        time.sleep(1)
                    except ClientError as e:
                        print(f"Error deleting {policy_type} access policy {policy['name']}: {str(e)}")
            except ClientError as e:
                print(f"Error listing {policy_type} access policies: {str(e)}")

    def delete_collections():
        print("\nDeleting collections...")
        try:
            response = client.list_collections()
            collections = response.get('collectionSummaries', [])

            if not collections:
                print("No collections found.")
                return

            for collection in collections:
                collection_name = collection['name']
                collection_id = collection['id']  # Use the actual ID provided by AWS
                try:
                    print(f"Deleting collection: {collection_name} (ID: {collection_id})")
                    if collection['status'] == 'ACTIVE':
                        client.delete_collection(id=collection_id)
                        print(f"Deletion initiated for collection: {collection_name}")
                        
                        # Wait for collection deletion
                        while True:
                            try:
                                status_response = client.batch_get_collection(ids=[collection_id])
                                status_collections = status_response.get('collectionDetails', [])
                                
                                if not status_collections:
                                    print(f"Collection {collection_name} deleted successfully")
                                    break
                                    
                                status = status_collections[0]['status']
                                print(f"Waiting for collection {collection_name} to be deleted... Current status: {status}")
                                
                                if status == 'DELETED':
                                    print(f"Collection {collection_name} deleted successfully")
                                    break
                                
                                time.sleep(30)
                                
                            except ClientError as e:
                                if 'ResourceNotFoundException' in str(e):
                                    print(f"Collection {collection_name} deleted successfully")
                                    break
                                raise e
                except ClientError as e:
                    print(f"Error deleting collection {collection_name}: {str(e)}")
                    
        except ClientError as e:
            print(f"Error listing collections: {str(e)}")

    try:
        print("Starting OpenSearch Serverless cleanup...")
        
        # Delete in order: collections first, then policies
        delete_collections()
        delete_access_policies()
        delete_security_policies()
        
        print("\nCleanup completed successfully!")
        
    except Exception as e:
        print(f"An error occurred during cleanup: {str(e)}")




Retrieve and Generate API를 사용하면 최종 응답을 바로 받게 되므로, 응답에 사용된 다양한 소스를 확인할 수 없습니다. 이제 Retrieve API로 Knowledge Base의 소스 정보를 확인해 보겠습니다.

#### Retrieve API로 Knowledge Base 테스트
추가적인 제어가 필요하다면 Retrieve API를 사용해 질의와 가장 잘 일치하는 청크를 직접 가져올 수 있습니다. 이 설정에서는 원하는 결과 수를 구성하고 자체 애플리케이션 로직으로 최종 답변을 제어할 수 있습니다. API는 일치하는 콘텐츠, 해당 S3 위치, 유사도 점수, 청크 메타데이터를 제공합니다.

In [None]:
response_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        "text": "How many new positions were opened across Amazon's fulfillment and delivery network?"
    }
)

def response_print(retrieve_resp):
#structure 'retrievalResults': list of contents. Each list has content, location, score, metadata
    for num,chunk in enumerate(response_ret['retrievalResults'],1):
        print(f'Chunk {num}: ',chunk['content']['text'],end='\n'*2)
        print(f'Chunk {num} Location: ',chunk['location'],end='\n'*2)
        print(f'Chunk {num} Score: ',chunk['score'],end='\n'*2)
        print(f'Chunk {num} Metadata: ',chunk['metadata'],end='\n'*2)

response_print(response_ret)

<div class="alert alert-block alert-warning">
<b>참고:</b> 비용이 발생하지 않도록 KB, OSS 인덱스 및 관련 IAM 역할과 정책을 반드시 삭제하세요.
</div>