# OpenSearch 신경망(Neural) 플러그인을 이용한 고급 RAG
이 실습에서는 PDF 문서를 처리하는 고급 RAG 아키텍처를 살펴볼 것입니다. 먼저 PDF에서 텍스트를 추출한 다음, 재귀적 문자 청킹(Recursive Character Chunking) 기법을 사용하여 청크로 나눕니다. OpenSearch 신경망 플러그인을 사용하여 데이터 수집 시 이러한 청크를 벡터로 변환합니다. 데이터가 수집되면 신경망 플러그인을 사용하여 의미론적으로 데이터를 검색합니다. 반환된 결과를 사용하여 답변을 생성하기 위한 프롬프트를 엔지니어링합니다.

우리가 사용할 PDF는 Amazon이 발행한 2023년 연간 보고서입니다. 이 문서에는 재무 데이터, 성과, 회사가 직면한 위험 및 미래에 대한 지침이 포함되어 있습니다. 우리는 이 문서에서 질문에 답할 수 있는 재무 분석가 보조 봇 역할을 할 것입니다.

먼저 적절한 라이브러리를 로드하는 것부터 시작하겠습니다.

## 1. 사전 요구 사항 설치 및 변수 초기화

In [None]:
!pip install langchain langchain_community pypdf langchain_experimental --quiet
!pip install -qU langchain-text-splitters
!pip install --upgrade --quiet  boto3
!pip install pdfminer.six --quiet
!pip install opensearch-py --quiet
!pip install "unstructured[all-docs]" --quiet
!pip install pdf2image --quiet
!pip install -qU langchain-aws --quiet
!pip install alive-progress --quiet
!pip install opensearch-py-ml --quiet
!pip install requests_aws4auth --quiet

In [None]:
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.chat_models import BedrockChat
from langchain.chains import ConversationalRetrievalChain

from langchain_community.embeddings import BedrockEmbeddings
from langchain.indexes import VectorstoreIndexCreator
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader

import boto3
import os
import time
import json
import pandas as pd
from tqdm import tqdm
import sagemaker
from opensearchpy import OpenSearch, RequestsHttpConnection
from sagemaker import get_execution_role
import random 
import string
import s3fs
from urllib.parse import urlparse
from IPython.display import display, HTML
from alive_progress import alive_bar
from opensearch_py_ml.ml_commons import MLCommonClient
from requests_aws4auth import AWS4Auth
import requests 
import os
import json
import pandas as pd
import numpy as np

### CloudFormation 스택 출력에서 변수 초기화

In [None]:
# Create a Boto3 session
session = boto3.Session()

# Get the account id
account_id = boto3.client('sts').get_caller_identity().get('Account')

# Get the current region
region = session.region_name

cfn = boto3.client('cloudformation')
bedrock_client = boto3.client('bedrock-runtime')

# Method to obtain output variables from Cloudformation stack. 
def get_cfn_outputs(stackname):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

## Setup variables to for the rest of the demo
cloudformation_stack_name = "genai-data-foundation-workshop"

outputs = get_cfn_outputs(cloudformation_stack_name)
aos_host = outputs['OpenSearchDomainEndpoint']
s3_bucket = outputs['s3BucketTraining']
bedrock_inf_iam_role = outputs['BedrockBatchInferenceRole']
bedrock_inf_iam_role_arn = outputs['BedrockBatchInferenceRoleArn']
sagemaker_notebook_url = outputs['SageMakerNotebookURL']

# We will just print all the variables so you can easily copy if needed.
outputs

## 2. PDF에서 텍스트 추출 및 청킹
문서를 청크로 나누는 가장 간단한 방법은 길이로 나누는 것이지만, 의미를 잃지 않도록 단락이나 줄을 함께 유지하는 것이 중요합니다. 우리는 LangChain 라이브러리의 재귀적 문자 텍스트 분할기(RecursiveCharacterTextSplitter)를 사용할 것입니다. 이 분할기는 데이터를 길이로 분할하면서도 가능한 한 줄과 단락을 함께 유지하는 방법을 제공합니다.

In [None]:
# this method would split the text into chunks by paragraph, line boundary and keeping chunk 
# size as close to 1000 characters, it will also overlap the text between chunks if it were to 
# split line or paragraph in the middle.

def recursive_character_chunking(text): 
    text_splitter = RecursiveCharacterTextSplitter( #create a text splitter
        separators=["\n\n", "\n", ".", " "], #split chunks at (1) paragraph, (2) line, (3) sentence, or (4) word, in that order
        chunk_size=1000, #divide into 1000-character chunks using the separators above
        chunk_overlap=200, #number of characters that can overlap with previous chunk
        length_function=len,
        is_separator_regex=True,
    )
    
    docs = text_splitter.create_documents(text)#From the loaded PDF
    
    return docs #return the index to be cached by the client app

### PDF 파일 로드 및 파싱
이제 LangChain 라이브러리의 PyPDFLoader를 사용하여 PDF를 로드하고 텍스트를 추출하겠습니다.

In [None]:
#LOAD A PDF DOCUMENT

from langchain.document_loaders import PyPDFLoader

loader = PyPDFLoader("Amazon-com-Inc-2023-Annual-Report.pdf")
documents = loader.load()

#print(documents)a

texts = ""

for document in documents:
    texts += document.page_content.replace(r'\\n', '\n')

이제 재귀적 문자 청킹(Recursive Character Chunking) 기법을 사용하여 로드된 PDF 텍스트를 청크로 나누겠습니다.

In [None]:
#LETS RECURISIVE CHUNK IT
docs = recursive_character_chunking([texts])

# the method prints chunks
def print_chunks(data):
    #Let's print the chunks -- notice the overlap between chunk 3 and 4
    i = 1
    for doc in data:
        print(f"---------START OF CHUNK {i}------")
        print(f"{doc.page_content}")
        print(f"---------END OF CHUNK {i}------\n\n")
        i+=1

#Let's print first 5 chunks.
print_chunks(docs[:5])

## 3. OpenSearch 도메인과 연결 생성
다음으로, Python API를 사용하여 OpenSearch 도메인과의 연결을 설정하겠습니다.

#### 참고: 
이 실습 중 어느 시점에서든 **_The security token included in the request is expired._** 라는 실패 메시지가 표시되면, 이 셀을 다시 실행하여 해결할 수 있습니다. 이 셀은 실습의 나머지 부분에 필요한 보안 자격 증명을 새로 고칩니다.

In [None]:
kms = boto3.client('secretsmanager')
aos_credentials = json.loads(kms.get_secret_value(SecretId=outputs['DBSecret'])['SecretString'])

#credentials = boto3.Session().get_credentials()
#auth = AWSV4SignerAuth(credentials, region)
auth = (aos_credentials['username'], aos_credentials['password'])

aos_client = OpenSearch(
    hosts = [{'host': aos_host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)
ml_client = MLCommonClient(aos_client)
host = f'https://{aos_host}/'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
headers = {"Content-Type": "application/json"}



In [None]:
#initializing some variables that we will use later.

connector_id = ""
model_id = ""

## 4. Amazon Bedrock Titan Text Embedding v2에 대한 ML 커넥터(ML Connector) 생성 및 배포

OpenSearch에서 ML 커넥터를 구성하기 위해 Amazon SageMaker Notebook IAM 역할을 사용할 것입니다. 이 IAM 역할은 BedrockInference IAM 역할을 OpenSearch에 전달할 수 있는 권한을 가지고 있습니다. 그러면 OpenSearch는 BedrockInference IAM 역할을 사용하여 Bedrock 모델을 호출할 수 있게 됩니다.

다음 셀은 SageMaker Notebook IAM 역할을 사용하여 ML 커넥터를 생성합니다. 이 셀은 Amazon Bedrock Titan Text embedding v2 모델과 함께 OpenSearch 원격 ML 커넥터를 생성합니다. 다음 셀은 ML 커넥터 구성을 정의합니다.

#### 중요한 전제 조건
실습 지침 섹션의 단계에 따라 SageMaker 노트북 역할을 OpenSearch의 `ml_full_access` 역할에 매핑해야 합니다. 그렇지 않은 경우, 실습 지침을 참조하여 **권한 설정 > OpenSearch 권한** 섹션을 완료하세요. 이 작업을 수행하지 않으면 다음 셀에서 오류가 발생합니다.

In [None]:
import boto3
import requests 
from requests_aws4auth import AWS4Auth
import json


if not connector_id:
    # Register repository
    path = '_plugins/_ml/connectors/_create'
    url = host + path

    payload = {
        "name": "Amazon Bedrock Connector: embedding",
        "description": "The connector to bedrock Titan text embedding model",
        "version": 1,
        "protocol": "aws_sigv4",
        "credential": {
          "roleArn": f"arn:aws:iam::{account_id}:role/{bedrock_inf_iam_role}"
       },
       "parameters": {
        "region": region,
        "service_name": "bedrock",
           ## USING AMAZON BEDROCK TITAN EMBED TEXT MODEL
        "model": "amazon.titan-embed-text-v2:0"
       },
       "actions": [
        {
          "action_type": "predict",
          "method": "POST",
          "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/invoke",
          "headers": {
            "content-type": "application/json",
            "x-amz-content-sha256": "required"
          },
         "request_body": "{ \"inputText\": \"${parameters.inputText}\" }",
         "pre_process_function": "connector.pre_process.bedrock.embedding",
         "post_process_function": "connector.post_process.bedrock.embedding"}
       ]
    }

    r = requests.post(url, auth=awsauth, json=payload, headers=headers)
    print(r.status_code)
    if r.status_code == 403:
        print("Permission Error: Please make sure you have mapped the NB role to ml_full_access role in OpenSearch dashboard. Follow permission section in lab instructions.")
        print(r.text)
    else:
        connector_id = json.loads(r.text)["connector_id"]
        print(r.text)
else:
    print(f"Connector already exists - {connector_id}")
    
connector_id

ML 커넥터가 정의되면, 모델을 등록하고 배포해야 합니다. 다음 두 셀은 각각 모델 연결을 등록하고 배포합니다.

In [None]:
# Register the model
if not model_id:
    path = '_plugins/_ml/models/_register'
    url = 'https://'+aos_host + '/' + path
    payload = { "name": "Bedrock Titan embeddings model",
    "function_name": "remote",
    "description": "Bedrock Titan text embeddings model",
    "connector_id": connector_id}
    r = requests.post(url, auth=awsauth, json=payload, headers=headers)
    model_id = json.loads(r.text)["model_id"]
else:
    print("skipping model registration - model already exists")
print("Model registered under model_id: "+model_id)


마지막으로 원격 추론을 가능하게 하는 모델을 배포할 것입니다. 이 모델은 텍스트를 임베딩으로 변환하는 데 사용할 모델입니다.

In [None]:
# Deploy the model
path = '_plugins/_ml/models/'+model_id+'/_deploy'
url = 'https://'+aos_host + '/' + path
r = requests.post(url, auth=awsauth, headers=headers)
deploy_status = json.loads(r.text)["status"]
print("Deployment status of the model, "+model_id+" : "+deploy_status)

### 배포된 모델이 제대로 작동하는지 확인하기 위한 테스트 임베딩 생성

OpenSearch ML Commons 플러그인 Python 클라이언트의 메서드인 `ml_client.generate_embedding`을 사용하여 임베딩 모델을 호출하고 임베딩을 생성해 보겠습니다.

**사전 요구 사항**: 실습 지침의 _**권한 설정 > Amazon Bedrock 권한**_ 섹션을 따라 Amazon Bedrock 모델 액세스를 설정했는지 확인하세요. 이 작업을 수행하지 않았다면, 다음 셀을 실행할 때 인증 예외가 발생할 수 있습니다.

In [None]:
#Testing model working

input_sentences = ["What an easy way to create embeddings"]
embedding_output = ml_client.generate_embedding(f"{model_id}", input_sentences)
embed = embedding_output['inference_results'][0]['output'][0]['data']
print(embed[:5])

## 5. 수집 파이프라인(Ingest Pipeline) 생성
Amazon Bedrock Titan Text 임베딩 모델을 호출하여 텍스트 청크 레코드의 `doc_chunk_text` 필드를 벡터 임베딩으로 변환하는 수집 파이프라인을 만들어 보겠습니다. 수집 파이프라인은 OpenSearch의 기능으로, 데이터 수집 시 수행할 특정 작업을 정의할 수 있습니다. 정적 필드 추가, 기존 필드 수정, 또는 원격 모델을 호출하여 추론을 얻고 추론 출력을 인덱싱된 레코드/문서와 함께 저장하는 등의 간단한 처리를 수행할 수 있습니다. 우리의 경우 추론 출력은 벡터 임베딩입니다.

다음 수집 파이프라인은 우리의 원격 모델을 호출하여 청크 텍스트 `doc_chunk_text` 필드를 벡터로 변환하고 이를 `doc_chunk_embedding`이라는 필드에 저장합니다.

In [None]:
path =  "/_ingest/pipeline/amazon-report-ingest-pipeline"
url = f"{aos_host}{path}"

payload = {
  "description": "An Index of Amazon annual report",
  "processors": [
    {
      "text_embedding": {
        "model_id": f"{model_id}",
        "field_map": {
          "doc_chunk_text": "doc_chunk_embedding"
        }
      }
    }
  ]
}

aos_client.ingest.put_pipeline(id="amazon-report-ingest-pipeline", body=payload)



## 6. Amazon OpenSearch Service에 인덱스 생성
이제 PDF 문서 지식 베이스를 위한 인덱스를 정의하겠습니다. 우리는 두 개의 필드를 정의할 것입니다. 첫 번째는 텍스트 청크이고 두 번째는 벡터 임베딩입니다. 벡터 임베딩을 위해 `FAISS` 엔진과 `HNSW`를 알고리즘/방법으로 사용할 것입니다. 다른 매개변수에 대해서는 적절한 기본값을 사용하겠습니다. 위에서 생성한 파이프라인을 인덱스로 수집되는 데이터의 기본 파이프라인으로 제공한다는 점에 주목하세요. 이렇게 하면 문서가 인덱스에 수집되기 전에 파이프라인을 거치게 됩니다.

인덱스를 생성하기 위해, 먼저 JSON으로 인덱스를 정의한 다음, 앞서 초기화한 aos_client 연결을 사용하여 OpenSearch에 인덱스를 생성합니다.

In [None]:
##DEFINE INDEX JSON
knn_index = {
    "settings": {
        "index.knn": True,
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "default_pipeline": "amazon-report-ingest-pipeline", 
        "analysis": {
          "analyzer": {
            "default": {
              "type": "standard",
              "stopwords": "_english_"
            }
          }
        }
    },
    "mappings": {
        "properties": {
             "doc_chunk_text": {
                "type": "text",
                "store": True
            },
           "doc_chunk_embedding": {
               "type": "knn_vector",
               "dimension": 1024,
               "method": {
                   "name": "hnsw",
                   "space_type": "l2",
                   "engine": "faiss",
                   "parameters": {
                       "ef_construction": 256,
                       "m": 48
                   }
               }
           }
        }
    }
}


위의 인덱스 정의를 사용하여 이제 Amazon OpenSearch Service에 인덱스를 생성해야 합니다. 이 셀을 실행하면 이전에 이 노트북을 실행한 적이 있는 경우 인덱스를 다시 생성합니다.

In [None]:
index_name = "amazon_report_knowledge_base"

try:
    aos_client.indices.delete(index=index_name)
    print("Recreating index '" + index_name + "' on cluster.")
    aos_client.indices.create(index=index_name,body=knn_index,ignore=400)
except:
    print("Index '" + index_name + "' not found. Creating index on cluster.")
    aos_client.indices.create(index=index_name,body=knn_index,ignore=400)


생성된 인덱스 정보를 확인해 보겠습니다.

In [None]:
aos_client.indices.get(index=index_name)

## 7. 인덱스에 원시 데이터 로드
다음으로, 방금 생성한 인덱스에 청크로 나눈 텍스트 데이터와 그 임베딩을 로드하겠습니다. 우리의 임베딩을 `doc_chunk_embedding` 필드에 저장할 것이며, 이는 나중에 KNN 검색에 사용될 것입니다.

In [None]:
cnt = 0
batch = 0
action = json.dumps({ "index": { "_index": index_name } })
body_ = ''


with alive_bar(len(docs), force_tty = True) as bar:
    for doc in docs:

        payload={
           "doc_chunk_text": doc.page_content
        }
        body_ = body_ + action + "\n" + json.dumps(payload) + "\n"
        cnt = cnt+1
        
        if(cnt == 100):
            
            response = aos_client.bulk(
                                index = index_name,
                                 body = body_)
            

            cnt = 0
            batch = batch +1
            body_ = ''
        
        bar()
print("Total Bulk batches completed: "+str(batch))

로드를 검증하기 위해, 인덱스에 있는 문서 수를 조회해 보겠습니다. 인덱스에는 약 400개의 문서가 있어야 합니다.

In [None]:
res = aos_client.search(index=index_name, body={"query": {"match_all": {}}})
print("Records found: %d." % res['hits']['total']['value'])

## 8. "의미론적 검색(Semantic Search)"으로 벡터 검색

이제 사용자의 질문과 의미론적으로 일치하는 청크를 찾기 위한 검색 쿼리를 실행하는 헬퍼 함수를 정의할 수 있습니다. `retrieve_opensearch_with_semantic_search`는 신경망 쿼리를 사용하며, 이는 쿼리 텍스트와 모델 ID를 받아 쿼리 텍스트를 임베딩으로 변환한 후 근사 이웃 검색(즉, 의미론적 검색)을 실행합니다. OpenSearch가 반환할 결과의 개수를 지정하기 위해 K를 매개변수로 전달합니다.

In [None]:
def retrieve_opensearch_with_semantic_search(phrase, n=3):
    osquery={
        "_source": {
            "exclude": [ "doc_chunk_embedding" ]
        },
        
      "size": n,
      "query": {
        "neural": {
          "doc_chunk_embedding": {
            "query_text": f"{phrase}",
            "model_id": f"{model_id}",
            "k": n
          }
        }
      }    
    }

    res = aos_client.search(index=index_name, 
                           body=osquery,
                           stored_fields=["doc_chunk_text"],
                           explain = False)
    top_result = res['hits']['hits']
    
    results = []
    
    for entry in top_result:
        result = {
            "doc_chunk_text":entry['_source']['doc_chunk_text']
           
        }
        results.append(result)
    
    return results


### 샘플 질문으로 의미론적 검색을 사용하여 유사한 레코드 가져오기

우리의 청크 데이터에서 답변할 수 있다고 생각되는 질문을 할 것입니다. OpenSearch는 질문과 일치하는 상위 결과를 반환할 것입니다. 반환된 청크들이 질문의 주제에 대해 이야기하고 있음을 알 수 있을 것입니다.

In [None]:
question_on_docs = "What was the operating income difference between 2022 and 2023?" # "2022년과 2023년 사이의 영업 이익 차이는 얼마였습니까?"
example_request = retrieve_opensearch_with_semantic_search(question_on_docs)
print(json.dumps(example_request, indent=4))

## 9. Amazon Bedrock - Anthropic Claude Sonnet 모델 호출 방법 준비
이제 사용자의 질문에 답변하기 위해 LLM을 호출하는 함수를 정의하겠습니다. LLM은 PDF가 생성되기 전에 일반적인 목적의 데이터로 훈련되었을 가능성이 높기 때문에, PDF 파일에 숨겨진 지식을 가지고 있지 않을 수 있습니다. 답변할 수 있더라도 귀하의 비즈니스가 선호하는 답변이 아닐 수 있습니다. 따라서 답변은 우리가 프롬프트에 전달한 데이터를 참조해야 합니다.

이 함수를 정의한 후, LLM이 PDF 청크 데이터에서 어떻게 질문에 답변하는지 확인하기 위해 호출해 보겠습니다.

In [None]:
def query_llm_endpoint_with_json_payload(encoded_json):

    # Create a Bedrock Runtime client
    bedrock_client = boto3.client('bedrock-runtime')
    # Set the model ID for Claude 3 Sonnet
    model_id = 'anthropic.claude-3-sonnet-20240229-v1:0'
    accept = 'application/json'
    content_type = 'application/json'


    try:
        # Invoke the model with the native request payload
        response = bedrock_client.invoke_model(
            modelId=model_id,
            body=str.encode(str(encoded_json)),
            accept = accept,
            contentType=content_type
        )

        # Decode the response body
        response_body = json.loads(response.get('body').read())
        return response_body
    except Exception as e:
        print(f"Error: {e}")
        return none

def query_llm(system, user_question):

    # Prepare the model's payload
    payload = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 10000,
        "system": system,
        "messages": [
            {
              "role": "user",
              "content": [
                {
                  "type": "text",
                  "text": f"{user_question}"
                }
              ]
            }
          ]
        })
    


    query_response = query_llm_endpoint_with_json_payload(payload)

    return query_response['content'][0]['text']


## 10. 검색 증강 생성 (Retrieval Augmented Generation)

### OpenSearch 검색 결과를 사용하여 LLM용 프롬프트 생성 (RAG)

우리는 Anthropic Sonnet 모델을 원샷 프롬프팅 기법과 함께 사용할 것입니다. 프롬프트 내의 모델 지시사항 안에서, 프롬프트 끝에 OpenSearch에서 검색한 청크를 제공하여 모델이 사용자 질문에 답변할 때 참조하도록 할 것입니다.

모델에 쿼리하기 전에, 아래의 `generate_rag_based_system_prompt` 함수를 사용하여 사용자 프롬프트를 구성합니다. 이 함수는 입력 문자열을 받아 OpenSearch 클러스터에서 일치하는 청크를 검색한 다음, LLM을 위한 사용자 프롬프트를 작성합니다.

시스템 프롬프트는 LLM이 수행할 역할을 정의합니다.

사용자 프롬프트는 LLM 모델이 사용자의 질문에 답변하기 위해 사용하는 지시사항과 컨텍스트 정보를 포함합니다.

프롬프트는 다음 형식을 따릅니다:

**시스템 프롬프트:**

```
당신은 재무 문서에서 제공된 텍스트를 분석하고 사용자 질문에 답변하는 재무 보고서 분석 봇입니다.
```

**사용자 프롬프트**
```
재무 보고서 분석가로서 제공된 DOCS_DATA에서 질문에 대한 답변을 해주세요. DOCS_DATA에서 답변을 찾을 수 없다면, "주어진 정보로는 이 질문에 답변할 수 없습니다"라고 말해주세요. DOCS_DATA에서 답변을 얻었다면 그 사실을 언급할 필요는 없습니다.

다음은 DOCS_DATA이며, 이후에 답변할 사용자의 질문이 제시될 것입니다.

DOCS_DATA: {retrieved_documents}

사용자 질문: <사용자 질문>
```

우리는 이 모듈에서 수행한 모든 작업을 결합하는 `query_llm_with_rag` 메서드를 정의합니다. 이 메서드는 다음과 같은 작업을 수행합니다:
- 관련 청크를 찾기 위해 의미론적 검색으로 OpenSearch 인덱스를 검색합니다.
- 검색 결과로부터 LLM 프롬프트를 생성합니다.
- RAG를 사용하여 LLM에 쿼리하여 응답을 얻습니다.

In [None]:
def query_llm_with_rag(user_question, n=3):
    retrieved_documents = retrieve_opensearch_with_semantic_search(user_question,n)
    system_prompt= "You are a Financial report analysis bot analyzes provided text from financial documents and answers user questions"
    user_prompt = (
        f"As a financial report analyst please answer the question from the provided DOCS_DATA. If you cannot find answer from the DOCS_DATA, please say I'm sorry I cannot answer this question from given information. You do not have to mention that you got answer from DOCS_DATA if you got answer from DOCS_DATA.\n"
        f"Following is the DOCS_DATA after which you will be given the user's question to answer\n"
        f"DOCS_DATA: {retrieved_documents} \n"
        f"User's Question: {user_question} \n"        
    )
    response = query_llm(system_prompt, user_prompt)
    return response



### RAG를 사용하여 질문에 답변하기
PDF 텍스트 데이터에서 답변할 수 있는 질문을 해봅시다. 연간 보고서에서 답변할 수 있는 다른 재무 또는 일반적인 회사 성과 관련 질문으로 변경해 보시기를 권장합니다. 이 간단한 아키텍처가 연간 재무 보고서와 같은 복잡한 문서를 분석하는 데 정말 좋은 방법을 제공한다는 것을 알 수 있을 것입니다.

In [None]:
question_on_docs ="What was the revenue for amazon advertisement in 2023?" # "2023년 Amazon 광고 수익은 얼마였습니까?"
recommendation = query_llm_with_rag(question_on_docs)
print(recommendation)

print(f"\n\ndocuments retrieved for above recommendations were \n\n{json.dumps(retrieve_opensearch_with_semantic_search(question_on_docs), indent=4)}")

## 11. 하이브리드 검색 - 고급 RAG
다음 섹션에서는 특정 상황에서 하이브리드 검색이 어떻게 도움이 될 수 있는지 살펴보겠습니다.

Amazon 보고서에서 **지식 베이스(Knowledge Base)** 는 실시간 쿼리에 답변하기 위한 RAG 또는 검색 증강 생성 기반 기술로 언급됩니다.

RAG를 위한 AWS 제품을 검색하는 예시로 다음 질문을 사용해 보겠습니다. 임베딩 모델이 RAG의 의미론적 의미를 알지 못할 수 있기 때문에, 사용자 쿼리의 다른 용어를 사용하여 청크의 정보와 일치시킬 것입니다. 이로 인해 LLM이 우리의 질문에 답변하지 못할 수 있습니다.

In [None]:
question_on_docs ="What is AWS offering for retrieval augmented generation?" # "AWS는 검색 증강 생성(retrieval augmented generation)을 위해 어떤 서비스를 제공하고 있습니까?"
recommendation = query_llm_with_rag(question_on_docs, n=3)
print(recommendation)

print(f"\n\ndocuments retrieved for above recommendations were \n\n{json.dumps(retrieve_opensearch_with_semantic_search(question_on_docs, n=3), indent=4)}")

### 이 문제를 어떻게 해결할 수 있을까요?
보시다시피 상위 3개의 청크에는 사용자의 질문에 답할 수 있는 정보가 포함되어 있지 않습니다. 이런 경우 벡터 검색은 비슷해 보이는 여러 청크를 찾을 수 있지만, 단순히 유사도 거리만 보고는 답변이 있는 청크를 상위로 반환하지 못할 수 있습니다. 우리는 의미론적 검색을 전통적인 키워드 검색과 결합하여 결과를 보완할 수 있습니다. 이를 **하이브리드 검색**이라고 합니다. 키워드와 의미론적 검색 모두의 결과가 다양한 정규화 및 결합 기법을 통해 병합되어 최종 결과를 생성합니다. 이제 **_phase_results_processors_** 결과 프로세서를 사용하는 검색 파이프라인을 정의해 보겠습니다. 이 프로세서는 2개의 서로 다른 쿼리를 받아 그 결과를 결합합니다. 우리의 경우 키워드 일치에 40%, 의미론적 일치에 60%의 가중치를 부여하고, **조화 평균** 기법을 사용하여 결과를 결합합니다.

우리의 쿼리는 이전과 같이 전체 텍스트 검색과 의미론적 검색이 될 것입니다. 우리는 검색 방법을 다시 작성하고 키워드 쿼리를 추가합니다. 이 과정을 `retrieve_opensearch_with_hybrid_search` 메서드로 정의합니다. "**hybrid**" 섹션 아래에 이 메서드가 2개의 쿼리 절을 가지고 있음을 주목하세요.

In [None]:
path =  "/_search/pipeline/hybrid-search-pipeline"
url = f"https://{aos_host}{path}"

payload = {
  "description": "Hybrid search over amazon financial report",
  "phase_results_processors":[
      {
          "normalization-processor":{
              "normalization": {
                  "technique": "l2"
              },
              "combination": {
                  "technique": "harmonic_mean",
                  "parameters": {
                      "weights": [
                          0.4, #first query i.e. keywords have 20% weightage
                          0.6  #first query i.e. semantic search have 80% weightage
                      ]
                  }
              }
          }
      }
    ]
}

r = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)



def retrieve_opensearch_with_hybrid_search(phrase, model_id=model_id, bedrock_client=bedrock_client, n=3 ):
    osquery={
            "_source": {
                "exclude": [ "doc_chunk_embedding" ]
            },
          "size": n,
          "query": {
            "hybrid": {
              "queries": [
                {
                  #First query clause that performs keyword search
                  "match": {
                    "doc_chunk_text": {
                      "query": f"{phrase}"
                    }
                  }
                },
                {
                  #Second query clause that performs semantic search
                  "neural": {
                    "doc_chunk_embedding": {
                      "query_text": f"{phrase}",
                      "model_id": f"{model_id}",
                      "k": n
                    }
                  }
                }
              ]
            }
          }    
        }

    res = aos_client.search(index=index_name, 
                           body=osquery,
                           search_pipeline="hybrid-search-pipeline",
                           stored_fields=["doc_chunk_text"],
                           explain = False)
    top_result = res['hits']['hits']
    
    results = []
    
    for entry in top_result:
        result = {
            "id":entry['_id'],
            "doc_chunk_text":entry['_source']['doc_chunk_text'],
            "_score":entry['_score']
           
        }
        results.append(result)
    
    return results


def hybrid_query_llm_with_rag(user_question):
    retrieved_documents = retrieve_opensearch_with_hybrid_search(user_question)
    system_prompt= "You are a Financial report analysis bot analyzes provided text from financial documents and answers user questions"
    user_prompt = (
        f"As a financial report analyst please answer the question from the provided DOCS_DATA. If you cannot find answer from the DOCS_DATA, please say I'm sorry I cannot answer this question from given information. You do not have to mention that you got answer from DOCS_DATA if you got answer from DOCS_DATA.\n"
        f"Following is the DOCS_DATA after which you will be given the user's question to answer\n"
        f"DOCS_DATA: {retrieved_documents} \n"
        f"User's Question: {user_question} \n"        
    )
    response = query_llm(system_prompt, user_prompt)
    return response

In [None]:
question_on_docs ="What is AWS offering for retrieval augmented generation?" # "AWS는 검색 증강 생성(retrieval augmented generation)을 위해 어떤 서비스를 제공하고 있습니까?"
recommendation = hybrid_query_llm_with_rag(question_on_docs)
print(recommendation)

print(f"\n\ndocuments retrieved for above recommendations were \n\n{json.dumps(retrieve_opensearch_with_hybrid_search(question_on_docs, model_id=model_id, bedrock_client=bedrock_client), indent=4)}")

### 하이브리드 검색은 인기 있지만...

고객들이 하이브리드 검색을 자주 사용하는 것을 보고 있습니다. 이는 단순히 의미론적 검색만으로는 충분하지 않은 경우에 유용합니다. 특히, 도메인 특화 용어(예: RAG)를 포함한 사용 사례에서는 더욱 그렇습니다. 이러한 용어는 임베딩 모델이 학습 데이터에서 충분히 접하지 못했거나, 회사의 분류 체계에 매우 특정되어 전통적인 의미와는 다르게 해석될 수 있습니다.

또한, 의미론적 검색에서 찾은 상위 3개의 결과에 답이 없는 경우일 수도 있습니다. 이번에는 **n=10**, 즉 상위 10개의 결과로 의미론적 검색을 다시 시도해 봅시다.

In [None]:
question_on_docs ="What is AWS offering for retrieval augmented generation?" # "AWS는 검색 증강 생성(retrieval augmented generation)을 위해 어떤 서비스를 제공하고 있습니까?"

#Setting n=10 to retrieve 10 results instead of 3
n = 10
recommendation = query_llm_with_rag(question_on_docs, n)
print(recommendation)

print(f"\n\ndocuments retrieved for above recommendations were \n\n{json.dumps(retrieve_opensearch_with_semantic_search(question_on_docs, n=n), indent=4)}")

보시다시피 모델이 질문에 조금 더 잘 답변할 수 있었습니다.

## 12. 교차 인코더(Cross-Encoder) 모델을 사용한 재순위화(Re-Ranking)

위의 문제는 RAG에 대해 언급하는 텍스트 청크가 상위 3개 결과 안에 들어오도록 결과를 재순위화하는 방법을 찾을 수 있다면 해결할 수 있습니다. 이는 쿼리와 결과 세트를 살펴보며 OpenSearch 결과를 재순위화하는 데 도움을 주는 교차 인코더 모델로 잠재적으로 달성할 수 있습니다.

교차 인코더 모델은 OpenSearch에서 지원되며, OpenSearch 서비스에서 이러한 모델 중 두 가지를 사용할 수 있습니다.

교차 인코더 모델은 쿼리와 검색 결과를 살펴보고, 쿼리에 답변할 가능성이 있는 결과가 더 높은 순위를 갖도록 재순위화합니다.

### 로컬 모델 호스팅을 가능하게 하는 클러스터 설정을 해보겠습니다

In [None]:
s = b"""
{"transient":{"plugins.ml_commons.only_run_on_ml_node": false}}
"""
aos_client.cluster.put_settings(body=s)

### 교차 인코더 모델 배포
OpenSearch의 ML commons 기능을 사용하여 교차 인코더 모델을 로드하겠습니다. 이 모델은 OpenSearch의 데이터 노드 내에서 호스팅됩니다. 이전에는 ML commons를 사용하여 Amazon Bedrock Titan을 호출하는 원격 추론 모델을 배포했습니다. 이번에는 데이터 노드 내에 교차 인코더 모델을 등록하고 배포할 것입니다.

In [None]:
model_group_id=""

path =  "/_plugins/_ml/model_groups/_register"
url = f"https://{aos_host}{path}"

payload = {
  "name": "local_model_group",
  "description": "A model group for local models"
}

r = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)
model_group_id = json.loads(r.text)["model_group_id"]

이제 모델을 등록하겠습니다...

In [None]:
path =  "/_plugins/_ml/models/_register"
url = f"https://{aos_host}{path}"

payload = {
  "name": "huggingface/cross-encoders/ms-marco-MiniLM-L-6-v2",
  "version": "1.0.2",
  "model_group_id": f"{model_group_id}",
  "model_format": "TORCH_SCRIPT"
}

r = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)
rr_model_task_id = json.loads(r.text)["task_id"]

rr_model_task_id

다음 셀은 모델 등록이 완료되었는지 확인합니다. **모델이 등록되었다는 메시지가 표시될 때까지 이 셀을 실행해 주세요.**

In [None]:
rr_model_id = ""

path =  f"/_plugins/_ml/tasks/{rr_model_task_id}"
url = f"https://{aos_host}{path}"

r = requests.get(url, auth=awsauth, headers=headers)
print(r.status_code)
print(r.text)
task_state = json.loads(r.text)["state"]
if task_state == "COMPLETED":
    rr_model_id = json.loads(r.text)["model_id"]
    print("TASK COMPLETED SUCCESSFULLY, PLEASE MOVE TO NEXT CELL")
else:
    print("TASK NOT COMPLETED, PLEASE RE-RUN THIS CELL")


다음 코드는 등록된 모델을 배포합니다.

In [None]:
path =  f"/_plugins/_ml/models/{rr_model_id}/_deploy"
url = f"https://{aos_host}{path}"

r = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)
rr_model_task_id = json.loads(r.text)["task_id"]



In [None]:
path =  f"/_plugins/_ml/tasks/{rr_model_task_id}"
url = f"https://{aos_host}{path}"

r = requests.get(url, auth=awsauth, headers=headers)
print(r.status_code)
print(r.text)
task_state = json.loads(r.text)["state"]
if task_state == "COMPLETED":
    print("MODEL DEPLOYMENT SUCCESSFUL, PLEASE MOVE TO THE NEXT CELL")
    rr_model_id = json.loads(r.text)["model_id"]
elif task_state == "FAILED":
    print("MODEL DEPLOYMENT FAILED, PLEASE INVESTIGATE THE ERROR")
else:
    print("MODEL DEPLOYMENT IN PROGRESS, PLEASE RE-RUN THE CELL")

### 재순위화 모델을 사용하는 검색 파이프라인(Search Pipeline) 생성
이제 방금 배포한 재순위화 모델에 검색 결과를 전달하는 검색 파이프라인을 배포하겠습니다. 이 모델은 교차 인코더가 결과 순위를 평가하는 데 사용할 필드의 이름을 필요로 합니다. 이 경우 우리는 `doc_chunk_text`를 필드로 제공합니다.

In [None]:
path =  "/_search/pipeline/rerank-search-pipeline"
url = f"https://{aos_host}{path}"

payload = {
  "description": "Pipeline for reranking with cross-encoder model",
    "response_processors": [
        {
            "rerank": {
                "ml_opensearch": {
                    "model_id": f"{rr_model_id}"
                },
                "context": {
                    "document_fields": ["doc_chunk_text"]
                }
            }
        }
    ]
}

print(payload)

r = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(r.status_code)
print(r.text)

의미론적 검색 메서드를 다시 작성해 보겠습니다. 재순위화(rerank) 절을 사용하고 우리의 쿼리를 쿼리 컨텍스트로 제공할 것입니다. 이 쿼리와 검색 파이프라인의 일부로 제공한 `doc_chunk_text`를 통해 교차 인코더는 재순위화를 수행하는 데 필요한 모든 것을 갖게 됩니다. 다음 두 가지 메서드를 정의하겠습니다:

`retrieve_opensearch_with_semantic_rerank_search` - 이 메서드는 의미론적 검색을 실행하고 재순위화 절을 사용하여 결과를 재순위화합니다. 검색을 실행할 때 `rerank-search-pipeline`을 검색 파이프라인으로 사용합니다.

`rerank_query_llm_with_rag` - 이 메서드는 위의 재순위화 검색 메서드를 호출하여 상위 청크를 검색한 다음, 이 상위 청크를 Claude Sonnet 3에 전달하여 사용자 질문에 답변합니다.

In [None]:
def retrieve_opensearch_with_semantic_rerank_search(phrase, n, k, model_id=model_id, bedrock_client=bedrock_client):
    osquery={
        "_source": {
            "exclude": [ "doc_chunk_embedding" ]
        },
        
      "size": k,
      "query": {
        "neural": {
          "doc_chunk_embedding": {
            "query_text": f"{phrase}",
            "model_id": f"{model_id}",
            "k": k
          }
        }
      },
     "ext":{
         "rerank": {
          "query_context": {
             "query_text": f"{phrase}"
        }
     }
        
    }
    }

    res = aos_client.search(index=index_name, 
                           body=osquery,
                           search_pipeline="rerank-search-pipeline",
                           stored_fields=["doc_chunk_text"],
                           explain = False)
    top_result = res['hits']['hits']

    results = []
    for entry in top_result:
        result = {
            "id":entry['_id'],
            "doc_chunk_text":entry['_source']['doc_chunk_text'],
            "_score":entry['_score']
           
        }
        results.append(result)

    return results[:n]


def rerank_query_llm_with_rag(user_question, n, k):
    retrieved_documents = retrieve_opensearch_with_semantic_rerank_search(user_question, n = n, k = k, model_id=model_id, bedrock_client=bedrock_client)
    system_prompt= "You are a Financial report analysis bot analyzes provided text from financial documents and answers user questions"
    user_prompt = (
        f"As a financial report analyst please answer the question from the provided DOCS_DATA. If you cannot find answer from the DOCS_DATA, please say I'm sorry I cannot answer this question from given information. You do not have to mention that you got answer from DOCS_DATA if you got answer from DOCS_DATA.\n"
        f"Following is the DOCS_DATA after which you will be given the user's question to answer\n"
        f"DOCS_DATA: {retrieved_documents} \n"
        f"User's Question: {user_question} \n"        
    )
    response = query_llm(system_prompt, user_prompt)
    return response

### 교차 인코더 검색 실행
다음 셀은 이전에 답변하지 못했던 동일한 질문에 대해 검색을 실행합니다. K = 10개의 항목을 가져올 것이며, 이는 교차 인코더 모델에 의해 재순위화될 것입니다. 상위 K개 중에서 상위 N개의 레코드를 LLM에 전달하여 사용자의 질문에 답변하게 할 것입니다.

In [None]:
question_on_docs ="What is AWS offering for retrieval augmented generation?" # "AWS는 검색 증강 생성(retrieval augmented generation)을 위해 어떤 서비스를 제공하고 있습니까?"

#keeping our results to top 3 items.
n = 3
recommendation = rerank_query_llm_with_rag(question_on_docs, n=3, k=10)
print(recommendation)

print(f"\n\ndocuments retrieved for above recommendations were \n\n{json.dumps(retrieve_opensearch_with_semantic_rerank_search(phrase=question_on_docs, n=3, k=10), indent=4)}")

# 결론
여러분은 Amazon 연간 재무 보고서를 PDF 파일 형식으로 로드하고 파싱하는 것으로 시작했습니다. 로드된 텍스트를 청크로 나누고 신경망 플러그인을 사용하여 직접 임베딩을 처리할 필요 없이 청크를 임베딩으로 변환했습니다. 그런 다음 의미론적 검색을 사용하여 이 데이터를 바탕으로 사용자의 금융 관련 질문에 답변했습니다. 강력한 키워드 검색과 새로운 의미론적 검색 기능을 결합하여 하이브리드 검색이 어떻게 검색 결과를 개선할 수 있는지 살펴보았습니다. 또한 재순위화가 RAG 파이프라인에서 중요한 개선 사항이 될 수 있음을 보여주었습니다.

이 실습을 완료하신 것을 축하드립니다. 이제 실습 지침 섹션으로 이동하시면 됩니다.