# IPC 코드 기준 평가

In [56]:
patent_id = '1020227017251'
correct_ipc = [
'H10K 50/80',
'H10K 50/00',
'H10K 59/00',
'H05B 33/28',
'H05B 33/06'
]

In [57]:
import chromadb
from sentence_transformers import SentenceTransformer
import torch
from collections import defaultdict
import math

# ---------------------------------------------------------
# 1. 설정 (GPU 확인 등)
# ---------------------------------------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# ---------------------------------------------------------
# 2. 모델 로드 (저장할 때 썼던 그 모델!)
# ---------------------------------------------------------
print("모델 로드 중...")
model = SentenceTransformer("dragonkue/BGE-m3-ko").to(device)

# ---------------------------------------------------------
# 3. ChromaDB 연결 (다운로드 받은 폴더 경로 지정)
# ---------------------------------------------------------
# path="./chroma_db" 는 압축 푼 폴더 이름과 같아야 합니다.
client = chromadb.PersistentClient(path="../../../db_search/doc_db")

# 컬렉션 가져오기 (create가 아니라 get_collection 사용)
collection = client.get_collection(name="patent_claims")

print(f"✅ 데이터베이스 로드 완료! 총 데이터 수: {collection.count()}개")



Using device: cpu
모델 로드 중...
✅ 데이터베이스 로드 완료! 총 데이터 수: 589049개


### query가 2개 이상일 때 re-ranking해서 200건만 저장하는 함수

In [58]:
# 200개 초과 검색되었을 때 re-ranking (z 정규화 수정ver)
import numpy as np

def multi_query_rerank(
    collection,
    model,
    query_list,
    per_query_top_k=200,
    final_top_k=200
):
    #------------------------------------------
    # 1) Q개의 query 문장 embedding
    #------------------------------------------
    query_embs = model.encode(query_list).tolist()


    #------------------------------------------
    # 2-1) query_list가 한 개일 경우, 검색 후 바로 return
    #------------------------------------------
    if len(query_list) == 1:
        return collection.query(query_embeddings=query_embs, n_results=final_top_k)


    #------------------------------------------
    # 2-2) query별 검색
    #------------------------------------------
    candidates = []  # 전체 후보 저장
    for emb in query_embs:
        r = collection.query(
            query_embeddings=[emb],
            n_results=per_query_top_k
        )
    
        distances = np.array(r["distances"][0])
        mean = distances.mean()
        std = distances.std() + 1e-9

        z_scores = (distances - mean) / std

        ids = r["ids"][0]
        docs = r["documents"][0]
        distances = r["distances"][0]
        metas = r["metadatas"][0]

        # 후보를 통합 리스트에 추가
        for pid, doc, meta, z, dist in zip(ids, docs, metas, z_scores, distances):
            candidates.append({
                "id": pid,
                "document": doc,
                'metadatas':meta,
                'distance':dist,
                'z-score':z
            })

    #------------------------------------------
    # 3) z-score 기준 오름차순 정렬 후 상위 final_top_k만 선택
    #------------------------------------------
    top_candidates = sorted(candidates, key=lambda x: x["z-score"])[:final_top_k]


    #------------------------------------------
    # 4) collection.query() 형식으로 재구성
    #------------------------------------------
    final_ids = [c["id"] for c in top_candidates]
    final_docs = [c["document"] for c in top_candidates]
    final_distances = [c["distance"] for c in top_candidates]
    final_metas = [c['metadatas'] for c in top_candidates]

    final_results = {
        "ids": [final_ids],
        "documents": [final_docs],
        "distances": [final_distances],
        "metadatas": [final_metas]
    }

    return final_results

### 특허 단위 점수 집계(Late Fusion Aggregated Scoring)
- 출원번호 하나당 모든 ‘매칭된 청구항’들의 유사도 점수를 통계적으로 합성해서 특허 단위 점수를 만듦
- 단점: 독립항이 핵심인데 종속항이 우연히 많이 매칭되면 점수가 잘못 올라갈 수 있음

In [59]:
import numpy as np

def many_claim_dis(results, TOP_K):
    # ----------------------------------------
    # 0. Chroma 결과 파싱
    # ----------------------------------------
    ids        = results["ids"][0]
    docs       = results["documents"][0]
    metas      = results["metadatas"][0]
    distances  = results["distances"][0]

    parsed = []
    for i in range(len(ids)):
        parsed.append({
            "id": ids[i],
            "document": docs[i],
            "metadata": metas[i],
            "distance": distances[i]
        })

    # ----------------------------------------
    # 1. 출원번호 기준 그룹화
    # ----------------------------------------
    grouped = defaultdict(list)
    for r in parsed:
        app_no = r["metadata"]["patent_id"]
        grouped[app_no].append(r)

    # ----------------------------------------
    # 2. 특허 단위 점수 계산
    #    방법: claim similarity들의 평균 + 대표 claim 보정
    # ----------------------------------------

    def similarity(d):
        return 1-d


    def compute_patent_score(claims):
        sims = [similarity(c["distance"]) for c in claims]   # 새로운 sim

        sims_sorted = sorted(sims, reverse=True)
        top3 = sims_sorted[:3]
        top3_avg = sum(top3) / len(top3)
        max_sim = sims_sorted[0]

        claim_count = len(claims)
        count_bonus = min(1.0, claim_count / 10.0)

        final_score =  top3_avg * 0.6 + max_sim * 0.3 + count_bonus * 0.1
    
        return final_score

    # ----------------------------------------
    # 3. 특허 단위 재랭킹
    # ----------------------------------------
    aggregated = []
    for app_no, claims in grouped.items():
        score = compute_patent_score(claims)

        # 대표 claim은 거리(distance)가 가장 낮은 claim 선택
        rep_claim = sorted(claims, key=lambda x: x["distance"])[0]
         # claims에서 id와 metadata를 제외하고 document와 distance만 저장
        
        filtered_claims = [
            {
                "id": c["id"],
                "document": c["document"],
                "distance": c["distance"]
            }
            for c in claims
        ]

        aggregated.append({
            "patent_id": app_no,
            "score": score,
            "top_claim": rep_claim["document"],
            "top_claim_no": rep_claim["metadata"]["claim_no"],
            "claims_found": len(claims),
            "claims": filtered_claims
        })

    # 점수 높은 순으로 재랭킹
    aggregated = sorted(aggregated, key=lambda x: x["score"], reverse=True)

    final_response = aggregated[:TOP_K]
    return final_response

In [60]:
query = ["근적외선(750-1000nm) 발광과 유기 제1층을 포함한 고효율 유기 EL 장치"]

results = multi_query_rerank(
    collection=collection,
    model=model,
    query_list=query,
    per_query_top_k=200,
    final_top_k=200
)

In [61]:
# 1. patent_ids: length 200, 중복 포함된 리스트
patent_ids_dum = [results['metadatas'][0][i]['patent_id'] for i in range(200)]

# 2. 중복 제거(원래 순서 유지)
unique_patent_ids = list(dict.fromkeys(patent_ids_dum))

# 3. 상위 30개만 선택
patent_ids2 = unique_patent_ids[:30]
len(patent_ids2)

30

In [62]:
# 상위 30개 결과에서 patent_id 기준으로 중복 제거하고 IPC 코드 추출
seen_patents = set()
top_30_patents = []

ids = results["ids"][0]
metas = results["metadatas"][0]

# 순서를 유지하면서 중복 제거
for i in range(len(ids)):
    patent_id = metas[i].get("patent_id")
    
    if patent_id not in seen_patents:
        seen_patents.add(patent_id)
        ipc = metas[i].get("ipc")
        top_30_patents.append({
            "patent_id": patent_id,
            "ipc": ipc
        })
        
        # 30개 모으면 중단
        if len(top_30_patents) >= 30:
            break

# IPC 코드 추출 및 flatten
top_30_ipc_list = []
for patent in top_30_patents:
    ipc = patent["ipc"]
    if isinstance(ipc, list):
        top_30_ipc_list.extend(ipc)
    elif isinstance(ipc, str):
        # 쉼표로 구분된 경우 split
        parts = ipc.split(",")
        for p in parts:
            code = p.strip()
            if code:
                top_30_ipc_list.append(code)

# 결과 확인
print(f"상위 30개 특허 개수: {len(top_30_patents)}")
print(f"총 IPC 코드 개수: {len(top_30_ipc_list)}")

# correct_ipc와 비교
from collections import Counter
counter = Counter(top_30_ipc_list)

print("\ncorrect_ipc 코드별 등장 횟수:")
for code in correct_ipc:
    print(f"{code}: {counter.get(code, 0)}회")

# 교집합 확인
common = set(top_30_ipc_list) & set(correct_ipc)
print(f"\n교집합: {common}")
print(f"일치 개수: {len(common)}/{len(correct_ipc)}")


상위 30개 특허 개수: 30
총 IPC 코드 개수: 167

correct_ipc 코드별 등장 횟수:
H10K 50/80: 6회
H10K 50/00: 11회
H10K 59/00: 8회
H05B 33/28: 1회
H05B 33/06: 1회

교집합: {'H10K 59/00', 'H10K 50/00', 'H05B 33/28', 'H05B 33/06', 'H10K 50/80'}
일치 개수: 5/5


In [63]:
results = multi_query_rerank(
    collection=collection,
    model=model,
    query_list=query,
    per_query_top_k=200,
    final_top_k=200
)

results = many_claim_dis(results, TOP_K=30)

In [64]:
results

[{'patent_id': '1020237013373',
  'score': 0.665850132703781,
  'top_claim': '발광 장치로서, 제 1 발광 디바이스와, 제 1 색 변환층을 포함하고, 상기 제 1 발광 디바이스와 상기 제 1 색 변환층 사이에 유기 화합물을 포함한 제 1 층을 포함하고, 상기 제 1 발광 디바이스는 양극과, 음극과, 상기 양극과 상기 음극 사이에 위치하는 EL층을 포함하고, 상기 EL층은 455nm 이상 465nm 이하의 파장의 광에 대한 정상 굴절률이 1.50 이상 1.75 미만인 재료를 포함한 층을 포함하고, 상기 제 1 색 변환층은 광을 흡수하여 방출하는 제 1 물질을 포함하고, 상기 제 1 층은 455nm 이상 465nm 이하의 파장의 광에 대한 정상 굴절률이 1.40 이상 2.10 이하이고, 상기 제 1 발광 디바이스로부터 상기 발광 장치의 외부에 방출되는 광의 광로 상에 상기 제 1 층 및 상기 제 1 색 변환층이 위치하는, 발광 장치.',
  'top_claim_no': 11,
  'claims_found': 12,
  'claims': [{'id': '1020237013373_claim11',
    'document': '발광 장치로서, 제 1 발광 디바이스와, 제 1 색 변환층을 포함하고, 상기 제 1 발광 디바이스와 상기 제 1 색 변환층 사이에 유기 화합물을 포함한 제 1 층을 포함하고, 상기 제 1 발광 디바이스는 양극과, 음극과, 상기 양극과 상기 음극 사이에 위치하는 EL층을 포함하고, 상기 EL층은 455nm 이상 465nm 이하의 파장의 광에 대한 정상 굴절률이 1.50 이상 1.75 미만인 재료를 포함한 층을 포함하고, 상기 제 1 색 변환층은 광을 흡수하여 방출하는 제 1 물질을 포함하고, 상기 제 1 층은 455nm 이상 465nm 이하의 파장의 광에 대한 정상 굴절률이 1.40 이상 2.10 이하이고, 상기 제 1 발광 디바이스로부터 상기 발광 장치의 외부에 방출되는 광의 광로 상

In [65]:
# 1. 검색 실행
search_results = multi_query_rerank(
    collection=collection,
    model=model,
    query_list=query,
    per_query_top_k=200,
    final_top_k=200
)

# 2. many_claim_dis로 특허 단위 그룹화
patent_results = many_claim_dis(search_results, TOP_K=30)

# 3. patent_id -> IPC 매핑 생성 (원본 검색 결과에서)
patent_to_ipc = {}
for meta in search_results["metadatas"][0]:
    pid = meta.get("patent_id")
    ipc = meta.get("ipc")
    if pid and pid not in patent_to_ipc:
        patent_to_ipc[pid] = ipc

# 4. 상위 30개 특허의 IPC 코드 추출
correct_ipc = [
'H10K 50/80',
'H10K 50/00',
'H10K 59/00',
'H05B 33/28',
'H05B 33/06'
]
top_30_ipc_list = []

for patent in patent_results[:30]:
    patent_id = patent["patent_id"]
    ipc = patent_to_ipc.get(patent_id)
    
    if isinstance(ipc, list):
        top_30_ipc_list.extend(ipc)
    elif isinstance(ipc, str):
        parts = ipc.split(",")
        for p in parts:
            code = p.strip()
            if code:
                top_30_ipc_list.append(code)

# 5. 결과 분석
from collections import Counter
counter = Counter(top_30_ipc_list)

print(f"상위 30개 특허 개수: {len(patent_results[:30])}")
print(f"총 IPC 코드 개수: {len(top_30_ipc_list)}")
print("\ncorrect_ipc 코드별 등장 횟수:")
for code in correct_ipc:
    print(f"{code}: {counter.get(code, 0)}회")

# 교집합 확인
common = set(top_30_ipc_list) & set(correct_ipc)
print(f"\n교집합: {common}")
print(f"일치 개수: {len(common)}/{len(correct_ipc)}")


상위 30개 특허 개수: 30
총 IPC 코드 개수: 142

correct_ipc 코드별 등장 횟수:
H10K 50/80: 8회
H10K 50/00: 15회
H10K 59/00: 10회
H05B 33/28: 1회
H05B 33/06: 1회

교집합: {'H10K 59/00', 'H10K 50/00', 'H05B 33/28', 'H05B 33/06', 'H10K 50/80'}
일치 개수: 5/5


### 하이브리드 서치 추가

In [66]:
from hybrid_search_function import hybrid_search
# 1. 검색 실행
search_results = multi_query_rerank(
    collection=collection,
    model=model,
    query_list=query,
    per_query_top_k=200,
    final_top_k=200
)

patent_results = hybrid_search(
    multi_query_results=search_results,
    query_list=query,
    top_k=30,
    vector_weight=0.7,
    bm25_weight=0.3
)

# 3. patent_id -> IPC 매핑 생성 (원본 검색 결과에서)
patent_to_ipc = {}
for meta in search_results["metadatas"][0]:
    pid = meta.get("patent_id")
    ipc = meta.get("ipc")
    if pid and pid not in patent_to_ipc:
        patent_to_ipc[pid] = ipc

# 4. 상위 30개 특허의 IPC 코드 추출
correct_ipc = [
'H10K 50/80',
'H10K 50/00',
'H10K 59/00',
'H05B 33/28',
'H05B 33/06'
]
top_30_ipc_list = []

for patent in patent_results[:30]:
    patent_id = patent["patent_id"]
    ipc = patent_to_ipc.get(patent_id)
    
    if isinstance(ipc, list):
        top_30_ipc_list.extend(ipc)
    elif isinstance(ipc, str):
        parts = ipc.split(",")
        for p in parts:
            code = p.strip()
            if code:
                top_30_ipc_list.append(code)

# 5. 결과 분석
from collections import Counter
counter = Counter(top_30_ipc_list)

print(f"상위 30개 특허 개수: {len(patent_results[:30])}")
print(f"총 IPC 코드 개수: {len(top_30_ipc_list)}")
print("\ncorrect_ipc 코드별 등장 횟수:")
for code in correct_ipc:
    print(f"{code}: {counter.get(code, 0)}회")

# 교집합 확인
common = set(top_30_ipc_list) & set(correct_ipc)
print(f"\n교집합: {common}")
print(f"일치 개수: {len(common)}/{len(correct_ipc)}")


상위 30개 특허 개수: 30
총 IPC 코드 개수: 174

correct_ipc 코드별 등장 횟수:
H10K 50/80: 6회
H10K 50/00: 9회
H10K 59/00: 7회
H05B 33/28: 0회
H05B 33/06: 0회

교집합: {'H10K 59/00', 'H10K 50/80', 'H10K 50/00'}
일치 개수: 3/5
