In [99]:
import pandas as pd
import torch
import ast
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from transformers import AutoModelForTokenClassification, AutoTokenizer

## 데이터 파일 읽기

In [100]:
file_path = './data/NER_결제승인.xlsx'
data = pd.read_excel(file_path)

data.head()

Unnamed: 0,message,result,annotations
0,"[Web발신] [신한체크승인] 장*진(4384) 01/03 17:24 (금액)5,0...","{\n""Method"":""신한체크"",\n""Location"":""서부카투사식당"",\n""T...","[(9, 13, 'METHOD'), (50, 57, 'LOCATION'), (27,..."
1,"[Web발신] [신한체크승인] 장*진(4384) 01/05 12:04 (금액)5,0...","{\n""Method"":""신한체크"",\n""Location"":""서부카투사식당"",\n""T...","[(9, 13, 'METHOD'), (50, 57, 'LOCATION'), (27,..."
2,"[Web발신] [신한체크승인] 장*진(4384) 08/16 22:47 (금액)1,5...","{\n""Method"":""신한체크"",\n""Location"":""태평할인마트"",\n""Ti...","[(9, 13, 'METHOD'), (50, 56, 'LOCATION'), (27,..."
3,"[Web발신] [신한체크승인] 장*진(8730) 12/20 13:35 (금액)4,2...","{\n""Method"":""신한체크"",\n""Location"":""한솥도시락한림대앞점"",\...","[(9, 13, 'METHOD'), (50, 60, 'LOCATION'), (27,..."
4,"[Web발신] [신한체크승인] 장*진(8730) 12/21 23:30 (금액)1,0...","{\n""Method"":""신한체크"",\n""Location"":""네이버페이"",\n""Tim...","[(9, 13, 'METHOD'), (50, 55, 'LOCATION'), (27,..."


## 데이터 전처리

In [101]:
# annotations 컬럼의 문자열을 파이썬 리스트로 변환
data['annotations'] = data['annotations'].apply(ast.literal_eval)

# 태그를 정수로 매핑하는 딕셔너리
tag2id = {'O': 0, 'METHOD': 1, 'LOCATION': 2, 'TIME': 3, 'COST': 4}

# KoBERT 토크나이저 로드
model_name = "monologg/kobert"
tokenizer = AutoTokenizer.from_pretrained(model_name)

## 토큰화 및 엔티티 태그 할당

In [102]:
def tokenize_and_align_labels(tokenizer, texts, annotations, tag2id):
    tokenized_inputs = tokenizer(texts, truncation=True, padding=True, return_tensors="pt", return_offsets_mapping=True)
    labels = []

    for i, annotation in enumerate(annotations):
        offset_mapping = tokenized_inputs['offset_mapping'][i].tolist()  # offset_mapping을 리스트로 변환
        label_ids = [tag2id['O']] * len(offset_mapping)  # Initialize with the 'O' label

        # Convert character-level annotation to token-level annotation
        for start_char, end_char, label in annotation:
            # Find the start token index
            start_token_index = None
            end_token_index = None
            for idx, (offset_start, offset_end) in enumerate(offset_mapping):
                if start_token_index is None and offset_start <= start_char < offset_end:
                    start_token_index = idx
                if offset_start < end_char <= offset_end:
                    end_token_index = idx
                    break  # Stop the loop once the end token is found

            # It's possible that a single word gets split into multiple tokens.
            # We need to assign the correct label to all tokens derived from the word.
            if start_token_index is not None and end_token_index is not None:
                for token_index in range(start_token_index, end_token_index + 1):
                    label_ids[token_index] = tag2id[label]

        # Set labels for special tokens to -100 so that they are not used in the loss calculation
        label_ids = [-100 if token_idx in (tokenizer.cls_token_id, tokenizer.sep_token_id, tokenizer.pad_token_id) else label for token_idx, label in zip(tokenized_inputs["input_ids"][i].tolist(), label_ids)]
        labels.append(label_ids)

    # Remove offset_mapping from tokenized_inputs for model training
    tokenized_inputs.pop("offset_mapping")

    return tokenized_inputs, labels

In [103]:
# 실제 데이터에 대해 토큰화 및 태그 할당
texts = data['message'].tolist()
annotations = data['annotations'].tolist()
tokenized_texts, labels = tokenize_and_align_labels(tokenizer, texts, annotations, tag2id)

## 훈련 데이터 셋 준비

In [104]:
class NERDataset(Dataset):
    def __init__(self, encodings, labels=None):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        if self.labels is not None:
            item['labels'] = torch.tensor(self.labels[idx])  # labels를 텐서로 변환
        return item

    def __len__(self):
        return len(self.encodings['input_ids'])


In [105]:
# 데이터셋 객체 생성
train_dataset = NERDataset(tokenized_texts, labels)

## 모델 및 학습 설정

In [106]:
# 모델 로드
model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(tag2id))

Some weights of BertForTokenClassification were not initialized from the model checkpoint at monologg/kobert and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [107]:
# 학습 준비
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
model.train()

BertForTokenClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(8002, 768, padding_idx=1)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, ele

In [108]:
# 최적화기 설정
optimizer = AdamW(model.parameters(), lr=5e-5)

### GPU 사용 코드

In [109]:
# for epoch in range(3):
#     for batch in train_loader:
#         optimizer.zero_grad()
#         input_ids = batch['input_ids'].to(torch.device('cuda'))
#         attention_mask = batch['attention_mask'].to(torch.device('cuda'))
#         labels = batch['labels'].to(torch.device('cuda'))
#         outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
#         loss = outputs.loss
#         loss.backward()
#         optimizer.step()

### CPU 사용 코드

In [110]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 학습 과정
for epoch in range(10):
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader)}")


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


Epoch 1, Loss: 1.2931532859802246
Epoch 2, Loss: 0.8657777309417725
Epoch 3, Loss: 0.6522725224494934
Epoch 4, Loss: 0.49368470907211304
Epoch 5, Loss: 0.38013402620951336
Epoch 6, Loss: 0.29835015535354614
Epoch 7, Loss: 0.2339643438657125
Epoch 8, Loss: 0.1728955258925756
Epoch 9, Loss: 0.128826675315698
Epoch 10, Loss: 0.09103158861398697


## 학습된 모델 테스트 함수

In [111]:
id2tag = {0: 'O', 1: 'METHOD', 2: 'LOCATION', 3: 'TIME', 4: 'COST'}
def predict(model, tokenizer, device, message):
    model.eval()
    inputs = tokenizer(message, return_tensors="pt", padding=True, truncation=True, max_length=512, return_offsets_mapping=True)
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)
    offset_mapping = inputs["offset_mapping"].detach().cpu().numpy()[0]  # 오프셋 매핑 정보
    
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        predictions = torch.argmax(outputs.logits, dim=-1)

    labels = [id2tag[id] for id in predictions[0].cpu().numpy()]
    extracted_info = {"METHOD": "", "LOCATION": "", "TIME": "", "COST": ""}
    
    for i, (offset, label) in enumerate(zip(offset_mapping, labels)):
        if label != "O":
            start, end = offset
            extracted_text = message[start:end]
            extracted_info[label] += extracted_text + " "

    for key in extracted_info:
        extracted_info[key] = extracted_info[key].strip()

    return extracted_info



In [115]:
# 예시 메시지로 예측 테스트
test_message = "[Web발신] 하나(1*1*) 우*하님 일시불 25,900원 10/15 11:47 누적 282,132원 양의원"
predicted_info = predict(model, tokenizer, device, test_message)
for domain, text in predicted_info.items():
    print(f"{domain}: {text.strip()}")


METHOD: 하나
LOCATION: 양의원
TIME: 10 / 15 11 : 47
COST: 25 , 900원


## 모델 저장

In [116]:
model_save_path = "./model/kobert"
model.save_pretrained(model_save_path)
tokenizer.save_pretrained(model_save_path)

('./model/kobert/tokenizer_config.json',
 './model/kobert/special_tokens_map.json',
 './model/kobert/vocab.txt',
 './model/kobert/added_tokens.json',
 './model/kobert/tokenizer.json')