# 1단계: 환경 설정

In [None]:
!pip install -qU langchain langchain-community langchain-text-splitters voyageai pymongo ipywidgets boto3

In [None]:
import os
import boto3
import voyageai
from pymongo import MongoClient

# MongoDB 연결 문자열을 넣습니다.
MONGODB_URI = ""
# Voyage AI API Key를 넣습니다.
VOYAGE_API_KEY = ""

MONGODB_URI = ""
VOYAGE_API_KEY = ""
BEDROCK_API_KEY= ""
# 2. 런타임 클라이언트 초기화
os.environ["AWS_BEARER_TOKEN_BEDROCK"] = BEDROCK_API_KEY
bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name="ap-northeast-2"
)

# 데이터베이스 이름
DB_NAME = "mdb_ai_workshop"

# MongoDB Python 클라이언트를 초기화합니다.
mongodb_client = MongoClient(MONGODB_URI)
# 서버 연결 상태를 확인합니다.
mongodb_client.admin.command("ping")

# Voyage AI 클라이언트를 초기화합니다.
vo = voyageai.Client(api_key=VOYAGE_API_KEY)

 ## 유틸리티 함수 정의

In [None]:
from pymongo.collection import Collection
from pymongo.errors import OperationFailure
from typing import List, Dict, Optional
from tqdm import tqdm
from typing import Dict
import time

SLEEP_TIMER = 3

def create_index(collection: Collection, index_name: str, model: Dict) -> None:
    try:
        print(f"Creating the {index_name} index")
        collection.create_search_index(model=model)
    except OperationFailure:
        print(f"{index_name} index already exists, recreating...")
        try:
            print(f"Dropping {index_name} index")
            collection.drop_search_index(name=index_name)

            # Poll for index deletion to complete
            while True:
                indexes = list(collection.list_search_indexes())
                index_exists = any(idx.get("name") == index_name for idx in indexes)
                if not index_exists:
                    print(f"{index_name} index deletion complete")
                    break
                print(f"Waiting for {index_name} index deletion to complete...")
                time.sleep(SLEEP_TIMER)

            print(f"Creating new {index_name} index")
            collection.create_search_index(model=model)
            print(f"Successfully recreated the {index_name} index")
        except Exception as e:
            raise Exception(f"Error during index recreation: {str(e)}")


def check_index_ready(collection: Collection, index_name: str) -> None:
    while True:
        indexes = list(collection.list_search_indexes())
        matching_indexes = [idx for idx in indexes if idx.get("name") == index_name]

        if not matching_indexes:
            print(f"{index_name} index not found")
            time.sleep(SLEEP_TIMER)
            continue

        index = matching_indexes[0]
        status = index["status"]
        if status == "READY":
            print(f"{index_name} index status: READY")
            print(f"{index_name} index definition: {index['latestDefinition']}")
            break

        print(f"{index_name} index status: {status}")
        time.sleep(SLEEP_TIMER)

# 2단계: 데이터셋 로드하기

In [None]:
import json

with open("assets/datas/mongodb_docs.json", "r") as data_file:
    json_data = data_file.read()

docs = json.loads(json_data)

In [None]:
# 데이터셋에 포함된 문서의 개수를 확인합니다.
len(docs)

In [None]:
# 문서 구조를 파악하기 위해 하나를 미리 봅니다.
docs[0]

# 3단계: 데이터 청킹(Chunking) 및 임베딩(Embedding)


In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing import Dict, List
from tqdm import tqdm

#### 데이터 청킹시 고려해야할 부분

In [None]:
# 데이터 청킹시 고려해야할 부분들입니다.
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=200,
    chunk_overlap=0, # 비교를 위해 오버랩을 0으로 둡니다. (오버랩이 있어도 문제는 발생합니다)
    separators=["\n\n", "\n", " ", "", "#", "##", "###"]
)

standard_chunks = []

for doc in docs:
    # 문서의 본문(body)을 가져옵니다.
    original_text = doc.get("body", "")
    
    # 텍스트 분할 수행
    splits = text_splitter.split_text(original_text)
    
    for i, split_text in enumerate(splits):
        standard_chunks.append({
            "source_title": doc.get("title"),
            "chunk_index": i,
            "text": split_text
        })

# 결과 확인 (처음 5개만 출력)
for chunk in standard_chunks[:5]:
    print(f"--- Chunk {chunk['chunk_index']} ({chunk['source_title']}) ---")
    print(chunk['text'])
    print("\n")

https://api.python.langchain.com/en/latest/character/langchain_text_splitters.character.RecursiveCharacterTextSplitter.html

In [None]:
# LangChain의 `RecursiveCharacterTextSplitter`를 사용하여 먼저 `separators` 목록을 기준으로 텍스트를 분할합니다.
# `model_name` 매개변수는 토큰화에 사용할 인코더를 지정합니다. 여기서는 GPT-4의 인코더를 사용합니다.
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    model_name="gpt-4", 
    separators=["\n\n", "\n", " ", "", "#", "##", "###"], 
    chunk_size=200, 
    chunk_overlap=0
)

def get_chunks(doc: Dict, text_field: str) -> List[Dict]:
    # `doc`에서 청킹할 필드를 추출합니다.
    text = doc[text_field]
    # 위에서 정의한 `text_splitter` 객체의 `split_text` 메서드를 사용하여 `text`를 분할합니다.
    chunks = text_splitter.split_text(text)
    return chunks

https://docs.voyageai.com/docs/contextualized-chunk-embeddings#approach-2-contextualized-chunk-embeddings

In [None]:
def get_embeddings(content: List[str], input_type: str) -> List[float] | List[List[float]]:
    # Voyage AI API의 `contextualized_embed` 메서드를 사용하여 각 청크의 임베딩을 생성합니다.
    # inputs: 리스트로 감싼 `content`
    # model: `voyage-context-3`
    # input_type: `input_type` 인자값
    embds_obj = vo.contextualized_embed(inputs=[content], model="voyage-context-3", input_type=input_type)
    if input_type == "document":
        embeddings = [emb for r in embds_obj.results for emb in r.embeddings]
    if input_type == "query":
        embeddings = embds_obj.results[0].embeddings[0]
    return embeddings

In [None]:
embedded_docs = []
# 2단계의 `docs`를 순회합니다.
for doc in tqdm(docs):
    # `get_chunks` 함수를 사용하여 각 문서의 "body" 필드를 청킹합니다.
    chunks = get_chunks(doc, "body")
    # 모든 `chunks`를 `get_embeddings` 함수에 전달하여 각 청크의 임베딩을 생성합니다.
    # RAG를 위한 "문서"를 임베딩하므로 `input_type`은 "document"로 설정해야 합니다.
    chunk_embeddings = get_embeddings(chunks, "document")
    # 각 청크에 대해 원본 메타데이터를 가진 새로운 문서를 생성합니다.
    # `body`를 청크 내용으로 교체하고 `embedding` 필드를 추가합니다.
    for chunk, embedding in zip(chunks, chunk_embeddings):
        # 원본 문서를 복사하여 새 문서를 생성합니다.
        chunk_doc = doc.copy()
        # `chunk_doc`의 `body` 필드를 청크 내용으로 교체합니다.
        chunk_doc["body"] = chunk
        # 이 청크의 임베딩 값을 담은 `embedding` 필드를 `chunk_doc`에 추가합니다.
        chunk_doc["embedding"] = embedding
        # `chunk_doc`을 `embedded_docs` 리스트에 추가합니다.
        embedded_docs.append(chunk_doc)

In [None]:
# `embedded_docs`의 길이가 2단계의 `docs` 길이보다 큰 것을 확인하세요.
# 이는 `docs`의 각 문서가 여러 개의 청크로 분할되었기 때문입니다.
len(embedded_docs)

In [None]:
# 청킹된 문서의 구조를 파악하기 위해 하나를 미리 봅니다.
# 원본 문서와 구조가 비슷해 보이지만, `body` 필드에 더 작은 텍스트 조각이 들어있습니다.
# 또한 각 문서에 `embedding` 필드가 추가되었습니다.
embedded_docs[0]

#### chunk_size를 같게 했는데 두번째에 더 긴 결과물이 나온 이유
```python
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=200 # 글자 수 기준
```

```python
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    model_name="gpt-4",
    chunk_size=200 # 토큰 수 기준 (GPT-4)
```

# 4단계: MongoDB로 데이터 입력(Ingest)하기


In [None]:
COLLECTION_NAME = "knowledge_base"
VECTOR_SEARCH_INDEX_NAME = f"{DB_NAME}_rag"

collection = mongodb_client[DB_NAME][COLLECTION_NAME]
collection.delete_many({})

https://pymongo.readthedocs.io/en/stable/examples/bulk.html#bulk-insert


In [None]:
# `embedded_docs`를 위에서 정의한 `collection`에 일괄 입력(bulk insert)합니다. -- 한 줄의 코드로 실행하세요.
collection.insert_many(embedded_docs)

print(f"{collection.count_documents({})}개의 문서가 {COLLECTION_NAME} 컬렉션에 입력되었습니다.")

# 5단계: 벡터 검색 인덱스 생성

In [None]:
# 벡터 인덱스 정의를 생성합니다. 다음 항목들을 지정하세요:
# path: 임베딩 필드의 경로
# numDimensions: 임베딩 차원 수 (사용된 임베딩 모델에 따라 다름)
# similarity: 유사도 측정 방식 (cosine, euclidean, dotProduct 중 하나)
model = {
    "name": VECTOR_SEARCH_INDEX_NAME,
    "type": "vectorSearch",
    "definition": {
        "fields": [
            {
                "type": "vector",
                "path": "embedding",
                "numDimensions": 1024,
                "similarity": "cosine"
            },
            {"type": "filter", "path": "metadata.contentType"},
            {"type": "filter", "path": "updated"}
        ]
    }
}

In [None]:
# `create_index` 함수를 사용하여 `collection` 컬렉션에 위 정의대로 벡터 검색 인덱스를 생성합니다.
create_index(collection, VECTOR_SEARCH_INDEX_NAME, model)

# 진행하기 전에 `check_index_ready` 함수를 사용하여 인덱스가 생성되었고 'READY' 상태인지 확인합니다.
check_index_ready(collection, VECTOR_SEARCH_INDEX_NAME)

# 6단계: 데이터에 대해 벡터 검색 수행하기


### 벡터 검색 함수 정의

https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#ann-examples ("Basic Example" 참고)


In [None]:
# 벡터 검색을 사용하여 사용자 쿼리와 관련된 문서를 검색하는 함수를 정의합니다.
from typing import List, Dict, Optional

def vector_search(user_query: str, content_type: Optional[str] = None, updated: Optional[str] = None) -> List[Dict]:
    query_embedding = get_embeddings([user_query], "query")

    # 2. 동적 필터 조건 생성
    filter_conditions = []

    if content_type:
        filter_conditions.append({"metadata.contentType": content_type})
    
    if updated:
        filter_conditions.append({"updated": {"$gte": updated}})

    # 3. $vectorSearch 스테이지 기본 구성
    vector_search_stage = {
        "index": VECTOR_SEARCH_INDEX_NAME,
        "path": "embedding",
        "queryVector": query_embedding,
        "numCandidates": 150,
        "limit": 20
    }

    if len(filter_conditions) > 1:
        vector_search_stage["filter"] = {"$and": filter_conditions}
    elif len(filter_conditions) == 1:
        vector_search_stage["filter"] = filter_conditions[0]
    
    pipeline = [
        {
            "$vectorSearch": vector_search_stage
        },
        {
            "$project": {
                "_id": 0,
                "body": 1,
                "metadata.productName": 1, 
                "metadata.contentType": 1,
                "updated": 1,
                "score": {"$meta": "vectorSearchScore"}
            }
        }
    ]

    # 실행 및 결과 반환
    results = collection.aggregate(pipeline)
    return list(results)

# 7단계: RAG 애플리케이션 구축하기


### 채팅 프롬프트 생성 함수 정의

In [None]:
# RAG 애플리케이션을 위한 사용자 프롬프트를 생성하는 함수를 정의합니다.
def create_prompt(user_query: str) -> str:
    # `vector_search` 함수를 사용하여 `user_query`와 관련된 문서를 검색합니다.
    vector_search_result = vector_search(user_query)
    # 검색된 문서들을 하나의 문자열로 결합합니다. 각 문서는 두 개의 줄 바꿈("\n\n")으로 구분합니다.
    context = "\n\n".join([doc.get('body') for doc in vector_search_result])
    # 질문과 답변에 필요한 관련 컨텍스트로 구성된 프롬프트를 만듭니다.
    prompt = f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}\n\nQuestion:{user_query}"
    return prompt

### 사용자 쿼리에 답변하는 함수 정의

In [None]:
# 사용자 쿼리에 답변하는 함수를 정의합니다.
def generate_answer(user_query: str) -> None:
    # 위의 `create_prompt` 함수를 사용하여 채팅 프롬프트를 생성합니다.
    prompt = create_prompt(user_query)
    # 채팅 메시지를 AI 모델 프록시로 전송하여 LLM 응답을 받습니다.

    model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"
    
    payload = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1000,
        "temperature": 0, 
        "messages": [
            {
                "role": "user",
                "content": [{"type": "text", "text": prompt}]
            }
        ]
    }
    try:
        # 3. 모델 호출
        response = bedrock_runtime.invoke_model(
            modelId=model_id,
            body=json.dumps(payload)
        )

        # 4. 응답 파싱
        result = json.loads(response.get("body").read())
        answer = result["content"][0]["text"]
        print(answer)

    except Exception as e:
        print(f"Error invoking Bedrock: {e}")

    return answer

### RAG 애플리케이션 쿼리하기


In [None]:
generate_answer("What are some best practices for data backups in MongoDB?")

In [None]:
# 이 단계에서는 LLM이 대화 기록을 기억하지 못한다는 점을 주목하세요.
generate_answer("What did I just ask you?")

# 검색 결과 재순위화(Re-rank)


https://docs.voyageai.com/docs/reranker#python-api (예제 참고)

In [None]:
# 함수에 재순위화(re-ranking) 단계를 추가합니다.
def create_prompt_with_rerank(user_query: str) -> str:
    context = vector_search(user_query)
    documents = [d.get("body") for d in context]

    # Voyage AI API의 `rerank` 메서드를 사용하여 `documents`를 재순위화합니다.
    reranked_documents = vo.rerank(user_query, documents, model="rerank-2.5", top_k=3)
    
    # 재순위화된 문서들을 하나의 문자열로 결합합니다. 각 문서는 두 개의 줄 바꿈("\n\n")으로 구분합니다.
    context = "\n\n".join([d.document for d in reranked_documents.results])
    
    # 질문과 답변에 필요한 관련 컨텍스트로 구성된 프롬프트를 만듭니다.
    prompt = f"Answer the question based only on the following context. If the context is empty, say I DON'T KNOW\n\nContext:\n{context}\n\nQuestion:{user_query}"
    return prompt

def generate_answer_with_rerank(user_query: str) -> None:
    # 위의 `create_prompt` 함수를 사용하여 채팅 프롬프트를 생성합니다.
    prompt = create_prompt_with_rerank(user_query)
    # 채팅 메시지를 AI 모델 프록시로 전송하여 LLM 응답을 받습니다.

    model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"
    
    payload = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1000,
        "temperature": 0, 
        "messages": [
            {
                "role": "user",
                "content": [{"type": "text", "text": prompt}]
            }
        ]
    }
    try:
        # 3. 모델 호출
        response = bedrock_runtime.invoke_model(
            modelId=model_id,
            body=json.dumps(payload)
        )

        # 4. 응답 파싱
        result = json.loads(response.get("body").read())
        answer = result["content"][0]["text"]
        print(answer)

    except Exception as e:
        print(f"Error invoking Bedrock: {e}")

    return answer

In [None]:
generate_answer("My CPU steal metric is high. What should I do?")

In [None]:
generate_answer_with_rerank("My CPU steal metric is high. What should I do?")