In [None]:
# Q5 다운로드
# 1. 구글 드라이브 마운트 (이미 되어 있다면 생략 가능)
from google.colab import drive
drive.mount('/content/drive')

import os

# 2. 경로 설정 및 폴더 생성
SAVE_DIR = "/content/drive/MyDrive/수능 풀이/models"
os.makedirs(SAVE_DIR, exist_ok=True)

# 3. 제공해주신 다이렉트 링크 반영
MODEL_URL = "https://huggingface.co/unsloth/Qwen3-30B-A3B-Instruct-2507-GGUF/resolve/main/Qwen3-30B-A3B-Instruct-2507-UD-Q5_K_XL.gguf?download=true"
FILE_NAME = "Qwen3-30B-A3B-Instruct-2507-UD-Q5_K_XL.gguf"
TARGET_PATH = os.path.join(SAVE_DIR, FILE_NAME)

print(f"🚀 Qwen3-30B 최신 모델 다운로드 시작...")
print(f"📍 저장 위치: {TARGET_PATH}")

# -c: 이어받기 지원
# -O: 저장될 파일 경로와 이름 강제 지정
!wget -c "{MODEL_URL}" -O "{TARGET_PATH}"

print(f"\n✅ 다운로드 완료! 이제 이 경로를 서버 실행 시 MODEL_PATH로 사용하세요.")

In [1]:
# 1. 랭체인 패밀리 강제 업데이트 및 설치
import os

print("🔄 1. 구글 드라이브 마운트 확인 (필수 데이터 접근용)")
from google.colab import drive
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')
else:
    print("✅ 이미 마운트 되어 있습니다.")


!CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python[server] \
    --force-reinstall \
    --no-cache-dir

print("\n 3. 나머지 RAG 필수 라이브러리 일괄 설치")
# 요청하신 리스트 + 의존성 패키지들
!pip install -U \
    langchain \
    langchain-community \
    langchain-core \
    chromadb \
    langchain-text-splitters \
    langchain-openai \
    langgraph \
    langfuse \
    sentence-transformers \
    jq \
    langchain-huggingface \
    kiwipiepy \
    bm25s


# 2. 설치가 끝나면, Colab 상단 메뉴의 [런타임] -> [세션 다시 시작]을 꼭 눌러주세요!
# (라이브러리 꼬임 방지)

🔄 1. 구글 드라이브 마운트 확인 (필수 데이터 접근용)
✅ 이미 마운트 되어 있습니다.
Collecting llama-cpp-python[server]
  Downloading llama_cpp_python-0.3.16.tar.gz (50.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.7/50.7 MB[0m [31m265.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting typing-extensions>=4.5.0 (from llama-cpp-python[server])
  Downloading typing_extensions-4.15.0-py3-none-any.whl.metadata (3.3 kB)
Collecting numpy>=1.20.0 (from llama-cpp-python[server])
  Downloading numpy-2.4.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (6.6 kB)
Collecting diskcache>=5.6.1 (from llama-cpp-python[server])
  Downloading diskcache-5.6.3-py3-none-any.whl.metadata (20 kB)
Collecting jinja2>=2.11.3 (from llama-cpp-python[server])
  Downloading jinja2

In [1]:
import llama_cpp
print(f"Llama version: {llama_cpp.__version__}")
# GPU 레이어 오프로드 지원 여부 확인 (True가 나와야 함)
# (간접적인 확인 방법입니다. 실행 로그에 BLAS = 1 이 떠야 진짜입니다.)

Llama version: 0.3.16


In [2]:
import subprocess
import time
import requests
import os

# 모델 경로 (아까 다운로드 받은 경로 확인 필수)
MODEL_PATH = "/content/drive/MyDrive/수능 풀이/models/Qwen3-30B-A3B-Instruct-2507-UD-Q5_K_XL.gguf"

if not os.path.exists(MODEL_PATH):
    print(f"❌ 모델 파일이 없습니다! 경로를 확인하세요: {MODEL_PATH}")
else:
    # 서버 실행 (GPU 풀가동)
    cmd = [
        "python", "-m", "llama_cpp.server",
        "--model", MODEL_PATH,
        "--n_gpu_layers", "-1",
        "--n_ctx", "12288",
        "--type_k", "8",
        "--type_v", "8",
        "--host", "0.0.0.0",
        "--port", "8000",
        "--flash_attn", "true"
    ]

    print(f"🚀 llama.cpp 서버 시작 중... (모델: {os.path.basename(MODEL_PATH)})")
    server_process = subprocess.Popen(cmd, stdout=open("server_log.txt", "w"), stderr=subprocess.STDOUT)

    # Health Check
    print("⏳ 서버 부팅 대기 중...", end="")
    while True:
        try:
            requests.get("http://localhost:8000/v1/models")
            print("\n✅ 서버 준비 완료!")
            break
        except:
            print(".", end="")
            time.sleep(2)

🚀 llama.cpp 서버 시작 중... (모델: Qwen3-30B-A3B-Instruct-2507-UD-Q5_K_XL.gguf)
⏳ 서버 부팅 대기 중......................................................................................................................
✅ 서버 준비 완료!


kiwi 토크나이저 및 sparse 리트리버 생성

In [3]:
# kiwi 토크나이저

import re
from typing import List, Any, Optional, Callable
from kiwipiepy import Kiwi

# Kiwi 초기화 및 기본 설정
kiwi = Kiwi()
tag_include = ['NNG', 'NNP', 'NNB', 'NR', 'VV', 'VA', 'MM', 'XR', 'SW', 'SL', 'SH', 'SN', 'SB']

def _fallback_tokenize(text: str) -> List[str]:
    """Kiwi 분석 실패 시 일반 공백 및 문자 기반 토큰화"""
    return re.findall(r'\b\w+\b', text, re.UNICODE)

def tokenize_kiwi(
    text: str,
    kiwi: Kiwi,
    tag_include: List[str],
    text_type: str, # "corpus" 또는 "query"
    top_n: int = 3,
    score_threshold: float = 1.2,
) -> List[str]:
    try:
        if text_type == "corpus":
            # 색인 시: 본문 길이에 따라 후보군 조절
            analyzed = kiwi.analyze(text, top_n=top_n + len(text) // 200)
            if not analyzed: return _fallback_tokenize(text)

            num_candi = 1
            # 1위 점수 대비 임계치 이내의 후보들을 모두 토큰화에 포함 (재현율 향상)
            while (num_candi < len(analyzed) and
                   analyzed[num_candi][1] > score_threshold * analyzed[0][1]):
                num_candi += 1

        elif text_type == "query":
            # 검색 시: 고정된 상위 후보 사용
            analyzed = kiwi.analyze(text, top_n=top_n)
            if not analyzed: return _fallback_tokenize(text)
            num_candi = min(3, len(analyzed))

        # 후보군에서 토큰 추출 및 태그 필터링
        all_tokenized = [
            f"{t.form}/{t.tag}"
            for nc in range(num_candi)
            for t in analyzed[nc][0]
            if t.tag in tag_include
        ]

        # 중복 제거 후 반환
        unique_tokens = list(set(all_tokenized))
        return unique_tokens if unique_tokens else _fallback_tokenize(text)

    except Exception:
        return _fallback_tokenize(text)

In [4]:
from typing import List, Any, Callable
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
from pydantic import Field, PrivateAttr

# 불안 요소인 CallbackManager 관련 임포트를 아예 하지 않습니다.

class LangChainKiwiBM25Retriever(BaseRetriever):
    k: int = Field(default=5)

    # Pydantic 필드 검증 제외
    _docs: List[Document] = PrivateAttr()
    _bm25: Any = PrivateAttr()
    _corpus_tokenizer: Callable = PrivateAttr()
    _query_tokenizer: Callable = PrivateAttr()

    def __init__(
        self,
        documents: List[Document],
        k: int,
        corpus_tokenizer: Callable,
        query_tokenizer: Callable,
        **kwargs: Any
    ):
        # 부모 클래스 초기화 시에도 불필요한 설정 배제
        super().__init__(**kwargs)
        self.k = k
        self._docs = documents
        self._corpus_tokenizer = corpus_tokenizer
        self._query_tokenizer = query_tokenizer

        # 1. 문서 토큰화 및 BM25 인덱싱
        # (실제 실행 시 메모리 주의: 문서를 넘겨받는 시점에 작동함)
        corpus_tokens = [self._corpus_tokenizer(doc.page_content) for doc in documents]

        import bm25s # 필요할 때만 임포트하는 방식 (메모리 절약)
        self._bm25 = bm25s.BM25()
        self._bm25.index(corpus_tokens)

    # run_manager 인자는 랭체인 규격상 필요하지만, 타입을 Any로 뭉개버립니다.
    def _get_relevant_documents(
        self, query: str, *, run_manager: Any = None
    ) -> List[Document]:

        # 2. 쿼리 토큰화 및 검색
        tokenized_query = [self._query_tokenizer(query)]
        results, scores = self._bm25.retrieve(tokenized_query, k=self.k)

        # 3. 결과 반환
        retrieved_docs = []
        for idx, score in zip(results[0], scores[0]):
            if score > 0:
                # 원본 문서의 사본을 만들어 메타데이터가 오염되지 않게 함
                doc = self._docs[idx]
                doc.metadata["sparse_score"] = float(score)
                retrieved_docs.append(doc)

        return retrieved_docs

In [5]:
from typing import List, Any
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document

# 1. 앙상블 리트리버 정의 (콜백 핸들러/매니저 완전 배제)
class CustomWeightedEnsembleRetriever(BaseRetriever):
    sparse_retriever: Any  # 타입을 Any로 해서 임포트 꼬임을 방지합니다.
    dense_retriever: Any
    weights: List[float] = [0.7, 0.3]
    top_k: int = 4

    # 랭체인 규격상 run_manager 인자는 있어야 하지만, 타입 힌트 없이 Any로 처리합니다.
    def _get_relevant_documents(self, query: str, *, run_manager: Any = None) -> List[Document]:

        # [Step 1] Sparse 검색 (Kiwi-BM25s)
        # .invoke()는 랭체인의 가장 표준적인 실행 방식입니다.
        sparse_docs = self.sparse_retriever.invoke(query)

        # [Step 2] Dense 검색 (BGE-m3-ko)
        dense_docs = self.dense_retriever.invoke(query)

        # [Step 3] RRF(Reciprocal Rank Fusion) 앙상블 계산
        # 공식: score = sum( weight / (60 + rank) )
        k_constant = 60
        all_docs = {}

        # Sparse 결과 처리
        for rank, doc in enumerate(sparse_docs):
            score = self.weights[0] / (k_constant + rank + 1)
            all_docs[doc.page_content] = {"doc": doc, "score": score}

        # Dense 결과 처리 및 합산
        for rank, doc in enumerate(dense_docs):
            score = self.weights[1] / (k_constant + rank + 1)
            if doc.page_content in all_docs:
                all_docs[doc.page_content]["score"] += score
            else:
                all_docs[doc.page_content] = {"doc": doc, "score": score}

        # [Step 4] 최종 정렬 및 상위 k개 반환
        sorted_docs = sorted(all_docs.values(), key=lambda x: x["score"], reverse=True)
        return [item["doc"] for item in sorted_docs[:self.top_k]]


In [6]:
from langchain_core.documents import Document
from typing import List

def get_hybrid_results(
    query: str,
    sparse_retriever: BaseRetriever,
    dense_retriever: BaseRetriever,
    sparse_weight: float = 0.7,
    dense_weight: float = 0.3,
    top_k: int = 5
) -> List[Document]:
    """
    Sparse와 Dense 검색 결과를 RRF 방식으로 합치는 커스텀 앙상블 함수
    """
    # 1. 각 리트리버에서 독립적으로 검색 수행
    sparse_docs = sparse_retriever.invoke(query)
    dense_docs = dense_retriever.invoke(query)

    # 2. RRF 점수 계산용 딕셔너리
    # 상수 k는 보통 60으로 설정하여 하위 랭킹의 영향을 완화합니다.
    k_constant = 60
    doc_scores = {}

    # Sparse 결과 처리 (가중치 적용)
    for rank, doc in enumerate(sparse_docs):
        # 문서의 내용이나 ID를 키로 사용
        content_hash = doc.page_content
        score = sparse_weight / (k_constant + rank + 1)
        doc_scores[content_hash] = {"doc": doc, "score": score}

    # Dense 결과 처리 (가중치 적용 및 합산)
    for rank, doc in enumerate(dense_docs):
        content_hash = doc.page_content
        score = dense_weight / (k_constant + rank + 1)
        if content_hash in doc_scores:
            doc_scores[content_hash]["score"] += score
        else:
            doc_scores[content_hash] = {"doc": doc, "score": score}

    # 3. 통합 점수 기준 정렬 및 상위 k개 추출
    sorted_items = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
    final_docs = [item["doc"] for item in sorted_items[:top_k]]

    return final_docs

In [7]:
import glob
import json
import os
import gc
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 1. 데이터 로드 함수
def load_history_json(file_path: str) -> List[Document]:
    documents = []
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
        for item in data:
            page_content = f"제목: {item.get('제목', '')}\n한자명: {item.get('한자명', '')}\n정의: {item.get('[정의]', '')}\n내용: {item.get('[내용]', '')}"
            metadata = {"title": item.get('제목', ''), "source": os.path.basename(file_path)}
            documents.append(Document(page_content=page_content, metadata=metadata))
    return documents

# 2. 실행
print("📚 1단계: 데이터 로드 및 청킹 시작...")
DATA_PATH = "/content/drive/MyDrive/수능 풀이/history_final_dataset/*.json"
all_raw_docs = []
for file in glob.glob(DATA_PATH):
    all_raw_docs.extend(load_history_json(file))

text_splitter = RecursiveCharacterTextSplitter(separators=["\n\n", "\n", ". ", " "], chunk_size=1000, chunk_overlap=200)
splits = []
for doc in all_raw_docs:
    chunks = text_splitter.split_text(doc.page_content)
    title_prefix = f"문서제목: {doc.metadata['title']}\n"
    for i, chunk in enumerate(chunks):
        splits.append(Document(page_content=title_prefix + chunk if i > 0 else chunk, metadata={**doc.metadata, "chunk_id": i}))

print(f"✅ {len(splits)}개의 청크 생성 완료.")

# 메모리 정리
del all_raw_docs
gc.collect()

📚 1단계: 데이터 로드 및 청킹 시작...
✅ 1246개의 청크 생성 완료.


0

In [8]:
# Dense 임베딩
import torch
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma

# 1. 임베딩 모델 로드
embedding_model = HuggingFaceEmbeddings(
    model_name="dragonkue/BGE-m3-ko",
    model_kwargs={'device': 'cuda'}
)

# 2. 벡터 DB 생성 및 로컬 저장 (persist)
# persist_directory를 지정하면 나중에 다시 계산 안 해도 됩니다.
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embedding_model,
    collection_name="history_dense_db"
)
dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

print("✅ Dense 리트리버 준비 완료")

# [중요] 임베딩 모델을 메모리에서 제거하여 30B 모델에게 GPU 자원 반환
del embedding_model
gc.collect()
torch.cuda.empty_cache()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/201 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/54.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/698 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/964 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/297 [00:00<?, ?B/s]

✅ Dense 리트리버 준비 완료


In [9]:
# sparse 임베딩

from functools import partial

# 1. 토크나이저 설정 (기존에 선언한 kiwi와 tag_include 활용)
corpus_tk = partial(tokenize_kiwi, kiwi=kiwi, tag_include=tag_include, text_type="corpus", top_n=5)
query_tk = partial(tokenize_kiwi, kiwi=kiwi, tag_include=tag_include, text_type="query", top_n=2)

# 2. BM25 리트리버 구축
sparse_retriever = LangChainKiwiBM25Retriever(
    documents=splits,
    k=4,
    corpus_tokenizer=corpus_tk,
    query_tokenizer=query_tk
)

print("✅ Sparse 리트리버 준비 완료.")

# 이제 더 이상 splits 리스트가 필요 없으므로 삭제하여 시스템 RAM 확보
del splits
gc.collect()

DEBUG:bm25s:Building index from tokens


BM25S Create Vocab:   0%|          | 0/1246 [00:00<?, ?it/s]

BM25S Convert tokens to indices:   0%|          | 0/1246 [00:00<?, ?it/s]

BM25S Count Tokens:   0%|          | 0/1246 [00:00<?, ?it/s]

BM25S Compute Scores:   0%|          | 0/1246 [00:00<?, ?it/s]

✅ Sparse 리트리버 준비 완료.


2620

In [10]:
print("🎉 4단계: 하이브리드 리트리버 최종 합체!")

# 1. 커스텀 앙상블 적용 (불안 요소인 콜백 핸들러 제거 버전)
hybrid_retriever = CustomWeightedEnsembleRetriever(
    sparse_retriever=sparse_retriever,
    dense_retriever=dense_retriever,
    weights=[0.7, 0.3],
    top_k=4
)

# 2. 최종 테스트
test_query = "임진왜란 당시의 의병 활동에 대해 알려줘"
docs = hybrid_retriever.invoke(test_query)

print(f"\n🔎 테스트 쿼리: {test_query}")
for i, doc in enumerate(docs):
    print(f"[{i+1}] {doc.metadata['title']} (Score: {doc.metadata.get('sparse_score', 'N/A')})")
    print(f"{doc.page_content[:100]}...")
    print("-" * 30)

🎉 4단계: 하이브리드 리트리버 최종 합체!


BM25S Retrieve:   0%|          | 0/1 [00:00<?, ?it/s]


🔎 테스트 쿼리: 임진왜란 당시의 의병 활동에 대해 알려줘
[1] 의병 (Score: 4.087352275848389)
문서제목: 의병
의병부대에는 양인(良人)과 천인(賤人)은 물론 양반 출신의 전직 관료와 낮은 신분의 인물들이 함께 소속되어 있었다. 따라서 의병 활동을 통해 전투나 전쟁에 참여할 ...
------------------------------
[2] 의병 (Score: 3.7012414932250977)
제목: 의병
한자명: 義兵
정의: 국가에 외적 등이 침입했을 때 나라를 지키기 위해 백성들 스스로가 조직해서 결성했던 민간군.
내용: 의병은 외적의 침입이 발생했을 때, 국가의 명...
------------------------------
[3] 독립의군부 (Score: 3.4014644622802734)
제목: 독립의군부
한자명: 獨立義軍府
정의: 1910년대 국내에서 조직된 의병 운동 계열의 비밀 항일 단체.
내용: 독립의군부는 전 낙안 군수
임병찬(林炳瓚)
이 고종(高宗, 재위...
------------------------------
[4] 남한 대토벌 작전 (Score: 3.847276449203491)
제목: 남한 대토벌 작전
한자명: 南韓大討伐作戰
정의: 1909년 9월부터 약 2개월간 일본군이 대규모 병력을 동원하여 의병 부대와 한국인을 학살한 작전.
내용: 1907년 정미의...
------------------------------


In [None]:
# from langchain_community.vectorstores import Chroma
# # (임베딩 모델은 로드해야 함 - DB 해석용)
# from langchain_huggingface import HuggingFaceEmbeddings

# # 1. 임베딩 모델 준비
# embedding_model = HuggingFaceEmbeddings(
#     model_name="jhgan/ko-sbert-multitask",
#     model_kwargs={'device': 'cuda'}
# )

# # 2. 저장된 DB 불러오기 (documents 로드 불필요!)
# DB_PATH = "/content/drive/MyDrive/수능 풀이/chroma_db"

# vectorstore = Chroma(
#     persist_directory=DB_PATH,
#     embedding_function=embedding_model,
#     collection_name="korean_history"
# )

# retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# print("✅ 저장된 DB를 성공적으로 불러왔습니다!")

In [20]:
from langfuse import Langfuse
import os

# 👇 여기에 본인의 키를 '정확하게' 붙여넣으세요! (공백 주의)
os.environ["LANGFUSE_SECRET_KEY"] = ""
os.environ["LANGFUSE_PUBLIC_KEY"] = ""
os.environ["LANGFUSE_HOST"] = ""



try:
    # 핸들러 말고, '본체(Client)'를 직접 소환합니다.
    langfuse = Langfuse()

    # 정석 인증 체크 함수
    if langfuse.auth_check():
        print("🎉 [최종 확인] 인증 성공! 연결 완벽합니다.")

    else:
        print("⚠️ [인증 실패] 키 값이나 호스트 주소(US/EU)를 다시 봐주세요.")

except Exception as e:
    print(f"🔥 에러: {e}")

🎉 [최종 확인] 인증 성공! 연결 완벽합니다.


In [12]:
langfuse_handler = None
try:
    # ✅ 정답 경로: langfuse.langchain
    from langfuse.langchain import CallbackHandler

    # 인증 체크 함수(auth_check) 호출하지 않고 바로 생성 (에러 방지)
    langfuse_handler = CallbackHandler()
    print("✅ LangFuse 핸들러 연결 성공! (로그 적재 중)")

except ImportError:
    print("⚠️ LangFuse 패키지 경로 에러: 로그 없이 진행합니다.")
except Exception as e:
    print(f"⚠️ LangFuse 연결 실패 ({e}): 로그 없이 진행합니다.")

✅ LangFuse 핸들러 연결 성공! (로그 적재 중)


In [23]:
from langchain_openai import ChatOpenAI

# LLM 재연결 (타임아웃 추가)
base_llm = ChatOpenAI(
    base_url="http://localhost:8000/v1",
    api_key="EMPTY",
    model=MODEL_PATH,
    streaming=False,
    max_tokens=2048,
    request_timeout=600, # 🔥 120초(2분) 지나면 에러 뱉고 다음 문제로 넘어감
    max_retries=1        # 재시도 1회만
)

# 파라미터 다시 바인딩
llm_with_params = base_llm.bind(
    temperature=0.7, # 문제 풀이용
    top_p=0.90,
    extra_body={
        "top_k": 25,
        "repeat_penalty": 1.1
    }
)

# (이후 LangGraph 노드 정의 및 app.compile() 코드는 그대로 사용)
print("✅ LLM 클라이언트 업데이트 완료 (Timeout 적용)")

✅ LLM 클라이언트 업데이트 완료 (Timeout 적용)


In [24]:
import torch

# 현재 할당된 메모리와 예약된(캐시된) 메모리 확인
print(f"현재 할당량: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
print(f"현재 예약량: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")

# 아주 상세한 요약 보고서 출력
print(torch.cuda.memory_summary(device=None, abbreviated=False))

현재 할당량: 2.12 GB
현재 예약량: 2.14 GB
|                  PyTorch CUDA memory summary, device ID 0                 |
|---------------------------------------------------------------------------|
|            CUDA OOMs: 0            |        cudaMalloc retries: 0         |
|        Metric         | Cur Usage  | Peak Usage | Tot Alloc  | Tot Freed  |
|---------------------------------------------------------------------------|
| Allocated memory      |   2175 MiB |   3292 MiB |   1027 GiB |   1025 GiB |
|       from large pool |   2172 MiB |   3289 MiB |   1027 GiB |   1025 GiB |
|       from small pool |      2 MiB |      3 MiB |      0 GiB |      0 GiB |
|---------------------------------------------------------------------------|
| Active memory         |   2175 MiB |   3292 MiB |   1027 GiB |   1025 GiB |
|       from large pool |   2172 MiB |   3289 MiB |   1027 GiB |   1025 GiB |
|       from small pool |      2 MiB |      3 MiB |      0 GiB |      0 GiB |
|-------------------------------

In [27]:
class MCQState(TypedDict):
    id: str
    paragraph: str
    question: str
    choices: List[str]
    is_history: bool       # [추가] 한국사 문제 여부
    strategy: str         # [추가] INFERENCE vs GENERAL
    summary: str          # [추가] 지문 요약본
    optimized_query: str  # [추가] 생성된 키워드 (10개 이내)
    retrieved_context: str
    full_response: str
    final_answer: str

# =========================================================
# Node 1: Classifier (한국사 판별기)
# =========================================================
def classifier_node(state: MCQState):
    print(f"🧐 [Classifier] 한국사 문제 여부 판별 중...")
    prompt = ChatPromptTemplate.from_template(
        "<|im_start|>system\n주어진 문제가 '한국사(Korean History)'와 관련되었는지 판단하여 YES 또는 NO로만 답하세요.<|im_end|>\n"
        "<|im_start|>user\n지문: {paragraph}\n질문: {question}\n판별:<|im_end|>\n<|im_start|>assistant\n"
    )
    chain = prompt | llm_with_params | StrOutputParser()
    res = chain.invoke({"paragraph": state['paragraph'], "question": state['question']}).strip().upper()

    return {"is_history": "YES" in res}

In [28]:
# =========================================================
# Node 2: Retriever & Query Gen (전략/쿼리/요약)
# =========================================================
def retrieve_node(state: MCQState):
    if not state.get('is_history'):
        return {"retrieved_context": "한국사 문제가 아니므로 검색을 생략합니다."}

    # Phase 1: INFERENCE vs GENERAL 분류
    router_prompt = ChatPromptTemplate.from_template(
        "<|im_start|>system\n문제를 분류하세요: \n"
        "- **INFERENCE**: (가), '이 왕', '이 단체' 등 주어가 생략되어 추론이 필요한 경우\n"
        "- **GENERAL**: 대상이 명확한 사실 확인 문제\n"
        "단어 하나만 출력하세요.<|im_end|>\n"
        "<|im_start|>user\n지문: {paragraph}\n질문: {question}\n분류:<|im_end|>\n<|im_start|>assistant\n"
    )
    strategy = (router_prompt | llm_with_params | StrOutputParser()).invoke(state).strip()

    # Phase 2: 요약 및 10대 키워드 생성 (병렬 처리 권장이나 여기선 순차 구현)
    gen_prompt = ChatPromptTemplate.from_template(
        "<|im_start|>system\n당신은 역사 전문가입니다. 다음 지침을 따르세요:\n"
        "1. 지문을 2문장 이내로 핵심 요약하세요.\n"
        "2. 검색을 위한 핵심 키워드를 '콤마'로 구분하여 10개 이내로 뽑으세요.\n"
        "형식: 요약: [내용] / 키워드: [키워드들]<|im_end|>\n"
        "<|im_start|>user\n지문: {paragraph}\n질문: {question}\n결과:<|im_end|>\n<|im_start|>assistant\n"
    )
    gen_res = (gen_prompt | llm_with_params | StrOutputParser()).invoke(state)

    summary = gen_res.split("요약:")[1].split("/ 키워드:")[0].strip()
    keywords = gen_res.split("키워드:")[1].strip()

    print(f"   🚦 [전략]: {strategy} | ✨ [키워드]: {keywords}")

    # Phase 3: Dual Search (키워드 쿼리 + 지문 요약)
    # 아까 만든 hybrid_retriever.invoke() 사용
    docs_query = hybrid_retriever.invoke(keywords)
    docs_summary = hybrid_retriever.invoke(summary)

    # 중복 제거 및 컨텍스트 조립
    combined = {d.page_content: d for d in (docs_query + docs_summary)}.values()
    para_context = "\n".join([f"- {d.page_content}" for d in list(combined)[:6]])

    return {
        "strategy": strategy,
        "summary": summary,
        "optimized_query": keywords,
        "retrieved_context": f"전략: {strategy}\n요약: {summary}\n참고자료:\n{para_context}"
    }

In [29]:
# =========================================================
# 5. 그래프 조립
# =========================================================
workflow = StateGraph(MCQState)

workflow.add_node("classify", classifier_node)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("solve", solver_node) # 기존 solver_node 유지
workflow.add_node("parse", parser_node)   # 기존 parser_node 유지

workflow.set_entry_point("classify")

# 조건부 분기: 한국사일 때만 리트리버 가동
def decide_next_node(state: MCQState):
    if state["is_history"]:
        return "retrieve"
    return END # 또는 "solve" (검색 없이 풀기)

workflow.add_conditional_edges(
    "classify",
    decide_next_node,
    {
        "retrieve": "retrieve",
        END: END
    }
)

workflow.add_edge("retrieve", "solve")
workflow.add_edge("solve", "parse")
workflow.add_edge("parse", END)

app = workflow.compile()

In [30]:
import ast
import pandas as pd
from tqdm import tqdm

# 데이터 로드 (73개 테스트)
df_train = pd.read_csv("/content/drive/MyDrive/수능 풀이/data/train.csv")
test_data = df_train.iloc[:73]

results = []
print("🔥 최종 RAG 모델 평가 시작...")

for idx, row in tqdm(test_data.iterrows(), total=len(test_data)):
    try:
        # 데이터 전처리
        problem = ast.literal_eval(row['problems']) if isinstance(row['problems'], str) else row['problems']

        inputs = {
            "id": row.get('id', f"q-{idx}"),
            "paragraph": row['paragraph'],
            "question": problem['question'],
            "choices": problem['choices']
        }

        # LangFuse 콜백 설정
        config = {"callbacks": [langfuse_handler]} if 'langfuse_handler' in globals() else {}

        # 그래프 실행
        output = app.invoke(inputs, config=config)

        # 결과 저장
        results.append({
            "id": inputs['id'],
            "question": inputs['question'],
            "predicted_answer": output['final_answer'],
            "real_answer": str(problem.get('answer')),
            "is_correct": str(output['final_answer']) == str(problem.get('answer')),
            "retrieved_context": output['retrieved_context'], # 검색된 내용 저장
            "full_response": output['full_response']
        })

    except Exception as e:
        print(f"❌ Error at {idx}: {e}")

# 결과 저장 및 출력
final_df = pd.DataFrame(results)
final_df.to_csv("/content/drive/MyDrive/수능 풀이/rag_results.csv", index=False, encoding='utf-8-sig')

acc = final_df['is_correct'].mean() * 100
print(f"\n🏆 최종 정답률: {acc:.2f}%")

🔥 최종 RAG 모델 평가 시작...


  0%|          | 0/73 [00:00<?, ?it/s]

🧐 [Classifier] 한국사 문제 여부 판별 중...
   🚦 [전략]: INFERENCE | ✨ [키워드]: 정체, 전중, 상복 절차, 장자, 3년상복, 송준길, 현종실록, 붕당, 정치 이론, 문인 중심


BM25S Retrieve:   0%|          | 0/1 [00:00<?, ?it/s]

BM25S Retrieve:   0%|          | 0/1 [00:00<?, ?it/s]

ERROR:opentelemetry.exporter.otlp.proto.http.trace_exporter:Failed to export span batch code: 401, reason: {"message":"Invalid credentials. Confirm that you've configured the correct host."}
  0%|          | 0/73 [00:12<?, ?it/s]


KeyboardInterrupt: 

In [None]:
from google.colab import runtime
import time

print("✅ 모든 작업이 완료되었습니다!")
print("💾 데이터 저장이 잘 되었는지 마지막으로 확인하세요.")
print("👋 10초 뒤에 런타임을 종료하여 크레딧을 절약합니다...")

# 10초 카운트다운 (혹시라도 멈추고 싶으면 이때 중지 누르세요)
for i in range(10, 0, -1):
    print(f"{i}초...", end="\r")
    time.sleep(1)

print("\n🔌 런타임을 종료합니다. 수고하셨습니다!")
runtime.unassign()

✅ 모든 작업이 완료되었습니다!
💾 데이터 저장이 잘 되었는지 마지막으로 확인하세요.
👋 10초 뒤에 런타임을 종료하여 크레딧을 절약합니다...

🔌 런타임을 종료합니다. 수고하셨습니다!
