# Information Retrieval

## 검색엔진 준비 - Elasticsearch




In [1]:
# 엘라스틱서치의 데몬 인스턴스 만들기
import os
import json
from elasticsearch import Elasticsearch, helpers
from subprocess import Popen, PIPE, STDOUT

es_server = Popen(['elasticsearch-8.8.0/bin/elasticsearch'],
                  stdout=PIPE, stderr=STDOUT,
                  preexec_fn=lambda: os.setuid(1)  # as daemon
                 )

# 인스턴스를 로드하는 데 약간의 시간이 걸림
import time
time.sleep(60)

In [2]:
# # 데몬이 구동되었는지 확인 (세개의 daemon process가 있어야 함)
# !ps -ef | grep elasticsearch

In [None]:
import os
import json
from elasticsearch import Elasticsearch, helpers

es_username = 'elastic'

# 위 명령 실행 결과의 마지막 부분인 PASSWORD elastic 값으로 교체 필요
es_password = 'LMq-dSVZ=+7e_KsHpI5n'

# Elasticsearch client 생성
es = Elasticsearch(['https://localhost:9200'], basic_auth=(es_username, es_password), ca_certs="./elasticsearch-8.8.0/config/certs/http_ca.crt")

# Elasticsearch client 정보 확인
print(es.info())

In [None]:
from sentence_transformers import SentenceTransformer

# Sentence Transformer 모델 초기화 (한국어 임베딩 생성 가능한 어떤 모델도 가능)
model = SentenceTransformer("monologg/koelectra-base-v3-discriminator")


# SetntenceTransformer를 이용하여 임베딩 생성
def get_embedding(sentences):
    return model.encode(sentences)


# 주어진 문서의 리스트에서 배치 단위로 임베딩 생성
def get_embeddings_in_batches(docs, batch_size=100):
    batch_embeddings = []
    for i in range(0, len(docs), batch_size):
        batch = docs[i:i + batch_size]
        contents = [doc["content"] for doc in batch]
        embeddings = get_embedding(contents)
        batch_embeddings.extend(embeddings)
        print(f'batch {i}')
    return batch_embeddings


# 새로운 index 생성
def create_es_index(index, settings, mappings):
    # 인덱스가 이미 존재하는지 확인
    if es.indices.exists(index=index):
        # 인덱스가 이미 존재하면 설정을 새로운 것으로 갱신하기 위해 삭제
        es.indices.delete(index=index)
    # 지정된 설정으로 새로운 인덱스 생성
    es.indices.create(index=index, settings=settings, mappings=mappings)


# 지정된 인덱스 삭제
def delete_es_index(index):
    es.indices.delete(index=index)


# Elasticsearch 헬퍼 함수를 사용하여 대량 인덱싱 수행
def bulk_add(index, docs):
    # 대량 인덱싱 작업을 준비
    actions = [
        {
            '_index': index,
            '_source': doc
        }
        for doc in docs
    ]
    return helpers.bulk(es, actions)


# 역색인을 이용한 검색
def sparse_retrieve(query_str, size):
    query = {
        "match": {
            "content": {
                "query": query_str
            }
        }
    }
    return es.search(index="test", query=query, size=size, sort="_score")


# Vector 유사도를 이용한 검색
def dense_retrieve(query_str, size):
    # 벡터 유사도 검색에 사용할 쿼리 임베딩 가져오기
    query_embedding = get_embedding([query_str])[0]

    # KNN을 사용한 벡터 유사성 검색을 위한 매개변수 설정
    knn = {
        "field": "embeddings",
        "query_vector": query_embedding.tolist(),
        "k": size,
        "num_candidates": 100
    }

    # 지정된 인덱스에서 벡터 유사도 검색 수행
    return es.search(index="test", knn=knn)


In [5]:
# 색인을 위한 setting 설정
settings = {
    "analysis": {
        "analyzer": {
            "nori": {
                "type": "custom",
                "tokenizer": "nori_tokenizer",
                "decompound_mode": "mixed",
                "filter": ["nori_posfilter"]
            }
        },
        "filter": {
            "nori_posfilter": {
                "type": "nori_part_of_speech",
                # 어미, 조사, 구분자, 줄임표, 지정사, 보조 용언 등
                "stoptags": ["E", "J", "SC", "SE", "SF", "VCN", "VCP", "VX"]
            }
        }
    }
}

# 색인을 위한 mapping 설정 (역색인 필드, 임베딩 필드 모두 설정)
mappings = {
    "properties": {
        "content": {"type": "text", "analyzer": "nori"},
        "embeddings": {
            "type": "dense_vector",
            "dims": 768,
            "index": True,
            "similarity": "l2_norm"
        }
    }
}


In [6]:
# settings, mappings 설정된 내용으로 'test' 인덱스 생성
create_es_index("test", settings, mappings)

In [None]:
# 문서의 content 필드에 대한 임베딩 생성
index_docs = []
with open("./data/documents.jsonl") as f:
    docs = [json.loads(line) for line in f]
embeddings = get_embeddings_in_batches(docs)

# 생성한 임베딩을 색인할 필드로 추가
for doc, embedding in zip(docs, embeddings):
    doc["embeddings"] = embedding.tolist()
    index_docs.append(doc)

# 'test' 인덱스에 대량 문서 추가
ret = bulk_add("test", index_docs)

# 색인이 잘 되었는지 확인 (색인된 총 문서수가 출력되어야 함)
print(ret)


In [8]:
# 검색엔진에 색인이 잘 되었는지 테스트하기 위한 질의
test_query = "금성이 다른 행성들보다 밝게 보이는 이유는 무엇인가요?"

In [None]:
# 역색인을 사용하는 검색 예제
search_result_retrieve = sparse_retrieve(test_query, 3)

# 결과 출력 테스트
for rst in search_result_retrieve['hits']['hits']:
    print('score:', rst['_score'], 'source:', rst['_source']["content"])

In [None]:
# Vector 유사도 사용한 검색 예제
search_result_retrieve = dense_retrieve(test_query, 3)

# 결과 출력 테스트
for rst in search_result_retrieve['hits']['hits']:
    print('score:', rst['_score'], 'source:', rst['_source']["content"])

## RAG 구현

준비된 검색엔진과 LLM을 활용하셔 대화형 RAG 구현

In [None]:
import transformers
import torch

model_id = "rtzr/ko-gemma-2-9b-it"

pipeline = transformers.pipeline(
    "text-generation",
    model=model_id,
    model_kwargs={"torch_dtype": torch.bfloat16},
    device_map="auto",
)

In [12]:
def request_prompt(message, max_new_tokens=100):
    with torch.no_grad():  # cuda 메모리 확보하기 위해 
        try:
            prompt = pipeline.tokenizer.apply_chat_template(
                message, 
                tokenize=False, 
                add_generation_prompt=True
            )

            terminators = [
                pipeline.tokenizer.eos_token_id,
                pipeline.tokenizer.convert_tokens_to_ids("<end_of_turn>")
            ]

            outputs = pipeline(
                prompt,
                max_new_tokens=max_new_tokens,
                eos_token_id=terminators,
                do_sample=False,
            )
            return outputs[0]["generated_text"][len(prompt):]
        except Exception as e:
            print(e)
            return ""

In [13]:
import re

def extract_standalone_query(text):
    # query 추출 (query: 뒤에 있는 문자열 추출)
    query_match = re.search(r'"query"\s*:\s*"([^"]*)"', text)
    query = query_match.group(1) if query_match else ""

    # # keywords 추출 (keywords: 뒤에 있는 리스트 추출)
    # keywords_match = re.search(r'"keywords"\s*:\s*(\[[^\]]*\])', text)
    # try:
    #     keywords = json.loads(keywords_match.group(1)) if keywords_match else []
    # except json.JSONDecodeError:
    #     keywords = []

    # # standalone_query 생성
    # if query and keywords:
    #     standalone_query = f"{query}. {', '.join(keywords)}"
    # elif query:
    #     standalone_query = query
    # elif keywords:
    #     standalone_query = ', '.join(keywords)
    # else:
    #     standalone_query = ""

    return query

def extract_topk(text):
    topk_match = re.search(r'"topk"\s*:\s*(\[[^\]]*\])', text)
    try:
        topk = json.loads(topk_match.group(1)) if topk_match else []
    except json.JSONDecodeError:
        topk = []
    return topk

def extract_answer(text):
    answer_match = re.search(r'"answer"\s*:\s*"([^"]+)"', text)
    answer = answer_match.group(1) if answer_match else ""
    return answer

In [None]:
# RAG 구현에 필요한 질의 분석 및 검색 이외의 일반 질의 대응을 위한 LLM 프롬프트
persona_for_query = """당신은 과학 기술에 특화된 상식 RAG 시스템입니다.
사용자 대화 이력을 참조해서 요약 query를 명사구로 생성 후, JSON 포맷으로 반환해 주세요.
응답의 상세한 조건은 다음을 따르세요:
1. **입력 정보**: 사용자 대화 이력
2. **출력 형식**: JSON 포맷으로 다음 정보를 포함해 주세요:
   - "query": 문맥과 검색 의도가 정확히 드러나는 명사구 요약 질의문(한글, 1문장). 사용자 대화 이력이 일상 대화라면 공백.
   - "answer": 사용자 대화 이력이 일상 대화라면 쓰레드 종료 응답, 아니라면 공백.
3. **예시**:
   - 과학이나 기술 상식 출력 예시: {"query":"카를로 로벨리의 루프 양자 중력 이론이 현대 물리학에 미친 영향"}
   - 일반대화 출력 예시: {"answer":"저는 과학 지식이나 일반 상식에 대한 답변이 가능합니다."}
4. **주의사항**
   - 응답은 반드시 예시처럼 JSON Body 만 출력하세요.
   - 당신에 관한 대화는 쿼리를 생성할 필요없는 일상 대화입니다. 바로 응답을 작성하세요.
"""
print(persona_for_query)

In [None]:
persona_for_answer = """당신은 과학 기술에 특화된 상식 RAG 시스템입니다.
query에 관한 답변을 작성하기 위해 참고할 content의 id를 관련도 높은 순으로 3개 반환해주세요.
응답의 상세한 조건은 다음을 따르세요:
1. **입력 정보**: query와 참고 문서
   - query: 질의
   - references : 답변 작성에 참조할 문서(id, content)
   - 입력 예시: {
   "query":"뭉게구름 발생 시 강수량 예측",
   "references":[
      {
         "id":0,
         "content":"건강한 아침 식사를 하는 사람은..."
      },
      {
         "id":1,
         "content":"적운은 하층 대기에서 주로 맑은 날씨에 나타나는..."
      },
      {
         "id":2,
         "content":"지진은 지각판의 움직임으로 인해 발생하는..."
      },
      {
         "id":3,
         "content":"적운은 보통 비와는 관련이 없지만, 특정 조건에서는..."
      }
   ]}
2. **출력 예시**: [3,1]
3. **주의사항**
   - 응답은 반드시 예시처럼 문서 id 배열만 출력하세요.
"""
print(persona_for_answer)

In [16]:
def generate_input_json(query, refs):
    references = []
    for ref in refs:
        # "id"와 "content"를 추출
        id_part = ref.split(", ")[0].split(":")[1].strip()
        content_part = ref.split(", content:")[1].strip()

        # 딕셔너리 형태로 저장
        references.append({
            "id": id_part,
            "content": content_part
        })

    # JSON 형식으로 결과 반환
    formatted_result = {
        "query": query.strip(),
        "references": references
    }
    
    # JSON 문자열로 변환하여 반환
    return json.dumps(formatted_result, ensure_ascii=False)

In [17]:
def fill_topk(topk, topk_original):
    # topk에서 제거된 항목을 추적
    removed_items = [item for item in topk if item not in topk_original]
    
    # topk에 있는 항목 중 topk-original에 없는 항목을 제거
    topk = [item for item in topk if item in topk_original]
    
    # topk 개수에 따른 분포 확인
    original_topk_count = len(topk)
    topk_distribution_before = f"topk {original_topk_count}개: {topk}"

    # topk의 항목이 3개 미만일 경우, 이미 있는 topk 항목을 제외한 topk-original의 항목으로 채우기
    if len(topk) < 3:
        remaining_items = [item for item in topk_original if item not in topk]
        topk += remaining_items[:3 - len(topk)]

    # 채우기 후의 결과 확인
    topk_distribution_after = f"topk {len(topk)}개: {topk}"

    print(f"총 id 오류 : {len(removed_items)}개, {removed_items}")
    print(f"topk 채우기 전: {topk_distribution_before}")
    print(f"topk 채우기 후: {topk_distribution_after}")

    return topk

def generate_answer(messages, standalone_query=None, max_k=3):
    response = {
        "standalone_query": "", 
        "references": [], 
        "topk": [],
        "topk-original": [],
        "answer": "", 
        }
    
    # 쿼리 생성
    if not standalone_query:
        msg = [{"role": "system", "content": persona_for_query}] + messages
        result = request_prompt(msg)
        # print(f"plain result : {result}")

        answer = extract_answer(result)
        # print(f"answer: {answer}")
        if answer: # 일반 대화
            response["answer"] = answer
            return response
        
        standalone_query = extract_standalone_query(result)
    print(f"standalone_query : {standalone_query}")

    # 문서 검색
    response["standalone_query"] = standalone_query
    search_result = sparse_retrieve(standalone_query, max_k)

    refs = []
    for i,rst in enumerate(search_result['hits']['hits']):
        docid = rst["_source"]["docid"]
        content = rst["_source"]["content"]
        refs.append(f"id:{i}, content:{content}") # llm에 리랭킹 요청 시 uuid 할루시네이션 이슈로 숫자 처리

        response["topk-original"].append(docid)
        response["references"].append({"score": rst["_score"], "content": content})

    # 답변 생성 및 re-ranking
    input_json = generate_input_json(standalone_query, refs)
    print(f"input json : {input_json}")

    msg = [{"role": "system", "content": persona_for_answer}, {"role": "user", "content": input_json}]
    # print(f"prompt : {msg}")

    result = request_prompt(msg, 16)
    print(f"plain result : {result}")

    # response["answer"] = extract_answer(result)
    # response["topk"] = extract_topk(result)

    topk = [int(num) for num in re.findall(r'\d+', result)]
    topk = [search_result['hits']['hits'][index]["_source"]["docid"] for index in topk]
    response["topk"] = fill_topk(topk, response["topk-original"])

    return response

In [18]:
from datetime import datetime
import time
from zoneinfo import ZoneInfo
import gc

def eval_rag(eval_filename, use_pre_query=False, max_k=3):
    current_time = datetime.fromtimestamp(time.time(), tz=ZoneInfo("Asia/Seoul")).strftime("%m%d-%H%M")
    output_filename = f"output/{current_time}_topk{max_k}.csv"

    with open(eval_filename) as f, open(output_filename, "w") as of:
        idx = 0
        for line in f:
            # if idx > 5:
            #   break
            
            j = json.loads(line)
            standalone_query = j["standalone_query"] if use_pre_query else None
            response = generate_answer(j["msg"], standalone_query, max_k)

            output = {
                "eval_id": j["eval_id"], 
                "standalone_query": response["standalone_query"],
                "answer": response["answer"],  
                "references": response["references"],
                "topk": response["topk"],
                "topk-original": response["topk-original"]
                }
            print(f'{json.dumps(output,indent=4, ensure_ascii=False)}\n')

            of.write(f'{json.dumps(output, ensure_ascii=False)}\n')
            idx += 1

            del response
            del output
            gc.collect()
            torch.cuda.empty_cache()

In [None]:
# 평가 데이터에 대해서 결과 생성 - 파일 포맷은 jsonl이지만 파일명은 csv 사용
eval_file = "./data/eval.jsonl"
eval_rag(eval_file, max_k=3)
# eval_rag(eval_file, max_k=4)
# eval_rag(eval_file, max_k=5)
# eval_rag(eval_file, max_k=6)
# eval_rag(eval_file, max_k=7)
# eval_rag(eval_file, max_k=8)
# eval_rag(eval_file, max_k=9)
# eval_rag(eval_file, max_k=10)