# Day 3 - Advanced RAG with LangGraph
# 고급 RAG 체인 구현

이 실습에서는 LangGraph를 사용하여 고급 RAG 시스템을 구현합니다.
Day 1에서 파인튜닝한 모델을 활용하여 더욱 정교한 검색-생성 파이프라인을 구축합니다.

## 핵심 개념
- 다단계 검색 전략 (Multi-step Retrieval)
- 검색 결과 재순위화 (Reranking)
- 컨텍스트 압축 및 요약
- 동적 검색 전략 선택

In [None]:
!pip install langchain langgraph chromadb sentence-transformers rank-bm25 transformers torch datasets

In [None]:
import os
import json
from typing import List, Dict, Any, Optional, TypedDict
from dataclasses import dataclass

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import chromadb
from sentence_transformers import SentenceTransformer, CrossEncoder
from rank_bm25 import BM25Okapi

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor

print("라이브러리 import 완료")

## 1. Day 1 파인튜닝 모델 로드

먼저 Day 1에서 파인튜닝한 EXAONE 모델을 로드합니다.

In [None]:
class Day1FinetunedLLM:
    def __init__(self, model_name="ryanu/my-exaone-raft-model"):
        self.model_name = model_name
        self.base_model = "LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct"
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        print(f"모델 로딩 중: {model_name}")
        print(f"사용 디바이스: {self.device}")
        
        # 토크나이저 로드
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.base_model,
            trust_remote_code=True
        )
        
        # 베이스 모델 로드
        self.model = AutoModelForCausalLM.from_pretrained(
            self.base_model,
            torch_dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )
        
        # 파인튜닝된 어댑터 로드
        try:
            self.model = PeftModel.from_pretrained(self.model, model_name)
            print("✅ 파인튜닝 모델 로드 완료")
        except Exception as e:
            print(f"⚠️ 파인튜닝 모델 로드 실패, 베이스 모델 사용: {e}")
    
    def generate(self, prompt: str, max_length: int = 512, temperature: float = 0.7) -> str:
        """텍스트 생성"""
        inputs = self.tokenizer(
            prompt, 
            return_tensors="pt", 
            truncation=True, 
            max_length=2048
        ).to(self.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_length=inputs['input_ids'].shape[1] + max_length,
                temperature=temperature,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )
        
        # 입력 부분 제거하고 생성된 텍스트만 반환
        generated_text = self.tokenizer.decode(
            outputs[0][inputs['input_ids'].shape[1]:], 
            skip_special_tokens=True
        )
        
        return generated_text.strip()

# 모델 초기화
llm = Day1FinetunedLLM()

## 2. 고급 검색 시스템 구성

여러 검색 전략을 결합한 하이브리드 검색 시스템을 구축합니다.

In [None]:
class AdvancedRetriever:
    def __init__(self):
        # 벡터 검색을 위한 임베딩 모델
        self.embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
        
        # 재순위화를 위한 크로스 인코더
        self.reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        
        # ChromaDB 클라이언트
        self.chroma_client = chromadb.Client()
        self.collection_name = "advanced_rag_docs"
        
        # 컬렉션 초기화
        try:
            self.collection = self.chroma_client.get_collection(self.collection_name)
        except:
            self.collection = self.chroma_client.create_collection(
                name=self.collection_name,
                metadata={"hnsw:space": "cosine"}
            )
        
        # BM25 검색을 위한 변수들
        self.bm25 = None
        self.documents = []
        self.doc_metadata = []
    
    def add_documents(self, documents: List[Document]):
        """문서 추가 및 인덱싱"""
        print(f"문서 {len(documents)}개 인덱싱 중...")
        
        # 텍스트와 메타데이터 분리
        texts = [doc.page_content for doc in documents]
        metadatas = [doc.metadata for doc in documents]
        
        # 벡터 임베딩 생성
        embeddings = self.embedding_model.encode(texts)
        
        # ChromaDB에 저장
        ids = [f"doc_{i}" for i in range(len(texts))]
        self.collection.add(
            embeddings=embeddings.tolist(),
            documents=texts,
            metadatas=metadatas,
            ids=ids
        )
        
        # BM25 인덱스 구축
        self.documents = texts
        self.doc_metadata = metadatas
        tokenized_docs = [doc.lower().split() for doc in texts]
        self.bm25 = BM25Okapi(tokenized_docs)
        
        print("✅ 문서 인덱싱 완료")
    
    def semantic_search(self, query: str, k: int = 10) -> List[Dict]:
        """의미적 벡터 검색"""
        query_embedding = self.embedding_model.encode([query])
        
        results = self.collection.query(
            query_embeddings=query_embedding.tolist(),
            n_results=k
        )
        
        return [
            {
                "content": doc,
                "score": 1 - distance,  # 유사도로 변환
                "metadata": meta,
                "method": "semantic"
            }
            for doc, distance, meta in zip(
                results['documents'][0],
                results['distances'][0], 
                results['metadatas'][0]
            )
        ]
    
    def keyword_search(self, query: str, k: int = 10) -> List[Dict]:
        """키워드 기반 BM25 검색"""
        if self.bm25 is None:
            return []
        
        query_tokens = query.lower().split()
        scores = self.bm25.get_scores(query_tokens)
        
        # 상위 k개 결과 선택
        top_indices = scores.argsort()[-k:][::-1]
        
        return [
            {
                "content": self.documents[idx],
                "score": float(scores[idx]),
                "metadata": self.doc_metadata[idx],
                "method": "keyword"
            }
            for idx in top_indices if scores[idx] > 0
        ]
    
    def hybrid_search(self, query: str, k: int = 10, alpha: float = 0.7) -> List[Dict]:
        """하이브리드 검색 (의미적 + 키워드)"""
        # 각각의 검색 결과 가져오기
        semantic_results = self.semantic_search(query, k * 2)
        keyword_results = self.keyword_search(query, k * 2)
        
        # 점수 정규화
        if semantic_results:
            max_semantic = max(r['score'] for r in semantic_results)
            for r in semantic_results:
                r['norm_score'] = r['score'] / max_semantic if max_semantic > 0 else 0
        
        if keyword_results:
            max_keyword = max(r['score'] for r in keyword_results)
            for r in keyword_results:
                r['norm_score'] = r['score'] / max_keyword if max_keyword > 0 else 0
        
        # 결과 합치기 및 재점수화
        combined_results = {}
        
        for result in semantic_results:
            content = result['content']
            if content not in combined_results:
                combined_results[content] = result.copy()
                combined_results[content]['hybrid_score'] = alpha * result['norm_score']
                combined_results[content]['methods'] = ['semantic']
            else:
                combined_results[content]['hybrid_score'] += alpha * result['norm_score']
                combined_results[content]['methods'].append('semantic')
        
        for result in keyword_results:
            content = result['content']
            if content not in combined_results:
                combined_results[content] = result.copy()
                combined_results[content]['hybrid_score'] = (1 - alpha) * result['norm_score']
                combined_results[content]['methods'] = ['keyword']
            else:
                combined_results[content]['hybrid_score'] += (1 - alpha) * result['norm_score']
                if 'keyword' not in combined_results[content]['methods']:
                    combined_results[content]['methods'].append('keyword')
        
        # 점수 기준 정렬 후 상위 k개 반환
        sorted_results = sorted(
            combined_results.values(),
            key=lambda x: x['hybrid_score'],
            reverse=True
        )[:k]
        
        return sorted_results
    
    def rerank_results(self, query: str, results: List[Dict], top_k: int = 5) -> List[Dict]:
        """크로스 인코더를 사용한 재순위화"""
        if not results:
            return []
        
        # 쿼리-문서 쌍 생성
        pairs = [[query, result['content']] for result in results]
        
        # 재순위화 점수 계산
        rerank_scores = self.reranker.predict(pairs)
        
        # 결과에 재순위화 점수 추가
        for i, result in enumerate(results):
            result['rerank_score'] = float(rerank_scores[i])
        
        # 재순위화 점수 기준으로 정렬
        reranked_results = sorted(
            results,
            key=lambda x: x['rerank_score'],
            reverse=True
        )[:top_k]
        
        return reranked_results

# 검색 시스템 초기화
retriever = AdvancedRetriever()
print("✅ 고급 검색 시스템 초기화 완료")

## 3. 샘플 문서 생성 및 인덱싱

In [None]:
# 샘플 문서 생성
sample_docs = [
    Document(
        page_content="머신러닝은 인공지능의 하위 분야로, 컴퓨터가 명시적으로 프로그래밍되지 않고도 학습할 수 있게 하는 기술입니다. 지도 학습, 비지도 학습, 강화 학습의 세 가지 주요 유형이 있습니다.",
        metadata={"topic": "machine_learning", "difficulty": "beginner"}
    ),
    Document(
        page_content="딥러닝은 인공 신경망을 사용하는 머신러닝의 특별한 형태입니다. 다층 퍼셉트론, 합성곱 신경망(CNN), 순환 신경망(RNN), 트랜스포머 등 다양한 아키텍처가 있습니다.",
        metadata={"topic": "deep_learning", "difficulty": "intermediate"}
    ),
    Document(
        page_content="자연어 처리(NLP)는 컴퓨터가 인간의 언어를 이해하고 생성할 수 있게 하는 AI 분야입니다. 토크나이제이션, 형태소 분석, 구문 분석, 의미 분석 등의 단계를 포함합니다.",
        metadata={"topic": "nlp", "difficulty": "intermediate"}
    ),
    Document(
        page_content="트랜스포머는 어텐션 메커니즘만을 사용하는 신경망 아키텍처입니다. BERT, GPT, T5 등 많은 현대 언어 모델의 기초가 되었으며, '어텐션이 전부다'라는 논문에서 처음 소개되었습니다.",
        metadata={"topic": "transformers", "difficulty": "advanced"}
    ),
    Document(
        page_content="파인튜닝은 사전 훈련된 모델을 특정 작업에 맞게 추가로 훈련시키는 기법입니다. 전체 파라미터를 업데이트하는 풀 파인튜닝과 일부만 업데이트하는 LoRA, 어댑터 등의 효율적 방법이 있습니다.",
        metadata={"topic": "fine_tuning", "difficulty": "advanced"}
    ),
    Document(
        page_content="RAG(Retrieval-Augmented Generation)는 외부 지식을 검색하여 생성 모델에 제공하는 기법입니다. 벡터 데이터베이스를 통한 의미적 검색과 언어 모델의 생성 능력을 결합합니다.",
        metadata={"topic": "rag", "difficulty": "advanced"}
    ),
    Document(
        page_content="LangChain은 언어 모델을 활용한 애플리케이션 개발을 위한 프레임워크입니다. 체인, 에이전트, 메모리 등의 개념을 통해 복잡한 AI 워크플로를 구성할 수 있습니다.",
        metadata={"topic": "langchain", "difficulty": "intermediate"}
    ),
    Document(
        page_content="벡터 데이터베이스는 고차원 벡터를 효율적으로 저장하고 검색하는 데이터베이스입니다. Chroma, Pinecone, Weaviate, FAISS 등이 대표적이며, 의미적 검색에 필수적입니다.",
        metadata={"topic": "vector_database", "difficulty": "intermediate"}
    )
]

# 문서 인덱싱
retriever.add_documents(sample_docs)

## 4. LangGraph 상태 정의

In [None]:
class RAGState(TypedDict):
    """RAG 워크플로 상태"""
    query: str                    # 사용자 질문
    search_strategy: str          # 검색 전략 (semantic, keyword, hybrid)
    raw_results: List[Dict]       # 초기 검색 결과
    reranked_results: List[Dict]  # 재순위화된 결과
    context: str                  # 최종 컨텍스트
    answer: str                   # 생성된 답변
    confidence: float             # 신뢰도 점수
    need_refinement: bool         # 개선 필요 여부
    iteration: int                # 반복 횟수

print("✅ 상태 클래스 정의 완료")

## 5. 워크플로 노드 함수들

In [None]:
def analyze_query(state: RAGState) -> RAGState:
    """질문 분석 및 검색 전략 결정"""
    query = state["query"]
    
    # 간단한 휴리스틱으로 검색 전략 결정
    if any(word in query.lower() for word in ["개념", "정의", "설명", "무엇", "what", "define"]):
        strategy = "semantic"  # 개념적 질문은 의미적 검색
    elif any(word in query.lower() for word in ["방법", "어떻게", "how", "단계", "절차"]):
        strategy = "hybrid"    # 방법론적 질문은 하이브리드 검색
    else:
        strategy = "hybrid"    # 기본은 하이브리드
    
    print(f"📊 질문 분석: '{query}' -> 전략: {strategy}")
    
    return {
        **state,
        "search_strategy": strategy,
        "iteration": state.get("iteration", 0)
    }

def retrieve_documents(state: RAGState) -> RAGState:
    """문서 검색"""
    query = state["query"]
    strategy = state["search_strategy"]
    
    print(f"🔍 문서 검색 중: {strategy} 전략 사용")
    
    if strategy == "semantic":
        results = retriever.semantic_search(query, k=15)
    elif strategy == "keyword":
        results = retriever.keyword_search(query, k=15)
    else:  # hybrid
        results = retriever.hybrid_search(query, k=15)
    
    print(f"📋 검색 결과: {len(results)}개 문서")
    
    return {
        **state,
        "raw_results": results
    }

def rerank_results(state: RAGState) -> RAGState:
    """검색 결과 재순위화"""
    query = state["query"]
    raw_results = state["raw_results"]
    
    print("📊 검색 결과 재순위화 중...")
    
    reranked = retriever.rerank_results(query, raw_results, top_k=8)
    
    print(f"✅ 재순위화 완료: 상위 {len(reranked)}개 선택")
    
    return {
        **state,
        "reranked_results": reranked
    }

def prepare_context(state: RAGState) -> RAGState:
    """컨텍스트 준비 및 압축"""
    results = state["reranked_results"]
    
    # 컨텍스트 구성
    context_parts = []
    for i, result in enumerate(results[:5], 1):  # 상위 5개만 사용
        context_parts.append(
            f"[참고자료 {i}] (관련도: {result.get('rerank_score', 0):.3f})\n"
            f"{result['content']}\n"
        )
    
    context = "\n".join(context_parts)
    
    print(f"📝 컨텍스트 준비 완료: {len(context)} 글자")
    
    return {
        **state,
        "context": context
    }

def generate_answer(state: RAGState) -> RAGState:
    """답변 생성"""
    query = state["query"]
    context = state["context"]
    
    print("🤖 답변 생성 중...")
    
    # 프롬프트 구성
    prompt = f"""다음 참고자료를 바탕으로 질문에 답변해주세요.

참고자료:
{context}

질문: {query}

답변:
- 참고자료의 내용을 바탕으로 정확하고 구체적으로 답변해주세요
- 참고자료에 없는 내용은 추측하지 마세요
- 답변 근거를 명시해주세요

답변:"""
    
    # 답변 생성
    answer = llm.generate(prompt, max_length=400, temperature=0.3)
    
    # 간단한 신뢰도 계산 (실제로는 더 정교한 방법 사용)
    avg_rerank_score = sum(r.get('rerank_score', 0) for r in state["reranked_results"][:3]) / min(3, len(state["reranked_results"]))
    confidence = min(avg_rerank_score * 2, 1.0)  # 정규화
    
    print(f"✅ 답변 생성 완료 (신뢰도: {confidence:.3f})")
    
    return {
        **state,
        "answer": answer,
        "confidence": confidence
    }

def evaluate_answer(state: RAGState) -> RAGState:
    """답변 품질 평가"""
    confidence = state["confidence"]
    iteration = state["iteration"]
    
    # 개선 필요 여부 판단
    need_refinement = (
        confidence < 0.6 and  # 신뢰도가 낮고
        iteration < 2 and     # 반복 횟수가 적고
        len(state["answer"]) < 50  # 답변이 너무 짧음
    )
    
    print(f"📊 답변 평가: 신뢰도={confidence:.3f}, 반복={iteration}, 개선필요={need_refinement}")
    
    return {
        **state,
        "need_refinement": need_refinement
    }

def refine_query(state: RAGState) -> RAGState:
    """질문 개선 및 재검색"""
    original_query = state["query"]
    
    # 질문 확장 (실제로는 더 정교한 방법 사용)
    expanded_query = f"{original_query} 자세한 설명 예시"
    
    print(f"🔄 질문 개선: '{original_query}' -> '{expanded_query}'")
    
    return {
        **state,
        "query": expanded_query,
        "iteration": state["iteration"] + 1,
        "search_strategy": "hybrid"  # 재검색은 하이브리드로
    }

def should_refine(state: RAGState) -> str:
    """개선 여부 결정"""
    if state["need_refinement"]:
        return "refine"
    else:
        return "end"

print("✅ 워크플로 노드 함수 정의 완료")

## 6. LangGraph 워크플로 구성

In [None]:
# 그래프 생성
workflow = StateGraph(RAGState)

# 노드 추가
workflow.add_node("analyze", analyze_query)
workflow.add_node("retrieve", retrieve_documents)
workflow.add_node("rerank", rerank_results)
workflow.add_node("context", prepare_context)
workflow.add_node("generate", generate_answer)
workflow.add_node("evaluate", evaluate_answer)
workflow.add_node("refine", refine_query)

# 엣지 연결
workflow.set_entry_point("analyze")
workflow.add_edge("analyze", "retrieve")
workflow.add_edge("retrieve", "rerank")
workflow.add_edge("rerank", "context")
workflow.add_edge("context", "generate")
workflow.add_edge("generate", "evaluate")

# 조건부 엣지
workflow.add_conditional_edges(
    "evaluate",
    should_refine,
    {
        "refine": "refine",
        "end": END
    }
)

# 개선 후 다시 분석으로
workflow.add_edge("refine", "analyze")

# 앱 컴파일
app = workflow.compile()

print("✅ LangGraph 워크플로 구성 완료")

## 7. 고급 RAG 시스템 테스트

In [None]:
def run_advanced_rag(question: str):
    """고급 RAG 시스템 실행"""
    print(f"\n🚀 고급 RAG 시스템 시작")
    print(f"❓ 질문: {question}")
    print("=" * 80)
    
    # 초기 상태
    initial_state = {
        "query": question,
        "iteration": 0
    }
    
    # 워크플로 실행
    final_state = app.invoke(initial_state)
    
    print("\n📊 최종 결과")
    print("=" * 80)
    print(f"🎯 답변: {final_state['answer']}")
    print(f"📊 신뢰도: {final_state['confidence']:.3f}")
    print(f"🔄 반복 횟수: {final_state['iteration']}")
    print(f"🔍 최종 검색 전략: {final_state['search_strategy']}")
    
    # 상위 검색 결과 표시
    print("\n🔍 주요 참고자료:")
    for i, result in enumerate(final_state['reranked_results'][:3], 1):
        print(f"\n[{i}] 점수: {result.get('rerank_score', 0):.3f}")
        print(f"내용: {result['content'][:100]}...")
        print(f"방법: {result.get('methods', [result.get('method', 'unknown')])}")
    
    return final_state

# 테스트 질문들
test_questions = [
    "머신러닝이 무엇인가요?",
    "파인튜닝을 어떻게 하나요?",
    "RAG 시스템의 장점은?"
]

# 첫 번째 질문으로 테스트
result = run_advanced_rag(test_questions[0])

## 8. 다양한 질문으로 성능 비교

In [None]:
# 모든 테스트 질문 실행
results = []

for i, question in enumerate(test_questions, 1):
    print(f"\n\n🧪 테스트 {i}/{len(test_questions)}")
    result = run_advanced_rag(question)
    results.append({
        "question": question,
        "answer": result["answer"],
        "confidence": result["confidence"],
        "iterations": result["iteration"],
        "strategy": result["search_strategy"]
    })

# 결과 요약
print("\n\n📊 전체 테스트 결과 요약")
print("=" * 100)
avg_confidence = sum(r["confidence"] for r in results) / len(results)
avg_iterations = sum(r["iterations"] for r in results) / len(results)

print(f"평균 신뢰도: {avg_confidence:.3f}")
print(f"평균 반복 횟수: {avg_iterations:.1f}")

for i, result in enumerate(results, 1):
    print(f"\n[{i}] {result['question']}")
    print(f"    신뢰도: {result['confidence']:.3f}, 반복: {result['iterations']}, 전략: {result['strategy']}")
    print(f"    답변: {result['answer'][:100]}...")

## 9. 워크플로 시각화 및 분석

In [None]:
def analyze_workflow_performance():
    """워크플로 성능 분석"""
    print("🔬 고급 RAG 워크플로 성능 분석")
    print("=" * 50)
    
    # 검색 전략별 성능
    strategies = [r["strategy"] for r in results]
    strategy_counts = {}
    for strategy in strategies:
        strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1
    
    print("검색 전략 사용 빈도:")
    for strategy, count in strategy_counts.items():
        print(f"  {strategy}: {count}회")
    
    # 신뢰도 분포
    confidences = [r["confidence"] for r in results]
    high_conf = sum(1 for c in confidences if c >= 0.7)
    medium_conf = sum(1 for c in confidences if 0.4 <= c < 0.7)
    low_conf = sum(1 for c in confidences if c < 0.4)
    
    print("\n신뢰도 분포:")
    print(f"  높음 (≥0.7): {high_conf}개")
    print(f"  보통 (0.4~0.7): {medium_conf}개")
    print(f"  낮음 (<0.4): {low_conf}개")
    
    # 개선 사항 제안
    print("\n💡 개선 제안:")
    if avg_confidence < 0.6:
        print("  - 더 많은 문서를 추가하여 검색 품질 향상")
        print("  - 더 정교한 재순위화 모델 사용")
    if avg_iterations > 0.5:
        print("  - 초기 질문 분석 로직 개선")
        print("  - 더 나은 검색 전략 선택 알고리즘")
    
    print("\n🎯 워크플로 장점:")
    print("  - 다단계 검색으로 정확도 향상")
    print("  - 동적 전략 선택으로 적응성 확보")
    print("  - 재순위화를 통한 관련성 개선")
    print("  - 반복 개선을 통한 품질 향상")

analyze_workflow_performance()

## 10. 실습 과제

### 과제 1: 새로운 검색 전략 추가
- BM25와 TF-IDF를 결합한 새로운 검색 전략을 구현해보세요
- 질문 유형에 따라 더 세분화된 전략 선택 로직을 만들어보세요

### 과제 2: 컨텍스트 압축 개선
- 검색된 문서에서 중요한 문장만 추출하는 요약 기능을 추가해보세요
- 중복 정보를 제거하는 로직을 구현해보세요

### 과제 3: 답변 품질 평가 고도화
- 더 정교한 신뢰도 계산 방법을 구현해보세요
- 답변의 완전성과 정확성을 평가하는 메트릭을 추가해보세요

### 과제 4: 실제 문서로 확장
- PDF나 웹 문서를 로드하여 더 큰 문서 컬렉션을 구축해보세요
- 청킹 전략을 개선하여 더 나은 검색 성능을 달성해보세요

In [None]:
# 과제 해결을 위한 시작점
print("🎓 고급 RAG 실습 완료!")
print("\n다음 단계:")
print("1. 04-text2sql.ipynb - Text-to-SQL 시스템")
print("2. 05-mcp-integration.ipynb - MCP 통합")
print("3. 06-gradio-ui.ipynb - UI 래핑")

print("\n💡 핵심 학습 내용:")
print("- 하이브리드 검색 전략 (의미적 + 키워드)")
print("- 크로스 인코더를 통한 재순위화")
print("- LangGraph를 활용한 복잡한 워크플로 구성")
print("- 동적 품질 평가 및 반복 개선")
print("- Day 1 파인튜닝 모델의 실전 활용")