# RAG(LlamaIndex)

In [None]:
### 필요한 함수 임폴트
import os
import json
import unicodedata

from dotenv import load_dotenv
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.core.llms import ChatMessage, MessageRole
from llama_index.core import Settings
from llama_index.core import VectorStoreIndex, Document
from llama_index.core import SimpleDirectoryReader
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import QueryEngineTool, ToolMetadata

In [None]:
### SentenceSplitter

text_splitter = SentenceSplitter(
    chunk_size=200,
    chunk_overlap=50
)

### embedding

# 임베딩 모델 생성
embed_model = HuggingFaceEmbedding(
    model_name = 'paraphrase-multilingual-MiniLM-L12-v2',
    device='cpu'
)

### llm

# google api key 불러오기
google_api_key = os.getenv("GOOGLE_API_KEY")
google_api_key = ''


# llm 생성
llm = GoogleGenAI(
    model='gemini-2.5-pro',
    request_timeout=120.0,
    temperature=0.0,
    api_key=google_api_key
)

### settings

# llm, embedding, text_splitter 모델 설정
Settings.llm = llm
Settings.embed_model = embed_model
Settings.text_splitter = text_splitter

### index

import os
import unicodedata

# 파일 경로가 존재하는지 확인합니다.
if not os.path.exists(file_path):
    raise ValueError(f"지정된 파일 '{file_path}'을(를) 찾을 수 없습니다.")

# 파일 경로를 리스트에 담아줍니다.
files = ["/content/drive/MyDrive/임베딩/GCP gemini API 활용/성분 효능.txt"]

# 이제 'files' 리스트에는 지정된 txt 파일의 경로가 하나만 들어 있습니다.
# 이 리스트를 사용하여 문서를 로드하고 RAG를 구축하면 됩니다.
print(f"로드할 파일: {files}")

# 문서 로딩
txt_documents = SimpleDirectoryReader(input_files=files).load_data()

# document 자료 구조 --> 벡터 인덱스 생성: {'Node':임베딩 벡터,...}
'''
# VectorStoreIndex.from_documents(documents) 실행
# 내부적으로 문서 분할 -> 임베딩 -> 저장이 실행
'''

txt_index = VectorStoreIndex.from_documents(txt_documents)

### query_engine

# query_engine 생성
txt_engine = txt_index.as_query_engine(similarity_top_k=10, include_metadata=True)

### queryenginetool

## 도구 생성

# pdf_tool 생성
txt_tool = QueryEngineTool(
        query_engine=txt_engine,
        metadata=ToolMetadata(
            name='txt_tool',
            description='건강기능식품에 들어간 원료의 효능을 알려주는 도구입니다.'
        )
    )

### agent

# 시스템 프롬프트 정의
react_system_prompt = f"""
당신은 건강기능식품 AI 비서입니다. txt_tool을 반드시 활용하세요.
"""

# ReActAgent 생성
healthcare_agent = ReActAgent(
    tools=[txt_tool],
    llm=Settings.llm,
    system_prompt=react_system_prompt,
    verbose=True
)


response = await healthcare_agent.run(user_msg=f"""당신은 소비자에게 건강기능식품 구매 시 꼭 알아야 할 중요한 정보를 전달해주는 전문가입니다.
아래에 두가지 제품이 제시되어 있습니다. {product_result}에 있는 '상세정보'와 txt_tool에 있는 원료의 효능 텍스트를 이용하여 두가지 제품에 대한 효능을 요약하세요.
효능의 근거를 찾기 위해 txt_tool을 사용하여 관련 txt 문서를 참조해야 합니다.
제품 정보에는 고유번호는 적지 마세요.
부정 리뷰와 중립 리뷰 내용을 기반으로 제품을 구매하기 전에 사용자가 '주의해야 할 점'을 세 가지로 요약하세요.
각 주의할 점은 짧은 문장으로 정리하고, '~가 있으니 주의하세요.'와 같은 형식으로 작성하세요.
근거를 찾을 수 없는 경우에는 '출처 없음'이라고 작성하세요.

두가지 제품 : {product_result}

출력은 반드시 아래 JSON 형식으로 작성해야 합니다:
[
    {{
      "제품명": "...",
      "제품의 효능": "...",
      "제품 정보": "...",
      "근거": "..." - txt_tool을 참조하세요,
      "긍정 리뷰": "...",
      "주의할 점": "...",
      "요약": "..."
    }},
    ...
  ]""")
print(str(response))

# RAG(LangChain)

In [None]:
import os
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

from langchain.llms import HuggingFacePipeline
from langchain.prompts import ChatPromptTemplate
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.prompts import PromptTemplate
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [None]:
import pandas as pd
file_path4 = '/content/drive/MyDrive/임베딩/GCP gemini API 활용/reviews_chunked_embeddings4.csv'

df = pd.read_csv(file_path4)

In [None]:
import ast
df['긍정리뷰_임베딩_int'] = df['긍정리뷰_임베딩'].apply(ast.literal_eval)

In [None]:
condition_zinc = (df.loc[:,'카테고리']=='아연')
df_zinc = df.loc[condition_zinc,:]

condition_vitamin = (df.loc[:,'카테고리']=='비타민')
df_vitamin = df.loc[condition_vitamin,:]

condition_probiotics = (df.loc[:,'카테고리']=='유산균')
df_probiotics = df.loc[condition_probiotics,:]

condition_omega3 = (df.loc[:,'카테고리']=='오메가3')
df_omega3 = df.loc[condition_omega3,:]

condition_protein = (df.loc[:,'카테고리']=='단백질')
df_protein = df.loc[condition_protein,:]

In [None]:
import glob

pdf_folder = "/content/drive/MyDrive/RAG/PDF/*.pdf"
pdf_files = glob.glob(pdf_folder)

In [None]:
%%time
# 청크 분할기
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size = 500,
    chunk_overlap = 50,
)

# PDF문서 로드 & 청크 분할
all_docs = []

for pdf_file in tqdm(pdf_files, desc = "PDF Load :"):
    loader = PyMuPDFLoader(pdf_file)
    docs = loader.load()
    all_docs.extend(docs)

all_chunks = text_splitter.split_documents(all_docs)

print(f"총 문서 수: {len(all_chunks)}")

In [None]:
docs = []
for i, c in enumerate(all_chunks):
    docs.append({
        "id": f"doc-{i}",
        "title": c.metadata.get("source", ""),  # PDF 파일명
        "page": c.metadata.get("page", -1),    # 페이지 번호
        "text": c.page_content                 # 실제 텍스트
    })

In [None]:
import os, json, re, textwrap
from typing import List, Dict, Any, Tuple

from sentence_transformers import SentenceTransformer, CrossEncoder
from sklearn.metrics.pairwise import cosine_similarity
from rank_bm25 import BM25Okapi
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

# ==================================================
# 0. 데이터/카테고리 선택
# ==================================================
category = input("원하는 건강기능식품 종류를 입력해주세요.\n\n 건강기능식품 종류: 아연, 비타민, 유산균, 오메가3, 단백질 中 \t")

# 각 카테고리별 데이터프레임이 있다고 가정
if category == '아연':
    df = df_zinc
elif category == '비타민':
    df = df_vitamin
elif category == '유산균':
    df = df_probiotics
elif category == '오메가3':
    df = df_omega3
elif category == '단백질':
    df = df_protein
else:
    raise ValueError("지원하지 않는 카테고리입니다.")

df = df.reset_index(drop=True)

# ==================================================
# 1. 리뷰 기반 추천 (임베딩 + 코사인 유사도)
# ==================================================
embed_model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')

def search_product_from_reviews(query: str, df: pd.DataFrame, top_n: int = 2):
    query_embedding = embed_model.encode([query])
    embeddings_matrix = np.array(df['긍정리뷰_임베딩_int'].tolist())
    similarities = cosine_similarity(query_embedding, embeddings_matrix)[0]

    top_indices = similarities.argsort()[-top_n:][::-1]
    unique_results = []
    seen_ids = set()

    for idx in top_indices:
        product_id = df.iloc[idx]['고유번호']
        if product_id not in seen_ids:
            seen_ids.add(product_id)
            unique_results.append((idx, similarities[idx]))
        if len(unique_results) == top_n:
            break

    return unique_results

def product_result(unique_results, df: pd.DataFrame):
    outputs = []
    for idx, score in unique_results:
        row = df.loc[idx]
        outputs.append(f"제품명: {row['제품명']}")
    return outputs

# ==================================================
# 2. 문서 기반 검색 (BM25 + Dense + RRF + Reranker)
# ==================================================
docs: List[Dict[str, Any]] = []  # TODO: 실제 문서 리스트 로드

EMBEDDING_MODEL = "BAAI/bge-m3"
RERANKER_MODEL = "BAAI/bge-reranker-v2-m3"
emb_model_doc = SentenceTransformer(EMBEDDING_MODEL)

def simple_tokenize_ko(text: str) -> List[str]:
    text = re.sub(r"[^0-9A-Za-z가-힣%·\.\-\s]", " ", text)
    return [t for t in text.split() if t]

bm25_corpus_tokens = [simple_tokenize_ko(d["text"]) for d in docs]
bm25 = BM25Okapi(bm25_corpus_tokens) if docs else None

try:
    import faiss
    use_faiss = True
except Exception:
    faiss = None
    use_faiss = False

if docs:
    doc_embeddings = emb_model_doc.encode([d["text"] for d in docs], batch_size=64, convert_to_numpy=True, show_progress_bar=True)
    if use_faiss:
        dim = doc_embeddings.shape[1]
        index = faiss.IndexFlatIP(dim)
        norms = np.linalg.norm(doc_embeddings, axis=1, keepdims=True) + 1e-12
        normed = doc_embeddings / norms
        index.add(normed.astype('float32'))
    else:
        index = None
else:
    doc_embeddings = np.zeros((0, 384), dtype='float32')
    index = None

def dense_search(query: str, top_k=40) -> List[Tuple[int, float]]:
    if not docs:
        return []
    q = emb_model_doc.encode([query], convert_to_numpy=True)[0]
    q = q / (np.linalg.norm(q) + 1e-12)
    if index is not None and use_faiss:
        D, I = index.search(q[np.newaxis, :].astype('float32'), top_k)
        return [(int(i), float(d)) for i, d in zip(I[0], D[0])]
    sims = (doc_embeddings @ q) / (np.linalg.norm(doc_embeddings, axis=1) + 1e-12)
    top_idx = np.argsort(-sims)[:top_k]
    return [(int(i), float(sims[i])) for i in top_idx]

def sparse_search(query: str, top_k=80) -> List[Tuple[int, float]]:
    if not docs or bm25 is None:
        return []
    tokens = simple_tokenize_ko(query)
    scores = bm25.get_scores(tokens)
    top_idx = np.argsort(-scores)[:top_k]
    return [(int(i), float(scores[i])) for i in top_idx]

def rrf_fuse(dense: List[Tuple[int, float]], sparse: List[Tuple[int, float]], k: int = 60, top_k: int = 50) -> List[int]:
    ranks: Dict[int, float] = {}
    for lst in [dense, sparse]:
        for rank, (idx, _) in enumerate(lst):
            ranks[idx] = ranks.get(idx, 0.0) + 1.0 / (k + rank + 1)
    fused = sorted(ranks.items(), key=lambda x: -x[1])[:top_k]
    return [idx for idx, _ in fused]

try:
    reranker = CrossEncoder(RERANKER_MODEL)
except Exception:
    reranker = None

def rerank(query: str, candidate_ids: List[int], top_k=10) -> List[int]:
    if not candidate_ids:
        return []
    pairs = [[query, docs[i]["text"]] for i in candidate_ids]
    if reranker:
        scores = reranker.predict(pairs)
    else:
        qset = set(simple_tokenize_ko(query))
        scores = [len(qset & set(simple_tokenize_ko(docs[i]["text"]))) for i in candidate_ids]
    order = np.argsort(-np.array(scores))[:top_k]
    return [candidate_ids[i] for i in order]

def build_context(query: str, max_chars: int = 2000, top_k_dense=40, top_k_sparse=40, top_k_final=8) -> Tuple[str, List[Dict[str, Any]]]:
    dense = dense_search(query, top_k=top_k_dense)
    sparse = sparse_search(query, top_k=top_k_sparse)
    fused_ids = rrf_fuse(dense, sparse, k=60, top_k=50)
    final_ids = rerank(query, fused_ids, top_k=top_k_final)

    selected = []
    total = 0
    for i in final_ids:
        d = docs[i]
        snippet = d["text"][:800]
        selected.append({"id": d["id"], "title": d.get("title", ""), "page": d.get("page", -1), "text": snippet})
        total += len(snippet)
        if total >= max_chars:
            break

    ctx_blocks = []
    for s in selected:
        header = f"[문서:{s['id']}] 제목:{s['title']} | 페이지:{s['page']}"
        ctx_blocks.append(header + "\n" + s["text"].strip())
    context_text = "\n\n".join(ctx_blocks)
    return context_text, selected

# ==================================================
# 3. LLM 프롬프트 & 파서
# ==================================================
SCHEMA_JSON = {
  "type": "object",
  "properties": {
    "사용자 입력": {"type": "string"},
    "추천 카테고리": {"type": "string"},
    "추천 제품": {"type": "string"},
    "전문가의 의견": {"type": "string"},
    "근거": {"type": "array", "items": {"type": "string"}},
    "참고 문서": {"type": "array"}
  },
  "required": ["사용자 입력", "추천 제품", "전문가의 의견"]
}

FEWSHOT = """
{
  "사용자 입력": "20대에게 추천하는 비타민",
  "추천 카테고리": "비타민",
  "추천 제품": "종합 비타민",
  "전문가의 의견": "20대는 균형 잡힌 영양 섭취가 필요하므로 종합 비타민이 적합합니다.",
  "근거": ["리뷰 기반 유사도 검색 결과", "제공 문서 참조"]
}

</예시>
위는 참고용입니다. 출력 시 예시 내용을 반복하지 말고, 실제 사용자 입력에 맞는 답변만 작성하세요.

엄격한 규칙:
- 반드시 위의 JSON 키만 사용. 추가 키/주석/머리말 금지.
- "전문가의 의견"은 제공된 문맥에서만 인용/요약. 모르면 "정보 부족"이라고 명시.
- 제공된 문맥 밖 정보를 추정하거나 발명 금지.
"""

SYSTEM_INSTRUCTIONS = f"""
당신은 영양학 전문가입니다.
주어진 리뷰 기반 추천 + 문서 기반 검색을 활용하여 JSON을 생성하세요.
절대 문맥 밖 지식으로 확장하지 마세요. 불충분하면 "정보 부족"을 사용하세요
JSON 스키마:
{json.dumps(SCHEMA_JSON, ensure_ascii=False)}
"""

MODEL_NAME = os.environ.get("HF_LLM", "skt/A.X-4.0-Light")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, padding_side="left")
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
    device_map="auto"
)
textgen = pipeline("text-generation", model=model, tokenizer=tokenizer)

def robust_parse_json(text: str) -> Dict[str, Any]:
    m = re.search(r"\{[\s\S]*\}", text)
    if m:
        cand = m.group(0)
        try:
            return json.loads(cand)
        except Exception:
            pass
    return {"사용자 입력": "", "추천 제품": "", "전문가의 의견": "", "근거": []}

# ==================================================
# 4. 최종 질의 처리
# ==================================================
def answer_query(query: str, category: str, max_new_tokens: int = 256, temperature: float = 0.2) -> Dict[str, Any]:
    # 리뷰 기반 추천
    review_results = search_product_from_reviews(query, df)
    products = product_result(review_results, df)

    # 문서 기반 검색
    context_text, selected = build_context(query)

    prompt = f"""
    [시스템]
    {SYSTEM_INSTRUCTIONS}

    [문맥]
    {context_text}

    [리뷰 기반 추천]
    {products}

    [사용자 입력]
    {query}

    [카테고리]
    {category}

    [지시]
    {FEWSHOT}
    """

    out = textgen(
        prompt,
        max_new_tokens=max_new_tokens,
        do_sample=False,
        temperature=temperature,
        repetition_penalty=1.1,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.eos_token_id,
        return_full_text=False
    )[0]["generated_text"]

    data = robust_parse_json(out)
    if not data.get("추천 카테고리"):
        data["추천 카테고리"] = category
    if not data.get("참고 문서") and selected:
        data["참고 문서"] = [{"title": s["title"], "page": s["page"], "문서ID": s["id"]} for s in selected[:3]]
    return data

# ==================================================
# 5. 출력 렌더링
# ==================================================
def render_ko(data: Dict[str, Any]) -> str:
    parts = [
        f"사용자 입력 : {data.get('사용자 입력','')}",
        f"추천 카테고리 : {data.get('추천 카테고리','')}",
        f"추천 제품 : {data.get('추천 제품','')}",
        f"전문가의 의견 : {data.get('전문가의 의견','')}"
    ]
    if data.get("근거"):
        parts.append("근거 : " + "; ".join(map(str, data["근거"])))
    return "\n".join(parts)

# ==================================================
# 6. 실행부
# ==================================================
if __name__ == "__main__":
    query = input("사용자 질문: ")
    result = answer_query(query, category)
    print(render_ko(result))
