### set environments

In [None]:
!pip install transformers

In [None]:
import torch
import numpy as np
import transformers
from tqdm import tqdm
from transformers import BertTokenizer, BertModel
from transformers import BertForMaskedLM,BertForSequenceClassification
from transformers import get_linear_schedule_with_warmup
from transformers import AdamW, BertConfig
import torch.nn.functional as F
from keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

import torch.nn.functional as F
import random
import time
import datetime

In [None]:
if torch.cuda.is_available():    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print('No GPU available, using the CPU instead.')

### Read file, Load data, Save num

In [None]:
def read_file(filename):
    with open(filename, "r") as file:
        lines = file.readlines()
        total = []
        for line in lines:
            total.append(line.strip().split("\n"))
    return total

def load_data(data_dir, *filenames):
    sentences = []

    for filename in filenames:
        with open(data_dir + filename, 'r') as f:
            for line in f.readlines():
                sentences.append(line.strip().split('\t')[0])
    return sentences, len(sentences)

def save_num(data_dir, *filenames):
  num = []
  for filename in filenames:
      with open(data_dir + filename, 'r') as f:
          for line in f.readlines():
              num.append(line.strip().split('\t')[1])
  return num

In [None]:
data_dir = '/content/'
train = 'masked_xialai-train data.txt'
test = 'masked_xialai-test data.txt'

buyu_train = read_file(train)
buyu, buyu_len = load_data(data_dir, train)
buyu_num = save_num(data_dir, train)

buyu_test = read_file(test)
buyu_test, buyu_len_test = load_data(data_dir, test)
buyu_num_test = save_num(data_dir, test)


print("train sentence :", buyu_len)
print("test sentence :", buyu_len_test)

train sentence : 5100
test sentence : 1000


In [None]:
# save num 결과가 이상한 경우, 확인용 코드

import pandas as pd

path = '/content/'
df = pd.DataFrame({"num": buyu_num})
df.to_csv(path + "num.csv", encoding="utf-8-sig", index=False)

In [None]:
def preprocess(data, label):
    sentences = []
    labels = []
    nums = []
    error_cnt = 0
    for sent in data:
      sent = sent.replace('[MASK]', label)
      # 해당 문장에 방향보어가 있는 지 확인
      if label not in sent:
        print(f"Sententce : {sent}")
        error_cnt += 1
        continue
      #sent = sent.replace(label, '[MASK]')
      sentences.append(sent)
      labels.append(label)
    print(f"{label} 방향보어 없는 문장 개수 : {error_cnt}")
    return sentences, labels

buyu_sent, buyu_label= preprocess(buyu, '上')
buyu_sent_test, buyu_label = preprocess(buyu_test, '上')

上 방향보어 없는 문장 개수 : 0
上 방향보어 없는 문장 개수 : 0


In [None]:
train_sent = buyu_sent
train_num = buyu_num
test_sent = buyu_test
test_num = buyu_num_test

### Using BertTokenizer

In [None]:
MODEL_TYPE = 'hfl/chinese-macbert-large'
tokenizer = BertTokenizer.from_pretrained(MODEL_TYPE)

Downloading:   0%|          | 0.00/107k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/19.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/263k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/660 [00:00<?, ?B/s]

In [None]:
tokenized_sent = [tokenizer.tokenize(sent) for sent in train_sent]
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_sent]

tokenized_sent_test = [tokenizer.tokenize(sent) for sent in test_sent]
input_ids_test = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_sent_test]

input_ids = pad_sequences(input_ids, dtype="long", truncating="post", padding="post")
input_ids_test = pad_sequences(input_ids_test, dtype="long", truncating="post", padding="post")

In [None]:
num_list = ['1', '2', '3', '4']
nums = [num_list.index(x) for x in train_num]
nums_test = [num_list.index(x) for x in test_num]

attention_masks = []
attention_masks_test = []

for seq in input_ids:
    seq_mask = [float(i>0) for i in seq]
    attention_masks.append(seq_mask)

for seq in input_ids_test:
    seq_mask = [float(i>0) for i in seq]
    attention_masks_test.append(seq_mask)

### train_test_split

In [None]:
train_inputs, validation_inputs, train_nums, validation_nums = train_test_split(input_ids, nums, random_state=1, test_size=0.1)
test_inputs, _, test_nums, _ = train_test_split(input_ids_test, nums_test, random_state=1, test_size=0.1)

train_masks, validation_masks, _, _ = train_test_split(attention_masks, input_ids, random_state=1, test_size=0.1)
test_masks = attention_masks_test

# 데이터를 파이토치의 텐서로 변환
train_inputs = torch.tensor(train_inputs)
#train_labels = torch.tensor(train_labels)
train_nums = torch.tensor(train_nums)
train_masks = torch.tensor(train_masks)
validation_inputs = torch.tensor(validation_inputs)
#validation_labels = torch.tensor(validation_labels)
validation_nums = torch.tensor(validation_nums)
validation_masks = torch.tensor(validation_masks)
test_inputs = torch.tensor(test_inputs)
#test_labels = torch.tensor(test_labels)
test_nums = torch.tensor(test_nums)
test_masks = torch.tensor(test_masks)

In [None]:
batch_size = 10

train_data = TensorDataset(train_inputs, train_masks, train_nums)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_nums)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

#test_data = TensorDataset(test_inputs, test_masks, test_nums)
#test_sampler = SequentialSampler(test_data)
#test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

In [None]:
model = BertForSequenceClassification.from_pretrained(MODEL_TYPE, num_labels=4)
model.cuda()

Downloading:   0%|          | 0.00/1.22G [00:00<?, ?B/s]

Some weights of the model checkpoint at hfl/chinese-macbert-large were not used when initializing BertForSequenceClassification: ['cls.predictions.decoder.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not 

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(21128, 1024, padding_idx=0)
      (position_embeddings): Embedding(512, 1024)
      (token_type_embeddings): Embedding(2, 1024)
      (LayerNorm): LayerNorm((1024,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0): BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=1024, out_features=1024, bias=True)
              (key): Linear(in_features=1024, out_features=1024, bias=True)
              (value): Linear(in_features=1024, out_features=1024, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=1024, out_features=1024, bias=True)
              (LayerNorm): LayerNorm((1024,), eps=1

### Training

In [None]:
optimizer = AdamW(model.parameters(), lr = 2e-5, eps = 1e-8)
epochs = 3
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0,
                                            num_training_steps = total_steps)

def flat_accuracy(preds, nums):   
    pred_flat = np.argmax(preds, axis=1).flatten()
    nums_flat = nums.flatten()
    return np.sum(pred_flat == nums_flat) / len(nums_flat)

def format_time(elapsed):
    elapsed_rounded = int(round((elapsed)))  
    # hh:mm:ss으로 형태 변경
    return str(datetime.timedelta(seconds=elapsed_rounded))

# 재현을 위해 랜덤시드 고정
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

model.zero_grad()

In [None]:
for epoch_i in range(0, epochs):
    
    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    print('Training...')

    t0 = time.time()
    total_loss = 0
    model.train()
        
    for step, batch in enumerate(train_dataloader):
        if step % 500 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))

        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch
           
        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)

        loss = outputs[0]
        total_loss += loss.item()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        model.zero_grad()

    avg_train_loss = total_loss / len(train_dataloader)            

    print("")
    print("  Average training loss: {0:.2f}".format(avg_train_loss))
    print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))

    print("")
    print("Running Validation...")

    t0 = time.time()
    model.eval()

    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    for batch in validation_dataloader:
        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch
        with torch.no_grad():     
            outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)       
            
        logits = outputs[0]
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()
        
        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("  Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
    print("  Validation took: {:}".format(format_time(time.time() - t0)))

print("")
print("Training complete!")

## Test

In [None]:
def convert_input_data(sentences):
    # BERT의 토크나이저로 문장을 토큰으로 분리
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]

    # 토큰을 숫자 인덱스로 변환
    input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
    
    # 문장을 MAX_LEN 길이에 맞게 자르고, 모자란 부분을 패딩 0으로 채움
    input_ids = pad_sequences(input_ids, dtype="long", truncating="post", padding="post")
    attention_masks = []

    # 어텐션 마스크를 패딩이 아니면 1, 패딩이면 0으로 설정
    # 패딩 부분은 BERT 모델에서 어텐션을 수행하지 않아 속도 향상
    for seq in input_ids:
        seq_mask = [float(i>0) for i in seq]
        attention_masks.append(seq_mask)

    inputs = torch.tensor(input_ids)
    masks = torch.tensor(attention_masks)

    return inputs, masks

def test_sentences(sentences):
    model.eval()
    inputs, nums = convert_input_data(sentences)

    # 데이터를 GPU에 넣음
    b_input_ids = inputs.to(device)
    b_input_nums = nums.to(device)
            
    with torch.no_grad():     
        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_nums)

    logits = outputs[0]
    logits = logits.detach().cpu()
    return logits

def predict_sentence():
    #sent = input('Input Sentence')
    logits = test_sentences([sent])
    softmax = F.softmax(logits[0], dim=0).numpy()
    
    for l, p in zip(num_list, softmax):
        print(f"{l} : {p*100:.2f}%")

In [None]:
test_result = []

for sent in buyu_sent_test:
  print(sent) 
  predict_sentence()
  logits = test_sentences([sent])
  predicted_nums = num_list[np.argmax(logits)]
  print('Predicted num class is ', predicted_nums)
  test_result.append(predicted_nums)
  print('\n')

In [None]:
answer_list = []
wrong_result_list = []
y_true = []
y_pred = []

wrong = 0

for i in range(len(test_num)):
  if int(test_num[i]) != int(test_result[i]):
    wrong += 1
    wrong_result_list.append(test_result[i]) # 오답 예측
    answer_list.append(test_num[i]) # 정답 예측

  y_true.append(int(test_num[i])) # 정답
  y_pred.append(int(test_result[i])) # 예측

print('1번으로 잘못 예측한 경우 : ', wrong_result_list.count('1'))
print('2번으로 잘못 예측한 경우 : ', wrong_result_list.count('2'))
print('3번으로 잘못 예측한 경우 : ', wrong_result_list.count('3'))
print('4번으로 잘못 예측한 경우 : ', wrong_result_list.count('4'))

test_accuracy = (len(test_num) - wrong) / len(test_num) * 100
print('\nTotal Num : ', len(test_num))
print('Correct : ', len(test_num)-wrong)
print('Wrong : ', wrong)
print('\nTest Accuracy : ' , test_accuracy)

## F1 score

정확도 : 예측이 정답과 얼마나 정확한가?


정밀도 : 예측한 것중에 정답의 비율은? Positive로 예측한 경우 중 실제로 Positive인 비율이다, 즉 예측값이 얼마나 정확한가


재현율 : 찾아야 할 것중에 실제로 찾은 비율은? 실제 Positive인 것 중 올바르게 Positive를 맞춘 것의 비율 이다, 즉 실제 정답을 얼마나 맞췄느냐


F1 Score : 정밀도와 재현율의 평균

In [None]:
def precision(y_true, y_pred, positive_label=1):
    true_positive  = 0 
    false_positive = 0 
    for (i,p) in enumerate(y_pred):
        if p == positive_label and y_true[i] == positive_label: # TP Case
            true_positive += 1
        elif p == positive_label and y_true[i] != positive_label: # FP Case
            false_positive += 1
    try:
      return  true_positive / (true_positive + false_positive)
    except ZeroDivisionError:
      print("zero division error")

def recall(y_true, y_pred, positive_label=1):
    true_positive  = 0 
    false_negative = 0  
    for (i,p) in enumerate(y_pred):
        if p == positive_label and y_true[i] == positive_label: # TP Case
            true_positive += 1
        elif p != positive_label and y_true[i] == positive_label: # FN Case
            false_negative += 1
    try:
      return true_positive / (true_positive + false_negative)

    except ZeroDivisionError:
      print("zero division error")

def f1_score(y_true, y_pred, positive_label=1):
    f1_precision = precision(y_true, y_pred, positive_label)
    f1_recall    = recall(y_true, y_pred, positive_label)
    try:
      return 2.0 / (1/f1_precision + 1/f1_recall)

    except ZeroDivisionError:
      print("zero division error")

print('precision:', precision(y_true, y_pred))
print('recall:', recall(y_true, y_pred))
print('f1_score:', f1_score(y_true, y_pred))

In [None]:
from sklearn import metrics

print(metrics.classification_report(y_true, y_pred))
print(metrics.confusion_matrix(y_true, y_pred))

In [None]:
import pandas as pd
df = pd.DataFrame({"Sentence" : buyu_sent_test, "True": y_true, "Pred": y_pred})
df

In [None]:
path = '/content/'
df.to_csv(path + "buyu_pred_result.csv", encoding="utf-8-sig", index=False)