# 2. Amazon Bedrock Knowledge Bases - GraphRAG 생성
- Amazon SageMaker AI - Notebook - JupyterLab 환경에서 테스트 되었습니다.
- Kernel : conda_python3

## 1. Setup

### 필요한 라이브러리 설치

In [None]:
# 필요한 라이브러리 설치 (설치 후 커널 재부팅 필요)
!pip install boto3==1.37.2 --force
!pip install -q awscli --upgrade

### 라이브러리 임포트 및 세션 설정

In [None]:
# 필요한 라이브러리 임포트
import boto3
import json
import time
import os
import uuid
from datetime import datetime
import pandas as pd
from tqdm.notebook import tqdm

# AWS 리전 설정
region = boto3.session.Session().region_name
print(f"현재 AWS 리전: {region}")

# 세션 및 클라이언트 설정
session = boto3.session.Session(region_name=region)
bedrock = session.client('bedrock')
bedrock_runtime = session.client('bedrock-runtime')
bedrock_agent = session.client('bedrock-agent')
s3 = session.client('s3')
iam = session.client('iam')


In [None]:
print(boto3.__version__)

### Workshop을 위한 S3 버킷 생성

In [None]:
# S3 버킷 설정
bucket_name = f"bedrock-kb-workshop-{str(uuid.uuid4())[:8]}"
prefix = "graphrag-workshop"

# 버킷이 존재하지 않으면 생성
try:
    s3.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={'LocationConstraint': region} if region != 'us-east-1' else {}
    )
    print(f"버킷 생성 완료: {bucket_name}")
except Exception as e:
    print(f"버킷 생성 중 오류 발생: {e}")
    
# S3 경로 설정
s3_data_path = f"s3://{bucket_name}/{prefix}/data"
print(f"데이터 저장 경로: {s3_data_path}")


### KB - GraphRAG 생성을 위한 IAM Role 생성

In [None]:
# Knowledge Base를 위한 IAM 역할 생성
role_name = f"AmazonBedrockKBExecutionRole-{str(uuid.uuid4())[:8]}"

# 신뢰 정책 정의
trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

# IAM 역할 생성
try:
    role_response = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(trust_policy),
        Description="Amazon Bedrock Knowledge Base Execution Role"
    )
    
    # 필요한 정책 연결
    iam.attach_role_policy(
        RoleName=role_name,
        PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess"
    )
    
    # 역할 ARN 저장
    role_arn = role_response['Role']['Arn']
    print(f"IAM 역할 생성 완료: {role_arn}")
    
    # 역할이 전파될 시간 대기
    print("IAM 역할 반영을 위해 10초 대기...")
    time.sleep(10)
    
except Exception as e:
    print(f"IAM 역할 생성 중 오류 발생: {e}")


### S3 버킷에 테스트 용 데이터 (10-q) 업로드

In [None]:
# ../data/10-q/ 디렉토리의 PDF 파일들을 S3에 업로드
import os
import glob
from tqdm.notebook import tqdm

# PDF 파일 경로 설정
pdf_directory = "../data/10-q/"

# 디렉토리 존재 확인
if not os.path.exists(pdf_directory):
    print(f"경고: {pdf_directory} 디렉토리가 존재하지 않습니다.")
    # 디렉토리 생성 (선택적)
    os.makedirs(pdf_directory, exist_ok=True)
    print(f"{pdf_directory} 디렉토리를 생성했습니다. 이 디렉토리에 PDF 파일을 넣어주세요.")
else:
    # PDF 파일 목록 가져오기
    pdf_files = glob.glob(os.path.join(pdf_directory, "*.pdf"))
    
    if not pdf_files:
        print(f"경고: {pdf_directory} 디렉토리에 PDF 파일이 없습니다.")
    else:
        print(f"{len(pdf_files)}개의 PDF 파일을 찾았습니다. S3에 업로드를 시작합니다...")
        
        # 각 PDF 파일을 S3에 업로드
        for pdf_file in tqdm(pdf_files, desc="PDF 파일 업로드 중"):
            file_name = os.path.basename(pdf_file)
            s3_key = f"{prefix}/data/{file_name}"
            
            try:
                s3.upload_file(
                    Filename=pdf_file,
                    Bucket=bucket_name,
                    Key=s3_key
                )
            except Exception as e:
                print(f"파일 '{file_name}' 업로드 중 오류 발생: {e}")
        
        print(f"모든 PDF 파일이 s3://{bucket_name}/{prefix}/data/ 경로에 업로드되었습니다.")


## 2. KB - GraphRAG 생성 <== 현재 boto3가 GraphRAG 생성을 지원하지 않는것 같음.. 에러.. 확인중

In [None]:
# Knowledge Base 생성
kb_name = f"bedrock-graphrag-workshop-{str(uuid.uuid4())[:8]}"

# Knowledge Base 생성 요청
try:
    kb_response = bedrock_agent.create_knowledge_base(
        name=kb_name,
        roleArn=role_arn,
        knowledgeBaseConfiguration={
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": f"arn:aws:bedrock:{region}::foundation-model/amazon.titan-embed-text-v2:0",
                "embeddingModelConfiguration": {
                    "bedrockEmbeddingModelConfiguration": {
                        "dimensions": 1024
                    }
                }
            }
        },
        storageConfiguration={
            "type": "NEPTUNE_ANALYTICS",
            "neptuneAnalyticsConfiguration": {
                "graphIdentifier": "auto-create"
            }
        },
        description="Amazon Bedrock GraphRAG Knowledge Base (Preview)"
    )

    
    kb_id = kb_response['knowledgeBase']['knowledgeBaseId']
    print(f"Knowledge Base 생성 요청 완료: {kb_id}")
    
    # Knowledge Base가 생성된 후 데이터 소스 추가
    data_source_id = f"ds-{str(uuid.uuid4())[:8]}"
    data_source_response = bedrock_agent.create_data_source(
        knowledgeBaseId=kb_id,
        dataSourceConfiguration={
            "s3Configuration": {
                "bucketArn": f"arn:aws:s3:::{bucket_name}",
                "inclusionPrefixes": [f"{prefix}/data/"]
            },
            "type": "S3"
        },
        name="GraphRAG-DataSource",
        description="Data source for GraphRAG workshop"
    )
    
    print(f"데이터 소스 생성 완료: {data_source_response['dataSource']['dataSourceId']}")
    
except Exception as e:
    print(f"Knowledge Base 생성 중 오류 발생: {e}")


### KB 생성 완료 대기

In [None]:
# Knowledge Base 생성 완료 대기
def wait_for_kb_status(kb_id, target_status="ACTIVE", max_attempts=30):
    print(f"Knowledge Base 상태 확인 중...")
    attempts = 0
    
    while attempts < max_attempts:
        try:
            response = bedrock_agent.get_knowledge_base(
                knowledgeBaseId=kb_id
            )
            
            current_status = response['knowledgeBase']['status']
            print(f"현재 상태: {current_status}")
            
            if current_status == target_status:
                print(f"Knowledge Base가 {target_status} 상태가 되었습니다.")
                return True
            
            if current_status == "FAILED":
                print("Knowledge Base 생성 실패")
                return False
                
            # 30초 대기 후 다시 확인
            print("30초 후 다시 확인합니다...")
            time.sleep(30)
            attempts += 1
            
        except Exception as e:
            print(f"상태 확인 중 오류 발생: {e}")
            return False
    
    print(f"최대 시도 횟수({max_attempts})에 도달했습니다.")
    return False

# KB 생성 완료 대기
kb_ready = wait_for_kb_status(kb_id)
if not kb_ready:
    print("Knowledge Base 생성이 완료되지 않았습니다. 다음 단계로 진행하기 전에 AWS 콘솔에서 상태를 확인하세요.")


## 3. Data Sync

In [None]:
# Knowledge Base 데이터 동기화
try:
    sync_response = bedrock_agent.start_ingestion_job(
        knowledgeBaseId=kb_id,
        dataSourceId=data_source_config["dataSourceId"]
    )
    
    ingestion_job_id = sync_response['ingestionJob']['ingestionJobId']
    print(f"데이터 동기화 작업 시작: {ingestion_job_id}")
    
except Exception as e:
    print(f"데이터 동기화 시작 중 오류 발생: {e}")


### 데이터 동기화 완료 대기

In [None]:
# 데이터 동기화 완료 대기
def wait_for_ingestion_job(kb_id, job_id, max_attempts=20):
    print(f"데이터 동기화 상태 확인 중...")
    attempts = 0
    
    while attempts < max_attempts:
        try:
            response = bedrock_agent.get_ingestion_job(
                knowledgeBaseId=kb_id,
                ingestionJobId=job_id
            )
            
            current_status = response['ingestionJob']['status']
            print(f"현재 상태: {current_status}")
            
            if current_status == "COMPLETE":
                print("데이터 동기화가 완료되었습니다.")
                return True
            
            if current_status == "FAILED":
                print("데이터 동기화 실패")
                return False
                
            # 15초 대기 후 다시 확인
            print("15초 후 다시 확인합니다...")
            time.sleep(15)
            attempts += 1
            
        except Exception as e:
            print(f"상태 확인 중 오류 발생: {e}")
            return False
    
    print(f"최대 시도 횟수({max_attempts})에 도달했습니다.")
    return False

# 데이터 동기화 완료 대기
sync_completed = wait_for_ingestion_job(kb_id, ingestion_job_id)
if not sync_completed:
    print("데이터 동기화가 완료되지 않았습니다. 다음 단계로 진행하기 전에 AWS 콘솔에서 상태를 확인하세요.")


---

## 4. KB - GraphRAG 쿼리 실행
- Claude 3.5 Sonnet 사용

In [None]:
# Knowledge Base 쿼리 함수 정의
def query_knowledge_base(query_text, model_id="anthropic.claude-3-sonnet-20240229-v1:0", max_tokens=1000):
    try:
        # 검색 요청 구성
        retrieve_response = bedrock_agent.retrieve(
            knowledgeBaseId=kb_id,
            retrievalQuery={
                'text': query_text
            },
            numberOfResults=3
        )
        
        # 검색 결과 확인
        retrieved_results = retrieve_response.get('retrievalResults', [])
        
        if not retrieved_results:
            print("검색 결과가 없습니다.")
            return None
        
        # 검색 결과를 컨텍스트로 사용
        context = ""
        for i, result in enumerate(retrieved_results):
            content = result['content']['text']
            source = result.get('location', {}).get('s3Location', {}).get('uri', '알 수 없는 소스')
            score = result.get('score', 0)
            
            context += f"\n\n참고 문서 {i+1} (관련성 점수: {score}):\n{content}\n"
            print(f"검색 결과 {i+1}: 관련성 점수 {score}")
        
        # Claude에 질의 구성
        prompt = f"""
사용자의 질문에 답변해 주세요. 다음 참고 문서의 정보를 활용하세요:

{context}

사용자 질문: {query_text}

답변:
"""
        
        # Claude에 질의 요청
        response = bedrock_runtime.converse(
            modelId=model_id,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "text": prompt
                        }
                    ]
                }
            ],
            maxTokens=max_tokens
        )
        
        # 응답 추출 및 반환
        answer = response['messages'][0]['content'][0]['text']
        return answer
        
    except Exception as e:
        print(f"쿼리 실행 중 오류 발생: {e}")
        return None


### 사용자 쿼리 테스트

In [None]:
# 쿼리 예제 실행
test_queries = [
    "Amazon의 총 순매출은 시간이 지남에 따라 어떻게 변화했나요?",
    "Amazon의 운영 비용 증가가 각 사업 부문의 수익성과 전반적인 재무 성과에 어떤 영향을 미쳤으며, 이는 다른 재무 지표들과 어떤 연관성을 보이나요?"
]

for query in test_queries:
    print(f"\n질문: {query}")
    answer = query_knowledge_base(query)
    if answer:
        print(f"\n답변:\n{answer}")
    print("-" * 80)


---

## 5. Clean-up

In [None]:
# 워크샵에서 생성된 리소스 ID/이름 설정

#kb_id = "여기에 Knowledge Base ID 입력"
#bucket_name = "여기에 S3 버킷 이름 입력"
#role_name = "여기에 IAM 역할 이름 입력"


# AWS 클라이언트 초기화
import boto3
region = boto3.session.Session().region_name
bedrock_agent = boto3.client('bedrock-agent', region_name=region)
s3 = boto3.client('s3', region_name=region)
iam = boto3.client('iam', region_name=region)

print(f"리소스 정리를 위한 변수가 설정되었습니다: region: {region}")

In [None]:
# Knowledge Base 삭제
try:
    print(f"Knowledge Base 삭제 중: {kb_id}")
    response = bedrock_agent.delete_knowledge_base(knowledgeBaseId=kb_id)
    print(f"Knowledge Base 삭제 요청이 성공적으로 제출되었습니다.")
    print("상태: 백그라운드에서 삭제 진행 중...")
except Exception as e:
    print(f"Knowledge Base 삭제 중 오류 발생: {e}")

In [None]:
# S3 버킷의 모든 객체 삭제 후 버킷 삭제
import time

try:
    # 모든 객체 삭제
    print(f"S3 버킷 내 모든 객체 삭제 중: {bucket_name}")

    # 모든 객체 나열 및 삭제
    paginator = s3.get_paginator('list_objects_v2')
    object_count = 0

    for page in paginator.paginate(Bucket=bucket_name):
        if 'Contents' in page:
            objects = [{'Key': obj['Key']} for obj in page['Contents']]
            s3.delete_objects(Bucket=bucket_name, Delete={'Objects': objects})
            object_count += len(objects)

    print(f"{object_count}개 객체 삭제 완료")

    # 버전 관리된 객체 삭제 (필요시)
    try:
        paginator = s3.get_paginator('list_object_versions')
        version_count = 0

        for page in paginator.paginate(Bucket=bucket_name):
            delete_list = []

            # 버전 삭제
            if 'Versions' in page:
                delete_list.extend([{'Key': obj['Key'], 'VersionId': obj['VersionId']} for obj in page['Versions']])

            # 삭제 마커 제거
            if 'DeleteMarkers' in page:
                delete_list.extend([{'Key': obj['Key'], 'VersionId': obj['VersionId']} for obj in page['DeleteMarkers']])

            if delete_list:
                s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_list})
                version_count += len(delete_list)

        if version_count > 0:
            print(f"{version_count}개 객체 버전/삭제 마커 제거 완료")
    except Exception as e:
        print(f"버전 제거 중 오류(무시 가능): {e}")

    # 잠시 대기 (모든 객체가 삭제되기를 기다림)
    time.sleep(3)

    # 버킷 삭제
    print(f"S3 버킷 삭제 중: {bucket_name}")
    s3.delete_bucket(Bucket=bucket_name)
    print(f"S3 버킷 삭제 완료: {bucket_name}")

except Exception as e:
    print(f"S3 버킷 삭제 중 오류 발생: {e}")

In [None]:
# IAM 역할 삭제 (연결된 정책 먼저 분리)
try:
    # 모든 관리형 정책 분리
    attached_policies = iam.list_attached_role_policies(RoleName=role_name)

    for policy in attached_policies.get('AttachedPolicies', []):
        policy_arn = policy['PolicyArn']
        print(f"정책 분리 중: {policy_arn}")
        iam.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
        print(f"정책 분리 완료: {policy_arn}")

    # 인라인 정책 삭제 (필요한 경우)
    inline_policies = iam.list_role_policies(RoleName=role_name)

    for policy_name in inline_policies.get('PolicyNames', []):
        print(f"인라인 정책 삭제 중: {policy_name}")
        iam.delete_role_policy(RoleName=role_name, PolicyName=policy_name)

    # 역할 삭제
    print(f"IAM 역할 삭제 중: {role_name}")
    iam.delete_role(RoleName=role_name)
    print(f"IAM 역할 삭제 완료: {role_name}")

except Exception as e:
    print(f"IAM 역할 삭제 중 오류 발생: {e}")