In [82]:
# from dotenv import load_dotenv

# load_dotenv()

In [83]:
# from langchain_teddynote import logging

# # 프로젝트 이름을 입력합니다.
# logging.langsmith("posco_RAG")

# 청크 만들기

In [84]:
# === Markdown 헤딩 기반 → 토큰 기반 2단계 청킹(간단판) ===
# - tiktoken 있으면 실제 토큰 기준, 없으면 공백 단위 토큰으로 대체
# - MINIMAL_FIELDS 플래그로 저장 필드 최소/확장 선택

import re, os, json, uuid
from pathlib import Path
from typing import List, Dict, Any

# (선택) tiktoken 사용: 없으면 자동 폴백
try:
    import tiktoken
    _ENC = tiktoken.get_encoding("cl100k_base")
    def count_tokens(text: str) -> int:
        return len(_ENC.encode(text))
    def split_by_tokens(text: str, max_tokens: int, overlap: int) -> List[str]:
        ids = _ENC.encode(text)
        out, start = [], 0
        while start < len(ids):
            end = min(start + max_tokens, len(ids))
            out.append(_ENC.decode(ids[start:end]).strip())
            if end >= len(ids):
                break
            start = max(0, end - overlap)
        return [x for x in out if x]
    TOKENIZER_NAME = "tiktoken(cl100k_base)"
except Exception:
    _ENC = None
    def count_tokens(text: str) -> int:
        return len(text.split())
    def split_by_tokens(text: str, max_tokens: int, overlap: int) -> List[str]:
        toks = text.split()
        out, start = [], 0
        while start < len(toks):
            end = min(start + max_tokens, len(toks))
            out.append(" ".join(toks[start:end]).strip())
            if end >= len(toks):
                break
            start = max(0, end - overlap)
        return [x for x in out if x]
    TOKENIZER_NAME = "whitespace(fallback)"

def split_sections_by_heading(md_text: str, heading_level_count: int) -> List[Dict[str, Any]]:
    """헤딩(예: '##'면 2) 기준으로 1차 분할"""
    pattern = re.compile(rf"^({'#' * heading_level_count})\s+(.+)$", re.MULTILINE)
    hits = [(m.start(), m.group(0)) for m in pattern.finditer(md_text)]
    if not hits:
        lines = md_text.splitlines()
        return [{
            "title": "",
            "content": md_text.strip(),
            "start_idx": 0,
            "end_idx": len(md_text),
            "start_line": 1,
            "end_line": len(lines)
        }]

    # 라인 번호 계산용
    line_starts, pos = [], 0
    for line in md_text.splitlines(True):
        line_starts.append(pos); pos += len(line)
    def char_to_lineno(i: int) -> int:
        ln = 1
        for idx, s in enumerate(line_starts):
            if s > i: break
            ln = idx + 1
        return ln

    hits.append((len(md_text), ""))  # 끝 경계
    sections = []
    for i in range(len(hits)-1):
        start_idx, _ = hits[i]
        end_idx, _ = hits[i+1]
        block = md_text[start_idx:end_idx].strip()
        lines = block.splitlines()
        title = re.sub(r"^#+\s*", "", lines[0]).strip() if lines else ""
        content = "\n".join(lines[1:]).strip()
        sections.append({
            "title": title,
            "content": content,
            "start_idx": start_idx,
            "end_idx": end_idx,
            "start_line": char_to_lineno(start_idx),
            "end_line": char_to_lineno(end_idx),
        })
    return sections

def chunk_one_md(md_path: Path, heading_level: int, max_tokens: int, overlap_tokens: int,
                 minimal_fields: bool = True) -> List[Dict[str, Any]]:
    """단일 MD 파일을 청킹"""
    text = md_path.read_text(encoding="utf-8", errors="ignore")
    sections = split_sections_by_heading(text, heading_level)
    chunks, gidx = [], 0
    for sec_idx, sec in enumerate(sections):
        parts = split_by_tokens(sec["content"], max_tokens, overlap_tokens) if sec["content"] else [""]
        for part_idx, part in enumerate(parts):
            if minimal_fields:
                row = {
                    "id": str(uuid.uuid4()),
                    "index": gidx,
                    "source_file": md_path.name,
                    "title": sec["title"],
                    "content": part,
                }
            else:
                row = {
                    "id": str(uuid.uuid4()),
                    "index": gidx,
                    "section_index": sec_idx,
                    "part_index": part_idx,
                    "source_path": str(md_path.resolve()),
                    "source_file": md_path.name,
                    "title": sec["title"],
                    "content": part,
                    "n_tokens": count_tokens(part),
                    "start_line": sec["start_line"],
                    "end_line": sec["end_line"],
                    "tokenizer": TOKENIZER_NAME,
                    "max_tokens": max_tokens,
                    "overlap_tokens": overlap_tokens,
                    "heading_level": heading_level,
                }
            chunks.append(row)
            gidx += 1
    return chunks

def chunk_md_folder(input_dir: str, output_dir: str,
                    heading_level: int = 2, max_tokens: int = 800, overlap_tokens: int = 100,
                    minimal_fields: bool = True) -> Path:
    """폴더 내 *.md 모두 처리하고 index.jsonl로 합칩니다."""
    in_dir, out_dir = Path(input_dir), Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    all_rows = []
    for md_file in sorted(in_dir.glob("*.md")):
        rows = chunk_one_md(md_file, heading_level, max_tokens, overlap_tokens, minimal_fields)
        # 파일별 저장(선택): 원하면 주석 해제해서 파일 단위도 저장하세요.
        # with (out_dir / f"{md_file.stem}_chunks.json").open("w", encoding="utf-8") as f:
        #     json.dump(rows, f, ensure_ascii=False, indent=2)
        # with (out_dir / f"{md_file.stem}_chunks.jsonl").open("w", encoding="utf-8") as f:
        #     for r in rows: f.write(json.dumps(r, ensure_ascii=False) + "\n")
        all_rows.extend(rows)

    # 전체 합본(이걸로 RAG 임베딩하면 편함)
    idx_path = out_dir / "index.jsonl"
    with idx_path.open("w", encoding="utf-8") as f:
        for r in all_rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")
    return idx_path


In [85]:
# PROJECT_ROOT = Path.cwd().parent

# === 여기만 바꾸면 됩니다 ===
INPUT_DIR = "../data_new/data/md_rag_files2"            # <- 당신의 MD 폴더 경로
OUTPUT_DIR = "../data_new/data/json_rag_files2"         # <- 청크 jsonl 저장 폴더
HEADING_LEVEL = 1                  # '##' 기준이면 2, '#'이면 1, '###'이면 3
MAX_TOKENS = 800                   # 청크 최대 토큰
OVERLAP_TOKENS = 100               # 청크 간 겹침
MINIMAL_FIELDS = True              # True=최소필드만 저장 / False=풍부한 메타 포함

idx_path = chunk_md_folder(
    input_dir=INPUT_DIR,
    output_dir=OUTPUT_DIR,
    heading_level=HEADING_LEVEL,
    max_tokens=MAX_TOKENS,
    overlap_tokens=OVERLAP_TOKENS,
    minimal_fields=MINIMAL_FIELDS
)

print("✅ Done. Saved:", idx_path)
# 간단 미리보기
from itertools import islice
print("\n--- Preview (first 3 lines) ---")
with open(idx_path, "r", encoding="utf-8") as f:
    for line in islice(f, 3):
        print(line.rstrip()[:300])


✅ Done. Saved: ../data_new/data/json_rag_files2/index.jsonl

--- Preview (first 3 lines) ---
{"id": "e93ffb8d-4f88-4dda-ba16-7aadbd771914", "index": 0, "source_file": "AFM - a330f_afm_ENG.md", "title": "ENG FIRE (IN FLIGHT)", "content": "Ident.: EMER-26-00005711.0001001 / 26 NOV 09 APPROVED Criteria: A330 LAND ASAP Shut down affected engine. Push relevant FIRE pushbutton. Turn off affected 
{"id": "038e956b-8430-4079-afe6-e062d89f2d37", "index": 1, "source_file": "AFM - a330f_afm_ENG.md", "title": "ENG FIRE (ON GROUND)", "content": "Ident.: EMER-26-00005712.0005001 / 16 APR 10 APPROVED Criteria: 330-200F Set all thrust levers to idle. When aircraft stopped : Set parking brake to ON. N
{"id": "e67e19ae-413b-4606-b65e-b1f6cae03318", "index": 2, "source_file": "AFM - a330f_afm_ENG.md", "title": "APU FIRE", "content": "Ident.: EMER-26-00005713.0001001 / 26 NOV 09 APPROVED Criteria: A330 LAND ASAP Press APU FlRE pushbutton. Discharge agent after 10 s. Shut down APU."}


-----

# 벡터 DB 생성

In [86]:
from pathlib import Path

# PROJECT_ROOT = Path(__file__).resolve().parent.parent
PROJECT_ROOT = Path.cwd().parent

# === 여기를 당신 환경에 맞게 수정 ===
INDEX_JSONL = PROJECT_ROOT / "data_new" / "data" / "json_rag_files2" / "index.jsonl"  # 위에서 만든 청크 합본 jsonl
OUT_DIR     = PROJECT_ROOT / "data_new" / "new_vector_DB" / "faiss_e5_large_v2"       # 두 개 인덱스를 저장할 루트 폴더

# 결과 폴더(모델별 하위 폴더)가 생성됩니다:
# - {OUT_DIR}/faiss_bge_m3/
# - {OUT_DIR}/faiss_e5_large_v2/
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)


In [87]:
import json

# index.jsonl -> 리스트(dict)
rows = []
with open(INDEX_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        rows.append(json.loads(line))

# 최소 필수 컬럼: id, title, content, source_file (당신 jsonl 구조는 이미 OK)
len(rows), rows[0].keys()


(22, dict_keys(['id', 'index', 'source_file', 'title', 'content']))

In [None]:
import os, faiss, numpy as np, json
import pickle
from tqdm import tqdm
from sentence_transformers import SentenceTransformer

def prepare_doc_text(doc, model_tag: str):
    """
    임베딩 모델에 맞는 문서 텍스트 전처리.
    - e5 계열: 'passage: ' 프리픽스 권장
    - bge 계열: 그냥 내용 그대로도 되지만, 통일감 위해 'passage: ' 사용해도 무방
    """
    text = doc.get("content", "") or ""
    title = doc.get("title", "") or ""
    # 제목 + 본문 합치기(검색 품질↑)
    base = (title + "\n" + text).strip() if title else text.strip()
    if model_tag == "e5":
        return "passage: " + base
    else:
        return "passage: " + base

def prepare_query_text(query: str, model_tag: str):
    """
    질의용 텍스트 전처리.
    - e5: 'query: ' 프리픽스
    - bge-m3: 그대로도 OK, 통일 위해 'query: ' 사용
    """
    if model_tag == "e5":
        return "query: " + query.strip()
    else:
        return "query: " + query.strip()

def build_faiss_index(rows, model_name: str, out_dir: str, batch_size: int = 64, normalize: bool = True):
    """
    rows: index.jsonl 로드 결과(list of dict)
    model_name: "BAAI/bge-m3" or "intfloat/e5-large-v2"
    out_dir: 인덱스 저장 폴더
    """
    os.makedirs(out_dir, exist_ok=True)

    # 모델 태그 판별(전처리 다르게)
    model_tag = "e5" if "e5" in model_name.lower() else "bge"

    # 모델 로드
    model = SentenceTransformer(model_name)

    # 임베딩 생성
    emb_list = []
    meta_list = []

    buf = []
    for r in rows:
        buf.append(prepare_doc_text(r, model_tag))
        meta_list.append({
            "id": r.get("id"),
            "title": r.get("title"),
            "source_file": r.get("source_file"),
            # 원하면 여기서 더(라인 범위 등) 붙여도 됩니다.
        })
        # 배치 인코딩
        if len(buf) >= batch_size:
            emb = model.encode(buf, normalize_embeddings=normalize, show_progress_bar=False)
            emb_list.append(emb)
            buf = []
    if buf:
        emb = model.encode(buf, normalize_embeddings=normalize, show_progress_bar=False)
        emb_list.append(emb)

    embs = np.vstack(emb_list).astype("float32")
    dim  = embs.shape[1]

    # 코사인 유사도 = 내적(IP) + L2 정규화 → IndexFlatIP
    index = faiss.IndexFlatIP(dim)
    index.add(embs)

    # 저장: faiss + 메타 + 설정
    faiss.write_index(index, os.path.join(out_dir, "index.faiss"))
    with open(os.path.join(out_dir, "meta.jsonl"), "w", encoding="utf-8") as f:
        for m in meta_list:
            f.write(json.dumps(m, ensure_ascii=False) + "\n")
    with open(os.path.join(out_dir, "config.json"), "w", encoding="utf-8") as f:
        json.dump({"model_name": model_name, "normalize": normalize, "dim": dim}, f, ensure_ascii=False, indent=2)
    with open(os.path.join(out_dir, "index.pkl"), "wb") as f:
        pickle.dump(meta_list, f)

    print(f"✅ Saved FAISS index to: {out_dir}")
    print(f"   - vectors: {len(meta_list)}, dim: {dim}, model: {model_name}, normalize: {normalize}")

def load_faiss_index(out_dir: str):
    index = faiss.read_index(os.path.join(out_dir, "index.faiss"))
    meta = []
    with open(os.path.join(out_dir, "meta.jsonl"), "r", encoding="utf-8") as f:
        for line in f:
            meta.append(json.loads(line))
    with open(os.path.join(out_dir, "config.json"), "r", encoding="utf-8") as f:
        cfg = json.load(f)
    model = SentenceTransformer(cfg["model_name"])
    return index, meta, cfg, model

def search(query: str, top_k: int, index, meta, cfg, model):
    model_tag = "e5" if "e5" in cfg["model_name"].lower() else "bge"
    qtext = prepare_query_text(query, model_tag)
    q = model.encode([qtext], normalize_embeddings=cfg.get("normalize", True), show_progress_bar=False).astype("float32")
    scores, idxs = index.search(q, top_k)
    idxs = idxs[0].tolist()
    scores = scores[0].tolist()
    results = []
    for rank, (i, sc) in enumerate(zip(idxs, scores), start=1):
        if i == -1: 
            continue
        m = meta[i]
        results.append({"rank": rank, "score": float(sc), **m})
    return results


In [89]:


bge_dir = str(Path(OUT_DIR) / "faiss_bge_m3")
# print(bge_dir)
e5_dir  = str(Path(OUT_DIR) / "faiss_e5_large_v2")
# print(e5_dir)

# build_faiss_index(rows, model_name="BAAI/bge-m3",         out_dir=bge_dir, batch_size=32, normalize=True)
build_faiss_index(rows, model_name="intfloat/e5-large-v2", out_dir=e5_dir,  batch_size=32, normalize=True)


✅ Saved FAISS index to: /home/piai/AI_Project/RAG/data_new/new_vector_DB/faiss_e5_large_v2/faiss_e5_large_v2
   - vectors: 22, dim: 1024, model: intfloat/e5-large-v2, normalize: True


⚠️ 중요:

bge-m3는 다국어 지원 → 한글 쿼리 그대로 검색해도 매칭 잘 됩니다.

e5-large-v2는 영어 중심 → 한글 쿼리 성능이 떨어질 수 있습니다(번역 후 사용 추천).
당장은 bge 인덱스에서 한글 쿼리 테스트가 가장 현실적입니다.

---

간단 비교용

In [90]:
# BGE 인덱스 로드 + 검색
# bge_index, bge_meta, bge_cfg, bge_model = load_faiss_index(bge_dir)
# results_bge = search("engine fire", top_k=5, index=bge_index, meta=bge_meta, cfg=bge_cfg, model=bge_model)
# print("=== BGE-m3 결과 ===")
# for r in results_bge:
#     print(f"[{r['rank']}] {r['score']:.3f} | {r['source_file']} | {r['title']}")

# E5 인덱스 로드 + (그대로) 검색
e5_index, e5_meta, e5_cfg, e5_model = load_faiss_index(e5_dir)
results_e5 = search("engine fire", top_k=5, index=e5_index, meta=e5_meta, cfg=e5_cfg, model=e5_model)
print("\n=== e5-large-v2 결과(한글 쿼리, 번역 없음) ===")
for r in results_e5:
    print(f"[{r['rank']}] {r['score']:.3f} | {r['source_file']} | {r['title']}")



=== e5-large-v2 결과(한글 쿼리, 번역 없음) ===
[1] 0.813 | AFM - a330f_afm_ENG.md | ENG FIRE (ON GROUND)
[2] 0.800 | AFM - a330f_afm_ENG.md | ENG FIRE (IN FLIGHT)
[3] 0.798 | AFM - a330f_afm_ENG.md | APU FIRE
[4] 0.797 | Abnormal-and-Emergency-Procedures-a330-ENG.md | ENG 1(2) FIRE (IN FLIGHT)
[5] 0.791 | Abnormal-and-Emergency-Procedures-a330-FUEL.md | [QRH] FUEL LEAK


---

본문 내용 비교용

In [91]:
# INDEX_JSONL 경로는 네가 이미 쓰던 그대로 사용
import json

id2content = {}
with open(INDEX_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        row = json.loads(line)
        id2content[row["id"]] = row.get("content", "")
print("loaded ids:", len(id2content))


loaded ids: 22


In [92]:
def print_full_content(results):
    for r in results:
        cid = r["id"]
        content = id2content.get(cid, "") or ""
        print("="*100)
        print(f"[{r['rank']}] score={r['score']:.3f}")
        print(f"source_file: {r['source_file']}")
        print(f"title      : {r.get('title','')}")
        print("-"*100)
        print(content)           # ← 전체 본문 그대로 출력
        print("\n")


In [93]:
# print("=== BGE-m3 결과 (FULL) ===")
# results_bge = search("engine fire", top_k=5, index=bge_index, meta=bge_meta, cfg=bge_cfg, model=bge_model)
# print_full_content(results_bge)

print("\n=== e5-large-v2 결과 (FULL) ===")
results_e5 = search("engine fire", top_k=5, index=e5_index, meta=e5_meta, cfg=e5_cfg, model=e5_model)
print_full_content(results_e5)



=== e5-large-v2 결과 (FULL) ===
[1] score=0.813
source_file: AFM - a330f_afm_ENG.md
title      : ENG FIRE (ON GROUND)
----------------------------------------------------------------------------------------------------
Ident.: EMER-26-00005712.0005001 / 16 APR 10 APPROVED Criteria: 330-200F Set all thrust levers to idle. When aircraft stopped : Set parking brake to ON. Notify ATC. Alert courier area occupants. Shut down affected engine. Push relevant FlRE pushbutton. Discharge all fire agents of the affected engine. If MAN CAB PR has been used: Check cabin differential pressure at zero before opening the doors. Shut down other engine. Push other engine FIRE pushbutton. If evacuation required : Initiate evacuation. Shut down APU. Turn off all batteries. If evacuation not required : Notify courier area occupants to remain seated.


[2] score=0.800
source_file: AFM - a330f_afm_ENG.md
title      : ENG FIRE (IN FLIGHT)
---------------------------------------------------------------------

----

인덱스 로드 (청크 JSONL → 메모리)

In [94]:
import json, re
from pathlib import Path

# === 경로만 바꿔주세요 ===
INDEX_JSONL = "../data_new/data/json_rag_files2/index.jsonl"

id2content = {}
id2meta = {}
docs_tokens = []     # BM25용 토큰 문서 리스트
doc_ids = []         # BM25용 doc id 리스트(청크 id)

def tokenize_en(text: str):
    # 영어만 사용한다고 했으니 간단 토크나이저(영숫자만)
    return re.findall(r"[A-Za-z0-9]+", text.lower())

with open(INDEX_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        row = json.loads(line)
        cid   = row["id"]
        title = row.get("title","") or ""
        body  = row.get("content","") or ""
        src   = row.get("source_file","")
        id2content[cid] = body
        id2meta[cid] = {"source_file": src, "title": title}
        # BM25에는 title+content를 함께 반영(키워드 리콜↑)
        tokens = tokenize_en(f"{title}\n{body}")
        docs_tokens.append(tokens)
        doc_ids.append(cid)

print(f"Loaded chunks: {len(doc_ids)}")


Loaded chunks: 22


BM25 인덱스 빌드

In [95]:
from rank_bm25 import BM25Okapi

bm25 = BM25Okapi(docs_tokens)
print("BM25 ready.")


BM25 ready.


Dense 검색 훅 (BGE/E5)

In [96]:
# 이미 정의되어 있는 것 가정:
# - load_faiss_index(dirpath) -> (index, meta, cfg, model)
# - search(query, top_k, index, meta, cfg, model) -> [{"id","rank","score","source_file","title"}, ...]

# === 경로만 바꿔주세요 (이미 로드했으면 이 셀은 건너뛰어도 됨) ===


# try:
#     bge_index
# except NameError:
#     bge_index, bge_meta, bge_cfg, bge_model = load_faiss_index(bge_dir)
#     print("BGE index loaded")

try:
    e5_index
except NameError:
    e5_index, e5_meta, e5_cfg, e5_model = load_faiss_index(e5_dir)
    print("E5 index loaded")

def dense_search(query: str, backend: str = "bge", top_k: int = 20):
    if backend == "bge":
        return search(query, top_k=top_k, index=bge_index, meta=bge_meta, cfg=bge_cfg, model=bge_model)
    elif backend == "e5":
        return search(query, top_k=top_k, index=e5_index, meta=e5_meta, cfg=e5_cfg, model=e5_model)
    else:
        raise ValueError("backend must be 'bge' or 'e5'")


BM25 검색 함수

In [97]:
from math import isfinite

def bm25_search(query: str, top_k: int = 20):
    q_tokens = tokenize_en(query)
    # BM25Okapi.get_scores는 모든 문서 점수 반환 → 상위 top_k 인덱스 추출
    scores = bm25.get_scores(q_tokens)
    # 랭크 산정을 위해 점수로 정렬
    idx_sorted = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_k]
    results = []
    for rank, i in enumerate(idx_sorted, start=1):
        cid = doc_ids[i]
        results.append({
            "id": cid,
            "rank": rank,
            "score": float(scores[i]) if isfinite(scores[i]) else 0.0,
            "source_file": id2meta[cid]["source_file"],
            "title": id2meta[cid]["title"],
            "_retriever": "bm25"
        })
    return results


RRF 결합 (LoRE 컨셉: 서로 다른 리트리버 랭크를 1/(k+rank)로 합산)

In [98]:
from collections import defaultdict

def rrf_fuse(*ranked_lists, k: int = 60, top_k: int = 10):
    """
    ranked_lists: 각 원소는 [{'id','rank',...}, ...] 형태의 리스트
    k: RRF 상수 (논문/IR 커뮤니티에서 60 자주 사용)
    """
    agg = defaultdict(lambda: {"rrf": 0.0, "candidates": []})
    for lst in ranked_lists:
        for item in lst:
            rid  = item["id"]
            rnk  = item["rank"]
            rrf  = 1.0 / (k + rnk)
            agg[rid]["rrf"] += rrf
            agg[rid]["candidates"].append(item)

    fused = []
    for cid, bundle in agg.items():
        # 메타 필드 하나를 대표로 사용
        any_item = max(bundle["candidates"], key=lambda x: x.get("score", 0))
        fused.append({
            "id": cid,
            "fused_rrf": bundle["rrf"],
            "source_file": any_item["source_file"],
            "title": any_item["title"],
            # 참고용(디버그): 개별 기여도
            "_details": sorted(
                [{"retriever": it.get("_retriever","dense"), "rank": it["rank"], "score": it.get("score", None)}
                 for it in bundle["candidates"]],
                key=lambda x: x["rank"]
            )
        })

    fused.sort(key=lambda x: x["fused_rrf"], reverse=True)
    return fused[:top_k]


하이브리드 리트리버 (BM25 + Dense(BGE/E5/둘다)) → RRF → LLM 입력 컨텍스트 직전

In [99]:
def hybrid_retrieve(query: str,
                    top_k_bm25: int = 30,
                    top_k_dense: int = 30,
                    dense_backends = ("bge","e5"),
                    rrf_k: int = 60,
                    final_k: int = 10):
    # 1) 각각 검색
    bm25_hits = bm25_search(query, top_k=top_k_bm25)
    dense_lists = []
    for be in dense_backends:
        d_hits = dense_search(query, backend=be, top_k=top_k_dense)
        # 표준화(필요 시): _retriever 태그 & rank 필드 보장
        for it in d_hits:
            it["_retriever"] = be
        dense_lists.append(d_hits)

    # 2) RRF 결합
    fused = rrf_fuse(bm25_hits, *dense_lists, k=rrf_k, top_k=final_k)
    return fused

def build_context(fused, max_chars=3500):
    """LLM 투입 직전 컨텍스트 문자열 조립(출처/제목/본문 포함)"""
    ctxs, used = [], 0
    for r in fused:
        cid   = r["id"]
        title = r["title"]
        src   = r["source_file"]
        body  = id2content.get(cid, "")
        block = f"[TITLE] {title}\n[FILE] {src}\n[CONTENT]\n{body}\n"
        if used + len(block) > max_chars:
            break
        ctxs.append(block); used += len(block)
    return "\n\n---\n\n".join(ctxs)


예시 실행

In [None]:
# fused = hybrid_retrieve(query, top_k_bm25=5, top_k_dense=5, dense_backends=("e5"), rrf_k=60, final_k=5)
# query = "engine fire in flight procedures for A330"
# query = "There’s now a FUEL IMBALANCE. What do we do in this case?"
# query = "There’s now a FUEL IMBALANCE. What do we do in this case?"
# query = "Cheked it out, it showed up in Fuel Leak. What’s the response?"
# query = "If smoke is detected in the left engine during the flight, and the engine fire is suspected, what procedure should the crew take?"
# query = "If there's a situation where you have to stop the engine, what steps should you take to do it?"
# query = "Now, when XBleed is open, Engine bleed is broken and it doesn’t work."
query = "What should we do in case of an engine fire?"
fused = hybrid_retrieve(query, top_k_bm25=5, top_k_dense=5, dense_backends=("e5",), rrf_k=60, final_k=5)

print("=== Fused (RRF) Top-K ===")
for i, r in enumerate(fused, 1):
    print(f"[{i}] RRF={r['fused_rrf']:.5f} | {r['source_file']} | {r['title']}")
    # 디버그: 개별 리트리버 기여
    # print("   ", r["_details"])

context = build_context(fused, max_chars=9999999) 
print("\n--- Context preview ---\n")
print(context)  # 너무 길면 앞부분만 확인


=== Fused (RRF) Top-K ===
[1] RRF=0.01639 | AFM - a330f_afm_FUEL.md | FUEL LEAK
[2] RRF=0.01639 | Abnormal-and-Emergency-Procedures-a330-AIR.md | AIR ABNORM BLEED CONFIG (X BLEED CLOSED)
[3] RRF=0.01613 | Abnormal-and-Emergency-Procedures-a330-ENG.md | ENG 1(2) SHUT DOWN
[4] RRF=0.01613 | Abnormal-and-Emergency-Procedures-a330-AIR.md | AIR ABNORM BLEED CONFIG (X BLEED CLOSED)
[5] RRF=0.01587 | Abnormal-and-Emergency-Procedures-a330-FUEL.md | [QRH] FUEL LEAK

--- Context preview ---

[TITLE] FUEL LEAK
[FILE] AFM - a330f_afm_FUEL.md
[CONTENT]
Ident.: ABN-28-00005134.0004001 / 28 FEB 11 APPROVED Criteria: (330-200F and 58623) A fuel leak may be detected if: ‐ The sum of the FOB and the FU is significantly less than the FOB at engine start, or decreases, or ‐ An occupant observes fuel spray from engine/pylon or wing tip, or ‐ The total fuel quantity decreases at an abnormal rate, or ‐ A fuel imbalance develops, or ‐ The fuel quantity of a tank decreases too fast (leak from engine/pylon or 

---

이제 LLM에 넣어버려~~~

In [102]:
import requests
import textwrap

# (선택) 컨텍스트 너무 길면 잘라 넣기
def clamp_context(ctx: str, max_chars: int = 7000) -> str:
    return ctx[:max_chars]

def build_prompt(query: str, context: str) -> list:
    """
    Ollama /api/chat 형식(messages 배열)으로 프롬프트 구성.
    - system: 역할/규칙
    - user: 질의와 컨텍스트
    """
    system = (
    "You are an expert aviation assistant. "
    "You are a chatbot that helps pilots during in-flight emergency situations. "
    "When an emergency occurs, provide the emergency response procedure step by step. "
    "Answer ONLY using the given context. "
    "If the answer is not in the context, say you don't know. "
    "Be concise, accurate, and quote short snippets if useful. "
    "At the end, include a short 'Sources:' list with the [FILE] and section titles you used."
)

    user = textwrap.dedent(f"""
    Question:
    {query}

    Context (multiple chunks):
    ---
    {context}
    ---

    Instructions:
    - Use only the information in Context.
    - Provide the emergency response procedure step by step.
    - Only include actual pilot action steps (checklists) as step-by-step procedures.
    - Do NOT list background info, triggering conditions, system descriptions, or alerts as steps.
    - Steps must start with the action the pilot must take.
    - If unsure or missing, say: "I couldn't find this in the provided context."
    - Keep the answer under ~8 sentences when possible.
    - At the end, provide additional recommended actions if you think they are necessary.
    - Include "Sources:" with [FILE] and [TITLE] lines you actually used.
    """)

    return [
        {"role": "system", "content": system},
        {"role": "user", "content": user},
    ]

def ask_ollama_llama3(query: str, context: str,
                      model: str = "llama3:8b",
                      host: str = "http://localhost:11434",
                      temperature: float = 0.2,
                      num_ctx: int = 8192) -> str:
    """
    Ollama chat API 호출. stream=False로 한방에 결과 받음.
    """
    context = clamp_context(context, max_chars=7000)
    messages = build_prompt(query, context)

    resp = requests.post(
        f"{host}/api/chat",
        json={
            "model": model,
            "messages": messages,
            "options": {
                "temperature": temperature,
                "num_ctx": num_ctx
            },
            "stream": False
        },
        timeout=120
    )
    resp.raise_for_status()
    data = resp.json()
    return data.get("message", {}).get("content", "").strip()

# ======================
# 예시 실행 (앞에서 만든 `query`, `context` 활용)
# ======================
answer = ask_ollama_llama3(query, context, model="llama3:8b")
print("\n=== LLM Answer ===\n")
print(answer)



=== LLM Answer ===

When XBleed is open, Engine bleed is broken and it doesn’t work.

Since the context mentions that when a fuel leak is confirmed, LAND ASAP, I will provide the emergency response procedure step by step:

1. Shut down affected engine.
2. Monitor inner tank fuel quantities and look for one tank depleting faster.
3. If one inner tank depletes faster than the other by at least 500 kg (1 102 lb) in less than 30 min, shut down affected engine and monitor the fuel leak.

Sources:
[FILE] AFM - a330f_afm_FUEL.md
[TITLE] FUEL LEAK
