# Chapter 6: OSTEP RAG 통합 (검색→프롬프트→생성)

이 노트북은 Chapter 2–5에서 생성한 산출물(청크/임베딩/FAISS 인덱스)을 재사용하여, 검색→프롬프트→생성까지 단일 RAG 파이프라인을 시연합니다.

## 📚 학습 목표
- 기존 산출물 로드만으로 RAG 통합 파이프라인 구성
- FAISS HNSW 인덱스를 이용한 Top-K 검색(Top-K=10)
- Ollama `llama3.1`로 구조화 프롬프트 기반 응답 생성

## 📋 실습 구성
1) 설정/하이퍼파라미터 정의(경로/모델/검색 파라미터)
2) 산출물 로드(청크 JSON, 인덱스/메타)
3) 임베딩 모델 로드(`all-MiniLM-L6-v2`)
4) 검색 함수 정의(retrieve)
5) 프롬프트/생성(헬퍼 함수)
6) 통합 함수(rag_answer)
7) 데모 실행(질문→답변+출처)


In [None]:
# ========================================
# 1️⃣ 설정 / 하이퍼파라미터 / 의존성 임포트
#   - 경로/모델/검색 파라미터는 상단 변수에서 통일 관리
# ========================================
import os
import json
import faiss
import numpy as np
import requests
import torch
from pathlib import Path
from typing import List, Dict, Any

# 재현성
SEED = 42
np.random.seed(SEED)

# 디바이스
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# 경로 (Chapter 4 산출물 기준)
INDEX_FILE = "/home/kbs1102/workspace/OSTEP_RAG/data/index/ostep_hnsw.index"
INDEX_META_FILE = "/home/kbs1102/workspace/OSTEP_RAG/data/index/ostep_hnsw_metadata.json"
CHUNK_FILE = "/home/kbs1102/workspace/OSTEP_RAG/data/chunk/ostep_tok400_ov20.json"

# 검색 하이퍼파라미터
TOP_K = 10
EF_SEARCH = 64           # HNSW efSearch (검색 품질/속도 트레이드오프)
MIN_SCORE = 0.0          # 필터 임계값 (0.0이면 필터 없음)

# 임베딩 모델 (쿼리 전용)
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
NORMALIZE = True         # 코사인 유사도 기반 점수화를 위해 정규화 사용

# LLM(Ollama) 하이퍼파라미터
OLLAMA_HOST = "http://localhost:11434"
LLM_MODEL = "llama3.1:8b"
TEMPERATURE = 0.2
TOP_P = 0.9
MAX_TOKENS = 512
REPEAT_PENALTY = 1.1

print(f"🔧 Device: {DEVICE}")
print(f"📁 Index: {INDEX_FILE}")
print(f"📁 Chunks: {CHUNK_FILE}")
print(f"🔎 TOP_K={TOP_K}, efSearch={EF_SEARCH}")
print(f"🧠 Embed: {EMBED_MODEL}")
print(f"🤖 LLM: {LLM_MODEL} @ {OLLAMA_HOST}")


---
## 2️⃣ 산출물 로드

Chapter 2–4에서 생성한 청크 JSON과 FAISS 인덱스/메타 정보를 로드합니다.


In [None]:
# 청크 로드
with open(CHUNK_FILE, 'r', encoding='utf-8') as f:
    CHUNKS = json.load(f)
print(f"✓ Chunks loaded: {len(CHUNKS)} entries")

# 인덱스/메타 로드
INDEX = faiss.read_index(INDEX_FILE)
print(f"✓ Index loaded: {INDEX.ntotal} vectors")

with open(INDEX_META_FILE, 'r', encoding='utf-8') as f:
    INDEX_META = json.load(f)
print("Index metadata:")
for k, v in INDEX_META.items():
    print(f"  - {k}: {v}")

# efSearch 적용
faiss.ParameterSpace().set_index_parameter(INDEX, "efSearch", EF_SEARCH)
print(f"efSearch set to {EF_SEARCH}")

# 샘플 출력
print("\nSample chunk:")
sample = CHUNKS[0]
for key, value in sample.items():
    if key == 'text':
        print(f"  {key}: {value[:100]}...")
    else:
        print(f"  {key}: {value}")


---
## 3️⃣ 임베딩 모델 로드 (쿼리 전용)

`sentence-transformers/all-MiniLM-L6-v2`를 로드하여 쿼리를 임베딩합니다. 검색 점수는 정규화된 L2 거리로부터 유사도(≈ 1 - distance/2)로 변환합니다.


In [None]:
from sentence_transformers import SentenceTransformer

print(f"Loading model: {EMBED_MODEL} on {DEVICE}")
embed_model = SentenceTransformer(EMBED_MODEL, device="cpu" if DEVICE=="cpu" else DEVICE)
print("✓ Embed model ready")


---
## 4️⃣ 검색 함수 정의

쿼리 → 임베딩 → FAISS HNSW 검색 → 점수 변환 → 상위 K개의 컨텍스트를 반환합니다.


In [None]:
from dataclasses import dataclass

@dataclass
class RetrievedChunk:
    index: int
    score: float
    chunk: Dict[str, Any]

def _distance_to_similarity(distances: np.ndarray) -> np.ndarray:
    # 정규화된 벡터의 L2 거리 → 유사도 근사: 1 - d/2
    return 1.0 - (distances / 2.0)

def retrieve(query: str, k: int = TOP_K, ef_search: int = EF_SEARCH, min_score: float = MIN_SCORE) -> List[RetrievedChunk]:
    faiss.ParameterSpace().set_index_parameter(INDEX, "efSearch", ef_search)
    q_emb = embed_model.encode([query], convert_to_numpy=True, normalize_embeddings=NORMALIZE).astype('float32')
    D, I = INDEX.search(q_emb, k)
    scores = _distance_to_similarity(D[0])

    results: List[RetrievedChunk] = []
    for rank, (idx, sc) in enumerate(zip(I[0], scores)):
        if idx < 0:
            continue
        if sc < min_score:
            continue
        results.append(RetrievedChunk(index=idx, score=float(sc), chunk=CHUNKS[idx]))
    return results


---
## 5️⃣ 프롬프트/생성 헬퍼 함수

Chapter 5 스타일의 `ollama_generate()`와 `build_structured_prompt()`를 정의합니다.


In [None]:
def ollama_generate(prompt: str) -> str:
    payload = {
        "model": LLM_MODEL,
        "prompt": prompt,
        "stream": False,
        "options": {
            "temperature": TEMPERATURE,
            "top_p": TOP_P,
            "num_predict": MAX_TOKENS,
            "repeat_penalty": REPEAT_PENALTY,
        }
    }
    try:
        r = requests.post(f"{OLLAMA_HOST}/api/generate", json=payload, timeout=600)
        r.raise_for_status()
        data = r.json() if isinstance(r.json(), dict) else json.loads(r.text)
        return data.get("response", "")
    except Exception as e:
        return f"[Error] {e}"


def build_structured_prompt(
    role: str,
    goal: str,
    constraints: str,
    format_spec: str,
    context: List[str],
    question: str,
) -> str:
    context_text = "\n\n".join([f"[Context {i+1}]\n{chunk}" for i, chunk in enumerate(context)])
    prompt = f"""Role:
{role}

Goal:
{goal}

Constraints:
{constraints}

Format:
{format_spec}

Context:
{context_text}

Question:
{question}

Please answer the question considering the role, goal, constraints, format, and context provided above."""
    return prompt


---
## 6️⃣ 통합 RAG 함수

검색 → 컨텍스트 선택 → 구조화 프롬프트 생성 → Ollama 호출 → 응답/출처 반환.


In [None]:
def _format_context_from_results(results: List[RetrievedChunk], max_chars: int = 600) -> List[str]:
    formatted = []
    for r in results:
        text = r.chunk.get("text", "")
        snippet = text[:max_chars]
        title = r.chunk.get("chapter_title") or r.chunk.get("chapter_id") or ""
        prefix = f"[Chapter: {title}]\nScore: {r.score:.4f}\n"
        formatted.append(prefix + snippet)
    return formatted

ROLE = """You are an expert tutor specializing in operating system concepts.
You explain concepts clearly and concisely so students can understand easily,
and you use concrete examples to illustrate concepts."""

GOAL = """Provide evidence-based answers to questions based on the provided context.
Find and cite relevant information from the context in your answers,
and explicitly state when information is not available in the context."""

CONSTRAINTS = """1. Your answer must be based on the provided context.
2. If you can find relevant information in the context, you must include 1-3 citations.
3. If relevant information is not available in the context, you must explicitly state 'The information is not available in the provided context.'
4. Any speculative or uncertain content must be clearly marked."""

FORMAT_SPEC = """Please respond in the following JSON format:

{
  "answer": "Answer to the question (1-2 paragraphs)",
  "citations": [
    {"chunk_id": 1, "quote": "quoted text"},
    {"chunk_id": 2, "quote": "quoted text"}
  ],
  "summary": "Summary (1-2 sentences)"
}"""

def rag_answer(question: str, top_k: int = TOP_K) -> Dict[str, Any]:
    # 1) Retrieve
    results = retrieve(question, k=top_k, ef_search=EF_SEARCH, min_score=MIN_SCORE)

    # 2) Build prompt (use top-3 for compactness)
    top_context = _format_context_from_results(results[:3])
    prompt = build_structured_prompt(
        role=ROLE,
        goal=GOAL,
        constraints=CONSTRAINTS,
        format_spec=FORMAT_SPEC,
        context=top_context,
        question=question,
    )

    # 3) Generate
    answer = ollama_generate(prompt)

    # 4) Sources
    sources = []
    for r in results[:3]:
        preview = (r.chunk.get("text", "")[:80] + "...") if r.chunk.get("text") else ""
        sources.append({
            "score": round(r.score, 4),
            "chapter_title": r.chunk.get("chapter_title"),
            "chunk_id": r.chunk.get("chunk_id"),
            "preview": preview,
        })

    return {
        "question": question,
        "answer": answer,
        "top_k": top_k,
        "sources": sources,
    }


---
## 7️⃣ 데모 실행: 질문 → 답변 + 출처

`rag_answer()`를 호출해 통합 동작을 확인합니다. Ollama 서버가 실행 중이어야 합니다.


In [None]:
question = "How does the operating system handle memory virtualization?"
result = rag_answer(question)

print("="*80)
print("Question:")
print(result["question"]) 
print("="*80)
print("Answer:")
print(result["answer"]) 
print("\n[Sources]")
for i, s in enumerate(result["sources"], 1):
    print(f"{i}. ({s['score']:.4f}) [{s.get('chapter_title')}] {s.get('preview')}")


---
## 8️⃣ (선택) 간단 검색 적합성 확인

`test_queries.json`에서 일부 질의를 불러와 검색 상위 결과의 챕터 타이틀을 미리보기로 출력합니다.


In [None]:
TEST_QUERIES = "/home/kbs1102/workspace/OSTEP_RAG/data/documents/test_queries.json"

try:
    with open(TEST_QUERIES, 'r', encoding='utf-8') as f:
        test_qs = json.load(f)
    print(f"Loaded {len(test_qs)} test queries. Showing first 3...")
    for q in test_qs[:3]:
        rs = retrieve(q, k=5)
        print("\nQ:", q)
        for i, r in enumerate(rs, 1):
            title = r.chunk.get('chapter_title')
            print(f"  {i}. {title} (score={r.score:.4f})")
except FileNotFoundError:
    print("[Info] test_queries.json not found. Skipping optional check.")


---
## 9️⃣ 인터랙티브 Q&A (실시간 질의)

사용자로부터 입력을 받아 `rag_answer()`로 답변을 생성합니다. 종료하려면 `exit`를 입력하세요.


In [None]:
try:
    while True:
        user_q = input("Enter your question (type 'exit' to quit): ").strip()
        if not user_q:
            print("[Info] Empty input. Try again.")
            continue
        if user_q.lower() in {"exit", "quit"}:
            print("[Info] Bye.")
            break
        result = rag_answer(user_q)
        print("="*80)
        print("Question:")
        print(result["question"]) 
        print("="*80)
        print("Answer:")
        print(result["answer"]) 
        print("\n[Sources]")
        for i, s in enumerate(result.get("sources", []), 1):
            print(f"{i}. ({s['score']:.4f}) [{s.get('chapter_title')}] {s.get('preview')}")
        print("\n")
except KeyboardInterrupt:
    print("\n[Info] Interrupted by user.")
