In [None]:
!pip install datasets transformers

In [None]:
import re 
import pandas as pd 
import torch 
import torch.nn as nn
from datasets import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from transformers import AutoTokenizer, BertModel, Trainer, TrainingArguments

In [None]:
# 텍스트 정규화 함수 
def normalize_token_text(text : str) -> str:
    text = re.sub(r'[^가-힣a-zA-Z0-9\s\.]', " ", text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

In [None]:
# 데이터 로드 
df = pd.read_csv("../data/ratings_train.txt", sep='\t')
df.info()

In [None]:
# document의 결측치 제거 
df.dropna(subset=['document'], inplace=True)
# 텍스트 정규화
df['document'] = df['document'].map(normalize_token_text)
# document 의 길이가 1이하인 행을 제거 
df = df.loc[df['document'].str.len() > 1 ]
# 중복데이터 제거
df.drop_duplicates(subset=['document'], inplace=True)
# 라벨 컬럼의 데이터를 숫자형 데이터로 변환 
df['label'] = df['label'].astype("int64")
# 데이터프레임의 인덱스를 초기화하고 기존 인덱스는 제거 
df.reset_index(drop=True, inplace=True)
df.info()

In [29]:
# 테스트를 위해 일정 구간만 선택 
df2 = df.head(10000)

In [30]:
# DataFrame을 Train, Test 셋으로 분할 
train_df, test_df =  train_test_split(
    df2, test_size= 0.2, random_state= 42, stratify= df2['label']
)
# BERT 모델에서 사용하는 데이터 타입으로 변경 
train_ds = Dataset.from_pandas(train_df.reset_index(drop=True))
test_ds = Dataset.from_pandas(test_df.reset_index(drop=True))

In [31]:
train_ds

Dataset({
    features: ['id', 'document', 'label'],
    num_rows: 8000
})

In [32]:
# 토큰화 -> AutoTokenizer를 이용하여 BERT 모델에서 사용하는 토큰화 작업을 로드 
MODEL_NAME = "skt/kobert-base-v1"

# use_fast = False -> 기본(파이썬 기반) 토크나이저 
    # KoBERT 모델은 sentencepiece 기반 토큰화 -> 기본 모델을 사용을 권장 
# use_fast = True -> 빠른 토크나이저 -> Rust, C 기반 
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast = False)

In [33]:
def tok_fn(batch):
    # batch?? -> 데이터의 묶음 

    # truncation -> 문장이 최대 입력 길이를 초과했을때 자동으로 자를것인가?
    # max_length -> 토큰의 최대 길이 -> 128 설정은 BERT 모델의 일반적인 설정
    result = tokenizer( batch['document'], truncation= True, max_length=128 )
    # result -> input_ids, attention_mask, token_type_ids 등의 포함 
    return result

In [34]:
# Dataset 타입의 데이터에서 데이터들의 묶음(batch)를 tok_fn에 대입하여 새로운 Dataset 생성
# batched 는 배치들을 병렬 처리할것인가
# batch_size -> 하나의 배치의 데이터의 양
train_tok = train_ds.map(tok_fn, batched=True, remove_columns=['id', 'document'])
test_tok = test_ds.map(tok_fn, batched = False, remove_columns=['id', 'document'])

Map: 100%|██████████| 8000/8000 [00:00<00:00, 14401.66 examples/s]
Map: 100%|██████████| 2000/2000 [00:00<00:00, 7646.02 examples/s]


In [35]:
train_tok
# input_ids -> 문장 토큰의 숫자 인덱스(단어사전의 해당 단어의 위치)-> 초기의 단어 사전 인덱스들은 특수 토큰
# token_type_ids -> 문장 구분용 인덱스 (0 : 첫번째 문장, 1: 두번째 문장, 2:버그)
# attention_mask -> 실제 토큰과 패딩 토큰 분류 (1 : 실제 토큰, 0 : 패딩 토큰) 

Dataset({
    features: ['label', 'input_ids', 'token_type_ids', 'attention_mask'],
    num_rows: 8000
})

In [36]:
train_tok['attention_mask']

Column([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])

In [37]:
# BERT 분류 모델을 정의 

class BERTClsHead(nn.Module):
    def __init__(self, model_name, num_label = 2, dropout = 0.1):
        # model_name -> 로드할 모델의 이름 
        # num_label -> 분류 클래스의 개수
        # dropout -> 데이터의 소실 비율 ( 과적합 방지 )

        # 설정 초기화
        super().__init__()

        # 사전에 학습된 BERT 모델을 로드 (백본)
        self.backbone = BertModel.from_pretrained(model_name)

        # BERT model에서의 output의 차원 개수 -> 768
        hidden = self.backbone.config.hidden_size

        # 과적합 방지를 위한 dropout 
        self.dropout = nn.Dropout(dropout)

        # 선형 모델  -> 2개의 class를 분류하는 모델 
        self.classifier = nn.Linear(hidden, num_label)

        # 패딩 토큰의 아이디 값을 백본 설정에 패딩 아이디에 대입 -> 확인차 대입 (안정성)
        self.backbone.config.pad_token_id = tokenizer.pad_token_id
        # 초기설정 완료
    
    # 순전파 함수 생성
    def forward(self, input_ids = None, 
                attention_mask = None, labels = None, **kwargs):
        # input_ids -> 토큰화 인코딩 처리가 완료된 문장
        # attention_mask -> 실제 단어 / 패딩 단어
        # labels -> 학습 시 정답의 라벨 ( 없으면 추론 모드 )

        # 백본에 입력 데이터를 대입 
        out = self.backbone(input_ids = input_ids, attention_mask = attention_mask)

        # [CLS] 토큰 벡터를 추출 
        # 입력의 첫번째 토큰 [CLS] -> 문장 전체를 대표하는 의미
        pooled = out.last_hidden_state[:, 0]

        # 일정 비율 데이터 소실
        drop_out_data = self.dropout(pooled)

        logits = self.classifier(drop_out_data)

        result = {'Logits' : logits}

        # labels의 데이터가 존재한다면 손실 계산
        if labels is not None:
            loss = nn.CrossEntropyLoss()(logits, labels)
            result['loss'] = loss
        
        return result


In [38]:
# 모델을 생성 
model = BERTClsHead(MODEL_NAME)

In [39]:
# 평가 함수 정의 (정확도, f1score)
def metrics(eval_pred):
    # eval_pred -> 예측값, 실젯값
    logits, y = eval_pred
    # logits [ x.xxx, x.xxx ]
    pred = logits.argmax(-1)
    return {
        'accuracy_score' : accuracy_score(y, pred), 
        'f1_score' : f1_score(y, pred)
    }

In [40]:
# TrainingArguments -> Trainer가 학습 할때 사용한 각종 설정 값들을 지정하는 객체 

args = TrainingArguments(
    # 학습된 모델의 결과들을 저장할 디렉토리 지정
    output_dir="./kobert_from_bertmodel", 
    # 배치의 크기를 설정 
    per_device_train_batch_size= 16,    # 학습시 cpu/gpu에 할당이 되는 배치의 크기  
    per_device_eval_batch_size= 16,     # 평가시 할당이 되는 배치의 크기 
    # 평가 및 저장 주기 설정 
    eval_strategy= "epoch",             # 한 epoch 마다 평가 수행
    save_strategy= 'epoch',             # 한 epoch 마다 모델을 저장
    # 학습 관련 설정 
    num_train_epochs= 2,                # 학습 epoch 수
    learning_rate= 5e-5,                # 옵티마이저의 학습율
    weight_decay= 0.01,                 # 가중치 감소 계수
    warmup_ratio= 0.1,                  # lr의 값을 올리는 비율
    logging_steps= 50,                  # 로그를 출력할 step의 간격
    # 모델 선택 및 저장 기준 
    load_best_model_at_end= True,       # 학습이 끝났을때 가장 성능이 좋은 모델을 자동 로드 
    metric_for_best_model= 'f1',        # 최고의 모델을 판단하는 검증 지표
    greater_is_better= True,            # 평가 지표가 높을 수록 좋은 모델인가?
    # 하드웨어 설정 
    fp16= torch.cuda.is_available(),    # cuda사용시 16-bit 혼합정밀도 학습을 할것인가
    use_mps_device= (
        torch.backends.mps.is_available() if not torch.cuda.is_available() else False
    ),                                  # Mac M1/M2 등 Apple silicon 가속기 사용 여부
    # 외부 로깅용 설정 
    report_to= []

)

In [41]:
# Trainer -> 모델의 학습을 자동으로 관리하는 Hugging Face의 고수준 API

trainer = Trainer(
    model = model,                  # 모델 선택
    args = args,                    # 학습에 사용한 설정값
    train_dataset= train_tok,       # 학습에 사용할 데이터
    eval_dataset= test_tok,         # 평가에 사용할 데이터
    tokenizer = tokenizer,          # 토큰화 함수
    compute_metrics= metrics        # 평가 시 사용할 검증 지표 함수
)

  trainer = Trainer(


In [42]:
# 평가 및 예측 테스트 

eval_res = trainer.evaluate()
print("평가의 결과 : ", eval_res)



평가의 결과 :  {'eval_loss': 0.7111042737960815, 'eval_model_preparation_time': 0.0, 'eval_accuracy_score': 0.48, 'eval_f1_score': 0.16666666666666666, 'eval_runtime': 62.795, 'eval_samples_per_second': 31.85, 'eval_steps_per_second': 1.991}


In [43]:
# 새 문장에 대한 감정 분류 

samples = [
    '정말 감동적인 영화였습니다', 
    '지루하고 시간 낭비였습니다'
]

# 토큰화 
enc = tokenizer(
    samples, 
    return_tensors = 'pt',      # pt -> Pytorch 텐서형태로 변환
    padding = True, 
    truncation = True
)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


In [44]:
with torch.no_grad():
    out = model(**enc)
    probs = torch.softmax(
        out['Logits'], dim=-1
    ).cpu().numpy()

In [45]:
import numpy as np

In [46]:
for s, p in zip(samples, probs):
    print(f"{s} : 부정 = {p[0]:.3f}, 긍정 = {p[1]:.3f} | 예측 = {p.argmax()}")
    # print(type(p[0]))

정말 감동적인 영화였습니다 : 부정 = 0.624, 긍정 = 0.376 | 예측 = 0
지루하고 시간 낭비였습니다 : 부정 = 0.430, 긍정 = 0.570 | 예측 = 1
