# RAG 파이프라인 (Elasticsearch + Sentence‑BERT + Solar_API)
이 노트북은 전체 파이프라인 코드를 **모든 줄 주석**과 함께 보여줍니다.
각 섹션별로 Markdown 설명 → Code 셀 순서로 구성되어 있습니다.

## 📦 라이브러리 임포트 및 모델 초기화

In [16]:
import os
import json
import jsonlines
import requests
import traceback
import time

from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv

model = SentenceTransformer("snunlp/KR-SBERT-V40K-klueNLI-augSTS")     # 한국어 SBERT 모델 로드

### 오류 메시지 안나오게 설정

In [17]:
import urllib3

# InsecureRequestWarning 끄기
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

### .env 불러오기

In [18]:
load_dotenv()

OPENAI_API_KEY=os.getenv("OPENAI_API_KEY")
ES_PASSWORD=os.getenv("ES_PASSWORD")
SOLAR_API_KEY=os.getenv("SOLAR_API_KEY")
SOLAR_API_URL=os.getenv("SOLAR_API_URL")

print("OPENAI_API_KEY:", OPENAI_API_KEY)
print("ES_PASSWORD:", ES_PASSWORD)
print("SOLAR_API_KEY:", SOLAR_API_KEY)
print("SOLAR_API_URL:", SOLAR_API_URL)

OPENAI_API_KEY: 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
ES_PASSWORD: EoNb_y3k0ijlW0PlKxnt
SOLAR_API_KEY: up_kntyD3xQnR27odlyiYxbsL1c3HsfP
SOLAR_API_URL: https://api.upstage.ai/v1/solar/chat/completions


## 임베딩 함수
1. get_embedding(sentences) 함수
- sentences: 텍스트(또는 텍스트 리스트)가 들어오는 입력.
- model.encode(sentences): model 객체 (보통 Sentence-BERT 같은 임베딩 모델)을 이용해서 입력 문장들을 벡터로 변환하는 함수.
- 결과적으로 문장 하나당 하나의 벡터를 뽑아주는 역할.

2. get_embeddings_in_batches(docs, batch_size=100) 함수
- docs: 여러 개의 문서를 담은 리스트야. (문서 하나는 딕셔너리 형태, 예: {"content": "내용"})
- batch_size: 한 번에 처리할 문서 수. 기본값은 100개.
- 작동 방식:
    - 전체 문서를 batch_size만큼 나눠서,
    - 각 배치마다 문서의 "content"만 뽑아,
    - get_embedding()으로 임베딩을 계산하고,
    - 결과를 하나의 리스트로 이어붙인다.

매 배치마다 진행 상황을 출력해서, "몇 번째 문서까지 처리했는지" 확인

In [19]:
def get_embedding(sentences):               # 문장 리스트를 임베딩으로 변환
    return model.encode(sentences)           # 모델을 이용해 문장들을 임베딩 벡터로 변환


def get_embeddings_in_batches(docs, batch_size=100):  # 배치 단위 임베딩 생성
    batch_embeddings = []                              
    for i in range(0, len(docs), batch_size):          # 0부터 끝까지 batch_size씩 슬라이싱
        batch = docs[i:i + batch_size]                 # 현재 배치 추출
        contents = [doc["content"] for doc in batch]   # 문서 본문만 리스트로 추출
        embeddings = get_embedding(contents)           # 임베딩 계산
        batch_embeddings.extend(embeddings)            # 결과 누적
        print(f'batch {i}')                            # 진행 상황 출력
    return batch_embeddings

## ES 인덱스 관리
1. create_es_index(index, settings, mappings)
- index: 만들고 싶은 인덱스 이름 (예: "test")
- settings: 아까 만든 분석기 설정 (settings 변수)
- mappings: 아까 만든 필드 구조 (mappings 변수)

2. delete_es_index(index)
- index: 삭제하고 싶은 인덱스 이름.
- 이름에 해당하는 인덱스를 삭제

3. bulk_add(index, docs)
- index: 데이터를 집어넣을 인덱스 이름.
- docs: 업로드할 문서들 리스트. (doc는 보통 { "content": "텍스트", "embeddings": [...] } 형태)
- 작동 방식:
    - 문서 리스트를 Elasticsearch가 이해할 수 있는 bulk 작업용 액션 리스트로 바꾼다.
    - helpers.bulk를 사용해서 한 번에 대량 업로드한다.

In [20]:
def create_es_index(index, settings, mappings):        # 새 인덱스 생성 함수
    if es.indices.exists(index=index):                 # 이미 존재하면
        es.indices.delete(index=index)                 # 삭제 후 재생성
    es.indices.create(index=index,                    # 인덱스 생성
                      settings=settings,
                      mappings=mappings)

def delete_es_index(index):                           # 인덱스 삭제 래퍼
    es.indices.delete(index=index)

def bulk_add(index, docs):                            # 대량 색인을 위한 헬퍼
    actions = [{'_index': index, '_source': doc} for doc in docs]  # 액션 목록 생성
    return helpers.bulk(es, actions)                  # helpers.bulk 로 일괄 업로드

## 📚 ES 검색 함수 정리

📊 요약 표

| 함수 이름 | 검색 방식 | 특징 |
|:---|:---|:---|
| `sparse_retrieve` | BM25 (텍스트 기반 역색인 검색) | 정확한 단어 매칭에 강함 |
| `dense_retrieve` | 임베딩 기반 벡터 검색 | 의미 기반 검색에 강함 |
| `hybrid_retrieve` | sparse + dense 점수 병합 | 단어 + 의미 모두 고려하여 검색 |
### 1. `sparse_retrieve(query_str, size)`

- **query_str**: 검색할 키워드(텍스트)
- **size**: 가져올 문서 수
- **작동 방식**:
  - 입력된 `query_str`을 **BM25 방식**으로 `content` 필드에 매칭
  - **_score** 기준으로 관련성 높은 문서를 `size`개 반환
- **특징**:
  - **정확한 키워드 매칭**에 강함
  - 의미 기반 매칭은 약함

---

### 2. `dense_retrieve(query_str, size)`

- **query_str**: 검색할 키워드(텍스트)
- **size**: 가져올 문서 수
- **작동 방식**:
  - `query_str`을 **임베딩 벡터**로 변환
  - 저장된 문서 벡터들과 **거리(L2 norm)** 비교
  - 가장 유사도가 높은 문서 `size`개 반환
- **특징**:
  - **의미 기반 검색**(비슷한 뜻 찾기)에 강함
  - 정확한 키워드 매칭에는 약할 수 있음

---

### 3. `hybrid_retrieve(query_str, size)`

- **query_str**: 검색할 키워드(텍스트)
- **size**: 가져올 문서 수
- **작동 방식**:
  1. `sparse_retrieve`로 **BM25** 기반 검색 결과 가져오기
  2. `dense_retrieve`로 **임베딩** 기반 검색 결과 가져오기
  3. **문서 ID 기준 병합**
  4. **sparse score**와 **dense score**를 **가중 평균**하여 최종 점수 계산
  5. 최종 점수 기준으로 상위 `size`개 문서 선택
- **특징**:
  - **단어 기반 + 의미 기반**을 모두 반영
  - 단독 sparse나 dense보다 **더 강력하고 균형 잡힌 검색 결과** 제공

---



In [21]:
def sparse_retrieve(query_str, size):                 # 역색인(BM25) 검색
    query = {"match": {"content": {"query": query_str}}}
    return es.search(index="test",
                     query=query,
                     size=size,
                     sort="_score")

def dense_retrieve(query_str, size):                  # 벡터 KNN 검색
    query_embedding = get_embedding([query_str])[0]   # 쿼리 임베딩
    knn = {                                           # KNN 파라미터
        "field": "embeddings",
        "query_vector": query_embedding.tolist(),
        "k": size,
        "num_candidates": 100
    }
    return es.search(index="test", knn=knn)           # ES 8.x KNN 검색 호출


def hybrid_retrieve(query_str, size, sparse_weight=0.6, dense_weight=0.4):
    """
    sparse(BM25) + dense(KNN) 결과를 가중 평균하여 결합하는 hybrid retrieval 함수
    - sparse_weight: BM25 점수의 비중
    - dense_weight: dense 점수의 비중
    """

    # sparse(BM25) 검색 결과 가져오기
    sparse_result = sparse_retrieve(query_str, size)

    # dense(KNN) 검색 결과 가져오기
    dense_result = dense_retrieve(query_str, size)

    # 문서 ID를 기준으로 결과를 병합하기 위한 딕셔너리 초기화
    merged_hits = {}

    # sparse 검색 결과를 먼저 처리
    for hit in sparse_result["hits"]["hits"]:
        doc_id = hit["_id"]
        merged_hits[doc_id] = {
            "source": hit["_source"],
            "sparse_score": hit["_score"],
            "dense_score": 0.0
        }

    # dense 검색 결과를 처리
    for hit in dense_result["hits"]["hits"]:
        doc_id = hit["_id"]
        if doc_id in merged_hits:
            merged_hits[doc_id]["dense_score"] = hit["_score"]
        else:
            merged_hits[doc_id] = {
                "source": hit["_source"],
                "sparse_score": 0.0,
                "dense_score": hit["_score"]
            }

    # sparse와 dense 점수를 가중 평균 (비율 조정 가능)
    for doc_id in merged_hits:
        merged_hits[doc_id]["final_score"] = (
            sparse_weight * merged_hits[doc_id]["sparse_score"]
            + dense_weight * merged_hits[doc_id]["dense_score"]
        )

    # 최종 스코어 기준으로 정렬 (높은 점수 순)
    ranked_hits = sorted(merged_hits.items(), key=lambda x: x[1]["final_score"], reverse=True)

    # top size개만 선택
    top_hits = ranked_hits[:size]

    # Elasticsearch 결과처럼 포맷 정리
    results = []
    for doc_id, info in top_hits:
        results.append({
            "_id": doc_id,
            "_score": info["final_score"],
            "_source": info["source"]
        })

    return results


## ES 클라이언트 설정
### 서버 버전과 호환이 안됨. 코드 fix 필요

In [22]:
es_username = "elastic"                               # ES 기본 사용자
es_password = ES_PASSWORD           # ← 실제 비밀번호로 교체

es = Elasticsearch(
    "https://localhost:9200",
    basic_auth=(es_username, es_password),
    verify_certs=False,        # CA 무시(테스트용)
)

print(es.info())  # 이제 정상 출력 (예: 7.17.9)

{'name': 'instance-15246', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'ixlY2lorQmS5rzNDpvAYVQ', 'version': {'number': '8.8.0', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': 'c01029875a091076ed42cdb3a41c10b1a9a5a20f', 'build_date': '2023-05-23T17:16:07.179039820Z', 'build_snapshot': False, 'lucene_version': '9.6.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


  _transport = transport_class(


## 인덱스 설정 & 매핑
Elasticsearch에서 한국어 텍스트 + 임베딩 벡터를 저장할 수 있는 인덱스를 만들기 위한 설정.  

1. settings 부분
- nori 분석기는 Nori Tokenizer를 기반으로 함.
- 조사, 어미, 기호 같은 불필요한 품사 토큰을 제거
    - E(어미), J(조사), SC(구두점), SE(문장구분), SF(마침표), VCN(형용사), VCP(긍정지정사), VX(보조동사)
- 핵심 던어만 남김.

2. mappings 부분
- content: 일반 텍스트 필드, 검색할 때 nori 분석기로 전처리 해서 인덱싱.
- embeddings: 768 차원 벡터 필드를 저장하고, 벡터 검색을 할 수 있도록 설정.

In [23]:
settings = {                                          # 한글용 Nori 분석기 설정
    "analysis": {
        "analyzer": {
            "nori": {                               # 'nori'라는 사용자 정의 분석기를 만든다
                "type": "custom",                   # 직접 필터와 토크나이저를 설정하는 방식
                "tokenizer": "nori_tokenizer",      # 한국어용 기본 Nori 토크나이저 사용
                "decompound_mode": "mixed",         # 복합어는 분리도 하고 원형도 같이 보존(mixed)
                "filter": ["nori_posfilter", "synonym_filter"]        # 필터링 적용
            }
        },
        
        "normalizer": {
            "lowercase_normalizer": {
            "type": "custom",
            "filter": ["lowercase"]
            }
        },

        "filter": {
            "synonym_filter": {
                "type": "synonym",
                # "synonyms_path": "/usr/share/elasticsearch/config/synonym_filter.txt", # 파일 경로 # 헬륨, helium, HeLiUm
                "synonyms": ["운동, 체육, 스포츠"] # 헬륨, helium, HeLiUm
                # "updateable": True  #  (7.3 이상) 동적 업데이트
            },
            "nori_posfilter": {  # 사용자 정의 품사 필터
                "type": "nori_part_of_speech",  # 품사 기반 필터링을 수행
                "stoptags": ["E", "J", "SC", "SE", "SF", "VCN", "VCP", "VX"]  
                # 이 품사에 해당하는 토큰들은 제거한다
                # E(어미), J(조사), SC(구두점), SE(문장구분), SF(마침표), VCN(형용사), VCP(긍정지정사), VX(보조동사)
            }
        }
    }
}

mappings = {
    "properties": {
        "content": {"type": "text", "analyzer": "nori"},  # 'content' 필드는 위에서 정의한 nori 분석기로 분석
        "embeddings": {  
            "type": "dense_vector",  # 'embeddings' 필드는 밀집 벡터(dense vector)로 저장
            "dims": 768,  # 임베딩 벡터 차원 수는 768 (ex: BERT base 모델 output 차원)
            "index": True,  # 벡터를 검색(indexing) 가능하게 설정
            "similarity": "l2_norm"  # 벡터 유사도는 L2 노름(유클리드 거리) 기반으로 측정
        }
    }
}

create_es_index("test", settings, mappings)           # ‘test’ 인덱스 생성


## 문서 로드 & 색인

In [24]:
index_docs = []                                       # 색인용 문서 리스트
with jsonlines.open("./data/documents.jsonl") as reader:
    docs = list(reader)                # 각 줄이 dict로 바로 변환됨

embeddings = get_embeddings_in_batches(docs)          # 임베딩 배치 생성

for doc, emb in zip(docs, embeddings):                # 문서와 임베딩 병합
    doc["embeddings"] = emb.tolist()                  # numpy → list 변환
    index_docs.append(doc)

ret = bulk_add("test", index_docs)                    # ES에 대량 색인
print(ret)                                            # 색인 결과 출력

batch 0
batch 100
batch 200
batch 300
batch 400
batch 500
batch 600
batch 700
batch 800
batch 900
batch 1000
batch 1100
batch 1200
batch 1300
batch 1400
batch 1500
batch 1600
batch 1700
batch 1800
batch 1900
batch 2000
batch 2100
batch 2200
batch 2300
batch 2400
batch 2500
batch 2600
batch 2700
batch 2800
batch 2900
batch 3000
batch 3100
batch 3200
batch 3300
batch 3400
batch 3500
batch 3600
batch 3700
batch 3800
batch 3900
batch 4000
batch 4100
batch 4200
(4272, [])


## 검색 예시 실행 

In [25]:
test_query = "금성이 다른 행성들보다 밝게 보이는 이유는 무엇인가요?"  # 샘플 쿼리

search_result_retrieve = sparse_retrieve(test_query, 3)  # BM25 검색
for hit in search_result_retrieve['hits']['hits']:       # 결과 출력
    print('sparse score:', hit['_score'],
          'source:', hit['_source']["content"])

search_result_retrieve = dense_retrieve(test_query, 3)   # 벡터 검색
for hit in search_result_retrieve['hits']['hits']:
    print('dense score:', hit['_score'],
          'source:', hit['_source']["content"])
    
search_result_retrieve = hybrid_retrieve(test_query, 3)   # 하이브리드 검색
for hit in search_result_retrieve:
    print('hybrid score:', hit['_score'],
          'source:', hit['_source']["content"])

sparse score: 33.710434 source: 금성이 다른 행성들보다 더 밝게 보이는 이유는 지구 쪽으로 가장 많은 햇빛을 반사하기 때문입니다. 케빈은 맑은 밤에 하늘을 관찰하고 있습니다. 그는 맨눈으로 금성, 화성, 목성, 토성을 볼 수 있습니다. 금성은 햇빛을 많이 반사하기 때문에 다른 행성들보다 더 밝게 보입니다. 이는 금성의 표면이 반사율이 높기 때문입니다. 금성은 태양으로부터 받은 햇빛을 표면에 반사하여 지구에서 관찰하기 쉽게 만듭니다. 따라서 케빈은 맑은 밤에 금성을 더 밝게 볼 수 있습니다.
sparse score: 18.925915 source: 금성은 태양계의 두 번째로 가까운 행성입니다. 이 행성의 대략적인 나이는 7억 5천만 년으로 추정됩니다. 금성은 지구와 매우 비슷한 크기와 구성을 가지고 있으며, 약 90% 이상이 이산화탄소로 이루어져 있습니다. 이 행성은 매우 뜨거운 온도와 압력을 가지고 있어서 인간이 살 수 있는 환경이 아닙니다. 금성의 대기는 두꺼워서 태양의 열을 가두고 있어서 행성의 표면은 평균 온도가 약 450도로 매우 뜨거운 상태입니다. 또한, 금성은 자전 속도가 매우 빠르기 때문에 하루가 지구의 약 243일과 같습니다. 이러한 특징들로 인해 금성은 우리 태양계에서 가장 가혹한 환경을 가진 행성 중 하나로 알려져 있습니다.
sparse score: 18.749123 source: 메릴랜드 Space Grant Observatory는 볼티모어에 위치해 있습니다. 학생들은 이 망원경을 방문하여 별, 행성, 달에 대해 배웠습니다. 그들은 아래와 같은 정보를 기록했습니다. 첫째, 별 패턴은 그대로 유지되지만, 하늘에서의 위치는 변하는 것처럼 보입니다. 둘째, 태양, 행성, 달은 하늘에서 움직이는 것처럼 보입니다. 셋째, 켄타우루스자리의 프록시마 성은 우리 태양계에서 가장 가까운 별입니다. 넷째, 북극성은 소 북두칠성이라 불리는 별 패턴의 일부입니다. 그렇다면 태양이 매일 하늘을 가로질러 움직이는 것처럼 보이는 이유는 무엇일

### SOLAR CALL

In [None]:
def call_solar(messages, model="solar-1-mini-chat", temperature=0.0, top_p=0.9):
    headers = {
        "Authorization": f"Bearer {SOLAR_API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
        "top_p": top_p,
        "n": 1
    }
    
    response = requests.post(SOLAR_API_URL, headers=headers, json=payload)

    if response.status_code == 200:
        return response.json()  # ✅ 여기! .text 말고 .json()으로!!
    else:
        print(f"Solar API Error {response.status_code}: {response.text}")
        raise Exception(f"Solar API 호출 실패 (status_code: {response.status_code})")


## RAG 핵심 로직
### open_ai 이용

In [None]:
import yaml

# persona.yaml 불러오기
with open("persona.yaml", "r", encoding="utf-8") as f:
    persona_config = yaml.safe_load(f)

# 변수로 꺼내 쓰기
persona_function_calling = persona_config["persona_function_calling"]
persona_qa = persona_config["persona_qa"]

llm_model = "solar-pro"                             # 사용할 LLM 이름


In [None]:
def answer_question(messages):  # 대화 기록(messages)을 입력 받아 답변(response)을 생성하는 함수
    response = {  # 반환할 결과를 저장할 빈 딕셔너리 초기화
        "standalone_query": "",  # 검색용 최종 쿼리 저장
        "topk": [],              # 검색된 문서들의 docid 리스트 저장
        "references": [],        # 검색된 문서들의 score와 내용 저장
        "answer": ""             # 최종 생성된 답변 텍스트 저장
    }

    # ✅ 1차 호출 준비 (질문이 과학 관련인지 판단하기 위한 system 메시지 추가)
    msg = [{"role": "system", "content": persona_function_calling}] + messages  # 시스템 페르소나 + 사용자 메시지 합치기
    try:
        result_json = call_solar(msg, model=llm_model, temperature=0.0)  # Solar API 호출 (결과를 JSON으로 받음)
    except Exception:
        traceback.print_exc()  # 오류 발생 시 에러 트레이스 출력
        response["answer"] = "답변을 생성하는 도중 오류가 발생했습니다."  # 오류 메시지 세팅
        return response  # 빈 결과 반환

    # ✅ 1차 호출 결과에서 생성된 assistant 메시지 가져오기
    message = result_json["choices"][0]["message"]  # Solar 결과에서 첫 번째 응답 메시지 꺼내기
    tool_calls = message.get("tool_calls", None)    # tool_calls (함수 호출 여부) 확인

    # ✅ 검색이 필요한 경우 (tool_calls 존재)
    if tool_calls:
        tool_call = tool_calls[0]  # 첫 번째 tool_call 사용
        args = json.loads(tool_call["function"]["arguments"])  # tool_call 안의 arguments를 dict로 변환
        query = args.get("standalone_query")  # standalone_query (검색 쿼리) 추출

        search_result = sparse_retrieve(query, 3)  # sparse(BM25) 검색 실행하여 관련 문서 3개 가져오기
        response["standalone_query"] = query  # 검색 쿼리 기록

        context = []  # 검색된 문서 본문 리스트 초기화
        for hit in search_result['hits']['hits']:  # 검색 결과 문서들을 순회
            context.append(hit["_source"]["content"])  # 문서 내용 추가
            response["topk"].append(hit["_source"]["docid"])  # 문서 docid 추가
            response["references"].append({  # 문서 score와 내용 추가
                "score": hit["_score"],
                "content": hit["_source"]["content"]
            })

        # ✅ 검색 결과를 assistant 메시지로 추가
        messages.append({
            "role": "assistant",
            "content": json.dumps(context)  # 검색 문서들을 JSON 문자열로 변환
        })

        # ✅ 2차 호출 준비 (검색 문서를 참고해서 최종 답변 생성)
        qa_msg = [{"role": "system", "content": persona_qa}] + messages  # 시스템 페르소나(답변 생성용) + 메시지 합치기

        try:
            qa_result_json = call_solar(qa_msg, model=llm_model, temperature=0.0)  # Solar API 다시 호출 (최종 답변용)
        except Exception:
            traceback.print_exc()  # 오류 발생 시 에러 트레이스 출력
            return response  # 빈 결과 반환

        response["answer"] = qa_result_json["choices"][0]["message"]["content"]  # 최종 생성된 답변을 response에 저장
    else:
        # ✅ 검색 없이 바로 답변하는 경우
        response["answer"] = message["content"]  # 1차 호출 결과를 바로 답변으로 사용

    return response  # 최종 생성된 response 반환


## 평가 루프 
### OPENAI

In [None]:
import requests


def eval_rag(eval_filename, output_filename):
    with jsonlines.open(eval_filename) as reader, open(output_filename, "w") as of:
        for idx, j in enumerate(reader):
            print(f'Test {idx}\nQuestion: {j["msg"]}')
            
            try:
                resp = answer_question(j["msg"])  # Solar API를 사용하는 answer_question 호출
            except Exception as e:
                print(f"Error during answering: {e}")
                resp = {"standalone_query": "", "topk": [], "references": [], "answer": "답변 생성 실패"}

            print(f'Answer: {resp["answer"]}\n')

            out = {
                "eval_id": j["eval_id"],
                "standalone_query": resp["standalone_query"],
                "topk": resp["topk"],
                "answer": resp["answer"],
                "references": resp["references"]
            }
            of.write(json.dumps(out, ensure_ascii=False) + "\n")

            time.sleep(0.5)  # ✅ 요청 간 딜레이 추가

eval_rag("./data/eval1.jsonl", "sample_submission2.csv")  # 위에서 정의한 eval_rag 함수를 호출하여 평가를 시작한다