In [1]:
import json
import torch
import numpy as np
import os
import gc
import warnings
import torch.nn as nn
import commentjson
warnings.filterwarnings('ignore')
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForTokenClassification
)
import traceback
from ipywidgets import widgets
from IPython.display import display, clear_output
from torchcrf import CRF
from transformers import BertModel, RobertaModel

# --- 0. 기본 설정 ---
MODEL_NAME = "klue/bert-base"
MAX_LEN = 128

# 모델 저장 경로 설정
INTENT_MODEL_DIR = "./models/intent"
NER_MODEL_DIR = "./models/ner"
INTENT_LABEL_PATH = os.path.join(INTENT_MODEL_DIR, "intent_labels.jsonc")
NER_LABEL_PATH = os.path.join(NER_MODEL_DIR, "ner_labels.jsonc")

GAZETTEER_DIR = "./gazetteer"
BOOKS_GAZETTEER_PATH = os.path.join(GAZETTEER_DIR, "titles.json")
AUTHORS_GAZETTEER_PATH = os.path.join(GAZETTEER_DIR, "authors.json")


# 전역 변수로 모델과 관련 객체 선언
intent_model = None
intent_tokenizer = None
intent_id2label = None
ner_model = None
ner_tokenizer = None
ner_id2label = None

class RobertaCRF(nn.Module):
    def __init__(self, model_dir, num_labels):
        super(RobertaCRF, self).__init__()
        # config 직접 로드 및 수정
        from transformers import RobertaConfig
        config_path = os.path.join(model_dir, "config.json")
        if os.path.exists(config_path):
            config = RobertaConfig.from_pretrained(config_path)
            # Pooler 사용 안함 설정
            config.add_pooling_layer = False
        else:
            config = RobertaConfig.from_pretrained(model_dir)
            config.add_pooling_layer = False

        # 수정된 config로 모델 로드
        self.roberta = RobertaModel.from_pretrained(model_dir, config=config)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.roberta.config.hidden_size, num_labels)
        self.crf = CRF(num_labels, batch_first=True)

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs[0]  # [batch_size, seq_len, hidden_size]
        sequence_output = self.dropout(sequence_output)
        emissions = self.classifier(sequence_output)

        if labels is not None:
            # CRF 손실 계산
            mask = attention_mask.byte() if attention_mask is not None else None
            loss = -self.crf(emissions, labels, mask=mask)
            return type('CRFOutput', (), {'loss': loss, 'logits': emissions})()
        else:
            # 디코딩 (예측)
            mask = attention_mask.byte() if attention_mask is not None else None
            tags = self.crf.decode(emissions, mask=mask)
            return type('CRFOutput', (), {'tags': tags, 'logits': emissions})()

def load_gazetteer():
    # Gazetteer 디렉토리 생성
    os.makedirs(GAZETTEER_DIR, exist_ok=True)

    # 책 목록 Gazetteer
    title = {}
    try:
        with open("gazetteers/titles.jsonc", 'r', encoding='utf-8') as f:
            loaded_data = json.load(f)
            # 데이터 형식 확인 (리스트 또는 딕셔너리)
            if isinstance(loaded_data, dict):
                title = loaded_data
            else:  # 리스트인 경우 그대로 사용
                title = loaded_data
            print(f"책 Gazetteer 로드 완료: {len(title)} 항목")
    except Exception as e:
        print(f"책 Gazetteer 로드 오류: {e}")
        title = []

    # 작가 목록 Gazetteer
    author = {}
    try:
        with open("gazetteers/authors.jsonc", 'r', encoding='utf-8') as f:
            loaded_data = json.load(f)
            # 데이터 형식 확인 (리스트 또는 딕셔너리)
            if isinstance(loaded_data, dict):
                author = loaded_data
            else:  # 리스트인 경우 그대로 사용
                author = loaded_data
            print(f"작가 Gazetteer 로드 완료: {len(author)} 항목")
    except Exception as e:
        print(f"작가 Gazetteer 로드 오류: {e}")
        author = []

    # 데이터 형식 로깅
    print(f"제목 데이터 형식: {type(title).__name__}")
    print(f"작가 데이터 형식: {type(author).__name__}")

    return {"title": title, "author": author}

# --- 1. 모델 로드 함수 ---
def load_intent_model():
    global intent_model, intent_tokenizer, intent_id2label

    try:
        # 토크나이저 로드
        intent_tokenizer = AutoTokenizer.from_pretrained(INTENT_MODEL_DIR)

        # 레이블 정보 로드
        with open(INTENT_LABEL_PATH, 'r', encoding='utf-8') as f:
            label_info = commentjson.load(f)

        intent_id2label = {int(k): v for k, v in label_info["id2label"].items()}
        label2id = label_info["label2id"]

        # 모델 로드
        intent_model = AutoModelForSequenceClassification.from_pretrained(
            INTENT_MODEL_DIR,
            num_labels=len(intent_id2label),
            id2label=intent_id2label,
            label2id=label2id
        )

        print(f"Intent 모델 로드 성공: {len(intent_id2label)}개 의도 클래스")
        return True
    except Exception as e:
        print(f"Intent 모델 로드 실패: {e}")
        return False

def load_ner_model():
    global ner_model, ner_tokenizer, ner_id2label

    try:
        # 토크나이저 로드
        ner_tokenizer = AutoTokenizer.from_pretrained(NER_MODEL_DIR)

        # 레이블 정보 로드
        with open(NER_LABEL_PATH, 'r', encoding='utf-8') as f:
            label_info = commentjson.load(f)

        ner_id2label = {int(k): v for k, v in label_info["id2label"].items()}
        label2id = label_info["label2id"]

        # 커스텀 모델 로드
        ner_model = RobertaCRF(NER_MODEL_DIR, len(ner_id2label))

        # 가중치 파일 로드 - strict=False 적용
        state_dict_path = os.path.join(NER_MODEL_DIR, "pytorch_model.bin")
        if os.path.exists(state_dict_path):
            state_dict = torch.load(state_dict_path, map_location='cpu')
            # strict=False로 설정하여 일부 파라미터가 없어도 로드되도록 함
            missing_keys, unexpected_keys = ner_model.load_state_dict(state_dict, strict=False)
            print(f"누락된 키: {missing_keys}")
            print(f"예상치 못한 키: {unexpected_keys}")

        print(f"NER 모델 로드 성공: {len(ner_id2label)}개 NER 태그")
        return True
    except Exception as e:
        print(f"NER 모델 로드 실패: {e}")
        traceback.print_exc()
        return False

# --- 2. 예측 함수 ---
def predict_intent(text):
    global intent_model, intent_tokenizer, intent_id2label

    if not intent_model or not intent_tokenizer or not intent_id2label:
        return {"intent": "모델 미로드", "confidence": 0.0}

    intent_model.eval()

    inputs = intent_tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=MAX_LEN
    )

    if 'token_type_ids' in inputs:
        del inputs['token_type_ids']

    device = "cuda" if torch.cuda.is_available() else "cpu"
    intent_model = intent_model.to(device)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = intent_model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=-1)
        predicted_class_id = torch.argmax(logits, dim=-1).item()
        predicted_intent = intent_id2label[predicted_class_id]

    return {
        "intent": predicted_intent,
        "confidence": probabilities.cpu().numpy().flatten().tolist()
    }

def predict_entities(text, gazetteer=None):
    global ner_model, ner_tokenizer, ner_id2label

    if not ner_model or not ner_tokenizer or not ner_id2label:
        return {"tokens": [], "tags": [], "entities": [], "token_probabilities": []}

    ner_model.eval()

    inputs = ner_tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=MAX_LEN,
        return_offsets_mapping=True
    )

    offset_mapping = inputs.pop("offset_mapping").cpu().numpy()[0]

    if 'token_type_ids' in inputs:
        del inputs['token_type_ids']

    device = "cuda" if torch.cuda.is_available() else "cpu"
    ner_model = ner_model.to(device)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = ner_model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=2)
        predictions = torch.argmax(logits, dim=2)

    input_ids = inputs["input_ids"][0].cpu().numpy()
    tokens = ner_tokenizer.convert_ids_to_tokens(input_ids)
    predicted_tag_ids = predictions[0].cpu().numpy()
    all_probabilities = probabilities[0].cpu().numpy()

    content_tokens = []
    content_tags = []
    token_probabilities_list = []
    content_offsets = []

    for i, (token, tag_id, input_id, (start, end)) in enumerate(zip(tokens, predicted_tag_ids, input_ids, offset_mapping)):
        if input_id == ner_tokenizer.pad_token_id:
            break

        if token not in ner_tokenizer.all_special_tokens:
            content_tokens.append(token)
            tag = ner_id2label.get(tag_id, "O")
            content_tags.append(tag)
            content_offsets.append((start, end))
            token_probabilities_list.append(all_probabilities[i].tolist())

    # 엔티티 추출 (모델 예측)
    model_entities = []
    current_entity_tokens = []
    current_entity_starts = []
    current_entity_ends = []
    current_entity_label = None

    for i, (token, tag, (start, end)) in enumerate(zip(content_tokens, content_tags, content_offsets)):
        if tag.startswith("B-"):
            if current_entity_tokens:
                entity_start_offset = current_entity_starts[0]
                entity_end_offset = current_entity_ends[-1]
                entity_text = text[entity_start_offset:entity_end_offset]
                model_entities.append({
                    "entity": entity_text,
                    "label": current_entity_label,
                    "start": entity_start_offset,
                    "end": entity_end_offset,
                    "source": "model",
                    "priority": 1  # 모델 엔티티에 기본 우선순위 부여
                })

            current_entity_tokens = [token]
            current_entity_starts = [start]
            current_entity_ends = [end]
            current_entity_label = tag[2:]

        elif tag.startswith("I-") and current_entity_label == tag[2:]:
            current_entity_tokens.append(token)
            current_entity_starts.append(start)
            current_entity_ends.append(end)

        else:
            if current_entity_tokens:
                entity_start_offset = current_entity_starts[0]
                entity_end_offset = current_entity_ends[-1]
                entity_text = text[entity_start_offset:entity_end_offset]
                model_entities.append({
                    "entity": entity_text,
                    "label": current_entity_label,
                    "start": entity_start_offset,
                    "end": entity_end_offset,
                    "source": "model",
                    "priority": 1  # 모델 엔티티에 기본 우선순위 부여
                })
            current_entity_tokens = []
            current_entity_starts = []
            current_entity_ends = []
            current_entity_label = None

    if current_entity_tokens:
        entity_start_offset = current_entity_starts[0]
        entity_end_offset = current_entity_ends[-1]
        entity_text = text[entity_start_offset:entity_end_offset]
        model_entities.append({
            "entity": entity_text,
            "label": current_entity_label,
            "start": entity_start_offset,
            "end": entity_end_offset,
            "source": "model",
            "priority": 1  # 모델 엔티티에 기본 우선순위 부여
        })

    # === 가제티어 처리 개선 시작 ===
    gazetteer_entities = []
    if gazetteer:
        # 작가(author) 가제티어 먼저 처리
        if "author" in gazetteer and gazetteer["author"]:
            author_items = []
            entity_data = gazetteer["author"]
            if isinstance(entity_data, list):
                for item in entity_data:
                    if isinstance(item, str):
                        author_items.append(item)
                    elif isinstance(item, dict) and 'name' in item:
                        author_items.append(item['name'])
            elif isinstance(entity_data, dict):
                author_items.extend(entity_data.keys())

            # 길이 순 정렬 (긴 작가명 먼저)
            author_items.sort(key=len, reverse=True)

            for author in author_items:
                if not author or len(author) < 2:  # 너무 짧은 작가명은 건너뛰기
                    continue

                # 대소문자 무시하고 작가명 찾기
                start_pos = 0
                while True:
                    pos = text.lower().find(author.lower(), start_pos)
                    if pos == -1:
                        break
                    end_pos = pos + len(author)
                    exact_entity = text[pos:end_pos]

                    gazetteer_entities.append({
                        "entity": exact_entity,
                        "label": "author",
                        "start": pos,
                        "end": end_pos,
                        "source": "gazetteer",
                        "priority": 3  # 작가 가제티어에 높은 우선순위 부여
                    })
                    start_pos = end_pos

        # 제목(title) 가제티어 처리
        if "title" in gazetteer and gazetteer["title"]:
            title_items = []
            entity_data = gazetteer["title"]
            if isinstance(entity_data, list):
                for item in entity_data:
                    if isinstance(item, str):
                        title_items.append(item)
                    elif isinstance(item, dict) and 'name' in item:
                        title_items.append(item['name'])
            elif isinstance(entity_data, dict):
                title_items.extend(entity_data.keys())

            # 길이 순 정렬 (긴 제목 먼저)
            title_items.sort(key=len, reverse=True)

            for title in title_items:
                if not title or len(title) < 2:  # 너무 짧은 제목은 건너뛰기
                    continue

                # 대소문자 무시하고 제목 찾기
                start_pos = 0
                while True:
                    pos = text.lower().find(title.lower(), start_pos)
                    if pos == -1:
                        break
                    end_pos = pos + len(title)
                    exact_entity = text[pos:end_pos]

                    gazetteer_entities.append({
                        "entity": exact_entity,
                        "label": "title",
                        "start": pos,
                        "end": end_pos,
                        "source": "gazetteer",
                        "priority": 2  # 제목 가제티어에 중간 우선순위 부여
                    })
                    start_pos = end_pos

    # 모델 엔티티와 가제티어 엔티티 합치기
    all_entities = model_entities + gazetteer_entities

    # 우선순위 정렬:
    # 1. 우선순위 높은 것부터 (작가 > 제목 > 모델)
    # 2. 같은 우선순위면 긴 것 먼저
    all_entities.sort(key=lambda x: (-x['priority'], -(x['end'] - x['start']), x['start']))

    # 범위 기반으로 중복/포함 관계 처리
    final_entities = []
    covered_positions = set()  # 이미 처리된 위치 추적

    for entity in all_entities:
        start = entity['start']
        end = entity['end']
        label = entity['label']

        # 완전히 동일한 엔티티인지 확인
        is_duplicate = False
        for existing in final_entities:
            if (start == existing['start'] and
                end == existing['end'] and
                label == existing['label']):
                is_duplicate = True
                break

        if is_duplicate:
            continue

        # 특별 처리: 작가 엔티티는 제목에 포함되어도 별도로 처리
        # 제목 안에 있는 작가 이름도 추출하기 위함
        if label == "author":
            # 이 작가 엔티티가 포함된 제목 엔티티가 있는지 확인
            is_inside_title = False
            for existing in final_entities:
                if (existing['label'] == 'title' and
                    start >= existing['start'] and
                    end <= existing['end']):
                    is_inside_title = True
                    break

            # 작가가 제목 안에 있더라도 추가
            if not is_inside_title or True:  # 항상 작가 추가
                # 포지션 중복 검사를 위한 임시 세트
                entity_positions = set(range(start, end))
                # 이미 처리된 작가 위치와 크게 겹치는지 확인
                overlap_ratio = len(entity_positions.intersection(covered_positions)) / len(entity_positions) if entity_positions else 0

                if overlap_ratio < 0.5:  # 50% 미만으로 겹치면 추가
                    final_entities.append(entity)
                    covered_positions.update(entity_positions)
                continue

        # 일반 엔티티 중복 처리 (작가 제외)
        entity_positions = set(range(start, end))
        # 기존 커버된 위치와 얼마나 겹치는지 계산
        overlap_ratio = len(entity_positions.intersection(covered_positions)) / len(entity_positions) if entity_positions else 0

        # 50% 미만으로 겹치면 추가
        if overlap_ratio < 0.5:
            final_entities.append(entity)
            covered_positions.update(entity_positions)

    # 우선순위 필드 제거 (출력에 불필요)
    for entity in final_entities:
        if 'priority' in entity:
            del entity['priority']

    # === 가제티어 처리 개선 끝 ===

    # 문자 단위 시각화를 위한 태그 매핑
    char_tags = ["_"] * len(text)
    for entity in sorted(final_entities, key=lambda x: x['end'] - x['start'], reverse=True):
        start_pos = entity["start"]
        end_pos = entity["end"]
        entity_type = entity["label"]

        # 이미 다른 (더 긴) 엔티티가 표시된 부분은 덮어쓰지 않음
        can_mark = True
        for k in range(start_pos, end_pos):
            if char_tags[k] != '_' and char_tags[k] != f"B-{entity_type}" and char_tags[k] != f"I-{entity_type}":
                can_mark = False
                break

        if can_mark:
            char_tags[start_pos] = f"B-{entity_type}"
            for k in range(start_pos + 1, end_pos):
                char_tags[k] = f"I-{entity_type}"

    return {
        "tokens": content_tokens,
        "tags": content_tags,
        "entities": final_entities,
        "token_probabilities": token_probabilities_list,
        "char_tags": char_tags
    }

# --- 3. 통합 예측 함수 ---
def predict(text):
    global intent_model, intent_tokenizer, intent_id2label, ner_model, ner_tokenizer, ner_id2label

    if not intent_model or not ner_model:
        return {"text": text, "intent": "오류: 모델 로드 실패", "confidence": 0.0, "entities": [], "error": "모델 로드 실패"}
    try:
        # 가제티어 로드
        gazetteer = load_gazetteer()

        # 의도 예측
        intent_result = predict_intent(text)

        # 개체명 인식 (가제티어 전달)
        ner_result = predict_entities(text, gazetteer)

        # 문자별 태그 시각화 추가
        char_tags = ner_result.get("char_tags", ["_"] * len(text))

        return {
            "text": text,
            "intent": intent_result["intent"],
            "confidence": max(intent_result["confidence"]) if intent_result.get("confidence") else 0.0,
            "entities": ner_result["entities"],
            "ner_tokens": ner_result.get("tokens", []),
            "ner_token_probabilities": ner_result.get("token_probabilities", []),
            "char_tags": "".join(char_tags)  # 문자 단위 시각화
        }
    except Exception as e:
        import traceback
        print(f"예측 중 오류 발생: {e}")
        traceback.print_exc()
        return {"text": text, "intent": "예측 오류", "confidence": 0.0, "entities": [],
                "ner_tokens": [], "ner_token_probabilities": [], "error": str(e)}

# --- 4. UI 생성 및 이벤트 처리 ---
def create_ui():
    # 위젯 생성
    title = widgets.HTML(value="<h2>도서 검색 NLU 모델 예측 인터페이스</h2>")
    text_widget = widgets.Text(
        description='질문:',
        placeholder='질문을 입력하세요',
        layout=widgets.Layout(width='80%')
    )
    button = widgets.Button(
        description='예측',
        button_style='primary',
        tooltip='입력된 텍스트에 대해 예측 실행'
    )
    output = widgets.Output()

    # 버튼 클릭 이벤트 처리
    def on_button_click(b):
        with output:
            clear_output(wait=True)
            user_input = text_widget.value
            if not user_input:
                print("질문을 입력해주세요.")
                return

            print(f"입력: \"{user_input}\"")
            print("예측 수행 중...")
            result = predict(user_input)

            print(f"\n{'='*60}")
            if "error" in result:
                print(f"🚨 예측 오류: {result['error']}")
            else:
                print(f"📄 입력 텍스트: \"{result['text']}\"")
                print(f"🎯 의도 분석 결과: {result['intent']} (확률: {result.get('confidence', 0.0):.4f})")

                print("\n🔍 개체명 인식 결과:")
                if result.get('entities'):
                    for i, entity in enumerate(result['entities'], 1):
                        print(f"  {i}. \"{entity['entity']}\" → {entity['label']}")
                else:
                    print("  인식된 개체명 없음")

                print("\n📊 토큰별 NER 확률:")
                if result.get('ner_tokens') and result.get('ner_token_probabilities'):
                    if ner_id2label:
                        for token, probs in zip(result['ner_tokens'], result['ner_token_probabilities']):
                            print(f"  - 토큰: '{token}'")
                            sorted_probs = sorted(enumerate(probs), key=lambda item: item[1], reverse=True)
                            # 상위 3개만 표시
                            for tag_id, prob in sorted_probs[:3]:
                                print(f"      {ner_id2label.get(tag_id, f'ID_{tag_id}')}: {prob:.4f}")
                    else:
                        print("  (오류: NER 레이블 매핑 정보(ner_id2label)를 찾을 수 없습니다.)")
                else:
                    print("  토큰별 확률 정보 없음.")

                print("\n📝 문자별 NER 태그 시각화:")
                print(f"  원문: {result['text']}")

                text = result['text']
                tag_markers = ['_'] * len(text)
                processed_indices = set()

                for entity in sorted(result.get('entities', []), key=lambda x: len(x['entity']), reverse=True):
                    entity_text = entity['entity']
                    entity_type = entity['label']
                    start_pos = -1
                    search_start = 0

                    while True:
                        temp_pos = text.find(entity_text, search_start)
                        if temp_pos == -1: break

                        is_overlapping = False
                        for i in range(temp_pos, temp_pos + len(entity_text)):
                            if i in processed_indices:
                                is_overlapping = True
                                break

                        if not is_overlapping:
                            start_pos = temp_pos
                            break

                        search_start = temp_pos + 1

                    if start_pos != -1:
                        end_pos = start_pos + len(entity_text)
                        for i in range(start_pos, end_pos):
                            if i not in processed_indices:
                                tag_markers[i] = '^'
                                processed_indices.add(i)

                        tag_label = f"[{entity_type}]"
                        for i, char in enumerate(tag_label):
                            pos = start_pos + i
                            if pos < len(tag_markers) and tag_markers[pos] == '^':
                                tag_markers[pos] = char

                print(f"  태그: {''.join(tag_markers)}")

    # 이벤트 연결
    button.on_click(on_button_click)

    # 엔터키로도 예측 실행
    def on_enter(widget):
        button.click()

    text_widget.on_submit(on_enter)

    # UI 표시
    display(title)

    model_status = widgets.HTML(value="<h3>모델 로드 중...</h3>")
    display(model_status)

    # 모델 로드
    intent_loaded = load_intent_model()
    ner_loaded = load_ner_model()

    if intent_loaded and ner_loaded:
        model_status.value = "<h3 style='color:green'>✓ 모델 로드 완료</h3>"
    else:
        model_status.value = "<h3 style='color:red'>✗ 모델 로드 실패</h3>"

    # 입력 UI 표시
    display(widgets.HBox([text_widget, button]))
    display(output)

# 메인 실행
if __name__ == "__main__":
    create_ui()
else:
    # Jupyter에서 직접 실행하는 경우
    create_ui()

HTML(value='<h2>도서 검색 NLU 모델 예측 인터페이스</h2>')

HTML(value='<h3>모델 로드 중...</h3>')

Intent 모델 로드 성공: 40개 의도 클래스


You are using a model of type roberta_crf to instantiate a model of type roberta. This is not supported for all configurations of models and can yield errors.
Some weights of RobertaModel were not initialized from the model checkpoint at ./models/ner and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


누락된 키: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
예상치 못한 키: []
NER 모델 로드 성공: 51개 NER 태그


HBox(children=(Text(value='', description='질문:', layout=Layout(width='80%'), placeholder='질문을 입력하세요'), Button(…

Output()