## boto3로 기존 OpenSearch Serverless Collection을 사용하여 Knowledge Base 생성하기

이 노트북에서는 boto3 클라이언트를 직접 사용하여 Knowledge Base를 생성합니다.

#### 특징:
- BedrockKnowledgeBase 클래스를 사용하지 않음
- 기존에 생성된 OpenSearch Serverless Collection 재사용
- 원하는 인덱스 이름 직접 지정
- 기존 S3 버킷을 Data Source로 사용

#### 사전 준비 사항:
- 기존 OpenSearch Serverless Collection ID
- Collection의 엔드포인트 URL
- 데이터가 업로드된 S3 버킷
- KB용 IAM 역할 (S3 및 OSS 접근 권한 필요)

## 1. 필요한 라이브러리 불러오기

In [None]:
%pip install --upgrade pip --quiet
%pip install -r ../requirements.txt --no-deps --quiet
%pip install -r ../requirements.txt --upgrade --quiet

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
import time
import boto3
import pprint
import json

pp = pprint.PrettyPrinter(indent=2)

In [None]:
# Clients
sts_client = boto3.client('sts')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')
session = boto3.session.Session()
region = session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')

print(f"Region: {region}")
print(f"Account ID: {account_id}")

## 2. 설정값 입력

기존 OpenSearch Serverless Collection 정보와 S3 버킷 정보를 입력합니다.

In [None]:
# 타임스탬프 생성
timestamp = int(time.time())
suffix = str(timestamp)[-7:]

# Knowledge Base 설정
kb_name = f"custom-kb-{suffix}"
kb_description = "Knowledge Base created with boto3 using existing OSS Collection"

# 기존 OpenSearch Serverless Collection 정보
existing_collection_id = "YOUR_COLLECTION_ID"  # 예: "abcdefghijklmnop"
existing_collection_arn = f"arn:aws:aoss:{region}:{account_id}:collection/{existing_collection_id}"

# 원하는 인덱스 이름 (새로 생성될 인덱스)
index_name = f"bedrock-kb-index-{suffix}"

# S3 버킷 (데이터가 이미 업로드되어 있어야 함)
s3_bucket_name = "YOUR_BUCKET_NAME"  # 예: "my-data-bucket"

# 임베딩 모델
embedding_model_arn = f"arn:aws:bedrock:{region}::foundation-model/amazon.titan-embed-text-v2:0"

print(f"KB Name: {kb_name}")
print(f"Collection ARN: {existing_collection_arn}")
print(f"Index Name: {index_name}")
print(f"S3 Bucket: {s3_bucket_name}")

## 3. IAM 역할 생성

Knowledge Base가 S3와 OpenSearch Serverless에 접근할 수 있도록 IAM 역할을 생성합니다.

In [None]:
# IAM 역할 이름
kb_role_name = f"AmazonBedrockExecutionRoleForKB_{suffix}"

# Trust Policy (Bedrock이 이 역할을 assume할 수 있도록)
assume_role_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": account_id
                },
                "ArnLike": {
                    "aws:SourceArn": f"arn:aws:bedrock:{region}:{account_id}:knowledge-base/*"
                }
            }
        }
    ]
}

# IAM 역할 생성
try:
    create_role_response = iam_client.create_role(
        RoleName=kb_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy),
        Description="Role for Bedrock Knowledge Base to access S3 and OpenSearch Serverless"
    )
    kb_role_arn = create_role_response['Role']['Arn']
    print(f"✓ IAM 역할 생성 완료: {kb_role_arn}")
except iam_client.exceptions.EntityAlreadyExistsException:
    kb_role_arn = f"arn:aws:iam::{account_id}:role/{kb_role_name}"
    print(f"IAM 역할이 이미 존재합니다: {kb_role_arn}")

In [None]:
# S3 접근 정책
s3_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                f"arn:aws:s3:::{s3_bucket_name}",
                f"arn:aws:s3:::{s3_bucket_name}/*"
            ]
        }
    ]
}

# OpenSearch Serverless 접근 정책
oss_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "aoss:APIAccessAll"
            ],
            "Resource": existing_collection_arn
        }
    ]
}

# Bedrock 모델 접근 정책
bedrock_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel"
            ],
            "Resource": embedding_model_arn
        }
    ]
}

# 정책 연결
policy_name_s3 = f"KB_S3_Policy_{suffix}"
policy_name_oss = f"KB_OSS_Policy_{suffix}"
policy_name_bedrock = f"KB_Bedrock_Policy_{suffix}"

try:
    iam_client.put_role_policy(
        RoleName=kb_role_name,
        PolicyName=policy_name_s3,
        PolicyDocument=json.dumps(s3_policy)
    )
    print(f"✓ S3 정책 연결 완료")
    
    iam_client.put_role_policy(
        RoleName=kb_role_name,
        PolicyName=policy_name_oss,
        PolicyDocument=json.dumps(oss_policy)
    )
    print(f"✓ OpenSearch Serverless 정책 연결 완료")
    
    iam_client.put_role_policy(
        RoleName=kb_role_name,
        PolicyName=policy_name_bedrock,
        PolicyDocument=json.dumps(bedrock_policy)
    )
    print(f"✓ Bedrock 정책 연결 완료")
    
    # IAM 역할이 전파될 때까지 대기
    print("\nIAM 역할 전파 대기 중...")
    time.sleep(10)
    
except Exception as e:
    print(f"정책 연결 중 오류: {e}")

## 4. Knowledge Base 생성

boto3를 사용하여 기존 OpenSearch Serverless Collection을 사용하는 Knowledge Base를 생성합니다.

In [None]:
# Knowledge Base 구성
storage_configuration = {
    "type": "OPENSEARCH_SERVERLESS",
    "opensearchServerlessConfiguration": {
        "collectionArn": existing_collection_arn,
        "vectorIndexName": index_name,
        "fieldMapping": {
            "vectorField": "bedrock-knowledge-base-default-vector",
            "textField": "AMAZON_BEDROCK_TEXT_CHUNK",
            "metadataField": "AMAZON_BEDROCK_METADATA"
        }
    }
}

# Knowledge Base 생성
print(f"Knowledge Base 생성 중: {kb_name}")
print(f"Collection: {existing_collection_arn}")
print(f"Index: {index_name}\n")

create_kb_response = bedrock_agent_client.create_knowledge_base(
    name=kb_name,
    description=kb_description,
    roleArn=kb_role_arn,
    knowledgeBaseConfiguration={
        "type": "VECTOR",
        "vectorKnowledgeBaseConfiguration": {
            "embeddingModelArn": embedding_model_arn
        }
    },
    storageConfiguration=storage_configuration
)

kb = create_kb_response['knowledgeBase']
kb_id = kb['knowledgeBaseId']

print("✓ Knowledge Base 생성 완료!")
print(f"\nKB ID: {kb_id}")
print(f"KB ARN: {kb['knowledgeBaseArn']}")
print(f"Status: {kb['status']}")

In [None]:
# KB가 ACTIVE 상태가 될 때까지 대기
print("\nKnowledge Base 활성화 대기 중...")

while True:
    get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id)
    status = get_kb_response['knowledgeBase']['status']
    print(f"Current status: {status}")
    
    if status == "ACTIVE":
        print("\n✓ Knowledge Base가 활성화되었습니다!")
        break
    elif status == "FAILED":
        print("\n✗ Knowledge Base 생성에 실패했습니다.")
        break
    
    time.sleep(5)

## 5. Data Source 추가

S3 버킷을 Data Source로 추가합니다.

In [None]:
# Data Source 이름
data_source_name = f"s3-datasource-{suffix}"

# Data Source 구성
data_source_configuration = {
    "type": "S3",
    "s3Configuration": {
        "bucketArn": f"arn:aws:s3:::{s3_bucket_name}"
    }
}

# 청킹 전략 구성
vector_ingestion_configuration = {
    "chunkingConfiguration": {
        "chunkingStrategy": "FIXED_SIZE",
        "fixedSizeChunkingConfiguration": {
            "maxTokens": 300,
            "overlapPercentage": 20
        }
    }
}

print(f"Data Source 생성 중: {data_source_name}")

create_ds_response = bedrock_agent_client.create_data_source(
    name=data_source_name,
    description="S3 data source",
    knowledgeBaseId=kb_id,
    dataSourceConfiguration=data_source_configuration,
    vectorIngestionConfiguration=vector_ingestion_configuration
)

data_source = create_ds_response["dataSource"]
data_source_id = data_source['dataSourceId']

print("\n✓ Data Source 생성 완료!")
print(f"Data Source ID: {data_source_id}")
print(f"Status: {data_source['status']}")

## 6. Ingestion 작업 시작

S3 버킷의 데이터를 OpenSearch Serverless 인덱스에 적재합니다.

In [None]:
# Ingestion 작업 시작
print(f"Ingestion 작업 시작...")

start_job_response = bedrock_agent_client.start_ingestion_job(
    knowledgeBaseId=kb_id,
    dataSourceId=data_source_id
)

job = start_job_response["ingestionJob"]
job_id = job['ingestionJobId']

print(f"\nIngestion Job ID: {job_id}")
print(f"Status: {job['status']}")

In [None]:
# Ingestion 작업 완료 대기
print("\nIngestion 작업 모니터링 중...")

while True:
    get_job_response = bedrock_agent_client.get_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=data_source_id,
        ingestionJobId=job_id
    )
    
    job_status = get_job_response["ingestionJob"]
    status = job_status['status']
    
    print(f"Status: {status}")
    
    if 'statistics' in job_status:
        stats = job_status['statistics']
        print(f"  Documents: {stats.get('numberOfDocumentsScanned', 0)} scanned, "
              f"{stats.get('numberOfDocumentsIndexed', 0)} indexed")
    
    if status in ["COMPLETE", "FAILED"]:
        break
    
    time.sleep(10)

if status == "COMPLETE":
    print("\n✓ Ingestion 작업이 완료되었습니다!")
    if 'statistics' in job_status:
        print(f"\n최종 통계:")
        pp.pprint(job_status['statistics'])
else:
    print("\n✗ Ingestion 작업이 실패했습니다.")
    if 'failureReasons' in job_status:
        print("실패 이유:")
        pp.pprint(job_status['failureReasons'])

## 7. OpenSearch Serverless 인덱스 확인

생성된 인덱스를 OpenSearch 클라이언트로 확인합니다.

In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

# OSS 엔드포인트 (Collection ID로부터 생성)
oss_host = f"{existing_collection_id}.{region}.aoss.amazonaws.com"

# OSS 클라이언트 생성
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region, 'aoss')

oss_client = OpenSearch(
    hosts=[{'host': oss_host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

# 인덱스 정보 확인
try:
    index_info = oss_client.indices.get(index=index_name)
    print(f"✓ 인덱스 '{index_name}' 확인 완료\n")
    print("인덱스 설정:")
    pp.pprint(index_info[index_name]['settings'])
    
    # 문서 수 확인
    count = oss_client.count(index=index_name)
    print(f"\n인덱스의 문서 수: {count['count']}")
    
except Exception as e:
    print(f"인덱스 확인 중 오류: {e}")

## 8. Knowledge Base 테스트

생성된 Knowledge Base를 테스트합니다.

In [None]:
query = "AWS의 주요 서비스들에 대해 설명해주세요."
foundation_model = "us.amazon.nova-pro-v1:0"

print(f"Query: {query}\n")

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id,
            "modelArn": f"arn:aws:bedrock:{region}:{account_id}:inference-profile/{foundation_model}",
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults": 5
                } 
            }
        }
    }
)

print("응답:")
print(response['output']['text'])

In [None]:
# 인용 정보 확인
def citations_print(response_ret):
    for num, chunk in enumerate(response_ret, 1):
        print(f'Chunk {num}: ', chunk['content']['text'][:200], '...', end='\n'*2)
        print(f'Chunk {num} Location: ', chunk['location'], end='\n'*2)

citations = response['citations'][0]['retrievedReferences']
print(f"\n사용된 청크 수: {len(citations)}\n")
citations_print(citations)

## 9. 생성된 리소스 정보 요약

In [None]:
print("="*80)
print("생성된 리소스 정보")
print("="*80)
print(f"\nKnowledge Base:")
print(f"  Name: {kb_name}")
print(f"  ID: {kb_id}")
print(f"  ARN: {kb['knowledgeBaseArn']}")
print(f"\nIAM Role:")
print(f"  Name: {kb_role_name}")
print(f"  ARN: {kb_role_arn}")
print(f"\nOpenSearch Serverless:")
print(f"  Collection ID: {existing_collection_id}")
print(f"  Collection ARN: {existing_collection_arn}")
print(f"  Index Name: {index_name}")
print(f"  Host: {oss_host}")
print(f"\nData Source:")
print(f"  Name: {data_source_name}")
print(f"  ID: {data_source_id}")
print(f"  S3 Bucket: {s3_bucket_name}")
print("="*80)

## 10. 정리 (선택사항)

테스트가 완료되면 리소스를 정리합니다.

In [None]:
# 주석을 해제하여 리소스 삭제

# # 1. Data Source 삭제
# print("Data Source 삭제 중...")
# bedrock_agent_client.delete_data_source(
#     knowledgeBaseId=kb_id,
#     dataSourceId=data_source_id
# )
# print("✓ Data Source 삭제 완료")

# # 2. Knowledge Base 삭제
# print("Knowledge Base 삭제 중...")
# bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=kb_id)
# print("✓ Knowledge Base 삭제 완료")

# # 3. IAM 정책 삭제
# print("IAM 정책 삭제 중...")
# iam_client.delete_role_policy(RoleName=kb_role_name, PolicyName=policy_name_s3)
# iam_client.delete_role_policy(RoleName=kb_role_name, PolicyName=policy_name_oss)
# iam_client.delete_role_policy(RoleName=kb_role_name, PolicyName=policy_name_bedrock)
# print("✓ IAM 정책 삭제 완료")

# # 4. IAM 역할 삭제
# print("IAM 역할 삭제 중...")
# iam_client.delete_role(RoleName=kb_role_name)
# print("✓ IAM 역할 삭제 완료")

# print("\n모든 리소스가 삭제되었습니다.")
# print("참고: OpenSearch Serverless Collection과 인덱스는 삭제되지 않았습니다.")

## 요약

이 노트북에서는 boto3를 직접 사용하여 Knowledge Base를 생성했습니다.

### 주요 단계:
1. **IAM 역할 생성** - S3, OSS, Bedrock 접근 권한 설정
2. **Knowledge Base 생성** - 기존 OSS Collection과 원하는 인덱스 이름 지정
3. **Data Source 추가** - 기존 S3 버킷 연결
4. **Ingestion 실행** - 데이터를 지정된 인덱스에 적재
5. **테스트** - retrieve_and_generate로 검증

### BedrockKnowledgeBase 클래스와의 차이:
- BedrockKnowledgeBase: 자동으로 새 Collection 생성 또는 기존 Collection 재사용 (제어 제한적)
- boto3 직접 사용: 모든 설정을 직접 제어 가능 (Collection, 인덱스 이름, IAM 역할 등)

### 장점:
- 기존 OSS Collection 재사용으로 비용 절감
- 원하는 인덱스 이름 직접 지정 가능
- 세밀한 IAM 권한 제어
- 여러 KB가 동일한 Collection을 공유하면서 독립적인 인덱스 사용 가능