In [3]:
import re
from typing import List, Dict
from langchain_community.document_loaders import PyPDFLoader

# 은행 버전을 참조해서, 법령의 메타 데이터도 조문 안으로 들어가도록 설정하자
def _pdf_to_text(path: str) -> str:
    pdf_file = path
    loader = PyPDFLoader(pdf_file)
    pages = loader.load()
    text_for_delete = r"법제처\s+\d+\s+국가법령정보센터\n개인정보 보호법"
    law_text = "\n".join([re.sub(text_for_delete, "", p.page_content).strip() for p in pages])
    
    return law_text

def _parse_law(law_text):
    # 서문 분리
    # '^'로 시작하여 '제1장' 또는 '제1조' 직전까지의 모든 텍스트를 탐색 
    preamble_pattern = r'^(.*?)(?=제1장|제1조)'
    preamble = re.search(preamble_pattern, law_text, re.DOTALL)
    if preamble:
        preamble = preamble.group(1).strip()
    
    # 장 분리 
    # '제X장' 형식의 제목과 그 뒤에 오는 모든 조항을 하나의 그룹화 
    chapter_pattern = r'(제\d+장\s+.+?)\n((?:제\d+조(?:의\d+)?(?:\(\w+\))?.*?)(?=제\d+장|부칙|$))'
    chapters = re.findall(chapter_pattern, law_text, re.DOTALL)
    
    # 부칙 분리
    # '부칙'으로 시작하는 모든 텍스트를 탐색 
    appendix_pattern = r'(부칙.*)'
    appendix = re.search(appendix_pattern, law_text, re.DOTALL)
    if appendix:
        appendix = appendix.group(1)
    
    # 파싱 결과를 저장할 딕셔너리 초기화
    parsed_law = {'서문': preamble, '장': {}, '부칙': appendix}
    
    # 각 장 내에서 조 분리
    for chapter_title, chapter_content in chapters:
        # 조 분리 패턴
        # 1. '제X조'로 시작 ('제X조의Y' 형식도 가능)
        # 2. 조 번호 뒤에 반드시 '(항목명)' 형식의 제목이 와야 함 
        # 3. 다음 조가 시작되기 전까지 또는 문서의 끝까지의 모든 내용을 포함
        article_pattern = r'(제\d+조(?:의\d+)?\s*\([^)]+\).*?)(?=제\d+조(?:의\d+)?\s*\([^)]+\)|$)'
        
        # 정규표현식을 이용해 모든 조항을 탐색 
        articles = re.findall(article_pattern, chapter_content, re.DOTALL)
        
        # 각 조항의 앞뒤 공백을 제거하고 결과 딕셔너리에 저장
        parsed_law['장'][chapter_title.strip()] = [article.strip() for article in articles]

    law_list = []

    for law in parsed_law["장"].keys():
        for article in parsed_law["장"][law]:
            law_list.append(article)
    
    return law_list 



In [4]:
law_text = _parse_law(_pdf_to_text("../../data/개인정보 보호법(법률)(제19234호)(20250313).pdf"))

In [6]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import PyPDFLoader
import requests
import json, os 
import numpy as np 
import re

# 임베딩 모델을 사용하고 은행에서 사용한 버전을 그대로 사용할 것
# llama server 사용 방식은 llama.cpp 폴더 안에 가이드 파일 참조

BASE_URL = "http://host.docker.internal:8080/v1" 

llm = ChatOpenAI(
    model="bge-m3",           # 서버에서 인식 가능한 임의의 모델명
    base_url=BASE_URL,
    api_key="sk-local-anything", # 의미없는 토큰도 OK
    temperature=0.7,
    max_tokens=512               # 서버 토큰 제한 고려
)

In [7]:

def _embedding(texts):
    """
    llama-server의 v1/embeddings 엔드 포인트를 사용해 bge-m3.gguf 임베딩 진행
    반환 결과 : (N,D) float32 numpy array
    N = chunk나 sentence 개수
    D = Vector의 차원(bge-m3는 1,024로 고정)
    """
    out = []
    for t in texts:
        vecs = None
        try:
            r= requests.post(f"{BASE_URL}/embeddings", json={"model": "bge-m3", "input": t}, timeout=45)
            if r.ok and "data" in r.json():
                vecs = r.json()["data"][0]["embedding"]
                v = np.array(vecs, dtype=np.float32)
                v /= (np.linalg.norm(v)+1e-12)
                out.append(v)
        except Exception:
            pass
    if vecs is None:
        raise RuntimeError(f"embedding API 실패 : {t}")

    return np.vstack(out).astype("float32")


In [8]:
embedded_law_text = _embedding(law_text)

In [9]:
embedded_law_text.shape

(112, 1024)

In [10]:
import numpy as np, networkx as nx
from sklearn.neighbors import NearestNeighbors

def _block_by_cosine(X: np.ndarray, k=8, sim_thresh=0.85):
    nbrs = NearestNeighbors(n_neighbors=min(k,len(X)), metric="cosine").fit(X)
    dists, idxs = nbrs.kneighbors(X)
    G = nx.Graph(); G.add_nodes_from(range(len(X)))
    for i,(dr,ir) in enumerate(zip(dists,idxs)):
        for d,j in zip(dr,ir):
            if i==j: continue
            sim = 1-d
            if sim>=sim_thresh: G.add_edge(i,j,weight=sim)
    return [sorted(list(c)) for c in nx.connected_components(G)]


In [11]:
blocked_embedded_texts = _block_by_cosine(embedded_law_text)

In [12]:
len(blocked_embedded_texts)

98

In [None]:
from __future__ import annotations
import os, json
from typing import List, Dict, Type
from pydantic import BaseModel, Field

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable
from langchain_core.output_parsers import StrOutputParser

BASE_URL = "http://host.docker.internal:8080/v1" 

llm = ChatOpenAI(
    model="Midm-2.0-Base-Instruct-q4_0",           # 서버에서 인식 가능한 임의의 모델명
    base_url=BASE_URL,
    api_key="sk-local-anything", # 의미없는 토큰도 OK
    temperature=0.5,
    max_tokens=8192               # 서버 토큰 제한 고려
)

class Distilled(BaseModel):
    doc_type: str | None = None
    title: str | None = None
    authority: str | None = None
    effective_date: str | None = None
    amendments: list[dict] = Field(default_factory=list)
    articles: list[dict] = Field(default_factory=list)

class Entities(BaseModel):
    entities: list[dict]

class Relations(BaseModel):
    relations: list[dict]


DISTILL_SYS = ("""
당신은 법률/규제 문서를 정밀하게 요약·정제하는 역할을 합니다. 
    다음 지침을 반드시 지키세요: 
    - 제공된 텍스트에 근거하여 blueprint를 채우세요. 
    - 원문에 없는 경우 null로 표기하세요. 
    - 배열은 간결하게 유지하세요. 
    - 출력은 반드시 JSON 형식으로 하세요. 
    - 출력의 모든 값은 한국어로 작성하세요
""")

DISTILL_HUMAN = """
아래 blueprint를 기준으로 텍스트를 요약하여 채워 주세요.  
반드시 JSON으로만 출력하고, 값은 한국어로 작성해야 합니다.

Blueprint (JSON-like):
{blueprint}

텍스트 묶음 (의미적으로 유사한 그룹):
{joined_texts}

"""
ENT_SYS = ("""
당신은 한국어 법률 조문에서 핵심 엔티티를 추출하는 역할을 합니다.  
다음 지침을 반드시 지키세요:

- 출력은 JSON 형식으로만 하세요.
- canonical은 반드시 하나의 한국어 표현으로 지정하세요.
- aliases에는 약어, 다른 한국어 표현, 영어 번역명을 넣을 수 있습니다.
- 출력의 canonical, aliases, type, evidence 값은 반드시 한국어로 작성하세요.
- 조문 전체를 canonical로 두지 마세요. 원자적 개념(법률명, 조문번호, 정의된 용어, 기관명, 행위, 조건, 수치 등)만 추출하세요.
""")

ENT_HUMAN = """아래 텍스트에서 엔티티를 추출해 주세요.  
반드시 JSON으로만 출력하며, 키 이름은 정확히 'entities' 여야 합니다.  
출력 예시는 다음과 같습니다:

[{{
  "id":"e1",
  "canonical":"개인정보 보호법",
  "aliases":["개보법","Privacy Act"],
  "type":"법률",
  "source":["doc://개인정보보호법#제28조의2"],
  "evidence":["..."]
}},{{
  "id":"e2",
  "canonical":"제28조의2",
  "aliases":["가명정보의 처리"],
  "type":"조문",
  "source":["doc://개인정보보호법#제28조의2"],
  "evidence":["..."]
}}]

텍스트:
{distilled}
"""

REL_SYS = ("""
당신은 한국어 법률 조문에서 엔티티 간 관계를 추출하는 역할을 합니다.  
다음 지침을 반드시 지키세요:

- 출력은 JSON 형식으로만 하세요.
- 관계의 subject(s), predicate(p), object(o)는 모두 엔티티 canonical을 참조하세요.
- predicate 값은 한국어 동사 또는 명사구로 작성하세요. (예: "정의한다","허용한다","적용된다","발급한다","제한한다")
- evidence 값은 반드시 한국어 원문 일부를 그대로 인용하세요.
- 출력 JSON 이외의 설명은 하지 마세요.
""")
REL_HUMAN = """아래 엔티티와 텍스트를 참고하여 관계를 추출해 주세요.  
반드시 JSON으로만 출력하며, 키 이름은 정확히 'relations' 여야 합니다.  

엔티티:
{entities}

텍스트:
{distilled}
"""
# rels 추출에는 차라리 제로샷이 나은 듯. few shot은 그 예시에 너무 집착하는 듯한 모습을 보임
def _structured_chain(system: str, human: str, schema: Type[BaseModel]) -> Runnable:
    """
    - ChatPromptTemplate → ChatOpenAI(with_structured_output) 로 이어지는 체인
    - 입력: dict (프롬프트 변수)
    - 출력: Pydantic 모델 인스턴스
    """
    prompt = ChatPromptTemplate.from_messages(
        [("system", system), ("human", human)]
    )
    return prompt | llm.with_structured_output(schema)

# 체인 인스턴스
_distill_chain = _structured_chain(DISTILL_SYS, DISTILL_HUMAN, Distilled)
_entities_chain = _structured_chain(ENT_SYS, ENT_HUMAN, Entities)
_relations_chain = _structured_chain(REL_SYS, REL_HUMAN, Relations)

# ---------------------------
# 5) 공개 함수 (파이프 접점)
# ---------------------------
def distill_group(blueprint: dict, texts: List[str], max_chars: int = 16000) -> dict:
    """
    코사인 블로킹으로 묶인 텍스트들(gtexts)을 증류 블루프린트에 맞게 구조화.
    - blueprint: iText2KG 스타일 JSON 블루프린트
    - texts: 그룹 내 원문 단위 텍스트들 (gtexts)
    """
    joined = "\n---\n".join(texts)[:max_chars]
    out: Distilled = _distill_chain.invoke(
        {"blueprint": json.dumps(blueprint, ensure_ascii=False),
         "joined_texts": joined}
    )
    return out.model_dump()

def extract_entities(distilled: dict) -> dict:
    """
    증류 결과(distilled JSON)를 기반으로 '고유 개념' 엔터티 목록만 추출.
    """
    out: Entities = _entities_chain.invoke(
        {"distilled": json.dumps(distilled, ensure_ascii=False)}
    )
    return out.model_dump()

def extract_relations(distilled: dict, entities: dict) -> dict:
    """
    확정된 엔터티 집합을 컨텍스트로 관계 추출.
    """
    out: Relations = _relations_chain.invoke(
        {
            "distilled": json.dumps(distilled, ensure_ascii=False),
            "entities": json.dumps(entities, ensure_ascii=False),
        }
    )
    return out.model_dump()


In [14]:
gtexts = []
for gi,g in enumerate(blocked_embedded_texts):
    print(f'gi : {gi}, g : {g}')
    gtext=[law_text[i] for i in g]
    print(gtext)
    gtexts.append(gtext)



gi : 0, g : [0]
['제1조(목적) 이 법은 개인정보의 처리 및 보호에 관한 사항을 정함으로써 개인의 자유와 권리를 보호하고, 나아가 개인\n의 존엄과 가치를 구현함을 목적으로 한다. <개정 2014. 3. 24.>']
gi : 1, g : [1]
['제2조(정의) 이 법에서 사용하는 용어의 뜻은 다음과 같다. <개정 2014. 3. 24., 2020. 2. 4., 2023. 3. 14.>\n1. “개인정보”란 살아 있는 개인에 관한 정보로서 다음 각 목의 어느 하나에 해당하는 정보를 말한다.\n가. 성명, 주민등록번호 및 영상 등을 통하여 개인을 알아볼 수 있는 정보\n나. 해당 정보만으로는 특정 개인을 알아볼 수 없더라도 다른 정보와 쉽게 결합하여 알아볼 수 있는 정보. 이 경우\n쉽게 결합할 수 있는지 여부는 다른 정보의 입수 가능성 등 개인을 알아보는 데 소요되는 시간, 비용, 기술 등\n을 합리적으로 고려하여야 한다.\n다. 가목 또는 나목을 제1호의2에 따라 가명처리함으로써 원래의 상태로 복원하기 위한 추가 정보의 사용ㆍ결합\n없이는 특정 개인을 알아볼 수 없는 정보(이하 “가명정보”라 한다)\n1의2. “가명처리”란 개인정보의 일부를 삭제하거나 일부 또는 전부를 대체하는 등의 방법으로 추가 정보가 없이는\n특정 개인을 알아볼 수 없도록 처리하는 것을 말한다.\n2. “처리”란 개인정보의 수집, 생성, 연계, 연동, 기록, 저장, 보유, 가공, 편집, 검색, 출력, 정정(訂正), 복구, 이용, 제공,\n공개, 파기(破棄), 그 밖에 이와 유사한 행위를 말한다.\n3. “정보주체”란 처리되는 정보에 의하여 알아볼 수 있는 사람으로서 그 정보의 주체가 되는 사람을 말한다.\n4. “개인정보파일”이란 개인정보를 쉽게 검색할 수 있도록 일정한 규칙에 따라 체계적으로 배열하거나 구성한 개인\n정보의 집합물(集合物)을 말한다.\n5. “개인정보처리자”란 업무를 목적으로 개인정보파일을 운용하기 위하여 스스로 또는 다른 사람을 통하여 개인정\n보를 처리

In [132]:
print(type(gtexts[2]))

<class 'list'>


In [133]:
joined = "\n---\n".join(gtexts[2])
print(len(joined))

726


In [134]:
from typing import TypedDict, List, Dict, Any

class State(TypedDict):
    intent: str           # "ingest" | "ask" | "compare"
    user_input: str
    file_path: str | None
    doc_id: str | None
    blueprint: Dict
    units: List[Dict]
    texts: List[str]
    embeddings: Any
    groups: List[List[int]]
    distilled: List[Dict]
    entities: List[Dict]
    relations: List[Dict]
    merged_entities: List[Dict]
    version: int | None
    answer: str | None
    diff_report: Dict | None
    contexts: List[str]

import json
from pathlib import Path

json_file_path = Path('../../configs/blueprint.json')

# 5. 파일을 열고 json.load()를 사용해 파이썬 dict로 변환합니다.
try:
    with open(json_file_path, 'r', encoding='utf-8') as f:
        blueprint = json.load(f)
    
    # 결과 출력
    print("성공적으로 파일을 불러왔습니다.")
    print(blueprint)
    # print(type(data_dict)) # <class 'dict'>

except FileNotFoundError:
    print(f"오류: 파일을 찾을 수 없습니다. 경로: {json_file_path}")


distilled = distill_group(blueprint, gtexts[1])
ents = extract_entities(distilled)["entities"]
rels = extract_relations(distilled, {"entities":ents})["relations"]

성공적으로 파일을 불러왔습니다.
{'doc_type': 'regulation', 'title': None, 'authority': None, 'effective_date': None, 'amendments': [{'date': None, 'summary': None}], 'articles': [{'no': None, 'heading': None, 'text': None, 'definitions': [], 'applicability': [], 'exceptions': [], 'penalties': [], 'references': []}]}


KeyError: 'Input to ChatPromptTemplate is missing variables {\'\\n  "id"\', \'clause_text\'}.  Expected: [\'\\n  "id"\', \'clause_text\'] Received: [\'distilled\']\nNote: if you intended {\n  "id"} to be part of the string and not a variable, please escape it with double curly braces like: \'{{\n  "id"}}\'.\nFor troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_PROMPT_INPUT '

In [174]:
rels = extract_relations(distilled, {"entities":ents})["relations"]

In [153]:
print(ents)

[{'id': 'e1', 'canonical': '개인정보 보호법', 'aliases': ['개보법', 'Privacy Act'], 'type': '법률', 'source': ['doc://PIPA#제28조의2'], 'evidence': ['...']}, {'id': 'e2', 'canonical': '제28조의2', 'aliases': ['가명정보의 처리'], 'type': '조문', 'source': ['doc://PIPA#제28조의2'], 'evidence': ['...']}, {'id': 'e3', 'canonical': '가명정보', 'aliases': ['가명정보', 'pseudonymized information', 'pseudonymized data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2'], 'evidence': ['...']}, {'id': 'e4', 'canonical': '가명처리', 'aliases': ['가명처리', 'pseudonymization', 'pseudonymization process'], 'type': '용어', 'source': ['doc://PIPA#제28조의2'], 'evidence': ['...']}, {'id': 'e5', 'canonical': '과학적 연구', 'aliases': ['과학적 연구', 'scientific research', 'scientific inquiry'], 'type': '용어', 'source': ['doc://PIPA#제28조의2'], 'evidence': ['...']}, {'id': 'e6', 'canonical': '개인정보', 'aliases': ['개인정보', 'personal information', 'personal data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2'], 'evidence': ['...']}, {'id': 'e7', 'canonical': '정보주체', 'aliases

In [175]:
rels

[{'subject': '개인정보 보호법',
  'predicate': '정의한다',
  'object': '가명정보의 처리',
  'evidence': '제28조의2(가명정보의 처리) ① 개인정보처리자는 통계작성, 과학적 연구, 공익적 기록보존 등을 위하여 정보주체의 동의 없이 가명정보를 처리할 수 있다.'},
 {'subject': '개인정보 보호법',
  'predicate': '허용한다',
  'object': '가명정보의 처리',
  'evidence': '① 개인정보처리자는 통계작성, 과학적 연구, 공익적 기록보존 등을 위하여 정보주체의 동의 없이 가명정보를 처리할 수 있다.'},
 {'subject': '개인정보 보호법',
  'predicate': '포함한다',
  'object': '고정형 영상정보처리기기',
  'evidence': '7. Fixed video information processing device means a device installed in a specific space that continuously or periodically captures images of people or objects, or transmits them through wired or wireless networks, as specified by Presidential Decree.'},
 {'subject': '개인정보 보호법',
  'predicate': '포함한다',
  'object': '휴대형 영상정보처리기기',
  'evidence': '7-2. Portable video information processing device means a device that can capture images of people or objects when worn or carried by a person, or attached or placed on a movable object, and transmits them through wired or wire

In [170]:
for e in ents:
    print(e)
for r in rels:
    print(type(r['evidence']))

{'id': 'e1', 'canonical': '개인정보 보호법', 'aliases': ['개보법', 'Privacy Act'], 'type': '법률', 'source': ['doc://PIPA#제28조의2', 'group:1'], 'evidence': ['...']}
{'id': 'e2', 'canonical': '제28조의2', 'aliases': ['가명정보의 처리'], 'type': '조문', 'source': ['doc://PIPA#제28조의2', 'group:1'], 'evidence': ['...']}
{'id': 'e3', 'canonical': '가명정보', 'aliases': ['가명정보', 'pseudonymized information', 'pseudonymized data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1'], 'evidence': ['...']}
{'id': 'e4', 'canonical': '가명처리', 'aliases': ['가명처리', 'pseudonymization', 'pseudonymization process'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1'], 'evidence': ['...']}
{'id': 'e5', 'canonical': '과학적 연구', 'aliases': ['과학적 연구', 'scientific research', 'scientific inquiry'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1'], 'evidence': ['...']}
{'id': 'e6', 'canonical': '개인정보', 'aliases': ['개인정보', 'personal information', 'personal data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1'], 'evi

In [176]:
gi = 1
all_e, all_r = [], []
for e in ents:
    e.setdefault("id", f"e_{len(all_e)+1}")
    e.setdefault("source",[]).append(f"group:{gi}")
for r in rels: 
    r.setdefault("group",[]).append(f"group:{gi}")
all_e.extend(ents); all_r.extend(rels)
# merged = judge.merge_entities(all_e)

In [179]:
all_e

[{'id': 'e1',
  'canonical': '개인정보 보호법',
  'aliases': ['개보법', 'Privacy Act'],
  'type': '법률',
  'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
  'evidence': ['...']},
 {'id': 'e2',
  'canonical': '제28조의2',
  'aliases': ['가명정보의 처리'],
  'type': '조문',
  'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
  'evidence': ['...']},
 {'id': 'e3',
  'canonical': '가명정보',
  'aliases': ['가명정보', 'pseudonymized information', 'pseudonymized data'],
  'type': '용어',
  'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
  'evidence': ['...']},
 {'id': 'e4',
  'canonical': '가명처리',
  'aliases': ['가명처리', 'pseudonymization', 'pseudonymization process'],
  'type': '용어',
  'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
  'evidence': ['...']},
 {'id': 'e5',
  'canonical': '과학적 연구',
  'aliases': ['과학적 연구', 'scientific research', 'scientific inquiry'],
  'type': '용어',
  'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
  'evidence'

In [178]:
distilled2 = distill_group(blueprint, gtexts[2])
ents2 = extract_entities(distilled2)["entities"]
rels2 = extract_relations(distilled2, {"entities":ents2})["relations"]

KeyboardInterrupt: 

In [None]:
gi = 2
for e in ents2:
    e.setdefault("id", f"e_{len(all_e)+1}")
    e.setdefault("source",[]).append(f"group:{gi}")
for r in rels2: r.setdefault("group",[]).append(f"group:{gi}")
all_e.extend(ents2); all_r.extend(rels2)

In [107]:
len(all_e)

2

In [109]:
all_r

[{'s': 'Personal information',
  'p': 'refers to',
  'o': 'information about living individuals',
  'evidence': ['Personal information refers to information about living individuals that falls under any of the following categories:',
   'group:1']},
 {'s': 'Processing',
  'p': 'means',
  'o': 'the collection, generation, connection, integration, recording, storage, retention, processing, editing, retrieval, output, correction, recovery, use, provision, disclosure, destruction, and other similar actions regarding personal information',
  'evidence': ['Processing means the collection, generation, connection, integration, recording, storage, retention, processing, editing, retrieval, output, correction, recovery, use, provision, disclosure, destruction, and other similar actions regarding personal information.']},
 {'s': 'Data subject',
  'p': 'is a person',
  'o': 'who can be identified from the processed information and is the subject of that information',
  'evidence': ['A data subject

In [196]:
# app/components/judge.py
from __future__ import annotations
import os, itertools, json
from typing import List, Dict, Type
from pydantic import BaseModel, Field
from rapidfuzz import fuzz

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# ---------------------------
# 1) 온프레미스 LLM (llama-server OpenAI 호환)
# ---------------------------

BASE_URL = "http://host.docker.internal:8080/v1" 

llm = ChatOpenAI(
    model="Midm-2.0-Base-Instruct-q4_0",           # 서버에서 인식 가능한 임의의 모델명
    base_url=BASE_URL,
    api_key="sk-local-anything", # 의미없는 토큰도 OK
    temperature=0.0,
    max_tokens=8192               # 서버 토큰 제한 고려
)

# ---------------------------
# 2) 구조화 출력 스키마 (Pydantic)
# ---------------------------
class JudgeVerdict(BaseModel):
    same: bool = Field(..., description="True if two entities are the same concept")
    why: str | None = Field(default=None, description="Short rationale")

# ---------------------------
# 3) 체인(LCEL): Prompt → LLM(structured)
# ---------------------------
JUDGE_SYS = (
    "You are a strict entity-equivalence judge for a regulatory knowledge graph. "
    "Decide if two entity records denote the SAME real-world concept within this legal context. "
    "Be conservative when definitions or applicability differ."
)

JUDGE_HUMAN = """Return a compact JSON with keys exactly: same (bool), why (str).
A:
{a}
B:
{b}"""

def _judge_chain():
    prompt = ChatPromptTemplate.from_messages([
        ("system", JUDGE_SYS),
        ("human",  JUDGE_HUMAN),
    ])
    # 구조화 출력: Pydantic으로 강제
    return prompt | llm.with_structured_output(JudgeVerdict) # chain을 반환

# ---------------------------
# 4) 후보쌍 생성 (빠른 문자 유사도 블로킹)
# ---------------------------
def candidate_pairs(ents: List[Dict], ratio: int = 85):
    """
    문자 기반(rapidfuzz) 프리필터로 같은/유사 표기 엔티티들만 후보쌍으로 만든다.
    - ratio는 토큰정렬 유사도 기준(0~100)
    """
    pairs = []
    for a, b in itertools.combinations(ents, 2):
        scores = [fuzz.token_sort_ratio(a.get("canonical",""), b.get("canonical",""))]
        for x in a.get("aliases", []):
            for y in b.get("aliases", []):
                scores.append(fuzz.token_sort_ratio(x, y))
        if max(scores) >= ratio:
            pairs.append((a, b))
    return pairs

# ---------------------------
# 5) LLM-as-Judge (체인 사용)
# ---------------------------
def llm_equiv(a: Dict, b: Dict) -> bool:
    """
    LangChain 체인으로 llama-server에 질의 → JudgeVerdict(same, why) 구조로 수신.
    실패 시 보수적으로 False.
    """
    try:
        chain = _judge_chain()
        print(a)
        print(b)
        verdict: JudgeVerdict = chain.invoke({
            "a": json.dumps(a, ensure_ascii=False),
            "b": json.dumps(b, ensure_ascii=False),
        })
        print(bool(verdict.same))
        return bool(verdict.same)
    except Exception:
        print("---Exception---")
        return False

# ---------------------------
# 6) 병합 루프
# ---------------------------
def merge_entities(ents: List[Dict]) -> List[Dict]:
    """
    - 후보쌍(candidate_pairs)으로 블로킹
    - 체인 기반 LLM-as-Judge로 동등 개념 판단
    - 동등하면 alias/source 병합 + 대표명(canonical) 단순 규칙으로 선택
    """
    ents = ents[:]  # shallow copy
    changed = True
    while changed:
        changed = False
        for a, b in candidate_pairs(ents):
            if llm_equiv(a, b):
                # 누락 키 보정
                a.setdefault("aliases", []); b.setdefault("aliases", [])
                a.setdefault("source",  []); b.setdefault("source",  [])

                # alias 병합(원래 canonical까지 alias로 흡수)
                a["aliases"] = sorted(set(
                    a["aliases"] + [a.get("canonical",""), b.get("canonical","")] + b["aliases"]
                ))

                # 대표명 선택(간단: 더 짧은 쪽; 필요시 규칙 고도화)
                if a.get("canonical") and b.get("canonical"):
                    a["canonical"] = min([a["canonical"], b["canonical"]], key=len)
                else:
                    a["canonical"] = a.get("canonical") or b.get("canonical")

                # source 병합
                a["source"] = sorted(set(a["source"] + b["source"]))

                # 타입/정의 충돌 등은 필요시 추가 규칙으로 조정
                # 예: a["type"] 우선, 없으면 b["type"] 채우기
                if not a.get("type") and b.get("type"):
                    a["type"] = b["type"]

                # b 제거 후 다시 루프 시작
                ents.remove(b)
                changed = True
                break
    return ents


In [180]:
for a, b in itertools.combinations(all_e, 2):
    print(a)
    print(b)

{'id': 'e1', 'canonical': '개인정보 보호법', 'aliases': ['개보법', 'Privacy Act'], 'type': '법률', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e2', 'canonical': '제28조의2', 'aliases': ['가명정보의 처리'], 'type': '조문', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e1', 'canonical': '개인정보 보호법', 'aliases': ['개보법', 'Privacy Act'], 'type': '법률', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e3', 'canonical': '가명정보', 'aliases': ['가명정보', 'pseudonymized information', 'pseudonymized data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e1', 'canonical': '개인정보 보호법', 'aliases': ['개보법', 'Privacy Act'], 'type': '법률', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e4', 'canonical': '가명처리', 'aliases': ['가명처리', 'pseudonymization', 'pseudonymization process'], 'type'

In [None]:
pairs = []
for a, b in itertools.combinations(all_e, 2):
    scores = [fuzz.token_sort_ratio(a.get("canonical",""), b.get("canonical",""))]
    for x in a.get("aliases", []):
        for y in b.get("aliases", []):
            scores.append(fuzz.token_sort_ratio(x, y))
    if max(scores) >= 85:
        pairs.append((a, b))
        all_e.pop(a)
        print(f'{a}를 pop하였습니다')
        all_e.pop(b)
        print(f'{b}를 pop하였습니다')


In [182]:
pairs

[({'id': 'e2',
   'canonical': '제28조의2',
   'aliases': ['가명정보의 처리'],
   'type': '조문',
   'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
   'evidence': ['...']},
  {'id': 'e23',
   'canonical': '정보의 처리',
   'aliases': ['정보의 처리', 'processing', 'data processing'],
   'type': '행위',
   'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
   'evidence': ['...']}),
 ({'id': 'e6',
   'canonical': '개인정보',
   'aliases': ['개인정보', 'personal information', 'personal data'],
   'type': '용어',
   'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
   'evidence': ['...']},
  {'id': 'e8',
   'canonical': '개인정보 파일',
   'aliases': ['개인정보 파일', 'personal information file', 'personal data file'],
   'type': '용어',
   'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'],
   'evidence': ['...']}),
 ({'id': 'e16',
   'canonical': '정보의 수집',
   'aliases': ['정보의 수집', 'collection', 'data collection'],
   'type': '행위',
   'source': ['doc://PIPA#제28조의2', 'group:

In [189]:
for i, (a, b) in enumerate(pairs):
    print(i)
    print(a)
    print(b)


0
{'id': 'e2', 'canonical': '제28조의2', 'aliases': ['가명정보의 처리'], 'type': '조문', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e23', 'canonical': '정보의 처리', 'aliases': ['정보의 처리', 'processing', 'data processing'], 'type': '행위', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
1
{'id': 'e6', 'canonical': '개인정보', 'aliases': ['개인정보', 'personal information', 'personal data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e8', 'canonical': '개인정보 파일', 'aliases': ['개인정보 파일', 'personal information file', 'personal data file'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
2
{'id': 'e16', 'canonical': '정보의 수집', 'aliases': ['정보의 수집', 'collection', 'data collection'], 'type': '행위', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e18', 'canonical': '정보의 결합

In [None]:
pairs_copied = pairs.copy()
changed = True
while changed:
    changed = False
    for a, b in pairs_copied:
        print(f"a : {a}")
        print(f"b : {b}")
        if llm_equiv(a, b):
            
            # 누락 키 보정
            a.setdefault("aliases", []); b.setdefault("aliases", [])
            a.setdefault("source",  []); b.setdefault("source",  [])

            # alias 병합(원래 canonical까지 alias로 흡수)
            a["aliases"] = sorted(set(
                a["aliases"] + [a.get("canonical",""), b.get("canonical","")] + b["aliases"]
            ))

            # 대표명 선택(간단: 더 짧은 쪽; 필요시 규칙 고도화)
            if a.get("canonical") and b.get("canonical"):
                a["canonical"] = min([a["canonical"], b["canonical"]], key=len)
            else:
                a["canonical"] = a.get("canonical") or b.get("canonical")

            # source 병합
            a["source"] = sorted(set(a["source"] + b["source"]))

            # 타입/정의 충돌 등은 필요시 추가 규칙으로 조정
            # 예: a["type"] 우선, 없으면 b["type"] 채우기
            if not a.get("type") and b.get("type"):
                a["type"] = b["type"]

            # b 제거 후 다시 루프 시작
            pairs_copied.remove(b)
            changed = True
            break
all_e.appned(pairs_copied)
print(f'{pairs_copied}가 다시 all_e에 합산되었습니다')

a : {'id': 'e2', 'canonical': '제28조의2', 'aliases': ['가명정보의 처리'], 'type': '조문', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
b : {'id': 'e23', 'canonical': '정보의 처리', 'aliases': ['정보의 처리', 'processing', 'data processing'], 'type': '행위', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e2', 'canonical': '제28조의2', 'aliases': ['가명정보의 처리'], 'type': '조문', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
{'id': 'e23', 'canonical': '정보의 처리', 'aliases': ['정보의 처리', 'processing', 'data processing'], 'type': '행위', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
False
a : {'id': 'e6', 'canonical': '개인정보', 'aliases': ['개인정보', 'personal information', 'personal data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}
b : {'id': 'e8', 'canonical': '개인정보 파일', 'aliases': ['개인정보 파일', 'personal

In [193]:
print(pairs_copied)

[({'id': 'e2', 'canonical': '제28조의2', 'aliases': ['가명정보의 처리'], 'type': '조문', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}, {'id': 'e23', 'canonical': '정보의 처리', 'aliases': ['정보의 처리', 'processing', 'data processing'], 'type': '행위', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}), ({'id': 'e6', 'canonical': '개인정보', 'aliases': ['개인정보', 'personal information', 'personal data'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}, {'id': 'e8', 'canonical': '개인정보 파일', 'aliases': ['개인정보 파일', 'personal information file', 'personal data file'], 'type': '용어', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}), ({'id': 'e16', 'canonical': '정보의 수집', 'aliases': ['정보의 수집', 'collection', 'data collection'], 'type': '행위', 'source': ['doc://PIPA#제28조의2', 'group:1', 'group:1', 'group:1'], 'evidence': ['...']}, {'id': 'e18', 'canonical': '정

In [None]:
import Tuple
def _norm(s: str) -> str:
    return " ".join((s or "").strip().lower().split())

def _names(e: Dict) -> set[str]:
    out = set()
    if e.get("canonical"): out.add(_norm(e["canonical"]))
    for a in e.get("aliases", []) or []:
        if a: out.add(_norm(a))
    return out

def build_name2canon(ents: List[Dict]) -> Dict[str, str]:
    m = {}
    for e in ents:
        can = _norm(e.get("canonical", ""))
        if not can: 
            continue
        for n in _names(e) | {can}:
            m[n] = can
    return m

def normalize_relations(relations: List[Dict], name2canon: Dict[str,str]) -> List[Dict]:
    out = []
    for r in relations:
        s = name2canon.get(_norm(r.get("s","")), _norm(r.get("s","")))
        o = name2canon.get(_norm(r.get("o","")), _norm(r.get("o","")))
        p = _norm(r.get("p",""))
        nr = dict(r)
        nr["s"], nr["o"], nr["p"] = s, o, p
        out.append(nr)
    return out

def _indices_involving(rel_norm: List[Dict], name_norm: str) -> List[int]:
    return [i for i, r in enumerate(rel_norm) if r["s"] == name_norm or r["o"] == name_norm]

def _direction_and_other(rn: Dict, center: str):
    if rn["s"] == center:  return "subj", rn["o"]
    if rn["o"] == center:  return "obj",  rn["s"]
    return "", ""

def _merge_evidence([dst: Dict, src: Dict]) -> None:
    e1 = set(dst.get("evidence", []) or [])
    e2 = set(src.get("evidence", []) or [])
    dst["evidence"] = sorted(e1 | e2)
    if "source" in dst or "source" in src:
        s1 = set(dst.get("source", []) or [])
        s2 = set(src.get("source", []) or [])
        dst["source"] = sorted(s1 | s2)


def dedupe_relations_simple(
    ents: List[Dict],
    relations: List[Dict],
    pairs: List[Tuple[Dict, Dict]],
    max_llm_checks: int = 200
) -> List[Dict]:
    name2canon = build_name2canon(ents)
    rel_norm = normalize_relations(relations, name2canon)

    to_delete = set()
    checks = 0

    for a, b in pairs:
        a_can = _norm(a.get("canonical","")); a_can = name2canon.get(a_can, a_can)
        b_can = _norm(b.get("canonical","")); b_can = name2canon.get(b_can, b_can)
        if not a_can or not b_can: 
            continue

        a_idxs = _indices_involving(rel_norm, a_can)
        b_idxs = _indices_involving(rel_norm, b_can)

        for i in a_idxs:
            if i in to_delete: continue
            r1n = rel_norm[i]

            for j in b_idxs:
                if j in to_delete or i == j: 
                    continue
                r2n = rel_norm[j]

                # 1) predicate 동일
                if r1n["p"] != r2n["p"]:
                    continue

                # 2) 방향 동일
                dir1, other1 = _direction_and_other(r1n, a_can)
                dir2, other2 = _direction_and_other(r2n, b_can)
                if not dir1 or not dir2 or dir1 != dir2:
                    continue

                # 3) 반대편 노드 동일
                if other1 != other2:
                    continue

                # ---- LLM 최종판정 ----
                if checks >= max_llm_checks:
                    break
                checks += 1

                if llm_equiv(relations[i], relations[j]):
                    # 더 많은 evidence 가진 쪽을 남김
                    ei = len(relations[i].get("evidence", []) or [])
                    ej = len(relations[j].get("evidence", []) or [])
                    keep, drop = (i, j) if ei >= ej else (j, i)
                    _merge_evidence(relations[keep], relations[drop])
                    to_delete.add(drop)

    return [r for k, r in enumerate(relations) if k not in to_delete]

In [None]:
print(f"[DEBUG] candidate_pairs = {[(a['canonical'], b['canonical']) for a,b in pairs]}")

cleaned = dedupe_relations_simple(all_e, all_r, pairs, max_llm_checks=50) 
# entities들을 모두 소문자로 normalize하고, relations들의 node들도 모두 normalize하고
# 유사한 것으로 의심되는 entities들이 들어있는 모든 relations들도 llm에게 판정하라고 시켜서, 유사한 것들은 병합하는 구조

# (5) 결과 확인
print("\n[Before]")
for r in relations:
    print(r)

print("\n[After]")
for r in cleaned:
    print(r)

In [None]:
# neo4j_io.py
from __future__ import annotations
from typing import List, Dict, Tuple, Optional
from neo4j import GraphDatabase
from datetime import datetime
import re

# ----------------------------
# 기본 정규화 유틸
# ----------------------------
def norm(s: str) -> str:
    return " ".join((s or "").strip().lower().split())

def uniq_list(xs: Optional[List[str]]) -> List[str]:
    if not xs:
        return []
    return sorted(set(x for x in xs if isinstance(x, str) and x.strip()))

def build_name_to_id(ents: List[Dict]) -> Dict[str, str]:
    """
    canonical/aliases 모든 표기를 -> 해당 entity id 로 매핑
    """
    m = {}
    for e in ents:
        eid = e.get("id")
        if not eid:
            continue
        can = e.get("canonical")
        if can:
            m[norm(can)] = eid
        for a in e.get("aliases", []) or []:
            if a:
                m[norm(a)] = eid
    return m

def sanitize_predicate(p: str) -> str:
    """
    관계 속성용 정규화(소문자, 공백 압축).
    (관계 타입으로 쓰지 않고 r.p_norm 속성으로만 보관)
    """
    return norm(p)

# ----------------------------
# Neo4j Writer
# ----------------------------
class Neo4jWriter:
    def __init__(self, uri: str, user: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    # 고유 제약조건: :Entity(id) 유니크
    def ensure_constraints(self):
        cypher = """
        CREATE CONSTRAINT entity_id_unique IF NOT EXISTS
        FOR (n:Entity) REQUIRE n.id IS UNIQUE
        """
        with self.driver.session() as sess:
            sess.run(cypher)

    # 엔터티 업서트 (간단 버전: 전달된 배열/맵을 그대로 세팅)
    def upsert_entities(self, ents: List[Dict], batch: int = 1000):
        rows = []
        now = datetime.utcnow().isoformat()

        for e in ents:
            row = {
                "id": e.get("id"),
                "canonical": e.get("canonical"),
                "type": e.get("type"),
                "aliases": uniq_list(e.get("aliases")),
                "source": uniq_list(e.get("source")),
                "evidence": uniq_list(e.get("evidence")),
                "attrs": e.get("attrs") or {},
                "updated_at": now,
            }
            if not row["id"]:
                continue
            rows.append({"props": row})

        if not rows:
            return

        q = """
        UNWIND $rows AS row
        MERGE (n:Entity {id: row.props.id})
        SET n += row.props
        """
        with self.driver.session() as sess:
            for i in range(0, len(rows), batch):
                sess.run(q, rows=rows[i:i+batch])

    # 관계 업서트
    def upsert_relations(
        self,
        ents: List[Dict],
        relations: List[Dict],
        batch: int = 1000,
        strict: bool = True,
    ):
        """
        relations의 s/o 가 '엔터티 id' 또는 'canonical/alias 문자열'일 수 있음.
        - id로 매칭 실패하면 문자열을 canonical/alias 매핑으로 id 해석 시도.
        - strict=True 이면 해석 실패한 관계는 건너뜀.
        """
        name2id = build_name_to_id(ents)
        id_set = {e["id"] for e in ents if e.get("id")}
        rows = []
        now = datetime.utcnow().isoformat()

        for r in relations:
            s_raw = str(r.get("s", "")).strip()
            o_raw = str(r.get("o", "")).strip()
            p_raw = str(r.get("p", "")).strip()

            # s/o가 이미 id이면 그대로, 아니면 이름->id 해석
            s_id = s_raw if s_raw in id_set else name2id.get(norm(s_raw))
            o_id = o_raw if o_raw in id_set else name2id.get(norm(o_raw))

            if not s_id or not o_id:
                if strict:
                    # 해석 실패 관계 스킵
                    continue
                else:
                    # 필요하면 느슨하게 노드도 함께 생성하는 로직을 넣을 수 있음
                    pass

            row = {
                "s_id": s_id,
                "o_id": o_id,
                "p": p_raw,
                "p_norm": sanitize_predicate(p_raw),
                "evidence": uniq_list(r.get("evidence")),
                "source": uniq_list(r.get("source")),
                "updated_at": now,
            }
            rows.append(row)

        if not rows:
            return

        # 동일 (s, p_norm, o) 조합을 하나로 MERGE
        q = """
        UNWIND $rows AS row
        MATCH (s:Entity {id: row.s_id})
        MATCH (o:Entity {id: row.o_id})
        MERGE (s)-[r:REL {p_norm: row.p_norm}]->(o)
        SET r.p = row.p,
            r.evidence = row.evidence,
            r.source = row.source,
            r.updated_at = row.updated_at
        """
        with self.driver.session() as sess:
            for i in range(0, len(rows), batch):
                sess.run(q, rows=rows[i:i+batch])


In [None]:
# run_write.py
from neo4j_io import Neo4jWriter

if __name__ == "__main__":
    # 2) 연결
    uri = "bolt://localhost:7687"       # Aura면 neo4j+s://... 형태
    user = "neo4j"
    password = "password"

    writer = Neo4jWriter(uri, user, password)
    writer.ensure_constraints()

    # 3) 엔터티/관계 업서트
    writer.upsert_entities(ents)
    writer.upsert_relations(ents, relations)

    writer.close()
    print("✅ Neo4j 업서트 완료")

In [None]:
# kb_update_compare.py
from __future__ import annotations
from typing import List, Dict, Tuple, Optional, Set
from dataclasses import dataclass
from neo4j import GraphDatabase
from datetime import datetime
import json
import re

# ----------------------------
# 유틸(정규화/집합 변환)
# ----------------------------
def norm(s: str) -> str:
    return " ".join((s or "").strip().lower().split())

def uniq_list(xs: Optional[List[str]]) -> List[str]:
    if not xs:
        return []
    return sorted(set(x for x in xs if isinstance(x, str) and x.strip()))

# ----------------------------
# 엔터티/관계의 "동일성 키" 정의
#  - 엔터티: (canonical_norm, type, attrs.key subset)
#  - 관계: (s_canonical_norm, p_norm, o_canonical_norm)
# ----------------------------
ENTITY_ATTR_KEY_HINTS = ("code", "number")  # 있으면 동일성 키에 포함

def entity_key(e: Dict) -> str:
    c = norm(e.get("canonical", ""))
    t = e.get("type", "") or ""
    parts = [c, "||", t]
    attrs = e.get("attrs") or {}
    for k in ENTITY_ATTR_KEY_HINTS:
        if k in attrs and attrs[k]:
            parts += ["||", f"{k}=", str(attrs[k])]
    return "".join(parts)

def rel_key(r: Dict, id2canon_norm: Dict[str, str] | None = None) -> str:
    # r["s"], r["o"]가 id거나 이름일 수 있음 -> 가능하면 canonical로 치환
    s_raw = str(r.get("s", ""))
    o_raw = str(r.get("o", ""))
    p_raw = str(r.get("p", r.get("p_norm","")))
    p_norm = norm(r.get("p_norm", p_raw))

    def to_canon_norm(x: str) -> str:
        if id2canon_norm and x in id2canon_norm:
            return id2canon_norm[x]
        return norm(x)

    s = to_canon_norm(s_raw)
    o = to_canon_norm(o_raw)
    return f"{s}||{p_norm}||{o}"

# ----------------------------
# Neo4j I/O (간단 리더/라이터)
# ----------------------------
class Neo4jIO:
    def __init__(self, uri: str, user: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def ensure_constraints(self):
        q = """CREATE CONSTRAINT entity_id_unique IF NOT EXISTS
               FOR (n:Entity) REQUIRE n.id IS UNIQUE"""
        with self.driver.session() as s:
            s.run(q)

    # ===== Read current KB =====
    def read_entities(self) -> List[Dict]:
        q = "MATCH (n:Entity) RETURN n"
        out = []
        with self.driver.session() as s:
            for rec in s.run(q):
                n = rec["n"]
                out.append({
                    "id": n.get("id"),
                    "canonical": n.get("canonical"),
                    "aliases": n.get("aliases") or [],
                    "type": n.get("type"),
                    "source": n.get("source") or [],
                    "evidence": n.get("evidence") or [],
                    "attrs": n.get("attrs") or {},
                })
        return out

    def read_relations(self) -> List[Dict]:
        q = """MATCH (s:Entity)-[r:REL]->(o:Entity)
               RETURN s.id AS s_id, s.canonical AS s_canon,
                      r.p AS p, r.p_norm AS p_norm, r.evidence AS evidence, r.source AS source,
                      o.id AS o_id, o.canonical AS o_canon"""
        out = []
        with self.driver.session() as s:
            for rec in s.run(q):
                out.append({
                    "s_id": rec["s_id"],
                    "s": rec["s_canon"],      # 비교 편의를 위해 canonical로 채움
                    "p": rec["p"],
                    "p_norm": rec["p_norm"],
                    "evidence": rec["evidence"] or [],
                    "source": rec["source"] or [],
                    "o_id": rec["o_id"],
                    "o": rec["o_canon"],
                })
        return out

    # ===== Upsert =====
    def upsert_entities(self, ents: List[Dict], batch: int = 500):
        if not ents:
            return
        rows = []
        now = datetime.utcnow().isoformat()
        for e in ents:
            rows.append({"props": {
                "id": e.get("id"),
                "canonical": e.get("canonical"),
                "type": e.get("type"),
                "aliases": uniq_list(e.get("aliases")),
                "source": uniq_list(e.get("source")),
                "evidence": uniq_list(e.get("evidence")),
                "attrs": e.get("attrs") or {},
                "updated_at": now,
            }})
        q = """
        UNWIND $rows AS row
        MERGE (n:Entity {id: row.props.id})
        SET n += row.props
        """
        with self.driver.session() as s:
            for i in range(0, len(rows), batch):
                s.run(q, rows=rows[i:i+batch])

    def upsert_relations(self, ents: List[Dict], rels: List[Dict], batch: int = 500):
        if not rels:
            return
        # (이 함수는 s/o가 id 또는 이름일 수 있음 → id 우선, 없으면 canonical로 매칭)
        id_set = {e["id"] for e in ents if e.get("id")}
        name2id = {}
        for e in ents:
            can = e.get("canonical")
            if e.get("id") and can:
                name2id[norm(can)] = e["id"]

        rows = []
        now = datetime.utcnow().isoformat()
        for r in rels:
            s_raw = str(r.get("s", ""))
            o_raw = str(r.get("o", ""))
            p = str(r.get("p", ""))
            p_norm = norm(r.get("p_norm", p))
            s_id = s_raw if s_raw in id_set else name2id.get(norm(s_raw))
            o_id = o_raw if o_raw in id_set else name2id.get(norm(o_raw))
            if not s_id or not o_id:
                continue
            rows.append({
                "s_id": s_id, "o_id": o_id,
                "p": p, "p_norm": p_norm,
                "evidence": uniq_list(r.get("evidence")),
                "source": uniq_list(r.get("source")),
                "updated_at": now
            })

        if not rows:
            return
        q = """
        UNWIND $rows AS row
        MATCH (s:Entity {id: row.s_id})
        MATCH (o:Entity {id: row.o_id})
        MERGE (s)-[r:REL {p_norm: row.p_norm}]->(o)
        SET r.p = row.p,
            r.evidence = row.evidence,
            r.source = row.source,
            r.updated_at = row.updated_at
        """
        with self.driver.session() as s:
            for i in range(0, len(rows), batch):
                s.run(q, rows=rows[i:i+batch])

    # ===== Delete (옵션) =====
    def delete_entities_by_canonical_type(self, items: List[Tuple[str, str]]):
        if not items:
            return
        q = """
        UNWIND $rows AS row
        MATCH (n:Entity {canonical: row.canonical, type: row.type})
        DETACH DELETE n
        """
        rows = [{"canonical": c, "type": t} for (c, t) in items]
        with self.driver.session() as s:
            s.run(q, rows=rows)

    def delete_relations_by_triplet(self, items: List[Tuple[str, str, str]]):
        """
        items: (s_canonical, p_norm, o_canonical)
        """
        if not items:
            return
        q = """
        UNWIND $rows AS row
        MATCH (s:Entity {canonical: row.s})
              -[r:REL {p_norm: row.p_norm}]->
              (o:Entity {canonical: row.o})
        DELETE r
        """
        rows = [{"s": s, "p_norm": p, "o": o} for (s, p, o) in items]
        with self.driver.session() as s:
            s.run(q, rows=rows)

# ----------------------------
# Diff 결과 구조체
# ----------------------------
@dataclass
class EntityDiff:
    added: List[Dict]
    removed: List[Dict]
    changed: List[Tuple[Dict, Dict, Dict]]  # (old, new, changes)

@dataclass
class RelationDiff:
    added: List[Dict]
    removed: List[Dict]
    changed: List[Tuple[Dict, Dict, Dict]]  # (old, new, changes)

@dataclass
class KBDiff:
    entities: EntityDiff
    relations: RelationDiff

# ----------------------------
# 새 KB 생성 훅(여러분의 파이프라인 연결)
#  - docs: 문서(텍스트/청크 리스트 등) → 엔터티/관계 반환
# ----------------------------
def run_full_pipeline_over_docs(docs: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
    """
    여러분의 파이프라인을 호출하여 (entities, relations)를 반환하세요.
    반드시 각 엔터티는 id, canonical, type 등을 포함.
    관계는 s/p/o/evidence/source 포함.
    """
    raise NotImplementedError("여러분의 추출/병합 파이프라인 함수를 연결하세요.")

# ----------------------------
# Diff 계산
# ----------------------------
def index_entities_by_key(ents: List[Dict]) -> Dict[str, Dict]:
    return {entity_key(e): e for e in ents}

def index_relations_by_key(rels: List[Dict], id2canon_norm: Dict[str, str] | None = None) -> Dict[str, Dict]:
    return {rel_key(r, id2canon_norm): r for r in rels}

def id2canon_norm_map(ents: List[Dict]) -> Dict[str, str]:
    m = {}
    for e in ents:
        if e.get("id"):
            m[e["id"]] = norm(e.get("canonical",""))
    return m

def diff_sets(old_map: Dict[str, Dict], new_map: Dict[str, Dict]) -> Tuple[List[Dict], List[Dict], Set[str]]:
    old_keys = set(old_map.keys())
    new_keys = set(new_map.keys())
    added_keys = new_keys - old_keys
    removed_keys = old_keys - new_keys
    common_keys = old_keys & new_keys
    added = [new_map[k] for k in sorted(added_keys)]
    removed = [old_map[k] for k in sorted(removed_keys)]
    return added, removed, common_keys

def dict_list_diff(name: str, old_list: List[str], new_list: List[str]) -> Dict:
    o = set(old_list or [])
    n = set(new_list or [])
    add = sorted(n - o)
    rem = sorted(o - n)
    return {f"{name}_added": add, f"{name}_removed": rem} if (add or rem) else {}

def entity_change([old: Dict, new: Dict]) -> Dict:
    changes = {}
    if (old.get("canonical") or "") != (new.get("canonical") or ""):
        changes["canonical"] = {"old": old.get("canonical"), "new": new.get("canonical")}
    if (old.get("type") or "") != (new.get("type") or ""):
        changes["type"] = {"old": old.get("type"), "new": new.get("type")}
    # 리스트/맵 차이
    changes |= dict_list_diff("aliases", old.get("aliases") or [], new.get("aliases") or [])
    changes |= dict_list_diff("source", old.get("source") or [], new.get("source") or [])
    changes |= dict_list_diff("evidence", old.get("evidence") or [], new.get("evidence") or [])
    # attrs는 단순 덮었을 때 변경된 key만 추려봄
    olda, newa = old.get("attrs") or {}, new.get("attrs") or {}
    attr_keys = set(olda.keys()) | set(newa.keys())
    attr_diff = {}
    for k in sorted(attr_keys):
        if json.dumps(olda.get(k), ensure_ascii=False, sort_keys=True) != json.dumps(newa.get(k), ensure_ascii=False, sort_keys=True):
            attr_diff[k] = {"old": olda.get(k), "new": newa.get(k)}
    if attr_diff:
        changes["attrs"] = attr_diff
    return changes

def relation_change([old: Dict, new: Dict]) -> Dict:
    changes = {}
    # p / p_norm 변화
    if (old.get("p") or "") != (new.get("p") or ""):
        changes["p"] = {"old": old.get("p"), "new": new.get("p")}
    if norm(old.get("p_norm", old.get("p",""))) != norm(new.get("p_norm", new.get("p",""))):
        changes["p_norm"] = {"old": old.get("p_norm"), "new": new.get("p_norm")}
    changes |= dict_list_diff("evidence", old.get("evidence") or [], new.get("evidence") or [])
    changes |= dict_list_diff("source", old.get("source") or [], new.get("source") or [])
    return changes

def compute_kb_diff(
    old_ents: List[Dict], old_rels: List[Dict],
    new_ents: List[Dict], new_rels: List[Dict],
) -> KBDiff:
    # 엔터티 매핑
    old_e_map = index_entities_by_key(old_ents)
    new_e_map = index_entities_by_key(new_ents)

    e_added, e_removed, e_common = diff_sets(old_e_map, new_e_map)
    e_changed = []
    for k in sorted(e_common):
        ch = entity_change(old_e_map[k], new_e_map[k])
        if ch:
            e_changed.append((old_e_map[k], new_e_map[k], ch))

    # 관계 매핑 (노드 id->canonical 정규화 맵 사용)
    old_id2canon = id2canon_norm_map(old_ents)
    new_id2canon = id2canon_norm_map(new_ents)
    old_r_map = index_relations_by_key(old_rels, old_id2canon)
    new_r_map = index_relations_by_key(new_rels, new_id2canon)

    r_added, r_removed, r_common = diff_sets(old_r_map, new_r_map)
    r_changed = []
    for k in sorted(r_common):
        ch = relation_change(old_r_map[k], new_r_map[k])
        if ch:
            r_changed.append((old_r_map[k], new_r_map[k], ch))

    return KBDiff(
        entities=EntityDiff(added=e_added, removed=e_removed, changed=e_changed),
        relations=RelationDiff(added=r_added, removed=r_removed, changed=r_changed),
    )

# ----------------------------
# 리포팅(요약 출력)
# ----------------------------
def print_kb_diff_summary(diff: KBDiff, limit: int = 10):
    print("=== KB Diff Summary ===")
    print(f"Entities: +{len(diff.entities.added)} / -{len(diff.entities.removed)} / ~{len(diff.entities.changed)}")
    print(f"Relations: +{len(diff.relations.added)} / -{len(diff.relations.removed)} / ~{len(diff.relations.changed)}")

    def brief_e(e): return f"{e.get('type','?')} :: {e.get('canonical','?')}"
    def brief_r(r): return f"{r.get('s','?')} -[{r.get('p_norm', r.get('p','?'))}]-> {r.get('o','?')}"

    if diff.entities.added:
        print("\n[Entities Added] (up to", limit, ")")
        for e in diff.entities.added[:limit]:
            print(" +", brief_e(e))
    if diff.entities.removed:
        print("\n[Entities Removed] (up to", limit, ")")
        for e in diff.entities.removed[:limit]:
            print(" -", brief_e(e))
    if diff.entities.changed:
        print("\n[Entities Changed] (up to", limit, ")")
        for old, new, ch in diff.entities.changed[:limit]:
            print(" ~", brief_e(old), "=>", brief_e(new), "| changes:", ch)

    if diff.relations.added:
        print("\n[Relations Added] (up to", limit, ")")
        for r in diff.relations.added[:limit]:
            print(" +", brief_r(r))
    if diff.relations.removed:
        print("\n[Relations Removed] (up to", limit, ")")
        for r in diff.relations.removed[:limit]:
            print(" -", brief_r(r))
    if diff.relations.changed:
        print("\n[Relations Changed] (up to", limit, ")")
        for old, new, ch in diff.relations.changed[:limit]:
            print(" ~", brief_r(old), " | changes:", ch)

# ----------------------------
# Neo4j에 패치 적용(옵션)
#   - added/changed 는 upsert
#   - removed 는 (옵션) 삭제
# ----------------------------
def apply_diff_to_neo4j(
    neo: Neo4jIO,
    new_ents: List[Dict],
    new_rels: List[Dict],
    diff: KBDiff,
    delete_removed: bool = False
):
    # 추가/변경 업서트
    ents_to_upsert = diff.entities.added + [new for _, new, _ in diff.entities.changed]
    if ents_to_upsert:
        neo.upsert_entities(ents_to_upsert)
    rels_to_upsert = diff.relations.added + [new for _, new, _ in diff.relations.changed]
    if rels_to_upsert:
        neo.upsert_relations(new_ents, rels_to_upsert)

    # 삭제(선택)
    if delete_removed:
        # 엔터티 삭제는 관계 참조가 있을 수 있으니 신중히.
        ent_del_items = [(e.get("canonical",""), e.get("type","")) for e in diff.entities.removed]
        if ent_del_items:
            neo.delete_entities_by_canonical_type(ent_del_items)

        rel_del_items = []
        for r in diff.relations.removed:
            s = norm(r.get("s",""))
            p = norm(r.get("p_norm", r.get("p","")))
            o = norm(r.get("o",""))
            rel_del_items.append((s, p, o))
        if rel_del_items:
            neo.delete_relations_by_triplet(rel_del_items)

# ----------------------------
# 메인 실행 예시
# ----------------------------
def main():
    # 0) 기존 KB 읽기 (Neo4j)
    uri = "bolt://localhost:7687"; user = "neo4j"; password = "password"
    neo = Neo4jIO(uri, user, password)
    neo.ensure_constraints()

    old_entities = neo.read_entities()
    old_relations = neo.read_relations()

    # 1) 새 문서 포함하여 전체 4개 문서로 파이프라인 실행 → 새 KB
    #    여러분의 함수를 연결하세요.
    docs = [
        # {"doc_id": "doc1", "text": "..."},
        # {"doc_id": "doc2", "text": "..."},
        # {"doc_id": "doc3", "text": "..."},
        # {"doc_id": "doc4_new", "text": "..."},
    ]
    new_entities, new_relations = run_full_pipeline_over_docs(docs)

    # 2) Diff 계산
    diff = compute_kb_diff(old_entities, old_relations, new_entities, new_relations)

    # 3) 요약 출력
    print_kb_diff_summary(diff, limit=15)

    # 4) (옵션) Neo4j에 패치 적용
    #    - removed 삭제까지 반영하려면 delete_removed=True
    apply_diff_to_neo4j(neo, new_entities, new_relations, diff, delete_removed=False)

    neo.close()
    print("✅ 완료")

if __name__ == "__main__":
    main()


In [128]:
same_entities = {}
pairs = []
for a, b in itertools.combinations(all_e, 2):
    pairs.append((a,b))
    for x in a['entities']:
        for y in b['entities']:
            score = fuzz.token_sort_ratio(x, y)
            print(f"x : {x} y : {y} score = {fuzz.token_sort_ratio(x, y)}")
            if score >= 80 : 
                same_entities[x] = y 


x : Personal information y : privacy score = 22.22222222222222
x : Personal information y : personal data score = 42.42424242424242
x : Personal information y : processing score = 26.66666666666667
x : Personal information y : data subjects score = 18.181818181818176
x : Personal information y : purpose score = 14.814814814814813
x : Personal information y : minimum necessary score = 21.62162162162162
x : Personal information y : legally score = 22.22222222222222
x : Personal information y : justly score = 15.384615384615385
x : Personal information y : scope score = 16.000000000000004
x : Personal information y : proper score = 23.076923076923073
x : Personal information y : infringement score = 37.5
x : Personal information y : rights score = 23.076923076923073
x : Personal information y : access score = 15.384615384615385
x : Personal information y : policies score = 28.57142857142857
x : Personal information y : rights of data subjects score = 23.25581395348837
x : Personal informa

In [127]:
same_entities

{'Processing': 'processing', 'Data subject': 'data subjects'}

In [129]:
pairs

[({'id': 'e1',
   'entities': ['Personal information',
    'living individuals',
    'categories',
    'Collection',
    'Generation',
    'Connection',
    'Integration',
    'Recording',
    'Storage',
    'Retention',
    'Processing',
    'Editing',
    'Retrieval',
    'Output',
    'Correction',
    'Recovery',
    'Use',
    'Provision',
    'Disclosure',
    'Destruction',
    'Similar actions',
    'Personal information file',
    'Systematically arranged',
    'Organized',
    'Easy search',
    'Data subject',
    'Person',
    'Subject',
    'Personal information processor',
    'Public institutions',
    'Corporations',
    'Organizations',
    'Individuals',
    'Business purposes',
    'Directly',
    'Through others',
    'Operate',
    'Personal information files',
    'Public institutions',
    'Administrative affairs',
    'National Assembly',
    'Courts',
    'Constitutional Court',
    'National Election Commission',
    'Central administrative agencies',
    'Aff

In [130]:
ents_f = pairs[:]  # shallow copy
ents_f

[({'id': 'e1',
   'entities': ['Personal information',
    'living individuals',
    'categories',
    'Collection',
    'Generation',
    'Connection',
    'Integration',
    'Recording',
    'Storage',
    'Retention',
    'Processing',
    'Editing',
    'Retrieval',
    'Output',
    'Correction',
    'Recovery',
    'Use',
    'Provision',
    'Disclosure',
    'Destruction',
    'Similar actions',
    'Personal information file',
    'Systematically arranged',
    'Organized',
    'Easy search',
    'Data subject',
    'Person',
    'Subject',
    'Personal information processor',
    'Public institutions',
    'Corporations',
    'Organizations',
    'Individuals',
    'Business purposes',
    'Directly',
    'Through others',
    'Operate',
    'Personal information files',
    'Public institutions',
    'Administrative affairs',
    'National Assembly',
    'Courts',
    'Constitutional Court',
    'National Election Commission',
    'Central administrative agencies',
    'Aff

In [None]:
# canonical 항목을 entities와 relations 생성할 때 만들어두자
ents_f = pairs[:]  # shallow copy
changed = True
while changed:
    changed = False
    for a, b in ents_f:
        if llm_equiv(a, b):
            # 누락 키 보정
            a.setdefault("entities", []); b.setdefault("entities", [])
            a.setdefault("source",  []); b.setdefault("source",  [])

            # alias 병합(원래 canonical까지 alias로 흡수)
            a["entities"] = sorted(set(
                a["entities"] + [a.get("canonical",""), b.get("canonical","")] + b["aliases"]
            ))

            # 대표명 선택(간단: 더 짧은 쪽; 필요시 규칙 고도화)
            if a.get("canonical") and b.get("canonical"):
                a["canonical"] = min([a["canonical"], b["canonical"]], key=len)
            else:
                a["canonical"] = a.get("canonical") or b.get("canonical")

            # source 병합
            a["source"] = sorted(set(a["source"] + b["source"]))

            # 타입/정의 충돌 등은 필요시 추가 규칙으로 조정
            # 예: a["type"] 우선, 없으면 b["type"] 채우기
            if not a.get("type") and b.get("type"):
                a["type"] = b["type"]

            # b 제거 후 다시 루프 시작
            ents.remove(b)
            changed = True
            break
return ents