In [None]:
import logging

import httpx
from langchain_core.documents import Document
from typing import Annotated
from langgraph.graph import MessagesState

class State(MessagesState):
    question: Annotated[str, "User question"]
    generation: Annotated[str, "LLM generated answer"]
    documents: Annotated[list[str], "List of documents"]

logger = logging.getLogger(__name__)

RERANK_API_URL = "http://127.0.0.1:9806/rerank"
RERANK_TIMEOUT_SEC = 15
RERANK_TOP_K = 5
RERANK_MIN_SCORE = 0.5
RERANK_INSTRUCTION = "Given a user query, retrieve relevant passages from the documents that answer the query."

def _to_texts_and_keep(documents: list[Document]) -> tuple[list[str], list[Document]]:
    texts: list[str] = []
    kept: list[Document] = []
    for doc in documents:
        text = None
        if hasattr(doc, "page_content"):
            text = getattr(doc, "page_content", None)
        else:
            text = str(doc)

        texts.append(text)
        kept.append(doc)
    return texts, kept


def _post_rerank(question: str, texts: list[str]) -> list[float]:
    payload = {
        "queries": [question] * len(texts),
        "documents": texts,
        "instruction": RERANK_INSTRUCTION,
    }

    logger.info(f"Calling Rerank API: url={RERANK_API_URL}, n_docs={len(texts)}")
    # with httpx.Client(timeout=httpx.Timeout(RERANK_TIMEOUT_SEC)) as client:
    with httpx.Client(timeout=None) as client:
        resp = client.post(RERANK_API_URL, json=payload)
        resp.raise_for_status()
        data = resp.json()

    scores = data.get("scores", [])
    if not isinstance(scores, list) or len(scores) != len(texts):
        raise ValueError(f"Invalid rerank response: {data}")

    return [float(s) for s in scores]


def _apply_topk_and_threshold(order: list[int], scores: list[float]) -> list[int]:
    if RERANK_MIN_SCORE is not None:
        try:
            thr = float(RERANK_MIN_SCORE)
            order = [i for i in order if scores[i] >= thr]
        except Exception:
            logger.warning("RERANK_MIN_SCORE is not a float; ignoring threshold filter.")

    # TOP_K 제한
    if RERANK_TOP_K is not None:
        try:
            k = RERANK_TOP_K
            if k > 0:
                order = order[:k]
        except Exception:
            logger.warning("RERANK_TOP_K is not an int; ignoring top-k.")

    return order


def rerank(state: State):
    question = state.get("question")
    documents = state.get("documents")
    messages = state.get("messages", [])

    if not question:
        logger.warning("rerank() called with empty question; returning state unchanged.")
        return

    if not documents:
        logger.info("rerank() no documents to rerank; returning state unchanged.")
        return

    try:
        texts, kept = _to_texts_and_keep(documents)
        scores = _post_rerank(question, texts)

        order = sorted(range(len(kept)), key=lambda i: scores[i], reverse=True)
        order = _apply_topk_and_threshold(order, scores)

        reranked_docs = [kept[i] for i in order]

        # 로그: 상위 몇 개만 미리보기
        preview_n = min(5, len(reranked_docs))
        logger.info(
            f"Reranked {len(kept)} docs. "
            f"Top-{preview_n} scores: {[round(scores[i], 4) for i in order[:preview_n]]}"
        )
        return {"documents": reranked_docs}

    except Exception as e:
        logger.exception(f"Failed to rerank documents: {e}")
        return

In [None]:
import logging, time, threading
from langgraph.graph import MessagesState

class State(MessagesState):
    question: Annotated[str, "User question"]
    generation: Annotated[str, "LLM generated answer"]
    documents: Annotated[list[str], "List of documents"]
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
from langchain.messages import HumanMessage

logger = logging.getLogger(__name__)

EMBED_MODEL_ID = "Qwen/Qwen3-Embedding-8B"
# EMBED_MODEL_ID = "Qwen/Qwen3-Embedding-4B"
OPENAI_URL = "http://127.0.0.1:9804/v1"
MILVUS_URI = "http://127.0.0.1:19530"
TIMEOUT_SEC = 5

def test_embedding_connection(embeddings: OpenAIEmbeddings, test_text: str, timeout: int) -> None:
    result_container = {"result": None, "error": None}

    def run():
        try:
            result_container["result"] = embeddings.embed_query(test_text)
        except Exception as e:
            result_container["error"] = e

    thread = threading.Thread(target=run)
    thread.start()
    thread.join(timeout)

    if thread.is_alive():
        raise TimeoutError(f"Embedding call exceeded timeout of {timeout} seconds.")

    if result_container["error"]:
        raise result_container["error"]

    result = result_container["result"]
    if result and isinstance(result, list) and len(result) > 0:
        logger.info(f"Embedding model connected successfully")
    else:
        raise ValueError("Embedding response is invalid or empty.")

try:
    logger.info(f"Initializing Embedding model: model='{EMBED_MODEL_ID}', url='{OPENAI_URL}'")
    embeddings = OpenAIEmbeddings(
        api_key="EMPTY",
        base_url=OPENAI_URL,
        model=EMBED_MODEL_ID,
        tiktoken_enabled=False
    )

    test_embedding_connection(embeddings, "연결 테스트 문장", TIMEOUT_SEC)
    
except Exception as e:
    logger.exception(f"Failed to initialize OpenAIEmbeddings: {e}")
    raise

try:
    logger.info(f"Connecting to Milvus at {MILVUS_URI}")
    vector_store = Milvus(
        embedding_function=embeddings,
        collection_name="doc_embeddings",
        connection_args={
            "uri": MILVUS_URI,
            "token": "root:Milvus",
            "db_name": "doc_embeddings3"
        },
        index_params={
            "index_type": "FLAT",
            "metric_type": "COSINE"
        },
    )
    logger.info("Milvus vector store connection established.")

except Exception as e:
    logger.exception(f"Failed to connect to Milvus: {e}")
    raise

def retrieve(state: State):
    question = state.get("question", "").strip()
    human_message = HumanMessage(question)

    if not question:
        logger.warning("retrieve() called with empty question in state.")
        return {"documents": [], "question": "", "messages": [human_message]}

    logger.info(f"Retrieving documents for question: '{question}'")

    try:
        print(question)
        documents_with_score = vector_store.similarity_search_with_score(question, k=171)
        documents = list(map(lambda x: x[0], documents_with_score))
        logger.info(f"Retrieved {len(documents)} documents from vector store.")
        return {"documents": documents, "question": question, "messages": [human_message], "temp": documents_with_score}

    except Exception as e:
        logger.exception(f"Error during retrieval for question='{question}': {e}")
        return {"documents": [], "question": question, "messages": [human_message]}


In [None]:
my_state = {
    "question": "스마트 시티"
}

my_state = retrieve(my_state)

In [None]:
my_state["temp"]

In [None]:
my_state["temp"]

In [None]:
my_state["temp"]

In [None]:
for doc in my_state["documents"]:
    print(doc.page_content)
    print("-"*100)

In [None]:
for doc in my_state["documents"]:
    print(doc.page_content)
    print("-"*100)

In [None]:
for doc in my_state["documents"]:
    print(doc.page_content[:100])
    print("-"*100)

In [None]:
result = rerank(my_state)

In [None]:
result

In [None]:
from pymilvus import MilvusClient
cli = MilvusClient(uri=MILVUS_URI, token="root:Milvus")
cli.use_database("doc_embeddings3")
desc = cli.describe_collection("doc_embeddings")
# assert desc["schema"]["fields"][-1]["params"]["dim"] == 4096, desc
# assert desc["indexes"][0]["metric_type"].upper() == "COSINE", desc


In [None]:
from pymilvus import MilvusClient

cli = MilvusClient(uri=MILVUS_URI, token="root:Milvus")
cli.use_database("doc_embeddings3")

# 인덱스 목록
idx_names = cli.list_indexes("doc_embeddings")
print("indexes:", idx_names)  # 보통 ['vector']

# 인덱스 상세 (메트릭/인덱스 타입 확인)
idx_info = cli.describe_index(collection_name="doc_embeddings", index_name=idx_names[0])
print(idx_info)
# 기대값: {'index_type': 'FLAT', 'metric_type': 'COSINE', ...}


In [None]:
from pymilvus import MilvusClient

cli = MilvusClient(uri=MILVUS_URI, token="root:Milvus")
cli.use_database("doc_embeddings2")

# 인덱스 목록
idx_names = cli.list_indexes("doc_embeddings")
print("indexes:", idx_names)  # 보통 ['vector']

# 인덱스 상세 (메트릭/인덱스 타입 확인)
idx_info = cli.describe_index(collection_name="doc_embeddings", index_name=idx_names[0])
print(idx_info)
# 기대값: {'index_type': 'FLAT', 'metric_type': 'COSINE', ...}


In [None]:
from pymilvus import MilvusClient
cli = MilvusClient(uri=MILVUS_URI, token="root:Milvus")
cli.use_database("doc_embeddings2")
desc = cli.describe_collection("doc_embeddings")

desc

In [None]:
    embeddings = OpenAIEmbeddings(
        api_key="EMPTY",
        base_url=OPENAI_URL,
        model=EMBED_MODEL_ID
    )

In [None]:
my_state["temp"][-1][0].page_content

In [None]:
my_state["temp"][0][1]

In [None]:
# 필요한 패키지 설치
# pip install langchain openai numpy

from langchain_openai import OpenAIEmbeddings
import numpy as np

# 1️⃣ OpenAI Embedding 모델 초기화
embeddings = OpenAIEmbeddings(
        api_key="EMPTY",
        base_url=OPENAI_URL,
        model=EMBED_MODEL_ID
    )

# 2️⃣ 문장 정의
sentence1 = my_state["temp"][-1][0].page_content
sentence2 = "스마트 시티"

# 3️⃣ 문장 임베딩
embedding1 = embeddings.embed_query(sentence1)
embedding2 = embeddings.embed_query(sentence2)

# 4️⃣ 코사인 유사도 계산 함수
def cosine_similarity(vec1, vec2):
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

# 5️⃣ 결과 출력
similarity = cosine_similarity(embedding1, embedding2)
print(f"코사인 유사도: {similarity:.4f}")


In [None]:
# 필요한 패키지 설치
# pip install langchain openai numpy

from langchain_openai import OpenAIEmbeddings
import numpy as np

# 1️⃣ OpenAI Embedding 모델 초기화
embeddings = OpenAIEmbeddings(
        api_key="EMPTY",
        base_url=OPENAI_URL,
        model=EMBED_MODEL_ID
    )

# 2️⃣ 문장 정의
sentence1 = my_state["temp"][-3][0].page_content
sentence2 = "스마트 시티"

# 3️⃣ 문장 임베딩
embedding1 = embeddings.embed_query(sentence1)
embedding2 = embeddings.embed_query(sentence2)

# 4️⃣ 코사인 유사도 계산 함수
def cosine_similarity(vec1, vec2):
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

# 5️⃣ 결과 출력
similarity = cosine_similarity(embedding1, embedding2)
print(f"코사인 유사도: {similarity:.4f}")


In [None]:
my_state["temp"][-3][1]

In [None]:
# 쿼리 & 문서 텍스트
query = "스마트 시티"
doc_text = my_state["temp"][1][0].page_content  # 같은 문서로 유지

# 1) Milvus에서 받은 distance -> cosine similarity로 변환
doc, dist = my_state["temp"][1]   # (Document, distance)
milvus_cos = 1.0 - float(dist)
print(f"[Milvus] 1 - distance = {milvus_cos:.6f}")

# 2) 같은 모델로, 같은 규칙으로 직접 코사인 계산
q_vec = embeddings.embed_query(query)                 # 질의는 embed_query
d_vec = embeddings.embed_documents([doc_text])[0]     # 문서는 embed_documents

import numpy as np
def cos_sim(a, b):
    a = np.array(a); b = np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

manual_cos = cos_sim(q_vec, d_vec)
print(f"[Manual] cosine_similarity = {manual_cos:.6f}")


In [None]:
from openai import OpenAI
import json
openai_client = OpenAI(
    api_key="EMPTY",
    base_url=OPENAI_URL,
)


milvus_client = MilvusClient(
    uri=MILVUS_URI,
    token="root:Milvus"
)

# 쿼리/결과 하나 집기
query = "스마트 시티"
docs_with_score = vector_store.similarity_search_with_relevance_scores(query, k=1)
doc, dist = docs_with_score[0]   # dist = Milvus가 반환한 COSINE distance
print("Milvus distance:", dist)

# 1) 같은 OpenAI-호환 클라이언트(인덱싱 때 쓰신 것)로 '쿼리' 임베딩 생성
#    Qwen3는 'query'에 Instruction을 주는 걸 권장합니다 (모델카드 예시 참조)
QWEN_QUERY_INSTRUCT = "Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: "
q_input = query
# q_input = QWEN_QUERY_INSTRUCT + query
q_vec = openai_client.embeddings.create(model=EMBED_MODEL_ID, input=[q_input]).data[0].embedding

# 2) 방금 검색된 동일 문서의 'pk'를 확보(기본 primary_field는 'pk')
doc_pk = doc.metadata.get("pk") or doc.metadata.get("id")


milvus_client.use_database("doc_embeddings3")
# 3) Milvus에서 해당 문서의 '저장된 벡터'를 그대로 읽어오기
row = milvus_client.query(
    collection_name="doc_embeddings",
    filter=f"pk == {int(doc_pk)}" if doc_pk is not None else f'text == {json.dumps(doc.page_content)}',
    output_fields=["pk","text","vector"]
)[0]
stored_vec = row["vector"]

# 4) 코사인 유사도 직접 계산
import numpy as np
def cos(a,b):
    a=np.array(a); b=np.array(b)
    return float(np.dot(a,b) / (np.linalg.norm(a)*np.linalg.norm(b)))

manual_cos = cos(q_vec, stored_vec)
print("Manual cosine:", manual_cos)
print("1 - distance :", 1.0 - float(dist))


In [None]:
milvus_client.query(
    collection_name="doc_embeddings",
    filter=f"pk == {int(doc_pk)}" if doc_pk is not None else f'text == {json.dumps(doc.page_content)}',
    output_fields=["pk","text","vector"]
)

In [None]:
# pip install "openai>=1.52.0" "pymilvus>=2.4.3" numpy

import os, json, hashlib, logging
from typing import List, Tuple
import numpy as np
from openai import OpenAI
from pymilvus import MilvusClient, DataType

# ---------------------------
# 설정
# ---------------------------
OPENAI_URL = os.getenv("OPENAI_URL", "http://127.0.0.1:9804/v1")  # OpenAI 호환 서버
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "EMPTY")
EMBED_MODEL_ID  = os.getenv("EMBED_MODEL_ID", "Qwen/Qwen3-Embedding-8B")

MILVUS_URI  = os.getenv("MILVUS_URI", "http://127.0.0.1:19530")
DB_NAME     = os.getenv("MILVUS_DB",   "doc_embeddings7")
COLLECTION  = os.getenv("MILVUS_COL",  "doc_embeddings_plain")

# 전처리/프롬프트 일치 옵션
STRIP_NEW_LINES = False  # LangChain을 쓰지 않으니, 여기서만 제어 (인덱싱/쿼리 둘 다 동일 적용)
USE_QWEN_QUERY_INSTRUCT = True  # Qwen3-Embedding은 쿼리에 전용 지시문을 주는 것이 권장

QWEN_QUERY_INSTRUCT = (
    "Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: "
)

logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
log = logging.getLogger("plain_milvus_demo")

# ---------------------------
# 클라이언트
# ---------------------------
openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_URL)
milvus_client = MilvusClient(uri=MILVUS_URI, token="root:Milvus")

# ---------------------------
# 전처리 & 임베딩 유틸
# ---------------------------
def normalize_text(t: str, strip_new_lines: bool = STRIP_NEW_LINES) -> str:
    if t is None:
        return ""
    return t.replace("\n", " ") if strip_new_lines else t

def embed_documents(texts: List[str]) -> List[List[float]]:
    """문서(패시지) 임베딩: 지시문 없이 그대로"""
    inputs = [normalize_text(t) for t in texts]
    resp = openai_client.embeddings.create(model=EMBED_MODEL_ID, input=inputs)
    return [d.embedding for d in resp.data]

def embed_query(text: str) -> List[float]:
    """쿼리 임베딩: Qwen3 계열은 쿼리용 지시문 권장"""
    q = normalize_text(text)
    # if USE_QWEN_QUERY_INSTRUCT:
    #     q = QWEN_QUERY_INSTRUCT + q
    resp = openai_client.embeddings.create(model=EMBED_MODEL_ID, input=[q])
    return resp.data[0].embedding

def cosine_similarity(a: List[float], b: List[float]) -> float:
    va, vb = np.array(a, dtype=np.float32), np.array(b, dtype=np.float32)
    denom = (np.linalg.norm(va) * np.linalg.norm(vb))
    return float(np.dot(va, vb) / denom) if denom != 0 else 0.0

# ---------------------------
# Milvus 스키마/컬렉션 준비
# ---------------------------
def ensure_database(db_name: str):
    if db_name not in milvus_client.list_databases():
        milvus_client.create_database(db_name=db_name)
    milvus_client.use_database(db_name)

def ensure_collection(collection: str, dim: int):
    if milvus_client.has_collection(collection):
        # 이미 있다면 dim 확인은 생략 (테스트용). 실제 운영에선 dim 검증 권장.
        return
    schema = milvus_client.create_schema()
    schema.add_field("pk",   DataType.INT64,  is_primary=True, auto_id=False)
    schema.add_field("text", DataType.VARCHAR, max_length=65535)
    schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim)

    index_params = milvus_client.prepare_index_params()
    index_params.add_index(
        field_name="vector",
        index_type="FLAT",       # 재현성 위해 정확 검색
        metric_type="COSINE"     # COSINE 거리(작을수록 가깝다)
    )
    milvus_client.create_collection(
        collection_name=collection,
        schema=schema,
        index_params=index_params
    )

# ---------------------------
# 데이터 적재
# ---------------------------
def make_pk(text: str) -> int:
    return int(hashlib.sha256(text.encode("utf-8")).hexdigest(), 16) % (1 << 63)

def upsert_documents(docs: List[str]) -> None:
    # 1) 임베딩
    vectors = embed_documents(docs)
    dim = len(vectors[0])
    ensure_database(DB_NAME)
    ensure_collection(COLLECTION, dim)

    # 2) upsert
    rows = []
    for t, v in zip(docs, vectors):
        rows.append({"pk": make_pk(t), "text": t, "vector": v})
    milvus_client.upsert(collection_name=COLLECTION, data=rows)

# ---------------------------
# 서버측 검색(Milvus) & 수동 코사인 비교
# ---------------------------
def milvus_search(query: str, topk: int = 5) -> List[Tuple[dict, float]]:
    qvec = embed_query(query)
    res = milvus_client.search(
        collection_name=COLLECTION,
        data=[qvec],                   # 질의 벡터
        anns_field="vector",
        limit=topk,
        search_params={"metric_type": "COSINE"},
        output_fields=["pk","text","vector"]  # ← vector 불러오면 수동 코사인 비교도 즉시 가능
    )
    # res는 [hits] 형태
    hits = res[0]
    out = []
    for h in hits:
        # Milvus는 COSINE metric에서 "distance"를 반환 (작을수록 유사)
        distance = float(h["distance"])
        # 코사인 유사도로 보려면:
        cos_sim = 1.0 - distance
        out.append((h, cos_sim))
    return out

def manual_rank(query: str, limit: int = 5) -> List[Tuple[dict, float]]:
    qvec = embed_query(query)
    # 데모를 위해 모든 행을 가져와 수동 계산 (데이터가 많다면 주의)
    rows = milvus_client.query(
        collection_name=COLLECTION,
        filter="",  # 전체
        output_fields=["pk","text","vector"],
        limit=10000,              # ← 필수!
        # offset=0               # 필요하면 페이징
    )
    scored = []
    for r in rows:
        sim = cosine_similarity(qvec, r["vector"])
        scored.append((r, sim))
    scored.sort(key=lambda x: x[1], reverse=True)
    return scored[:limit]

# ---------------------------
# 데모 실행
# ---------------------------
# 0) 예시 문장들(원하시는 문장으로 교체하세요)
DOCS = [
    "스마트 시티는 도시의 인프라와 서비스를 데이터로 최적화한다.",
    "스마트 팩토리는 공정 데이터를 활용해 생산성을 높인다.",
    "서울은 교통 데이터 플랫폼을 통해 신호체계를 개선하고 있다.",
    "도시 재생 프로젝트는 시민 참여와 공공 데이터를 결합한다.",
    "클라우드 네이티브 아키텍처는 확장성과 복원력을 가진다."
]
QUERY = "스마트 시티"

# 1) 문서 임베딩 + Milvus 적재
log.info("문서 upsert")
upsert_documents(DOCS)

# 2) Milvus 서버측 검색 (distance → cos로 변환)
log.info("Milvus 검색 결과 (distance → 1 - distance 변환)")
milvus_hits = milvus_search(QUERY, topk=5)
for i, (hit, cos_sim) in enumerate(milvus_hits, 1):
    # hit에는 "entity" 필드가 있고, output_fields로 요청한 값들이 들어있다.
    text = hit["entity"].get("text")
    distance = float(hit["distance"])
    print(f"[Milvus] top{i} | cos={cos_sim:.6f} | 1-dist={1-distance:.6f} | dist={distance:.6f} | text={text}")

# 3) 수동 코사인 랭킹 (저장된 vector 전부를 가져와서 쿼리 벡터와 직접 코사인)
log.info("수동 코사인 랭킹")
manual_hits = manual_rank(QUERY, limit=5)
for i, (row, cos_sim) in enumerate(manual_hits, 1):
    print(f"[Manual] top{i} | cos={cos_sim:.6f} | text={row['text']}")

# 4) 상호 검증: 같은 PK가 비슷한 순서로 나오는지 간단 확인
print("\n[검증] 상위 3개 텍스트 비교")
print("Milvus top3:")
print([h[0]["entity"]["text"] for h in milvus_hits[:3]])
print("Manual top3:")
print([r[0]["text"] for r in manual_hits[:3]])

print("\n완료")


In [None]:
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(
    api_key="EMPTY",
    base_url=OPENAI_URL,
    model=EMBED_MODEL_ID
)

vector_store = Milvus(
    embedding_function=embeddings,
    collection_name="doc_embeddings_plain",
    connection_args={
        "uri": MILVUS_URI,
        "token": "root:Milvus",
        "db_name": "doc_embeddings5"
    },
    index_params={
        "index_type": "FLAT",
        "metric_type": "COSINE"
    },
)

In [None]:
import logging, time, threading
from typing import Annotated
from langgraph.graph import MessagesState


class State(MessagesState):
    question: Annotated[str, "User question"]
    generation: Annotated[str, "LLM generated answer"]
    documents: Annotated[list[str], "List of documents"]
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
from langchain.messages import HumanMessage

logger = logging.getLogger(__name__)

EMBED_MODEL_ID = "Qwen/Qwen3-Embedding-8B"
# EMBED_MODEL_ID = "Qwen/Qwen3-Embedding-4B"
OPENAI_URL = "http://127.0.0.1:9804/v1"
MILVUS_URI = "http://127.0.0.1:19530"
TIMEOUT_SEC = 5

def test_embedding_connection(embeddings: OpenAIEmbeddings, test_text: str, timeout: int) -> None:
    result_container = {"result": None, "error": None}

    def run():
        try:
            result_container["result"] = embeddings.embed_query(test_text)
        except Exception as e:
            result_container["error"] = e

    thread = threading.Thread(target=run)
    thread.start()
    thread.join(timeout)

    if thread.is_alive():
        raise TimeoutError(f"Embedding call exceeded timeout of {timeout} seconds.")

    if result_container["error"]:
        raise result_container["error"]

    result = result_container["result"]
    if result and isinstance(result, list) and len(result) > 0:
        logger.info(f"Embedding model connected successfully")
    else:
        raise ValueError("Embedding response is invalid or empty.")

try:
    logger.info(f"Initializing Embedding model: model='{EMBED_MODEL_ID}', url='{OPENAI_URL}'")
    embeddings = OpenAIEmbeddings(
        api_key="EMPTY",
        base_url=OPENAI_URL,
        model=EMBED_MODEL_ID
    )

    test_embedding_connection(embeddings, "연결 테스트 문장", TIMEOUT_SEC)
    
except Exception as e:
    logger.exception(f"Failed to initialize OpenAIEmbeddings: {e}")
    raise

try:
    logger.info(f"Connecting to Milvus at {MILVUS_URI}")
    vector_store = Milvus(
        embedding_function=embeddings,
        collection_name="doc_embeddings_plain",
        connection_args={
            "uri": MILVUS_URI,
            "token": "root:Milvus",
            "db_name": "doc_embeddings7"
        },
        index_params={
            "index_type": "FLAT",
            "metric_type": "COSINE"
        },
    )
    logger.info("Milvus vector store connection established.")

except Exception as e:
    logger.exception(f"Failed to connect to Milvus: {e}")
    raise

def retrieve(state: State):
    question = state.get("question", "").strip()
    human_message = HumanMessage(question)

    if not question:
        logger.warning("retrieve() called with empty question in state.")
        return {"documents": [], "question": "", "messages": [human_message]}

    logger.info(f"Retrieving documents for question: '{question}'")

    try:
        documents_with_score = vector_store.similarity_search_with_score(question, k=171)
        documents = list(map(lambda x: x[0], documents_with_score))
        logger.info(f"Retrieved {len(documents)} documents from vector store.")
        return {"documents": documents, "question": question, "messages": [human_message], "temp": documents_with_score}

    except Exception as e:
        logger.exception(f"Error during retrieval for question='{question}': {e}")
        return {"documents": [], "question": question, "messages": [human_message]}


In [None]:
my_state = {
    "question": "스마트 시티"
}

my_state = retrieve(my_state)

In [None]:
my_state

In [None]:
Manual] top1 | cos=0.669276 | text=스마트 시티는 도시의 인프라와 서비스를 데이터로 최적화한다.
[Manual] top2 | cos=0.435555 | text=서울은 교통 데이터 플랫폼을 통해 신호체계를 개선하고 있다.
[Manual] top3 | cos=0.385460 | text=스마트 팩토리는 공정 데이터를 활용해 생산성을 높인다.
[Manual] top4 | cos=0.355128 | text=도시 재생 프로젝트는 시민 참여와 공공 데이터를 결합한다.
[Manual] top5 | cos=0.217225 | text=클라우드 네이티브 아키텍처는 확장성과 복원력을 가진다.

In [None]:
# 쿼리 & 문서 텍스트
query = "스마트 시티"
doc_text = my_state["temp"][2][0].page_content  # 같은 문서로 유지

# 1) Milvus에서 받은 distance -> cosine similarity로 변환
doc, dist = my_state["temp"][2]   # (Document, distance)
milvus_cos =float(dist)
print(f"[Milvus] 1 - distance = {milvus_cos:.6f}")

# 2) 같은 모델로, 같은 규칙으로 직접 코사인 계산
q_vec = embeddings.embed_query(query)                 # 질의는 embed_query
d_vec = embeddings.embed_documents([doc_text])[0]     # 문서는 embed_documents

import numpy as np
def cos_sim(a, b):
    a = np.array(a); b = np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

manual_cos = cos_sim(q_vec, d_vec)
print(f"[Manual] cosine_similarity = {manual_cos:.6f}")


In [None]:
from pymilvus import MilvusClient
import json, numpy as np

milvus_raw = MilvusClient(uri=MILVUS_URI, token="root:Milvus")
milvus_raw.use_database("doc_embeddings5")  # vector_store와 동일 DB

# 1) Milvus 결과에서 문서 텍스트 확보
doc_text = my_state["temp"][0][0].page_content

# 2) 저장 벡터 조회 (pk가 메타에 있으면 pk로, 없으면 text로)
rows = milvus_raw.query(
    collection_name="doc_embeddings_plain",
    filter=f'text == {json.dumps(doc_text)}',
    output_fields=["pk","text","vector"],
    limit=1
)
stored_vec = rows[0]["vector"]

# 3) 쿼리 벡터 (vector_store가 내부에서 쓴 것과 동일 경로로 생성)
# q_vec = embeddings.embed_query("스마트 시티")

def cos(a,b):
    a=np.array(a); b=np.array(b)
    return float(np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b)))

manual_cos = cos(q_vec, stored_vec)
print(f"[Manual using STORED vec] cosine = {manual_cos:.6f}")
print(f"[From Milvus] cosine = {float(my_state['temp'][0][1]):.6f}")


In [None]:
[Milvus] top1 | cos=0.330724 | 1-dist=0.330724 | dist=0.669276 | text=스마트 시티는 도시의 인프라와 서비스를 데이터로 최적화한다.
[Milvus] top2 | cos=0.564445 | 1-dist=0.564445 | dist=0.435555 | text=서울은 교통 데이터 플랫폼을 통해 신호체계를 개선하고 있다.
[Milvus] top3 | cos=0.614540 | 1-dist=0.614540 | dist=0.385460 | text=스마트 팩토리는 공정 데이터를 활용해 생산성을 높인다.
[Milvus] top4 | cos=0.644872 | 1-dist=0.644872 | dist=0.355128 | text=도시 재생 프로젝트는 시민 참여와 공공 데이터를 결합한다.
[Milvus] top5 | cos=0.782775 | 1-dist=0.782775 | dist=0.217225 | text=클라우드 네이티브 아키텍처는 확장성과 복원력을 가진다.

In [None]:
# pip install "openai>=1.52.0" "pymilvus>=2.4.3" numpy

import os, json, hashlib, logging
from typing import List, Tuple
import numpy as np
from openai import OpenAI
from pymilvus import MilvusClient, DataType

# ---------------------------
# 설정
# ---------------------------
OPENAI_URL = os.getenv("OPENAI_URL", "http://127.0.0.1:9804/v1")  # OpenAI 호환 서버
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "EMPTY")
EMBED_MODEL_ID  = os.getenv("EMBED_MODEL_ID", "Qwen/Qwen3-Embedding-8B")

MILVUS_URI  = os.getenv("MILVUS_URI", "http://127.0.0.1:19530")
DB_NAME     = os.getenv("MILVUS_DB",   "doc_embeddings102")
COLLECTION  = os.getenv("MILVUS_COL",  "doc_embeddings_plain")

# 전처리/프롬프트 일치 옵션
STRIP_NEW_LINES = False  # LangChain을 쓰지 않으니, 여기서만 제어 (인덱싱/쿼리 둘 다 동일 적용)
USE_QWEN_QUERY_INSTRUCT = True  # Qwen3-Embedding은 쿼리에 전용 지시문을 주는 것이 권장

QWEN_QUERY_INSTRUCT = (
    "Instruct: Given a web search query, retrieve relevant passages that answer the query\nQuery: "
)

logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s")
log = logging.getLogger("plain_milvus_demo")

# ---------------------------
# 클라이언트
# ---------------------------
openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_URL)
milvus_client = MilvusClient(uri=MILVUS_URI, token="root:Milvus")

# ---------------------------
# 전처리 & 임베딩 유틸
# ---------------------------
def normalize_text(t: str, strip_new_lines: bool = STRIP_NEW_LINES) -> str:
    if t is None:
        return ""
    return t.replace("\n", " ") if strip_new_lines else t

def embed_documents(texts: List[str]) -> List[List[float]]:
    """문서(패시지) 임베딩: 지시문 없이 그대로"""
    inputs = [normalize_text(t) for t in texts]
    resp = openai_client.embeddings.create(model=EMBED_MODEL_ID, input=inputs)
    return [d.embedding for d in resp.data]

def embed_query(text: str) -> List[float]:
    """쿼리 임베딩: Qwen3 계열은 쿼리용 지시문 권장"""
    q = normalize_text(text)
    # if USE_QWEN_QUERY_INSTRUCT:
    #     q = QWEN_QUERY_INSTRUCT + q
    resp = openai_client.embeddings.create(model=EMBED_MODEL_ID, input=[q])
    return resp.data[0].embedding

def cosine_similarity(a: List[float], b: List[float]) -> float:
    va, vb = np.array(a, dtype=np.float32), np.array(b, dtype=np.float32)
    denom = (np.linalg.norm(va) * np.linalg.norm(vb))
    return float(np.dot(va, vb) / denom) if denom != 0 else 0.0

# ---------------------------
# Milvus 스키마/컬렉션 준비
# ---------------------------
def ensure_database(db_name: str):
    if db_name not in milvus_client.list_databases():
        milvus_client.create_database(db_name=db_name)
    milvus_client.use_database(db_name)

def ensure_collection(collection: str, dim: int):
    if milvus_client.has_collection(collection):
        # 이미 있다면 dim 확인은 생략 (테스트용). 실제 운영에선 dim 검증 권장.
        return
    schema = milvus_client.create_schema()
    schema.add_field("pk",   DataType.INT64,  is_primary=True, auto_id=False)
    schema.add_field("text", DataType.VARCHAR, max_length=65535)
    schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim)

    index_params = milvus_client.prepare_index_params()
    index_params.add_index(
        field_name="vector",
        index_type="FLAT",       # 재현성 위해 정확 검색
        metric_type="COSINE"     # COSINE 거리(작을수록 가깝다)
    )
    milvus_client.create_collection(
        collection_name=collection,
        schema=schema,
        index_params=index_params
    )

# ---------------------------
# 데이터 적재
# ---------------------------
def make_pk(text: str) -> int:
    return int(hashlib.sha256(text.encode("utf-8")).hexdigest(), 16) % (1 << 63)

def upsert_documents(docs: List[str]) -> None:
    # 1) 임베딩
    vectors = embed_documents(docs)
    dim = len(vectors[0])
    ensure_database(DB_NAME)
    ensure_collection(COLLECTION, dim)

    # 2) upsert
    rows = []
    for t, v in zip(docs, vectors):
        rows.append({"pk": make_pk(t), "text": t, "vector": v})
    milvus_client.upsert(collection_name=COLLECTION, data=rows)

# ---------------------------
# 서버측 검색(Milvus) & 수동 코사인 비교
# ---------------------------
def milvus_search(query: str, topk: int = 5) -> List[Tuple[dict, float]]:
    qvec = embed_query(query)
    res = milvus_client.search(
        collection_name=COLLECTION,
        data=[qvec],                   # 질의 벡터
        anns_field="vector",
        limit=topk,
        search_params={"metric_type": "COSINE"},
        output_fields=["pk","text","vector"]  # ← vector 불러오면 수동 코사인 비교도 즉시 가능
    )
    # res는 [hits] 형태
    hits = res[0]
    out = []
    for h in hits:
        # Milvus는 COSINE metric에서 "distance"를 반환 (작을수록 유사)
        distance = float(h["distance"])
        # 코사인 유사도로 보려면:
        cos_sim = 1.0 - distance
        out.append((h, cos_sim))
    return out

def manual_rank(query: str, limit: int = 5) -> List[Tuple[dict, float]]:
    qvec = embed_query(query)
    # 데모를 위해 모든 행을 가져와 수동 계산 (데이터가 많다면 주의)
    rows = milvus_client.query(
        collection_name=COLLECTION,
        filter="",  # 전체
        output_fields=["pk","text","vector"],
        limit=10000,              # ← 필수!
        # offset=0               # 필요하면 페이징
    )
    scored = []
    for r in rows:
        sim = cosine_similarity(qvec, r["vector"])
        scored.append((r, sim))
    scored.sort(key=lambda x: x[1], reverse=True)
    return scored[:limit]

# ---------------------------
# 데모 실행
# ---------------------------
# 0) 예시 문장들(원하시는 문장으로 교체하세요)
DOCS = [
    "스마트 시티는 도시의 인프라와 서비스를 데이터로 최적화한다.",
    "스마트 팩토리는 공정 데이터를 활용해 생산성을 높인다.",
    "서울은 교통 데이터 플랫폼을 통해 신호체계를 개선하고 있다.",
    "도시 재생 프로젝트는 시민 참여와 공공 데이터를 결합한다.",
    "클라우드 네이티브 아키텍처는 확장성과 복원력을 가진다."
]
QUERY = "스마트 시티"

# 1) 문서 임베딩 + Milvus 적재
log.info("문서 upsert")
upsert_documents(DOCS)

# 2) Milvus 서버측 검색 (distance → cos로 변환)
log.info("Milvus 검색 결과 (distance → 1 - distance 변환)")
milvus_hits = milvus_search(QUERY, topk=5)
for i, (hit, cos_sim) in enumerate(milvus_hits, 1):
    # hit에는 "entity" 필드가 있고, output_fields로 요청한 값들이 들어있다.
    text = hit["entity"].get("text")
    distance = float(hit["distance"])
    print(f"[Milvus] top{i} | cos={distance:.6f} | 1-dist={1-cos_sim:.6f} | dist={cos_sim:.6f} | text={text}")

# 3) 수동 코사인 랭킹 (저장된 vector 전부를 가져와서 쿼리 벡터와 직접 코사인)
log.info("수동 코사인 랭킹")
manual_hits = manual_rank(QUERY, limit=5)
for i, (row, cos_sim) in enumerate(manual_hits, 1):
    print(f"[Manual] top{i} | cos={cos_sim:.6f} | text={row['text']}")

# 4) 상호 검증: 같은 PK가 비슷한 순서로 나오는지 간단 확인
print("\n[검증] 상위 3개 텍스트 비교")
print("Milvus top3:")
print([h[0]["entity"]["text"] for h in milvus_hits[:3]])
print("Manual top3:")
print([r[0]["text"] for r in manual_hits[:3]])

print("\n완료")


In [None]:
import logging, time, threading
from typing import Annotated
from langgraph.graph import MessagesState


class State(MessagesState):
    question: Annotated[str, "User question"]
    generation: Annotated[str, "LLM generated answer"]
    documents: Annotated[list[str], "List of documents"]
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
from langchain.messages import HumanMessage

logger = logging.getLogger(__name__)

EMBED_MODEL_ID = "Qwen/Qwen3-Embedding-8B"
# EMBED_MODEL_ID = "Qwen/Qwen3-Embedding-4B"
OPENAI_URL = "http://127.0.0.1:9804/v1"
MILVUS_URI = "http://127.0.0.1:19530"
TIMEOUT_SEC = 5

def test_embedding_connection(embeddings: OpenAIEmbeddings, test_text: str, timeout: int) -> None:
    result_container = {"result": None, "error": None}

    def run():
        try:
            result_container["result"] = embeddings.embed_query(test_text)
        except Exception as e:
            result_container["error"] = e

    thread = threading.Thread(target=run)
    thread.start()
    thread.join(timeout)

    if thread.is_alive():
        raise TimeoutError(f"Embedding call exceeded timeout of {timeout} seconds.")

    if result_container["error"]:
        raise result_container["error"]

    result = result_container["result"]
    if result and isinstance(result, list) and len(result) > 0:
        logger.info(f"Embedding model connected successfully")
    else:
        raise ValueError("Embedding response is invalid or empty.")

try:
    logger.info(f"Initializing Embedding model: model='{EMBED_MODEL_ID}', url='{OPENAI_URL}'")
    embeddings = OpenAIEmbeddings(
        api_key="EMPTY",
        base_url=OPENAI_URL,
        model=EMBED_MODEL_ID
    )

    test_embedding_connection(embeddings, "연결 테스트 문장", TIMEOUT_SEC)
    
except Exception as e:
    logger.exception(f"Failed to initialize OpenAIEmbeddings: {e}")
    raise

try:
    logger.info(f"Connecting to Milvus at {MILVUS_URI}")
    vector_store = Milvus(
        embedding_function=embeddings,
        collection_name="doc_embeddings_plain",
        connection_args={
            "uri": MILVUS_URI,
            "token": "root:Milvus",
            "db_name": "doc_embeddings102"
        },
        index_params={
            "index_type": "FLAT",
            "metric_type": "COSINE"
        },
    )
    logger.info("Milvus vector store connection established.")

except Exception as e:
    logger.exception(f"Failed to connect to Milvus: {e}")
    raise

def retrieve(state: State):
    question = state.get("question", "").strip()
    human_message = HumanMessage(question)

    if not question:
        logger.warning("retrieve() called with empty question in state.")
        return {"documents": [], "question": "", "messages": [human_message]}

    logger.info(f"Retrieving documents for question: '{question}'")

    try:
        documents_with_score = vector_store.similarity_search_with_score(question, k=171)
        documents = list(map(lambda x: x[0], documents_with_score))
        logger.info(f"Retrieved {len(documents)} documents from vector store.")
        return {"documents": documents, "question": question, "messages": [human_message], "temp": documents_with_score}

    except Exception as e:
        logger.exception(f"Error during retrieval for question='{question}': {e}")
        return {"documents": [], "question": question, "messages": [human_message]}


In [None]:
my_state = {
    "question": "스마트 시티"
}

my_state = retrieve(my_state)

In [None]:
my_state

In [None]:
from pymilvus import MilvusClient
import json, numpy as np

milvus_raw = MilvusClient(uri=MILVUS_URI, token="root:Milvus")
milvus_raw.use_database("doc_embeddings5")  # vector_store와 동일 DB

# 1) Milvus 결과에서 문서 텍스트 확보
doc_text = my_state["temp"][-1][0].page_content

# 2) 저장 벡터 조회 (pk가 메타에 있으면 pk로, 없으면 text로)
rows = milvus_raw.query(
    collection_name="doc_embeddings_plain",
    filter=f'text == {json.dumps(doc_text)}',
    output_fields=["pk","text","vector"],
    limit=1
)
stored_vec = rows[0]["vector"]

# 3) 쿼리 벡터 (vector_store가 내부에서 쓴 것과 동일 경로로 생성)
q_vec = embeddings.embed_query("스마트 시티")

def cos(a,b):
    a=np.array(a); b=np.array(b)
    return float(np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b)))

manual_cos = cos(q_vec, stored_vec)
print(f"[Manual using STORED vec] cosine = {manual_cos:.6f}")
print(f"[From Milvus] cosine = {float(my_state['temp'][-1][1]):.6f}")


In [None]:
a = openai_client.embeddings.create(model=EMBED_MODEL_ID, input=["스마트 시티"])

In [None]:
a.data[0].embedding

In [None]:
q_vec

In [None]:
cos(a.data[0].embedding, q_vec)

In [None]:
# LangChain
from langchain_openai import OpenAIEmbeddings
emb = OpenAIEmbeddings(
        api_key="EMPTY",
        base_url=OPENAI_URL,
        model=EMBED_MODEL_ID
    )
v1 = emb.embed_query("스마트 시티")

# OpenAI SDK (공식)
from openai import OpenAI
client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_URL)
v2 = client.embeddings.create(model=EMBED_MODEL_ID, input=["스마트 시티"]).data[0].embedding

cos(v1, v2)

In [None]:
from langchain_openai import OpenAIEmbeddings
from openai import OpenAI
import numpy as np

OPENAI_URL = "http://127.0.0.1:9804/v1"
OPENAI_API_KEY = "EMPTY"
EMBED_MODEL_ID = "Qwen/Qwen3-Embedding-8B"

# LangChain — 키/엔드포인트/차원/encoding_format 통일
emb = OpenAIEmbeddings(
    api_key=OPENAI_API_KEY,
    base_url=OPENAI_URL,
    model=EMBED_MODEL_ID,
    model_kwargs={"encoding_format": "float"},
    # tiktoken_enabled=False,      # 호환 서버 사용 시 권장
)

v1 = emb.embed_query("스마트 시티")                  # 문자열
v1b = emb.embed_documents(["스마트 시티"])[0]        # 리스트 형태와도 비교

# OpenAI SDK — 입력도 문자열로 맞춰 보기
client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_URL)
v2 = client.embeddings.create(
    model=EMBED_MODEL_ID, input="스마트 시티"
).data[0].embedding

def cos(a,b):
    a=np.array(a); b=np.array(b)
    return float(np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b)))

print("LC(query) vs SDK(str):", cos(v1, v2))
print("LC(doc)   vs SDK(str):", cos(v1b, v2))
