# RAG Embedding Pipeline (Pinecone + E5)

이 노트북은 문서 업서트를 위한 전용 파이프라인입니다.

실행 순서: 1) 환경/설치 → 2) Pinecone 연결 → 3) 문서 수집/파싱/청킹 → 4) 모델 로드/업서트 → (선택) 인덱스 초기화


In [1]:
# 환경/설치 (Cell 2–4)
import sys, platform
print('Python:', sys.version)
print('Platform:', platform.platform())

try:
    import torch
    print('Torch:', torch.__version__, '| CUDA:', torch.cuda.is_available())
except Exception as e:
    print('Torch error:', e)

try:
    import transformers
    print('Transformers:', transformers.__version__)
except Exception as e:
    print('Transformers error:', e)

# 설치 (조용히)
!{sys.executable} -m pip install -q --upgrade pip
!{sys.executable} -m pip install -q "pinecone>=5.0.0" sentence-transformers python-slugify rank-bm25 pyyaml


Python: 3.11.9 (tags/v3.11.9:de54cf5, Apr  2 2024, 10:12:12) [MSC v.1938 64 bit (AMD64)]
Platform: Windows-10-10.0.26100-SP0
Torch: 2.8.0+cpu | CUDA: False
Transformers: 4.56.1




In [2]:
# Pinecone 연결/인덱스 (Cell 6, 9)
import os
from pinecone import Pinecone, ServerlessSpec
from config import PINECONE_API_KEY, PINECONE_INDEX_NAME, PINECONE_DIMENSION, PINECONE_METRIC, PINECONE_CLOUD, PINECONE_REGION

pc = Pinecone(api_key=PINECONE_API_KEY)
index_name = PINECONE_INDEX_NAME

if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=PINECONE_DIMENSION,
        metric=PINECONE_METRIC,
        spec=ServerlessSpec(cloud=PINECONE_CLOUD, region=PINECONE_REGION)
    )
    print("인덱스 생성 완료")
else:
    print("이미 인덱스 있음")

index = pc.Index(index_name)
print(index.describe_index_stats())


이미 인덱스 있음
{'dimension': 1024,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 1082}},
 'total_vector_count': 1082,
 'vector_type': 'dense'}


In [None]:
# 문서 수집/파싱/청킹 (Cell 12, 14, 18)
import os, re, json, yaml
from slugify import slugify
from config import DOCUMENT_PATHS, CHUNK_SIZE, CHUNK_OVERLAP

def find_md_files_direct(folder_path):
    md_files = []
    if os.path.exists(folder_path):
        for file in os.listdir(folder_path):
            if file.endswith(".md"):
                md_files.append(os.path.join(folder_path, file))
    return md_files

# config.py에서 경로 가져오기
folder_path1 = DOCUMENT_PATHS["products"]
folder_path2 = DOCUMENT_PATHS["company"]
md_files = find_md_files_direct(folder_path1) + find_md_files_direct(folder_path2)
print("전체 마크다운 파일 수:", len(md_files))

fm_re = re.compile(r"^---\s*\n(.*?)\n---\s*\n(.*)", re.DOTALL)

def parse_document_unified(text, filename):
    m = fm_re.match(text)
    if m:
        fm_raw, body = m.groups()
        try:
            fm_dict = yaml.safe_load(fm_raw)
            if fm_dict:
                return fm_dict, body
        except yaml.YAMLError:
            pass
    name_without_ext = os.path.splitext(filename)[0]
    title = name_without_ext.replace('_', ' ')
    fm_dict = {"title": title, "category1": "기타", "category2": "", "category3": "", "category4": "", "keywords": title}
    return fm_dict, text

def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
    chunks, start = [], 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start += chunk_size - overlap
    return chunks

docs = []
for file_path in md_files:
    with open(file_path, "r", encoding="utf-8") as f:
        t = f.read()
    fm, body = parse_document_unified(t, os.path.basename(file_path))
    docs.append({"id": os.path.basename(file_path), "front_matter": fm, "body": body})

print("불러온 문서 수:", len(docs))

chunked_docs_meta = []
for doc in docs:
    fm = doc.get("front_matter") or {}
    base_meta = {
        "source_doc": doc["id"],
        "title": fm.get("title", ""),
        "category1": fm.get("category1", ""),
        "category2": fm.get("category2", ""),
        "category3": fm.get("category3", ""),
        "category4": fm.get("category4", ""),
        "keywords": fm.get("keywords", ""),
    }
    clean_name = slugify(os.path.splitext(doc["id"])[0], separator='_') or "doc"
    # front-matter도 하나의 레코드로 저장
    if fm:
        fm_json = json.dumps(fm, ensure_ascii=False)
        chunked_docs_meta.append({
            "id": f"{clean_name}-front",
            "text": f"passage: {fm_json}",
            "metadata": {**base_meta, "kind": "front", "text_content": fm_json[:40000]}
        })
    for i, chunk in enumerate(chunk_text(doc["body"], CHUNK_SIZE, CHUNK_OVERLAP)):
        chunked_docs_meta.append({
            "id": f"{clean_name}-chunk-{i}",
            "text": f"passage: {chunk}",
            "metadata": {**base_meta, "kind": "chunk", "text_content": chunk[:40000]}
        })

print("청크 수:", len(chunked_docs_meta))
print("예시:", chunked_docs_meta[0]["id"] if chunked_docs_meta else None)


In [None]:
# 모델 로드/업서트 (Cell 21, 22)
from sentence_transformers import SentenceTransformer
import numpy as np
from config import EMBEDDING_MODEL_NAME

model_name = EMBEDDING_MODEL_NAME
model = SentenceTransformer(model_name, device="cpu")
print("모델 로드 완료:", model_name)

# 업서트
records = []
texts = [d["text"] for d in chunked_docs_meta]
embs = model.encode(texts, convert_to_numpy=True, normalize_embeddings=True)

for i, d in enumerate(chunked_docs_meta):
    records.append({
        "id": d["id"],
        "values": embs[i].tolist(),
        "metadata": d["metadata"]
    })

print("업서트 준비:", len(records))

batch_size = 100
for s in range(0, len(records), batch_size):
    batch = records[s:s+batch_size]
    index.upsert(vectors=batch)
print("업서트 완료")
print(index.describe_index_stats())


In [None]:
# (선택) 인덱스 초기화 (Cell 15, 16) - 위험
print("🗑️ 인덱스 전체 삭제 (실행 전 주의)")
# 아래 두 줄을 주석 해제 후 실행
# stats = index.describe_index_stats(); print(stats)
# index.delete(delete_all=True); print(index.describe_index_stats())
