In [1]:
import pandas as pd

train_set = pd.read_csv('data/병합데이터셋.csv', index_col=0) 
train_set.sample(n=5)

Unnamed: 0,sentence,emotion
35212,죽기 전에 이것저것 해 보고 싶은데 딸과 사위의 눈치를 보게 되어서 억울하네.,슬픔
17760,와봐!! 와봐!! 어디서 되도 않는 20세기식 협박을 하고 있어... 21세기에.....,분노
30236,어차피 열심히 해봤자 취업도 못 할텐데.,슬픔
53244,"아니, 그래도 말이지……",중립
12327,아무리 생각할수록 결혼을 진짜 잘못한 거 같아.,슬픔


In [2]:
# 감정을 정수 라벨로 변경
emotions = {'행복': 0, '불안': 1, '놀람': 2, '슬픔': 3, '분노': 4, '중립': 5 }
# emotions = {'기쁨': 0, '불안': 1, '당황': 2, '슬픔': 3, '분노': 4, '상처': 5} // 감성대화 말뭉치

train_set['emotion'] = train_set.emotion.map(emotions)

train_set.sample(n=5)

Unnamed: 0,sentence,emotion
3526,치료해봤자 나아지지도 않고 괴롭기만 한 항암 치료는 이제 지긋지긋해.,3.0
1338,헤어진 여자 친구가 뒤늦게 임신 소식을 알렸는데 사실인가 싶어서 불안해.,
14505,이제그만하자 국가적 망신이다,4.0
26879,치매를 예방하고 싶어서 손을 많이 움직이는 취미를 가져보려고 해.,
637,학교에서 애들이랑 오해가 생겼는데 가족들이 함께 해결해 주려고 하니까 힘이 나.,0.0


In [3]:
# 라이브러리 불러오기
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split
import gluonnlp as nlp

#kobert
from kobert_tokenizer import KoBERTTokenizer

# transformers
from transformers import BertModel
from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup

# GPU 사용시 필요
#device = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device("cuda")
#device = torch.device('cpu')

In [4]:
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
bertmodel = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False)
vocab = nlp.vocab.BERTVocab.from_sentencepiece(tokenizer.vocab_file, padding_token='[PAD]')
tok = tokenizer.tokenize

# Setting parameters(KoBERT finetuning 베에스 라인) -> 
max_len = 64    #베이스라인 64
batch_size = 32 #베이스라인 64
warmup_ratio = 0.1
num_epochs = 5  # 에포크 횟수
max_grad_norm = 1
log_interval = 200
learning_rate =  5e-5

In [5]:
# 모델에 사용되는 데이터셋 클래스 정의
class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer,vocab, max_len,
                 pad, pair):
   
        transform = nlp.data.BERTSentenceTransform(
            bert_tokenizer, max_seq_length=max_len,vocab=vocab, pad=pad, pair=pair)
        
        self.sentences = [transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))
         
    def __len__(self):
        return (len(self.labels))

In [6]:
# 감성 분류 모델 정의
class BERTClassifier(nn.Module):
    def __init__(self,
                 bert,
                 hidden_size = 768,
                 num_classes=7,     
                 dr_rate=None,
                 params=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
                 
        self.classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)
    
    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        
        _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device))
        if self.dr_rate:
            out = self.dropout(pooler)
        return self.classifier(out)

In [7]:
# 모델 학습에 사용할 데이터셋을 [data, label] 배열로 피팅
train_set_data = [[i, str(j)] for i, j in zip(train_set['sentence'], train_set['emotion'])]

# sklearn 의 train_test_split 모듈-> 4:1로 학습&검증 데이터를 분류 
train_set_data, test_set_data = train_test_split(train_set_data, test_size = 0.2, random_state=4)

# 데이터셋을 Bert모델에 입력할 수 있게 변환
train_set_data = BERTDataset(train_set_data, 0, 1, tok, vocab, max_len, True, False)
test_set_data = BERTDataset(test_set_data, 0, 1, tok, vocab, max_len, True, False)

# 배치데이터셋 생성
train_dataloader = torch.utils.data.DataLoader(train_set_data, batch_size=batch_size, num_workers=0)    # num_workers: 데이터 로딩할때 쓰는 프로세스 수(로딩속도)
test_dataloader = torch.utils.data.DataLoader(test_set_data, batch_size=batch_size, num_workers=0)

ValueError: invalid literal for int() with base 10: '3.0'

In [None]:
# 모델 선언
model = BERTClassifier(bertmodel, dr_rate=0.5).to(device)
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
    {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]
optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate)
loss_fn = nn.CrossEntropyLoss()
t_total = len(train_dataloader) * num_epochs
warmup_step = int(t_total * warmup_ratio)
scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_step, num_training_steps=t_total)

In [None]:
# 정확도 계산
def calc_accuracy(X,Y):
    max_vals, max_indices = torch.max(X, 1)
    train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
    return train_acc

# 예측 반환
def predict(sentence):
    dataset = [[sentence, '0']]
    test = BERTDataset(dataset, 0, 1, tok, vocab, max_len, True, False)
    test_dataloader = torch.utils.data.DataLoader(test, batch_size=batch_size, num_workers=0)  #로컬에서는 디폴트(0)으로 수정
    model.eval()
    answer = 0
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.long().to(device)
        out = model(token_ids, valid_length, segment_ids)
        for logits in out:
            logits = logits.detach().cpu().numpy()
            answer = np.argmax(logits)
    return answer

In [None]:
for e in range(num_epochs):
    train_acc = 0.0
    test_acc = 0.0
    model.train()
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm(train_dataloader)):
        optimizer.zero_grad()
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.long().to(device)
        out = model(token_ids, valid_length, segment_ids)
        loss = loss_fn(out, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        optimizer.step()
        scheduler.step()  # Update learning rate schedule
        train_acc += calc_accuracy(out, label)
        if batch_id % log_interval == 0:
            print("epoch {} batch id {} loss {} train acc {}".format(e+1, batch_id+1, loss.data.cpu().numpy(), train_acc / (batch_id+1)))
    print("epoch {} train acc {}".format(e+1, train_acc / (batch_id+1)))
    model.eval()
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm(test_dataloader)):   # 아까 만든 테스트 배치 데이터 - 정확도 측정

        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.long().to(device)
        out = model(token_ids, valid_length, segment_ids)
        test_acc += calc_accuracy(out, label)
    print("epoch {} test acc {}".format(e+1, test_acc / (batch_id+1)))