In [17]:
from kobert_tokenizer import KoBERTTokenizer
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')

print(tokenizer.tokenize("안녕하세요. 반갑습니다."))
inputs = tokenizer("안녕하세요. 반간습니다.")
print(inputs)

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'XLNetTokenizer'. 
The class this function is called from is 'KoBERTTokenizer'.


['▁안', '녕', '하세요', '.', '▁반', '갑', '습니다', '.']
{'input_ids': [2, 3135, 5724, 7814, 54, 2207, 5337, 6701, 54, 3], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}


In [18]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

df = pd.read_csv('./data/daum_movie_review.csv')
# rating이 6보다 작으면 0(부정), 6 이상이면 긍정으로 라벨 생성
y =[0 if rate < 6 else 1 for rate in df.rating]
# 데이터셋을 학습, 검증, 평가 데이터셋으로 분리
X_train_val, X_test, y_train_val, y_test = train_test_split(df.review.tolist(), y, random_state=0)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, random_state=0)

print('#Train set size:', len(X_train))
print('#Validation set size:', len(X_val))
print('#Test set size:', len(X_test))

#Train set size: 8282
#Validation set size: 2761
#Test set size: 3682


In [19]:
import torch
import evaluate

metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits,labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

class OurDataset(torch.utils.data.Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.inputs.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item
    
    def __len__(self):
        return len(self.labels)

In [20]:
from transformers import BertModel
from torch.utils.data import DataLoader

# 토큰화
train_input = tokenizer(X_train, truncation=True, padding=True, return_tensors="pt")
val_input = tokenizer(X_val, truncation=True, padding=True, return_tensors="pt")
test_input = tokenizer(X_test, truncation=True, padding=True, return_tensors="pt")

# Dataset 생성
train_dataset = OurDataset(train_input, y_train)
val_dataset = OurDataset(val_input, y_val)
test_dataset = OurDataset(test_input, y_test)

# 데이터로더 생성
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=8)
val_loader = DataLoader(val_dataset, batch_size=16)
test_loader = DataLoader(test_dataset, batch_size=16)

# KoBERT 사전학습 모형 로드
bert_model = BertModel.from_pretrained('skt/kobert-base-v1')

# BERT를 포함한 신경망 모형
class MyModel(torch.nn.Module):
    def __init__(self, pretrained_model, token_size, num_labels):
        super(MyModel, self).__init__()
        self.token_size = token_size
        self.num_labels = num_labels
        self.pretrained_model = pretrained_model

        # 분류기 정의
        self.classifier = torch.nn.Linear(self.token_size, self.num_labels)

    def forward(self, inputs):
        # BERT 모형에 입력을 넣고 출력을 받음
        outputs = self.pretrained_model(**inputs)
        # BERT 출력에서 CLS 토큰에 해당하는 부분만 가져옴
        bert_clf_token = outputs.last_hidden_state[:,0,:]

        return self.classifier(bert_clf_token)
    
# token_size는 BERT 토큰과 동일, bert_model.config.hidden_size로 알 수 있음
model = MyModel(bert_model, num_labels=2, token_size=bert_model.config.hidden_size)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


TypeError: Instance and class checks can only be used with @runtime_checkable protocols

In [None]:
from transformers import AdamW, get_linear_schedule_with_warmup
import torch.nn.functional as F
import time

# gpu 사용
device = torch.device("mps")
model.to(device)
model.train()

# 옵티마이저를 트랜스포머가 제공하는 AdamW로 설정
optim = AdamW(model.parameters(), lr=5e-5, weight_decay=0.01) # 가중치 감쇠 설정
# 멀티클래스이므로 크로스 엔트로피를 손실 함수로 사용
criterion = torch.nn.CrossEntropyLoss()

num_epochs = 2      # 학습 epochs를 2회로 설정
total_training_steps = num_epochs * len(train_loader)
# 학습 스케줄러 설정
scheduler = get_linear_schedule_with_warmup(optimizer=optim,
                                            num_training_steps=total_training_steps,
                                            num_warmup_steps=200)

start = time.time() # 시작시간 기록
train_loss = 0
eval_steps = 500
step = 0

for epoch in range(num_epochs):
    #total_epoch_loss = 0 # epoch의 총 loss 초기화
    for batch in train_loader:
        model.train()   # 학습모드로 전환
        optim.zero_grad()   # 그래디언트 초기화

        # 배치에서 라벨을 제외한 입력만 추출해 GPU로 복사
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device) # 배치에서 라벨을 추출해 GPU로 복사
        outputs = model(inputs)  # 모형으로 결과 예측

        # 두 클래스에 대해 예측하고 각각 비교해야 하므로
        # labels에 대해 원핫 인코딩을 적용한 후에 손실을 계산
        loss = criterion(outputs, F.one_hot(labels, num_classes=2).float())  # loss 계산
        train_loss += loss
        loss.backward()  # 그래디언트 게산
        optim.step()     # 가중치 업데이트
        scheduler.step() # 스케줄러 업데이트

        step += 1
        if step % eval_steps == 0:  # eval_steps마다 경과한 시간과 loss를 출력
            with torch.no_grad():
                val_loss = 0
                model.eval()
                for batch in val_loader:
                    inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
                    labels = batch['labels'].to(device)
                    outputs = model(inputs)
                    # loss 계산
                    loss = criterion(outputs, F.one_hot(labels, num_classes=2).float())
                    val_loss += loss
                avg_val_loss = val_loss / len(val_loader)
            avg_train_loss = train_loss / eval_steps
            elapsed = time.time() - start
            print(
                'Step %d, elapsed time: %.2f, train loss: %.4f, validation loss: %.4f' % (step, elapsed, avg_train_loss, avg_val_loss)
            )

In [None]:
import evaluate

metric = evaluate.load("accuracy")
model.eval()
for batch in test_loader:
    inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
    labels = batch['labels'].to(device)

    with torch.no_grad():  # 학습할 필요가 없으므로 그레디언트 계산을 끔
        outputs = model(inputs)

    predictions = torch.argmax(outputs, dim=-1)
    metric.add_batch(predictions=predictions, references=labels)

metric.compute()