In [None]:
# 필요한 라이브러리 설치
!pip install transformers torch torchcrf

In [2]:
!pip install torchcrf



In [6]:
pip install pytorch-crf==0.4.0

Collecting pytorch-crf==0.4.0
  Downloading pytorch_crf-0.4.0-py3-none-any.whl.metadata (3.5 kB)
Downloading pytorch_crf-0.4.0-py3-none-any.whl (11 kB)
Installing collected packages: pytorch-crf
Successfully installed pytorch-crf-0.4.0


In [32]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import DistilBertTokenizerFast, DistilBertPreTrainedModel, DistilBertModel, DistilBertConfig
from torchcrf import CRF
from sklearn.model_selection import train_test_split

# 개체 및 태그 정의
ENTITY_TAGS = {
    "PER": ["B-PER", "I-PER"],
    "ORG": ["B-ORG", "I-ORG"],
    "EDU": ["B-EDU", "I-EDU"],
    "AFF": ["B-AFF", "I-AFF"],
    "POS": ["B-POS", "I-POS"],
    "LOC": ["B-LOC", "I-LOC"],
    "DUR": ["B-DUR", "I-DUR"]
}

class EnhancedNERModel(DistilBertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.distilbert = DistilBertModel(config)
        self.dropout = nn.Dropout(config.dropout)
        self.sentence_classifier = nn.Linear(config.dim, 2)  # 문장 분류 (민감/비민감)
        self.token_classifier = nn.Linear(config.dim * 2, config.num_labels)
        self.init_weights()

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None,
        sentence_labels=None,
    ):
        outputs = self.distilbert(
            input_ids,
            attention_mask=attention_mask,
        )

        sequence_output = outputs[0]
        pooled_output = sequence_output[:, 0]  # [CLS] 토큰의 출력

        # 문장 수준 분류
        sentence_logits = self.sentence_classifier(pooled_output)
        sentence_loss = None
        if sentence_labels is not None:
            sentence_loss_fct = nn.CrossEntropyLoss()
            sentence_loss = sentence_loss_fct(sentence_logits.view(-1, 2), sentence_labels.view(-1))

        # 문장 수준 정보를 토큰 수준 정보와 결합
        expanded_pooled_output = pooled_output.unsqueeze(1).expand(-1, sequence_output.size(1), -1)
        combined_output = torch.cat([sequence_output, expanded_pooled_output], dim=-1)

        token_logits = self.token_classifier(self.dropout(combined_output))

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
            token_loss = loss_fct(token_logits.view(-1, self.num_labels), labels.view(-1))

            loss = token_loss
            if sentence_loss is not None:
                loss += sentence_loss

        return {
            "loss": loss,
            "token_logits": token_logits,
            "sentence_logits": sentence_logits,
        }

    def decode(self, token_logits, attention_mask):
        return torch.argmax(token_logits, dim=-1)

class EnhancedNERDataset(Dataset):
    def __init__(self, texts, tags, sentence_labels, tokenizer, max_len=128):
        self.texts = texts
        self.tags = tags
        self.sentence_labels = sentence_labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.label_dict = {"O": 0}
        for entity, entity_tags in ENTITY_TAGS.items():
            for tag in entity_tags:
                if tag not in self.label_dict:
                    self.label_dict[tag] = len(self.label_dict)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        tags = self.tags[idx]
        sentence_label = self.sentence_labels[idx]

        encoding = self.tokenizer(text, padding='max_length', truncation=True, max_length=self.max_len, return_tensors="pt")
        input_ids = encoding["input_ids"].squeeze()
        attention_mask = encoding["attention_mask"].squeeze()

        # 첫 번째 토큰([CLS])의 마스크를 1로 설정
        attention_mask[0] = 1

        # [CLS]와 [SEP] 토큰을 포함한 라벨 생성
        labels = torch.tensor([-100] + [self.label_dict.get(tag, 0) for tag in tags] + [-100])

        # 패딩
        if len(labels) < self.max_len:
            labels = torch.cat([labels, torch.tensor([-100] * (self.max_len - len(labels)))])
        else:
            labels = labels[:self.max_len]

        # 첫 번째 토큰([CLS])의 라벨을 0으로 설정
        labels[0] = 0

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels,
            "sentence_labels": torch.tensor(sentence_label),
        }



def train(model, train_loader, optimizer, device):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        sentence_labels = batch["sentence_labels"].to(device)

        # 마스크의 첫 번째 요소를 True로 설정
        attention_mask[:, 0] = 1

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
        loss = outputs["loss"]
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    return total_loss / len(train_loader)

def evaluate(model, test_loader, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            sentence_labels = batch["sentence_labels"].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
            total_loss += outputs["loss"].item()
    return total_loss / len(test_loader)


def identify_entities(text, model, tokenizer, device, label_dict):
    encoding = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    input_ids = encoding["input_ids"].to(device)
    attention_mask = encoding["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        token_logits = outputs["token_logits"]
        sentence_logits = outputs["sentence_logits"]

    token_predictions = model.decode(token_logits, attention_mask)
    sentence_prediction = torch.argmax(sentence_logits, dim=1).item()

    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
    ner_labels = [list(label_dict.keys())[pred] for pred in token_predictions[0]]

    entities = []
    current_entity = None
    for token, label in zip(tokens, ner_labels):
        if label.startswith("B-"):
            if current_entity:
                entities.append(current_entity)
            current_entity = {"type": label[2:], "text": token}
        elif label.startswith("I-") and current_entity and current_entity["type"] == label[2:]:
            current_entity["text"] += " " + token
        else:
            if current_entity:
                entities.append(current_entity)
                current_entity = None

    if current_entity:
        entities.append(current_entity)

    sentence_sensitivity = "민감" if sentence_prediction == 1 else "비민감"

    return entities, sentence_sensitivity

# 메인 실행 부분
if __name__ == "__main__":
    # 데이터 준비 (예시)
    texts = [
        "김철수씨는 서울대학교 컴퓨터공학과를 졸업하고 현재 구글 코리아에서 소프트웨어 엔지니어로 2년째 근무 중입니다.",
        "이회사 대표이사 박영희는 연세대학교 경영학과 출신으로 알려져 있습니다.",
        "저는 한국대학교 물리학과에 재학 중인 학생입니다."
    ]
    tags = [
        ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "B-ORG", "I-ORG", "O", "B-POS", "I-POS", "O", "B-DUR", "I-DUR", "O", "O", "O"],
        ["O", "B-POS", "B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O"],
        ["O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O", "O"]
    ]
    sentence_labels = [1, 1, 0]  # 1: 민감, 0: 비민감

    # 데이터 분할
    train_texts, test_texts, train_tags, test_tags, train_sentence_labels, test_sentence_labels = train_test_split(
        texts, tags, sentence_labels, test_size=0.2, random_state=42
    )

    # 토크나이저 및 모델 초기화
    tokenizer = DistilBertTokenizerFast.from_pretrained("monologg/distilkobert")
    config = DistilBertConfig.from_pretrained("monologg/distilkobert")
    config.num_labels = len(ENTITY_TAGS) * 2 + 1  # B, I for each entity + O

    # 데이터셋 및 데이터로더 생성
    train_dataset = EnhancedNERDataset(train_texts, train_tags, train_sentence_labels, tokenizer)
    test_dataset = EnhancedNERDataset(test_texts, test_tags, test_sentence_labels, tokenizer)
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=2)

    # 모델 초기화
    model = EnhancedNERModel(config)

    # 학습 설정
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

    # 학습
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, optimizer, device)
        eval_loss = evaluate(model, test_loader, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Eval Loss: {eval_loss:.4f}")

    # 모델 저장
    torch.save(model.state_dict(), "enhanced_ner_model.pth")

    # 추론 예시
    test_text = "김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다."
    entities, sentence_sensitivity = identify_entities(test_text, model, tokenizer, device, train_dataset.label_dict)

    print("\n추론 결과:")
    print("입력 텍스트:", test_text)
    print("식별된 개체:")
    for entity in entities:
        print(f"- {entity['text']} ({entity['type']})")
    print(f"문장 민감도: {sentence_sensitivity}")


Epoch 1/10, Train Loss: 3.5570, Eval Loss: 2.9723
Epoch 2/10, Train Loss: 2.2772, Eval Loss: 3.4505
Epoch 3/10, Train Loss: 2.3759, Eval Loss: 3.2653
Epoch 4/10, Train Loss: 2.1237, Eval Loss: 2.8385
Epoch 5/10, Train Loss: 1.6698, Eval Loss: 2.6309
Epoch 6/10, Train Loss: 1.4479, Eval Loss: 2.7175
Epoch 7/10, Train Loss: 1.4055, Eval Loss: 2.8179
Epoch 8/10, Train Loss: 1.4852, Eval Loss: 2.9339
Epoch 9/10, Train Loss: 1.1470, Eval Loss: 3.0918
Epoch 10/10, Train Loss: 1.0359, Eval Loss: 3.1265

추론 결과:
입력 텍스트: 김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다.
식별된 개체:
- [UNK] (ORG)
- [UNK] (PER)
- [UNK] (EDU)
- [UNK] (ORG)
- [UNK] (EDU)
문장 민감도: 비민감


In [34]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import DistilBertTokenizerFast, DistilBertPreTrainedModel, DistilBertModel, DistilBertConfig
from sklearn.model_selection import train_test_split

# 개체 및 태그 정의
ENTITY_TAGS = {
    "PER": ["B-PER", "I-PER"],
    "ORG": ["B-ORG", "I-ORG"],
    "EDU": ["B-EDU", "I-EDU"],
    "AFF": ["B-AFF", "I-AFF"],
    "POS": ["B-POS", "I-POS"],
    "LOC": ["B-LOC", "I-LOC"],
    "DUR": ["B-DUR", "I-DUR"]
}

class EnhancedNERModel(DistilBertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.distilbert = DistilBertModel(config)
        self.dropout = nn.Dropout(config.dropout)
        self.sentence_classifier = nn.Linear(config.dim, 2)  # 문장 분류 (민감/비민감)
        self.token_classifier = nn.Linear(config.dim * 2, config.num_labels)
        self.init_weights()

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None,
        sentence_labels=None,
    ):
        outputs = self.distilbert(
            input_ids,
            attention_mask=attention_mask,
        )

        sequence_output = outputs[0]
        pooled_output = sequence_output[:, 0]  # [CLS] 토큰의 출력

        # 문장 수준 분류
        sentence_logits = self.sentence_classifier(pooled_output)
        sentence_loss = None
        if sentence_labels is not None:
            sentence_loss_fct = nn.CrossEntropyLoss()
            sentence_loss = sentence_loss_fct(sentence_logits.view(-1, 2), sentence_labels.view(-1))

        # 문장 수준 정보를 토큰 수준 정보와 결합
        expanded_pooled_output = pooled_output.unsqueeze(1).expand(-1, sequence_output.size(1), -1)
        combined_output = torch.cat([sequence_output, expanded_pooled_output], dim=-1)

        token_logits = self.token_classifier(self.dropout(combined_output))

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            active_loss = attention_mask.view(-1) == 1
            active_logits = token_logits.view(-1, self.num_labels)
            active_labels = torch.where(
                active_loss, labels.view(-1), torch.tensor(loss_fct.ignore_index).type_as(labels)
            )
            loss = loss_fct(active_logits, active_labels)
            if sentence_loss is not None:
                loss += sentence_loss

        return {
            "loss": loss,
            "token_logits": token_logits,
            "sentence_logits": sentence_logits,
        }

    def decode(self, token_logits, attention_mask):
        predictions = torch.argmax(token_logits, dim=-1)
        return predictions


class EnhancedNERDataset(Dataset):
    def __init__(self, texts, tags, sentence_labels, tokenizer, max_len=128):
        self.texts = texts
        self.tags = tags
        self.sentence_labels = sentence_labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.label_dict = {"O": 0}
        for entity, entity_tags in ENTITY_TAGS.items():
            for tag in entity_tags:
                if tag not in self.label_dict:
                    self.label_dict[tag] = len(self.label_dict)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        tags = self.tags[idx]
        sentence_label = self.sentence_labels[idx]

        encoding = self.tokenizer(text, padding='max_length', truncation=True, max_length=self.max_len, return_tensors="pt")
        input_ids = encoding["input_ids"].squeeze()
        attention_mask = encoding["attention_mask"].squeeze()

        # 첫 번째 토큰([CLS])의 마스크를 1로 설정
        attention_mask[0] = 1

        # [CLS]와 [SEP] 토큰을 포함한 라벨 생성
        labels = torch.tensor([-100] + [self.label_dict.get(tag, 0) for tag in tags] + [-100])

        # 패딩
        if len(labels) < self.max_len:
            labels = torch.cat([labels, torch.tensor([-100] * (self.max_len - len(labels)))])
        else:
            labels = labels[:self.max_len]

        # 첫 번째 토큰([CLS])의 라벨을 0으로 설정
        labels[0] = 0

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels,
            "sentence_labels": torch.tensor(sentence_label),
        }


def train(model, train_loader, optimizer, device):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        sentence_labels = batch["sentence_labels"].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
        loss = outputs["loss"]
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    return total_loss / len(train_loader)

def evaluate(model, test_loader, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            sentence_labels = batch["sentence_labels"].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
            total_loss += outputs["loss"].item()
    return total_loss / len(test_loader)


def identify_entities(text, model, tokenizer, device, label_dict):
    encoding = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    input_ids = encoding["input_ids"].to(device)
    attention_mask = encoding["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        token_logits = outputs["token_logits"]
        sentence_logits = outputs["sentence_logits"]

    token_predictions = model.decode(token_logits, attention_mask)
    sentence_prediction = torch.argmax(sentence_logits, dim=1).item()

    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
    ner_labels = [list(label_dict.keys())[pred] for pred in token_predictions[0]]

    entities = []
    current_entity = None
    for token, label in zip(tokens, ner_labels):
        if label.startswith("B-"):
            if current_entity:
                entities.append(current_entity)
            current_entity = {"type": label[2:], "text": token}
        elif label.startswith("I-") and current_entity and current_entity["type"] == label[2:]:
            current_entity["text"] += " " + token
        else:
            if current_entity:
                entities.append(current_entity)
                current_entity = None

    if current_entity:
        entities.append(current_entity)

    sentence_sensitivity = "민감" if sentence_prediction == 1 else "비민감"

    return entities, sentence_sensitivity

# 메인 실행 부분
if __name__ == "__main__":
    # 데이터 준비 (예시)
    texts = [
        "김철수씨는 서울대학교 컴퓨터공학과를 졸업하고 현재 구글 코리아에서 소프트웨어 엔지니어로 2년째 근무 중입니다.",
        "이회사 대표이사 박영희는 연세대학교 경영학과 출신으로 알려져 있습니다.",
        "저는 한국대학교 물리학과에 재학 중인 학생입니다."
    ]
    tags = [
        ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "B-ORG", "I-ORG", "O", "B-POS", "I-POS", "O", "B-DUR", "I-DUR", "O", "O", "O"],
        ["O", "B-POS", "B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O"],
        ["O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O", "O"]
    ]
    sentence_labels = [1, 1, 0]  # 1: 민감, 0: 비민감

    # 데이터 분할
    train_texts, test_texts, train_tags, test_tags, train_sentence_labels, test_sentence_labels = train_test_split(
        texts, tags, sentence_labels, test_size=0.2, random_state=42
    )

    # 토크나이저 및 모델 초기화
    tokenizer = DistilBertTokenizerFast.from_pretrained("monologg/distilkobert")
    config = DistilBertConfig.from_pretrained("monologg/distilkobert")
    config.num_labels = len(ENTITY_TAGS) * 2 + 1  # B, I for each entity + O

    # 데이터셋 및 데이터로더 생성
    train_dataset = EnhancedNERDataset(train_texts, train_tags, train_sentence_labels, tokenizer)
    test_dataset = EnhancedNERDataset(test_texts, test_tags, test_sentence_labels, tokenizer)
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=2)

    # 모델 초기화
    model = EnhancedNERModel(config)

    # 학습 설정
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

    # 학습
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, optimizer, device)
        eval_loss = evaluate(model, test_loader, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Eval Loss: {eval_loss:.4f}")

    # 모델 저장
    torch.save(model.state_dict(), "enhanced_ner_model.pth")

    # 추론 예시
    test_text = "김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다."
    entities, sentence_sensitivity = identify_entities(test_text, model, tokenizer, device, train_dataset.label_dict)

    print("\n추론 결과:")
    print("입력 텍스트:", test_text)
    print("식별된 개체:")
    for entity in entities:
        print(f"- {entity['text']} ({entity['type']})")
    print(f"문장 민감도: {sentence_sensitivity}")


Epoch 1/10, Train Loss: 3.6476, Eval Loss: 2.7678
Epoch 2/10, Train Loss: 2.9260, Eval Loss: 2.8414
Epoch 3/10, Train Loss: 2.5470, Eval Loss: 2.5781
Epoch 4/10, Train Loss: 2.0865, Eval Loss: 2.4768
Epoch 5/10, Train Loss: 1.5965, Eval Loss: 2.5025
Epoch 6/10, Train Loss: 1.5950, Eval Loss: 2.5183
Epoch 7/10, Train Loss: 1.5232, Eval Loss: 2.5943
Epoch 8/10, Train Loss: 1.3054, Eval Loss: 2.6888
Epoch 9/10, Train Loss: 1.0037, Eval Loss: 2.8350
Epoch 10/10, Train Loss: 0.8831, Eval Loss: 2.9779

추론 결과:
입력 텍스트: 김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다.
식별된 개체:
- [UNK] (ORG)
- [UNK] (PER)
- [UNK] (EDU)
문장 민감도: 민감


In [36]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import DistilBertTokenizerFast, DistilBertPreTrainedModel, DistilBertModel, DistilBertConfig
from sklearn.model_selection import train_test_split

# 개체 및 태그 정의
ENTITY_TAGS = {
    "PER": ["B-PER", "I-PER"],
    "ORG": ["B-ORG", "I-ORG"],
    "EDU": ["B-EDU", "I-EDU"],
    "AFF": ["B-AFF", "I-AFF"],
    "POS": ["B-POS", "I-POS"],
    "LOC": ["B-LOC", "I-LOC"],
    "DUR": ["B-DUR", "I-DUR"]
}

class EnhancedNERModel(DistilBertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.distilbert = DistilBertModel(config)
        self.dropout = nn.Dropout(config.dropout)
        self.sentence_classifier = nn.Linear(config.dim, 2)  # 문장 분류 (민감/비민감)
        self.token_classifier = nn.Linear(config.dim * 2, config.num_labels)
        self.init_weights()

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None,
        sentence_labels=None,
    ):
        outputs = self.distilbert(
            input_ids,
            attention_mask=attention_mask,
        )

        sequence_output = outputs[0]
        pooled_output = sequence_output[:, 0]  # [CLS] 토큰의 출력

        # 문장 수준 분류
        sentence_logits = self.sentence_classifier(pooled_output)
        sentence_loss = None
        if sentence_labels is not None:
            sentence_loss_fct = nn.CrossEntropyLoss()
            sentence_loss = sentence_loss_fct(sentence_logits.view(-1, 2), sentence_labels.view(-1))

        # 문장 수준 정보를 토큰 수준 정보와 결합
        expanded_pooled_output = pooled_output.unsqueeze(1).expand(-1, sequence_output.size(1), -1)
        combined_output = torch.cat([sequence_output, expanded_pooled_output], dim=-1)

        token_logits = self.token_classifier(self.dropout(combined_output))

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            active_loss = attention_mask.view(-1) == 1
            active_logits = token_logits.view(-1, self.num_labels)
            active_labels = torch.where(
                active_loss, labels.view(-1), torch.tensor(loss_fct.ignore_index).type_as(labels)
            )
            loss = loss_fct(active_logits, active_labels)
            if sentence_loss is not None:
                loss += sentence_loss

        return {
            "loss": loss,
            "token_logits": token_logits,
            "sentence_logits": sentence_logits,
        }

    def decode(self, token_logits, attention_mask):
        predictions = torch.argmax(token_logits, dim=-1)
        return predictions


class EnhancedNERDataset(Dataset):
    def __init__(self, texts, tags, sentence_labels, tokenizer, max_len=128):
        self.texts = texts
        self.tags = tags
        self.sentence_labels = sentence_labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.label_dict = {"O": 0}
        for entity, entity_tags in ENTITY_TAGS.items():
            for tag in entity_tags:
                if tag not in self.label_dict:
                    self.label_dict[tag] = len(self.label_dict)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        tags = self.tags[idx]
        sentence_label = self.sentence_labels[idx]

        encoding = self.tokenizer(text, padding='max_length', truncation=True, max_length=self.max_len, return_tensors="pt")
        input_ids = encoding["input_ids"].squeeze()
        attention_mask = encoding["attention_mask"].squeeze()

        # 첫 번째 토큰([CLS])의 마스크를 1로 설정
        attention_mask[0] = 1

        # [CLS]와 [SEP] 토큰을 포함한 라벨 생성
        labels = torch.tensor([-100] + [self.label_dict.get(tag, 0) for tag in tags] + [-100])

        # 패딩
        if len(labels) < self.max_len:
            labels = torch.cat([labels, torch.tensor([-100] * (self.max_len - len(labels)))])
        else:
            labels = labels[:self.max_len]

        # 첫 번째 토큰([CLS])의 라벨을 0으로 설정
        labels[0] = 0

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels,
            "sentence_labels": torch.tensor(sentence_label),
        }


def train(model, train_loader, optimizer, device):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        sentence_labels = batch["sentence_labels"].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
        loss = outputs["loss"]
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    return total_loss / len(train_loader)

def evaluate(model, test_loader, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            sentence_labels = batch["sentence_labels"].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
            total_loss += outputs["loss"].item()
    return total_loss / len(test_loader)


def identify_entities(text, model, tokenizer, device, label_dict):
    encoding = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    input_ids = encoding["input_ids"].to(device)
    attention_mask = encoding["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        token_logits = outputs["token_logits"]
        sentence_logits = outputs["sentence_logits"]

    token_predictions = model.decode(token_logits, attention_mask)
    sentence_prediction = torch.argmax(sentence_logits, dim=1).item()

    # Instead of using tokenizer.convert_ids_to_tokens, use the original text's tokens
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0], skip_special_tokens=True)
    token_predictions = token_predictions[0].cpu().numpy()

    entities = []
    current_entity = None
    for token, label_id in zip(tokens, token_predictions):
        label = list(label_dict.keys())[label_id]
        if label.startswith("B-"):
            if current_entity:
                entities.append(current_entity)
            current_entity = {"type": label[2:], "text": token}
        elif label.startswith("I-") and current_entity and current_entity["type"] == label[2:]:
            current_entity["text"] += " " + token
        else:
            if current_entity:
                entities.append(current_entity)
                current_entity = None

    if current_entity:
        entities.append(current_entity)

    sentence_sensitivity = "민감" if sentence_prediction == 1 else "비민감"

    return entities, sentence_sensitivity

# 메인 실행 부분
if __name__ == "__main__":
    # 데이터 준비 (예시)
    texts = [
        "김철수씨는 서울대학교 컴퓨터공학과를 졸업하고 현재 구글 코리아에서 소프트웨어 엔지니어로 2년째 근무 중입니다.",
        "이회사 대표이사 박영희는 연세대학교 경영학과 출신으로 알려져 있습니다.",
        "저는 한국대학교 물리학과에 재학 중인 학생입니다."
    ]
    tags = [
        ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "B-ORG", "I-ORG", "O", "B-POS", "I-POS", "O", "B-DUR", "I-DUR", "O", "O", "O"],
        ["O", "B-POS", "B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O"],
        ["O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O", "O"]
    ]
    sentence_labels = [1, 1, 0]  # 1: 민감, 0: 비민감

    # 데이터 분할
    train_texts, test_texts, train_tags, test_tags, train_sentence_labels, test_sentence_labels = train_test_split(
        texts, tags, sentence_labels, test_size=0.2, random_state=42
    )

    # 토크나이저 및 모델 초기화
    tokenizer = DistilBertTokenizerFast.from_pretrained("monologg/distilkobert")
    config = DistilBertConfig.from_pretrained("monologg/distilkobert")
    config.num_labels = len(ENTITY_TAGS) * 2 + 1  # B, I for each entity + O

    # 데이터셋 및 데이터로더 생성
    train_dataset = EnhancedNERDataset(train_texts, train_tags, train_sentence_labels, tokenizer)
    test_dataset = EnhancedNERDataset(test_texts, test_tags, test_sentence_labels, tokenizer)
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=2)

    # 모델 초기화
    model = EnhancedNERModel(config)

    # 학습 설정
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

    # 학습
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, optimizer, device)
        eval_loss = evaluate(model, test_loader, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Eval Loss: {eval_loss:.4f}")

    # 모델 저장
    torch.save(model.state_dict(), "enhanced_ner_model.pth")

    # 추론 예시
    test_text = "김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다."
    entities, sentence_sensitivity = identify_entities(test_text, model, tokenizer, device, train_dataset.label_dict)

    print("\n추론 결과:")
    print("입력 텍스트:", test_text)
    print("식별된 개체:")
    for entity in entities:
        print(f"- {entity['text']} ({entity['type']})")
    print(f"문장 민감도: {sentence_sensitivity}")


Epoch 1/10, Train Loss: 3.5436, Eval Loss: 3.2226
Epoch 2/10, Train Loss: 2.5932, Eval Loss: 3.0084
Epoch 3/10, Train Loss: 2.3537, Eval Loss: 2.8014
Epoch 4/10, Train Loss: 1.8110, Eval Loss: 2.8706
Epoch 5/10, Train Loss: 1.7926, Eval Loss: 3.0120
Epoch 6/10, Train Loss: 1.5126, Eval Loss: 2.9965
Epoch 7/10, Train Loss: 1.2298, Eval Loss: 2.9322
Epoch 8/10, Train Loss: 1.1261, Eval Loss: 2.9384
Epoch 9/10, Train Loss: 1.1242, Eval Loss: 3.0252
Epoch 10/10, Train Loss: 0.8748, Eval Loss: 3.1504

추론 결과:
입력 텍스트: 김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다.
식별된 개체:
- . (POS)
문장 민감도: 민감


In [37]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import DistilBertTokenizerFast, DistilBertPreTrainedModel, DistilBertModel, DistilBertConfig
from sklearn.model_selection import train_test_split

# 개체 및 태그 정의
ENTITY_TAGS = {
    "PER": ["B-PER", "I-PER"],
    "ORG": ["B-ORG", "I-ORG"],
    "EDU": ["B-EDU", "I-EDU"],
    "AFF": ["B-AFF", "I-AFF"],
    "POS": ["B-POS", "I-POS"],
    "LOC": ["B-LOC", "I-LOC"],
    "DUR": ["B-DUR", "I-DUR"]
}

class EnhancedNERModel(DistilBertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.distilbert = DistilBertModel(config)
        self.dropout = nn.Dropout(config.dropout)
        self.sentence_classifier = nn.Linear(config.dim, 2)  # 문장 분류 (민감/비민감)
        self.token_classifier = nn.Linear(config.dim * 2, config.num_labels)
        self.init_weights()

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        labels=None,
        sentence_labels=None,
    ):
        outputs = self.distilbert(
            input_ids,
            attention_mask=attention_mask,
        )

        sequence_output = outputs[0]
        pooled_output = sequence_output[:, 0]  # [CLS] 토큰의 출력

        # 문장 수준 분류
        sentence_logits = self.sentence_classifier(pooled_output)
        sentence_loss = None
        if sentence_labels is not None:
            sentence_loss_fct = nn.CrossEntropyLoss()
            sentence_loss = sentence_loss_fct(sentence_logits.view(-1, 2), sentence_labels.view(-1))

        # 문장 수준 정보를 토큰 수준 정보와 결합
        expanded_pooled_output = pooled_output.unsqueeze(1).expand(-1, sequence_output.size(1), -1)
        combined_output = torch.cat([sequence_output, expanded_pooled_output], dim=-1)

        token_logits = self.token_classifier(self.dropout(combined_output))

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            active_loss = attention_mask.view(-1) == 1
            active_logits = token_logits.view(-1, self.num_labels)
            active_labels = torch.where(
                active_loss, labels.view(-1), torch.tensor(loss_fct.ignore_index).type_as(labels)
            )
            loss = loss_fct(active_logits, active_labels)
            if sentence_loss is not None:
                loss += sentence_loss

        return {
            "loss": loss,
            "token_logits": token_logits,
            "sentence_logits": sentence_logits,
        }

    def decode(self, token_logits, attention_mask):
        predictions = torch.argmax(token_logits, dim=-1)
        return predictions


class EnhancedNERDataset(Dataset):
    def __init__(self, texts, tags, sentence_labels, tokenizer, max_len=128):
        self.texts = texts
        self.tags = tags
        self.sentence_labels = sentence_labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.label_dict = {"O": 0}
        for entity, entity_tags in ENTITY_TAGS.items():
            for tag in entity_tags:
                if tag not in self.label_dict:
                    self.label_dict[tag] = len(self.label_dict)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        tags = self.tags[idx]
        sentence_label = self.sentence_labels[idx]

        encoding = self.tokenizer(text, padding='max_length', truncation=True, max_length=self.max_len, return_tensors="pt")
        input_ids = encoding["input_ids"].squeeze()
        attention_mask = encoding["attention_mask"].squeeze()

        # 첫 번째 토큰([CLS])의 마스크를 1로 설정
        attention_mask[0] = 1

        # [CLS]와 [SEP] 토큰을 포함한 라벨 생성
        labels = torch.tensor([-100] + [self.label_dict.get(tag, 0) for tag in tags] + [-100])

        # 패딩
        if len(labels) < self.max_len:
            labels = torch.cat([labels, torch.tensor([-100] * (self.max_len - len(labels)))])
        else:
            labels = labels[:self.max_len]

        # 첫 번째 토큰([CLS])의 라벨을 0으로 설정
        labels[0] = 0

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels,
            "sentence_labels": torch.tensor(sentence_label),
        }


def train(model, train_loader, optimizer, device):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        sentence_labels = batch["sentence_labels"].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
        loss = outputs["loss"]
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    return total_loss / len(train_loader)

def evaluate(model, test_loader, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            sentence_labels = batch["sentence_labels"].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels, sentence_labels=sentence_labels)
            total_loss += outputs["loss"].item()
    return total_loss / len(test_loader)


def identify_entities(text, model, tokenizer, device, label_dict):
    encoding = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    input_ids = encoding["input_ids"].to(device)
    attention_mask = encoding["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        token_logits = outputs["token_logits"]
        sentence_logits = outputs["sentence_logits"]

    token_predictions = model.decode(token_logits, attention_mask)
    sentence_prediction = torch.argmax(sentence_logits, dim=1).item()

    # Display the tokenized input
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
    token_predictions = token_predictions[0].cpu().numpy()

    print("\n[Tokenized Input and Predicted Tags]")
    for token, label_id in zip(tokens, token_predictions):
        label = list(label_dict.keys())[label_id]
        print(f"Token: {token}, Tag: {label}")

    # Extract entities from the predicted tags
    entities = []
    current_entity = None
    for token, label_id in zip(tokens, token_predictions):
        label = list(label_dict.keys())[label_id]
        if label.startswith("B-"):
            if current_entity:
                entities.append(current_entity)
            current_entity = {"type": label[2:], "text": token}
        elif label.startswith("I-") and current_entity and current_entity["type"] == label[2:]:
            current_entity["text"] += " " + token
        else:
            if current_entity:
                entities.append(current_entity)
                current_entity = None

    if current_entity:
        entities.append(current_entity)

    sentence_sensitivity = "민감" if sentence_prediction == 1 else "비민감"

    return entities, sentence_sensitivity

# 메인 실행 부분
if __name__ == "__main__":
    # 데이터 준비 (예시)
    texts = [
        "김철수씨는 서울대학교 컴퓨터공학과를 졸업하고 현재 구글 코리아에서 소프트웨어 엔지니어로 2년째 근무 중입니다.",
        "이회사 대표이사 박영희는 연세대학교 경영학과 출신으로 알려져 있습니다.",
        "저는 한국대학교 물리학과에 재학 중인 학생입니다."
    ]
    tags = [
        ["B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "B-ORG", "I-ORG", "O", "B-POS", "I-POS", "O", "B-DUR", "I-DUR", "O", "O", "O"],
        ["O", "B-POS", "B-PER", "I-PER", "O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O"],
        ["O", "B-ORG", "I-ORG", "B-EDU", "I-EDU", "O", "O", "O", "O", "O"]
    ]
    sentence_labels = [1, 1, 0]  # 1: 민감, 0: 비민감

    # 데이터 분할
    train_texts, test_texts, train_tags, test_tags, train_sentence_labels, test_sentence_labels = train_test_split(
        texts, tags, sentence_labels, test_size=0.2, random_state=42
    )

    # 토크나이저 및 모델 초기화
    tokenizer = DistilBertTokenizerFast.from_pretrained("monologg/distilkobert")
    config = DistilBertConfig.from_pretrained("monologg/distilkobert")
    config.num_labels = len(ENTITY_TAGS) * 2 + 1  # B, I for each entity + O

    # 데이터셋 및 데이터로더 생성
    train_dataset = EnhancedNERDataset(train_texts, train_tags, train_sentence_labels, tokenizer)
    test_dataset = EnhancedNERDataset(test_texts, test_tags, test_sentence_labels, tokenizer)
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=2)

    # 모델 초기화
    model = EnhancedNERModel(config)

    # 학습 설정
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

    # 학습
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, optimizer, device)
        eval_loss = evaluate(model, test_loader, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Eval Loss: {eval_loss:.4f}")

    # 모델 저장
    torch.save(model.state_dict(), "enhanced_ner_model.pth")

    # 추론 예시
    test_text = "김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다."
    entities, sentence_sensitivity = identify_entities(test_text, model, tokenizer, device, train_dataset.label_dict)

    print("\n[Final Extracted Entities]")
    print("입력 텍스트:", test_text)
    print("식별된 개체:")
    for entity in entities:
        print(f"- {entity['text']} ({entity['type']})")
    print(f"문장 민감도: {sentence_sensitivity}")


Epoch 1/10, Train Loss: 3.7597, Eval Loss: 2.6959
Epoch 2/10, Train Loss: 2.6071, Eval Loss: 2.4316
Epoch 3/10, Train Loss: 2.4268, Eval Loss: 2.3588
Epoch 4/10, Train Loss: 1.8176, Eval Loss: 2.4500
Epoch 5/10, Train Loss: 1.8143, Eval Loss: 2.5761
Epoch 6/10, Train Loss: 1.6547, Eval Loss: 2.7241
Epoch 7/10, Train Loss: 1.3008, Eval Loss: 2.7493
Epoch 8/10, Train Loss: 1.1318, Eval Loss: 2.6602
Epoch 9/10, Train Loss: 0.9772, Eval Loss: 2.5753
Epoch 10/10, Train Loss: 1.0795, Eval Loss: 2.6232

[Tokenized Input and Predicted Tags]
Token: [CLS], Tag: O
Token: [UNK], Tag: O
Token: [UNK], Tag: B-POS
Token: [UNK], Tag: B-PER
Token: [UNK], Tag: B-EDU
Token: 현재, Tag: I-EDU
Token: [UNK], Tag: O
Token: 마케팅, Tag: O
Token: [UNK], Tag: B-EDU
Token: [UNK], Tag: O
Token: [UNK], Tag: O
Token: ., Tag: I-EDU
Token: [SEP], Tag: O

[Final Extracted Entities]
입력 텍스트: 김영희는 고려대학교 경제학과를 졸업하고 현재 삼성전자에서 마케팅 팀장으로 일하고 있습니다.
식별된 개체:
- [UNK] (POS)
- [UNK] (PER)
- [UNK] 현재 (EDU)
- [UNK] (EDU)
문장 민감도: 민감
