# 신뢰성 검증 함수 모듈화

기존 `validate_reliability` 함수를 여러 개의 작은 함수로 모듈화하여 코드의 가독성과 재사용성을 높였습니다. 모듈화된 함수들은 다음과 같은 기능을 수행합니다:

1. **입력 데이터 준비**: 모델에 입력될 데이터를 준비
2. **인코딩 및 초기 예측**: 모델을 통한 데이터 인코딩 및 초기 예측
3. **순방향 참조 분석**: 특허의 인용 관계 분석
4. **잠재 공간 최적화**: 경사 하강법을 통한 잠재 벡터 최적화
5. **IPC 생성 및 검증**: 생성된 IPC 코드의 검증
6. **결과 분석 및 수집**: 분석 결과 수집

아래는 모듈화된 함수들의 구현입니다.

In [None]:
import torch
import numpy as np
import copy
import pandas as pd
from utils import to_device

def prepare_input_data(tech_dataset, used_test_index_TC, idx, model):
    """
    입력 데이터를 준비하고 모델 장치에 로드합니다.
    """
    # 클래스와 클레임 데이터 인코딩
    input_class = torch.tensor(tech_dataset.tokenizers["class_enc"].encode(tech_dataset.X_class[used_test_index_TC][idx])).unsqueeze(0)
    input_claim = tech_dataset.tokenize(tech_dataset.tokenizers["claim_enc"], tech_dataset.X_claim[used_test_index_TC][idx])
    input_claim = {k: v.unsqueeze(0) for k, v in input_claim.items()}
    
    # 배치 입력 생성 및 모델 장치로 전송
    batch_input = {"class": torch.tensor(input_class), "claim": input_claim}
    input_inf = to_device(batch_input, model.device)
    
    return input_inf, input_class

def encode_and_predict(model, input_inf):
    """
    입력 데이터를 인코딩하고 초기 예측을 수행합니다.
    """
    enc_outputs, z, mu, logvar = model.encode(input_inf)
    pred_outputs = model.predict(z)
    
    return enc_outputs, z, mu, logvar, pred_outputs

def analyze_forward_references(used_test_data_TC, used_rawdata, total_data, idx, n_TC):
    """
    순방향 참조를 분석하여 참조된 특허의 IPC 코드와 인용 수를 가져옵니다.
    """
    # 순방향 참조가 있는지 확인
    if used_test_data_TC.iloc[idx]["TC5"] <= 0:
        return False, None, None
    
    # 순방향 참조 정보 가져오기
    forward_refs = used_rawdata.loc[used_test_data_TC.iloc[idx]["patent_number"]]["forward_refs"].split(";")
    ref_info = total_data.loc[[ref for ref in forward_refs if ref in total_data.index]]
    
    if len(ref_info) == 0:
        return False, None, None
        
    # 참조된 특허들의 IPC 코드와 인용 수
    ref_ipcs = ref_info["patent_classes"].apply(lambda x: set(x))
    ref_FCs = ref_info["TC" + str(n_TC)]
    
    return True, ref_ipcs, ref_FCs

def decode_original_text(tech_dataset, input_class):
    """
    원본 IPC 코드를 디코딩합니다.
    """
    tokenizer = tech_dataset.tokenizers["class_dec"]
    org_text = tokenizer.decode_batch(input_class.cpu().detach().numpy())[0]
    org_text = org_text[org_text.index(tokenizer.sos_token)+1:org_text.index(tokenizer.eos_token)]
    
    return org_text

def check_same_ipcs(org_text, ref_ipcs):
    """
    원본 IPC와 참조된 특허들의 IPC가 동일한지 확인합니다.
    """
    return set(org_text) == set(np.concatenate(ref_ipcs.apply(lambda x: list(x)).values))

def breakdown(ipcs):
    """
    IPC 코드를 다양한 레벨(섹션, 클래스, 서브클래스, 전체)로 분류합니다.
    """
    return ([ipc[0] for ipc in ipcs], [ipc[:3] for ipc in ipcs], [ipc[:4] for ipc in ipcs], ipcs)

In [None]:
def optimize_latent_space(model, z, enc_outputs, tech_dataset, L1_threshold, n_iter, step_size):
    """
    경사 하강법을 통해 잠재 공간을 최적화하여 L1_threshold 이상의 인용 수를 갖는 IPC 코드를 생성합니다.
    """
    tokenizer = tech_dataset.tokenizers["class_dec"]
    optimized = False
    gen_text = None
    FC_estimated = 0
    
    # 경사 하강법을 통한 최적화 반복
    for i in range(n_iter):
        pred_outputs = model.predict(z)
        z.retain_grad()
        FC_estimated = np.round(np.exp(pred_outputs[0,1].item()), 4)  # 예상 인용 수
        
        # L1 오차 계산 및 역전파
        L1_error = (1 - torch.exp(pred_outputs[0,1]))
        L1_error.backward(retain_graph=True)
        
        # 경사 방향으로 잠재 벡터 업데이트
        grad_for_update = step_size * z.grad
        z_ = z - grad_for_update
        
        z.grad.zero_()
        
        # 새 잠재 벡터로 IPC 코드 생성
        dec_outputs = model.decode(z_, enc_outputs, dec_inputs=None)
        dec_outputs = dec_outputs.argmax(-1)
        
        # 생성된 IPC 코드 디코딩
        gen_text = tokenizer.decode_batch(dec_outputs.cpu().detach().numpy())[0]
        if tokenizer.eos_token in gen_text:
            gen_text = gen_text[gen_text.index(tokenizer.sos_token)+1:gen_text.index(tokenizer.eos_token)]
        else:
            gen_text = gen_text[gen_text.index(tokenizer.sos_token)+1:]
            
        # 생성된 IPC 코드 정제
        if gen_text != []:
            gen_text = [gen_text[0]] + list(np.array(gen_text[1:])[np.unique(gen_text[1:], return_index=True)[1]])
            gen_text = set(gen_text)
        else:
            continue
        
        # 충분한 인용 수가 예상되면 최적화 종료
        if FC_estimated >= L1_threshold:
            optimized = True
            break
            
        z = z_
        
    return optimized, gen_text, FC_estimated, z

def validate_generated_ipc(gen_text, ref_ipcs, ref_FCs):
    """
    생성된 IPC 코드와 참조된 특허들의 IPC 코드를 비교하여 검증합니다.
    """
    inclusions = [None, None, None, None]
    higher_impacts = [None, None, None, None]
    similar_refs = [None, None, None, None]
    unsimilar_refs = [None, None, None, None]
    
    # IPC 코드 분류
    gen_text_breakdown = breakdown(gen_text)
    ref_ipcs_breakdown = (
        ref_ipcs.apply(lambda x: breakdown(x)[0]),
        ref_ipcs.apply(lambda x: breakdown(x)[1]),
        ref_ipcs.apply(lambda x: breakdown(x)[2]),
        ref_ipcs
    )
    
    # 각 레벨에서 검증 수행
    for i in range(4):
        if inclusions[i] is not None:
            continue
            
        temp_gen_text = gen_text_breakdown[i]
        temp_ref_ipcs = ref_ipcs_breakdown[i]
        
        # 생성된 IPC와 일치하는 참조 찾기
        hit_index = temp_ref_ipcs.apply(lambda x: 1 if set(x) == set(temp_gen_text) else 0) == 1
        similar_refs[i] = temp_ref_ipcs[hit_index].index
        unsimilar_refs[i] = temp_ref_ipcs[~hit_index].index
        
        # 참조가 없는 경우
        if len(similar_refs[i]) == 0:
            inclusions[i] = 0
            higher_impacts[i] = None
        # 모든 참조가 일치하는 경우
        elif len(unsimilar_refs[i]) == 0:
            inclusions[i] = 1
            similar_mean_FC = np.mean(ref_FCs.loc[similar_refs[i]])
            higher_impacts[i] = 1 if similar_mean_FC > 0 else 0
        # 일부 참조가 일치하는 경우
        else:
            inclusions[i] = 1
            similar_mean_FC = np.mean(ref_FCs.loc[similar_refs[i]])
            unsimilar_mean_FC = np.mean(ref_FCs.loc[unsimilar_refs[i]])
            
            if similar_mean_FC >= unsimilar_mean_FC:
                higher_impacts[i] = 1 if similar_mean_FC > 0 else None
            else:
                higher_impacts[i] = 0
    
    return inclusions, higher_impacts, similar_refs, unsimilar_refs

In [None]:
def validate_reliability(model=None, idx=None, L1_threshold=0.5, n_iter=30, step_size=40):
    """
    생성된 IPC 코드의 신뢰성을 검증합니다.
    
    Args:
        model: 사용할 모델
        idx: 현재 검증할 샘플의 인덱스
        L1_threshold: L1 임계값
        n_iter: 최적화 반복 횟수
        step_size: 경사 하강법의 단계 크기
        
    Returns:
        counters: (cnt_nonexist, cnt_noFC, cnt_diverge, cnt_same_ipcs, cnt_diff_ipcs)
        results: 검증 결과 또는 None (실패 시)
    """
    # 카운터 초기화
    cnt_nonexist = 0
    cnt_noFC = 0
    cnt_diverge = 0
    cnt_same_ipcs = 0
    cnt_diff_ipcs = 0
    
    # 1. 입력 데이터 준비
    input_inf, input_class = prepare_input_data(tech_dataset, used_test_index_TC, idx, model)
    
    # 2. 모델 인코딩 및 초기 예측
    enc_outputs, z, mu, logvar, pred_outputs = encode_and_predict(model, input_inf)
    
    # 3. 순방향 참조 분석
    ref_exists, ref_ipcs, ref_FCs = analyze_forward_references(used_test_data_TC, used_rawdata, total_data, idx, n_TC)
    
    if not ref_exists:
        if used_test_data_TC.iloc[idx]["TC5"] <= 0:
            cnt_noFC += 1
        else:
            cnt_nonexist += 1
        return (cnt_nonexist, cnt_noFC, cnt_diverge, cnt_same_ipcs, cnt_diff_ipcs), None
    
    # 4. 원본 IPC 코드 디코딩
    org_text = decode_original_text(tech_dataset, input_class)
    
    # 5. 원본 IPC와 참조 IPC가 동일한지 확인
    if check_same_ipcs(org_text, ref_ipcs):
        cnt_same_ipcs += 1
    
    # 6. 잠재 공간 최적화를 통한 IPC 코드 생성
    optimized, gen_text, FC_estimated, z = optimize_latent_space(model, z, enc_outputs, tech_dataset, L1_threshold, n_iter, step_size)
    
    if not optimized:
        cnt_diverge += 1
        return (cnt_nonexist, cnt_noFC, cnt_diverge, cnt_same_ipcs, cnt_diff_ipcs), None
    
    # 7. 생성된 IPC 코드 검증
    inclusions, higher_impacts, similar_refs, unsimilar_refs = validate_generated_ipc(gen_text, ref_ipcs, ref_FCs)
    
    # 8. 최종 결과 수집
    cnt_diff_ipcs += 1
    results = {
        "index": idx,
        "patent_id": used_test_data_TC.iloc[idx]["patent_number"],
        "org_text": org_text,
        "gen_text": gen_text,
        "ref_ipcs": ref_ipcs,
        "ref_FCs": ref_FCs,
        "inclusions": inclusions,
        "higher_impacts": higher_impacts,
        "FC_estimated": FC_estimated,
        "similar_refs": similar_refs,
        "unsimilar_refs": unsimilar_refs
    }
    
    return (cnt_nonexist, cnt_noFC, cnt_diverge, cnt_same_ipcs, cnt_diff_ipcs), results

## 모듈화 후 성능 테스트

모듈화된 함수를 사용하여 이전과 동일한 결과를 얻는지 확인합니다.

In [None]:
# 예시: 모듈화된 함수 테스트
test_idx = 0  # 테스트할 샘플 인덱스

# 기존 함수와 모듈화된 함수의 결과 비교
original_counters, original_results = validate_reliability(model=model, idx=test_idx)

print("테스트 샘플:", test_idx)
if original_results is not None:
    print("원본 IPC:", original_results["org_text"])
    print("생성된 IPC:", original_results["gen_text"])
    print("예상 인용 수:", original_results["FC_estimated"])
    print("IPC 포함 여부:", original_results["inclusions"])
    print("더 높은 영향력 여부:", original_results["higher_impacts"])
else:
    print("결과 없음. 원인:", original_counters)