In [None]:
milvus_ip = "localhost"
milvus_port = 19530
db_name = "wb_vanila_rag"
collection_name = "news_article"
collection_kor_name = "뉴스 기사 데이터"

In [2]:
# Task: 컬렉션 생성 (create_collection)
from pymilvus import (Collection, DataType, FieldSchema, CollectionSchema, 
                    connections, MilvusClient, Function, FunctionType)

# Milvus 연결
connections.connect(host=milvus_ip, port=milvus_port, db_name=db_name)
milvus_client = MilvusClient(uri=f"http://{milvus_ip}:{milvus_port}", db_name=db_name)

selected_embedding_model = "Qwen/Qwen3-Embedding-0.6B"
milvus_dim = 1024

In [3]:
# 스키마 정의 (기존 create_collection과 동일)
bm25_function = Function(
        name="text_bm25_emb",
        input_field_names=["chunk"],
        output_field_names=["sparse_vector"],
        function_type=FunctionType.BM25,
    )

fields = [
    FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="ver", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="access_level", dtype=DataType.INT64),
    FieldSchema(name="params", dtype=DataType.JSON),
    FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR),
    FieldSchema(name="dense_vector", dtype=DataType.FLOAT_VECTOR, dim=milvus_dim),
    FieldSchema(name="section", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="chunk", dtype=DataType.VARCHAR, max_length=65535, enable_analyzer=True),
    FieldSchema(name="metadata", dtype=DataType.JSON)
]        
    
schema = CollectionSchema(fields, description=f"{collection_kor_name}")
schema.add_function(bm25_function)

collection = Collection(name=collection_name, schema=schema)

# 인덱스 생성
index_params = {"metric_type": "IP", "index_type": "FLAT", "params": {}}
collection.create_index(field_name="dense_vector", index_params=index_params)
sparse_index_params = {"metric_type": "BM25", "index_type": "SPARSE_INVERTED_INDEX", "params": {}}

collection.create_index(field_name="sparse_vector", index_params=sparse_index_params)

# 컬렉션을 메모리에 로드하여 사용 준비
collection.load()
connections.disconnect(db_name)

md2json

In [4]:
import os
import logging
import argparse
import time
import glob
import json

from datetime import datetime, timezone, timedelta
from tqdm import tqdm
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader

CHUNK_SIZE = 200
CHUNK_OVERLAP = 20
SEPARATOR = ["\n\n"]
MD_FOLDER = "/home/woobin.choi/research/mlops/rag/rsc/raws"

In [5]:
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    length_function=len,
    separators=SEPARATOR
)

In [6]:
def save_json(filepath, filename, json_data):
    directory_path = "/".join(filepath.split("/")[:-1])
    if "raws" in directory_path:
        directory_path = directory_path.replace("raws", "processed")
    
    # 디렉토리가 없으면 생성
    os.makedirs(directory_path, exist_ok=True)
    
    json_filename = filename.replace(".md", "_chunks.json")
    json_filepath = os.path.join(directory_path, json_filename)

    with open(json_filepath, 'w', encoding='utf-8') as json_file:
        json.dump(json_data, json_file, ensure_ascii=False, indent=4)

In [7]:
MD_FOLDER

'/home/woobin.choi/research/mlops/rag/rsc/raws'

In [8]:
MD_DATA_PATTERN = os.path.join(MD_FOLDER, "**", "*.md")
md_paths = glob.glob(MD_DATA_PATTERN, recursive=True)

In [9]:
md_paths = [os.path.abspath(p) for p in md_paths]
filepaths = [md for md in md_paths]

In [10]:
len(filepaths)

975

In [11]:
filepaths[0]

'/home/woobin.choi/research/mlops/rag/rsc/raws/19961.md'

In [12]:
from typing import List, Literal, Union, Optional
from pydantic import BaseModel, Field


class Params(BaseModel):
    chunk_size: int = Field(..., description="청크 사이즈")
    chunk_overlap: int = Field(..., description="청크 중첩 사이즈")
    length_function: str = Field(..., description="길이 함수, len | count_tokens")
    separator: List[Optional[str]] = Field(..., description="구분자, 예: ['\\n\\n', None]")
    splitter: str = Field(..., description="텍스트 분리기, RecursiveCharacterTextSplitter | MarkdownTextSplitter")
    parser: str = Field(..., description="PDF 파서 종류, PyMuPDFLoader | pymupdf4llm")

class Metadata(BaseModel):
    title: str = Field("", description="연구 제목", example="초거대 AI를 활용한 RAG 시스템 구축")

class Data(BaseModel):
    chunks: List[List[str]] = Field(..., description="청크 목록")
    section: str = Field("", description="chunk가 속한 문서의 절절")

class DataSpec(BaseModel):
    ver: str = Field(..., description="해당 데이터의 버전")
    access_level: int = Field(..., description="접근 권한 레벨, 1 | 2 | 3")
    params: Params
    metadata: Metadata
    data: Data


In [13]:
loader = TextLoader(filepaths[0])

docs = loader.load_and_split(text_splitter)
print(len(docs))    

12


In [14]:
docs[1].page_content

'\n\n\n* Patty와 남편 Tyler는 도시에서도 교외 같은 공동체 느낌을 만들고자 노력함\n* 처음엔 쿠키를 만들어 돌리거나 저녁 식사에 초대할까 고민했지만, 결국 주말 아침에 집 밖에서 커피를 마시기로 결정함\n* 비록 집에 현관 계단은 없었지만, 접이식 의자를 들고 나와 햇볕을 즐기며 이웃을 맞이하는 루틴을 시작함\n* 이웃들이 지나갈 때 손을 흔들고, 인사하고, 이름을 적어두며 “기억에 남는 사람들”이 되기 위해 노력함\n* 패티는 눈에 띄는 타이다이 모자도 착용해 친근한 이미지 강조'

In [15]:
start_time = time.time()

with tqdm(total=len(filepaths), desc="Saving json") as pbar:

    info_id = 0
    count = 0
    for filepath in filepaths:
        try:
            info_id += 1
        
            filename = filepath.split("/")[-1].strip()
            
            loader = TextLoader(filepath)

            docs = loader.load_and_split(text_splitter)
            chunks = [
                [p.strip() for p in doc.page_content.split("\n") if p.strip()]
                for doc in docs
            ]
            
            params = Params(
                chunk_size=CHUNK_SIZE,
                chunk_overlap=CHUNK_OVERLAP,
                length_function="len",
                separator=SEPARATOR,
                splitter="RecursiveCharacterTextSplitter",
                parser="TextLoader"
            )
            
            metadata = Metadata(
                title=filename.split(".md")[0]
            )

            json_data = DataSpec(
                ver="v1.2",
                access_level=1,
                params=params,
                metadata=metadata,
                data=Data(
                    chunks=chunks,
                    section=filename.split(".md")[0]
                )
            ).model_dump()

            save_json(filepath, filename, json_data)  
            count += 1
            logging.info(f"File: {filepath}")

        except Exception as e:
            logging.error(e, exc_info=True) 
        
        pbar.update(1)             

end_time = time.time()
elapsed_time = end_time - start_time
print(f"경과 시간: {elapsed_time}초")

Saving json: 100%|██████████| 975/975 [00:00<00:00, 1599.81it/s]

경과 시간: 0.618105411529541초





json2vectordb

In [17]:
from sentence_transformers import SentenceTransformer

def get_embedding_model():

    return SentenceTransformer(
        'Qwen/Qwen3-Embedding-0.6B',
        device="cuda:0",
        model_kwargs={"torch_dtype": "bfloat16"}
    )

In [18]:
from typing import List
from scipy.sparse import csr_matrix

def insert_data(
            collection_name: str, 
            doc_batch: List[str]
    ) -> None:
    
    # 임베딩 스펙 변경시 모델 재 로드
    embedding_model = get_embedding_model()

    data = []

    docs_embeddings = embedding_model.encode(doc_batch.page_content)
    docs_embeddings_result = [docs_embeddings[i].tolist() for i in range(len(docs_embeddings))]

    docs_embeddings_result = {"dense": docs_embeddings_result}

    docs_embeddings = {}
    dense_vector = docs_embeddings_result["dense"]

    docs_embeddings["dense"] = dense_vector
    rows = len(docs_embeddings["dense"])

    if "sparse" in docs_embeddings_result.keys():
        sparse_vector = docs_embeddings_result["sparse"]
        docs_embeddings["sparse"] = csr_matrix(
            (sparse_vector["data"], sparse_vector["indices"], sparse_vector["indptr"]),
            shape=tuple(sparse_vector["shape"])
        )

    for idx in range(rows):

        dense_vector = docs_embeddings["dense"][idx]

        row = {
            "ver": doc_batch.ver,
            "access_level": doc_batch.access_level,
            "params": doc_batch.params,
            "dense_vector": dense_vector,
            "section": doc_batch.section,
            "chunk": doc_batch.page_content[idx],
            "metadata": doc_batch.metadata
        }

        if "sparse" in docs_embeddings_result.keys():
            row["sparse_vector"] = sparse_vector = docs_embeddings["sparse"][[idx], :]

        data.append(row)

    # milvus client 에서 sparse 연산 후 insert 그러나 0.02초 소요
    # sparse 자체는 연산 시간 얼마 안걸림
    milvus_client.insert(
        collection_name=collection_name,
        data=data
    )

In [19]:
import re
import unicodedata

def clean_text(text):
    """
    텍스트를 정제하는 함수입니다.

    이 함수는 다음과 같은 작업을 수행합니다:
    1. 유니코드 정규화
    2. URL 제거
    3. 특정 문자만 유지하고 나머지 제거
    4. 빈 괄호 제거
    5. 연속된 줄바꿈과 공백 정리
    6. 중복된 문장부호 정리

    Args:
        text (str): 정제할 원본 텍스트

    Returns:
        str: 정제된 텍스트
    """

    # 유니코드 정규화 및 url 제거
    text = unicodedata.normalize('NFKC', text)
    text = re.sub(r'(?:https?:\/\/)?(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%_\+.~#?&//=]*)', '', text)
    
    # 한글, 영문, 숫자, 특정 문장부호만 남기고 나머지는 제거
    text = re.sub(r'[^가-힣a-zA-Z0-9.,!?"\'\s\-():;/=%{}[\]<>]', '', text)

    # 빈 괄호는 제거 (괄호 안에 공백 포함)
    text = re.sub(r'\(\s*\)|\{\s*\}|\[\s*\]|<\s*>', '', text)

    # 연속된 줄바꿈을 하나로 변환
    text = re.sub(r'\n+', '\n', text)
    
    # 줄바꿈을 제외한 연속된 공백을 하나의 공백으로 변환
    text = re.sub(r'[^\S\n]+', ' ', text)
    
    # 중복된 문장부호를 하나로 줄임 (마침표, 쉼표, 느낌표, 물음표만)
    text = re.sub(r'([.,!?])\1+', r'\1', text)
    
    # 문장의 양 끝에 있는 불필요한 공백 제거
    return text.strip()

In [20]:
import os
import time
import glob
from tqdm import tqdm

class Document:
    def __init__(self, pk=None, ver=None, access_level=None, file=None, params=None, metadata=None, page_content=None, section=None):
        self.pk = pk
        self.ver = ver
        self.access_level = access_level
        self.params = params
        self.metadata = metadata
        self.page_content = page_content
        self.section = section

# 동적 리턴이 필요한 컬렉션
def get_collection(collection):
    if collection == "**":
        return os.path.basename(os.path.dirname(filepath))    
    return collection

start_time = time.time()

JSON_PATTERN = "/home/woobin.choi/research/mlops/rag/rsc/processed/**/*.json"
json_files = list(glob.iglob(JSON_PATTERN, recursive=True))
filepaths = [pdf for pdf in json_files ]

In [None]:
import gc
import torch

DATA_PATH = "/home/woobin.choi/research/mlops/rag/rsc/processed"

with tqdm(total=len(filepaths), desc="Embedding/Insert") as pbar:

    for filepath in filepaths:
        with open(os.path.join(DATA_PATH, filepath), 'r', encoding='utf-8') as f:
            json_data = json.load(f)
        
        metadata = json_data["metadata"]
        expr_title = metadata["title"]
        expr = f'metadata["title"] == \'{expr_title}\''
        json_title = metadata.get("title")
        category = metadata.get("categories")
        

        try:
            chunks = json_data["data"]["chunks"]
            # page_content = []

            if not chunks:
                continue

            # for chunk in chunks:
                # page_content.append(clean_text("\n".join(chunk)))
            BATCH_CHUNK_SIZE = 10

            for i in range(0, len(chunks), BATCH_CHUNK_SIZE):
                page_content = []
                for chunk in chunks[i:i+BATCH_CHUNK_SIZE]:
                    page_content.append(clean_text("\n".join(chunk)))

                doc = Document(
                    ver=json_data["ver"],
                    access_level=json_data["access_level"],
                    params=json_data["params"],
                    page_content=page_content, 
                    metadata=metadata,
                    section=json_data["data"]["section"]
                )

                insert_data(
                    collection_name = collection_name,
                    doc_batch=doc
                )
                
                torch.cuda.empty_cache()
                gc.collect()

        except Exception as e:
            print(e)
            print('### error: ', json_title)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"경과 시간: {elapsed_time}초")

Hybrid Search

In [21]:
query = "mcp 에 대해서 알려줘"

In [22]:
from typing import List, Dict

def get_output_fields(
    collection_name,
) -> List[str]:
    res = milvus_client.describe_collection(
        collection_name=collection_name,
    )
    return [
        field['name'] 
        for field in res['fields']
        if field['name'] not in ('sparse_vector', 'dense_vector')
    ]        

def rrf_rank(
    results_list: List[List[Dict]], 
    top_k: int, 
) -> List[Dict]:
    scores = {}
    entities = {}

    for result_set in results_list:
        for rank, item in enumerate(result_set):
            doc_id = item['pk']
            score = 1 / (rank + 1)
            scores[doc_id] = scores.get(doc_id, 0) + score

            if doc_id not in entities:
                entities[doc_id] = item

    sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return [entities[doc_id] for doc_id, _ in sorted_docs[:top_k]]    

In [23]:
from typing import Dict, Optional, List
import numpy as np


def hybrid_search(
    client,
    collection_name: str,        
    query: str,
    top_k: int = 5,
    max_allowed_level: Optional[int] = 0,
    search_params: Optional[Dict] = {"metric_type": "IP"},
) -> List[Document]:

    embeddings_model = get_embedding_model()
    embeddings_result = embeddings_model.encode(query)
    # dense_vector = [np.array(embedding) for embedding in embeddings_result["dense"]]
    
    dense_arr = np.asarray(embeddings_result, dtype=np.float32)
    if dense_arr.ndim == 1:  # (D,)면 1개 배치로 감싸기
        dense_vector = [dense_arr]
    else:  # (N, D)
        dense_vector = [row for row in dense_arr]

    embeddings_result = {"dense": dense_vector}
    
    output_fields = get_output_fields(collection_name)
    if max_allowed_level == 0:
        expr = None 
    else:
        expr = f"access_level < {max_allowed_level + 1}"        

    try:
        if "sparse" in embeddings_result.keys():
            sparse_vector = csr_matrix(
                (
                    embeddings_result["sparse"]["data"],
                    embeddings_result["sparse"]["indices"],
                    embeddings_result["sparse"]["indptr"],
                ),
                shape=tuple(embeddings_result["sparse"]["shape"]),
            )
            sparse_res = client.search(
                collection_name=collection_name,
                filter=expr,
                data=sparse_vector,
                anns_field="sparse_vector",
                search_params=search_params,
                limit=top_k,
                output_fields=output_fields,
            )            
        else:
            sparse_res = client.search(
                collection_name=collection_name,
                filter=expr,
                data=[query],
                anns_field="sparse_vector",
                # search_params=search_params,
                limit=top_k,
                output_fields=output_fields,
            )

        dense_res = client.search(
            collection_name=collection_name,
            filter=expr,
            data=dense_vector,
            anns_field="dense_vector",
            search_params=search_params,
            limit=top_k,
            output_fields=output_fields,
        )

        res = rrf_rank([sparse_res[0], dense_res[0]], top_k=top_k)
    except Exception as ex:
        raise ex

    return [Document(
        pk=result["entity"]["pk"] if "pk" in result["entity"].keys() else None,
        ver=result["entity"]["ver"] if "ver" in result["entity"].keys() else None,
        access_level=result["entity"]["access_level"] if "access_level" in result["entity"].keys() else None,
        params=result["entity"]["params"] if "params" in result["entity"].keys() else None,
        section=result["entity"]["section"] if "section" in result["entity"].keys() else None,
        page_content=result["entity"]["chunk"] if "chunk" in result["entity"].keys() else None,
        metadata=result["entity"]["metadata"] if "metadata" in result["entity"].keys() else None,
    ) for result in res]


In [24]:
# 범위 검색
return_docs = hybrid_search(
    client = milvus_client,
    collection_name = collection_name,
    query = query,
    top_k = 5,
    max_allowed_level = 1,
    search_params={
        "metric_type": "IP",
        "params": {
            "radius": 0.4,
            "range_filter": 0.6,
        }
    }
)      

In [25]:
len(return_docs)

5

In [26]:
for i in range(len(return_docs)):
    print(return_docs[i].section)
    print("===============================================")
    print(return_docs[i].page_content)
    print("===============================================")

20109
배포 및 DNS 설정
 Heroku 배포 자동화, 일부 구버전 API 사용 문제는 문서 링크로 해결
 GoDaddy 도메인 연결도 버튼 누를 위치와 값까지 알려줘 손쉽게 설정 완료
 AI 도구로서의 Windsurf 사용 경험
20721
WinDBG(CDB)를 파이썬으로 제어하며, 이를 AI가 사용할 수 있도록 MCP 프로토콜 서버로 래핑함
 MCP는 Anthropic이 개발한 AI와 외부 도구 간의 통신 표준으로, 툴을 AI의 손처럼 사용할 수 있게 해줌
 MCP의 장점:
 모든 AI 모델에서 사용 가능
 VS Code 외 환경에서도 독립 실행 가능
 비플랫폼 종속적
 빠른 기능 확장성 확보
19987
Playwright MCP - LLM을 위한 웹 브라우저 자동화용 MCP 서버
21155
Model Context Protocol (MCP) 지원 예정
 MCP는 LLM이 툴에 접근하는 새로운 표준 프로토콜로 급부상
 지난 8일 내에 OpenAI, Anthropic, Mistral 등 대형 벤더 API에도 빠르게 도입되고 있음
 향후 LLM을 MCP 클라이언트로 만들어 다양한 MCP 서버에 쉽게 연동 계획
20430
Claude Code는 시스템을 변경할 수 있는 작업(파일 쓰기, bash 명령어 실행, MCP 도구 사용 등)에 대해 기본적으로 사용자 승인 요청을 함
이는 보안을 위한 보수적 설계이며, 사용자가 안전하다고 판단되는 도구는 허용 목록(allowlist) 을 통해 사전 승인 가능함
 허용 도구 설정 방법
