In [7]:
import pandas as pd

data = pd.read_csv("ai_hub_data.csv")
data

Unnamed: 0,Sentence,Emotion
0,아 진짜! 사무실에서 피지 말라니깐! 간접흡연이 얼마나 안좋은데!,분노
1,그럼 직접흡연하는 난 얼마나 안좋겠니? 안그래? 보면 꼭... 지 생각만 하고.,혐오
2,손님 왔어요.,중립
3,손님? 누구?,중립
4,몰라요. 팀장님 친구래요.,중립
...,...,...
94189,솔직히 예보 제대로 못하는 데 세금이라도 아끼게 그냥 폐지해라..,혐오
94190,재미가 없으니 망하지,혐오
94191,공장 도시락 비우생적임 아르바이트했는데 화장실가성 손도 않씯고 재료 담고 바닥 떨어...,혐오
94192,코딱지 만한 나라에서 지들끼리 피터지게 싸우는 센징 클래스 ㅉㅉㅉ,혐오


In [8]:
from transformers import AutoModel, AutoTokenizer

model = AutoModel.from_pretrained("monologg/kobert")
tokenizer = AutoTokenizer.from_pretrained("monologg/kobert", trust_remote_code=True)

In [9]:
import re

data['Sentence'] = data["Sentence"].apply(lambda x: re.sub("[^0-9a-zA-Z가-힣\s+]", "", x))

In [7]:
data[data["Emotion"] == 5].head(5)

Unnamed: 0,Sentence,Emotion


In [10]:
import pandas as pd

# label 수치화
def encoding(label):
    if label == "중립":
        return 0
    elif label == "놀람":
        return 1
    elif label == "분노":
        return 2
    elif label == "슬픔":
        return 3
    elif label == "행복":
        return 4
    elif label == "혐오":
        return 5
    elif label == "공포":
        return 6

data['Emotion'] = data['Emotion'].apply(lambda x: encoding(x))

## 데이터셋 분리

In [11]:
from sklearn.model_selection import train_test_split 

trainX, testX, trainY, testY = train_test_split(
    data["Sentence"],
    data["Emotion"],
    test_size=0.2,
    random_state=42
)

print(f"TrainX: {trainX.shape}, TrainY: {trainY.shape}")
print(f"TestX: {testX.shape}, TestY: {testY.shape}")

TrainX: (75355,), TrainY: (75355,)
TestX: (18839,), TestY: (18839,)


In [12]:
trainX, valX, trainY, valY = train_test_split(
    trainX, trainY, test_size=0.1, random_state=42
)

print(f"TrainX: {trainX.shape}, TrainY: {trainY.shape}")
print(f"ValX: {valX.shape}, ValY: {valY.shape}")
print(f"TestX: {testX.shape}, TestY: {testY.shape}")

TrainX: (67819,), TrainY: (67819,)
ValX: (7536,), ValY: (7536,)
TestX: (18839,), TestY: (18839,)


## 토크나이징 클래스화

In [13]:
import torch 
from torch.utils.data import Dataset, DataLoader 

class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts.tolist() 
        self.labels = labels.tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def tokenize(self, text):
        encoded_output = self.tokenizer(
            text, 
            max_length = self.max_length, # 모델이 처리할 수 있는 최대 토큰의 길이
            padding="max_length", # max_length 보다 짧을 경우 패딩 토큰을 추가하여 길이를 맞춘다
            truncation=True, # max_length 보다 길경우 자른다
            add_special_tokens=True, # bert 모델의 필요한 특수토큰을 자동으로 추가(?)
            return_token_type_ids=True, # 텍스트를 변환해서 반환할거냐 ?
            return_attention_mask=True, # 실제토큰 1 패딩토큰 0 반환
            return_tensors='pt'
        )
        return encoded_output
            
    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # 데이터 가져오기
        text = self.texts[idx]
        label = self.labels[idx]

        # 토크나이징
        encoding = self.tokenize(text)

        # 라벨 추가
        # encoding["label"] = torch.tensor(label, dtype=torch.long)

        return {
            "input_ids": encoding["input_ids"].flatten(),
            "attention_mask": encoding["attention_mask"].flatten(),
            "token_type_ids": encoding["token_type_ids"].flatten(),
            "label": torch.tensor(label, dtype=torch.long)
        }

In [14]:
max_length = 128
train_dataset = CustomDataset(trainX, trainY, tokenizer, max_length)
val_dataset = CustomDataset(valX, valY, tokenizer, max_length)
test_dataset = CustomDataset(testX, testY, tokenizer, max_length)

In [15]:
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size = batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=True)

In [16]:
import torch
import torch.nn as nn
from transformers import AutoModel

class CustomKoBERT(nn.Module): # pyTorch의 nn.Module을 상속하여 클래스 정의
    def __init__(self, num_labels):
        super(CustomKoBERT, self).__init__()
        self.bert = AutoModel.from_pretrained("monologg/kobert")
        self.classifier = nn.Linear(768, num_labels)  #분류 헤드 정의: BERT의 출력(768차원)을 입력으로, 분류할 클래스 개수만큼출력생성하는 nn.linear정의
        self.loss_fn = nn.CrossEntropyLoss()  # 손실 함수 정의

    def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
        # BERT 모델의 출력
        outputs = self.bert(input_ids=input_ids,
                            attention_mask=attention_mask,
                            token_type_ids=token_type_ids)
        pooled_output = outputs[1]  # KoBERT 의 두번째 값인 pooled_output을 가져옴 이것은? -> 문장의 전체의미를 요약한 벡터
        
        # 분류 헤드 통과
        logits = self.classifier(pooled_output)  # 전체의미를요약한 벡터를 분류기에 넣어서 분류결과를 생성
        
        # 손실 계산 (labels가 주어진 경우)
        loss = None
        if labels is not None:
            loss = self.loss_fn(logits, labels) # 예측값과 라벨과 손실 값을 계산

        return {"loss": loss, "logits": logits} # train 중에는 손실 값 사용 / 추론 시 불류 결과 사용


In [17]:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [16]:
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup
from torch.nn.utils import clip_grad_norm_

# CustomKoBERT 모델 생성
num_labels = 7
batch_size = 8
epochs = 50
model = CustomKoBERT(num_labels=num_labels).to(device)

# Optimizer 및 스케줄러 설정 (변경 없음)
optimizer = AdamW(model.parameters(), lr=0.001)
total_steps = len(train_loader) * epochs
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)
loss_history = {"train": [], "validation" : []}
patience = 7
patience_cnt = 0
best_loss_val = float('inf')

# 학습 루프 (변경 없음)
for epoch in range(epochs):
    model.train()
    loss_train = 0.0

    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        token_type_ids = batch['token_type_ids'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        
        # Forward pass (loss와 logits 반환)
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            labels=labels
        )
        
        loss = outputs["loss"]
        loss.backward()

        clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        loss_train += loss.item() * batch_size

    loss_history['train'].append(loss_train / len(train_dataset))

    #### Validation ####
    model.eval()
    loss_val = 0.0

    for batch in val_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        token_type_ids = batch['token_type_ids'].to(device)
        labels = batch['label'].to(device)

        with torch.no_grad():
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids,
                labels=labels
            )
            loss = outputs["loss"]

        loss_val += loss.item() * batch_size

    loss_history['validation'].append(loss_val / len(val_dataset))

    #### Early Stopping ####
    if loss_val < best_loss_val:
        best_loss_val = loss_val
        torch.save(model.state_dict(), "Bert_best_model.pth")
        patience_cnt += 1
        if patience_cnt == patience:
            print("Early stopping!")
            break

    print(f"epoch: {epoch}, Train_loss: {loss_train/len(train_dataset)}, Validation_loss: {loss_val / len(val_dataset)}")


epoch: 0, Train_loss: 1.5669022231257446, Validation_loss: 1.5391176285384314
epoch: 1, Train_loss: 1.5539898996332477, Validation_loss: 1.5386065797061677
epoch: 2, Train_loss: 1.55131397014025, Validation_loss: 1.5367664109496302
epoch: 3, Train_loss: 1.5507040051998127, Validation_loss: 1.552088091454435
epoch: 4, Train_loss: 1.550665170867015, Validation_loss: 1.5464408539409598
epoch: 5, Train_loss: 1.5502346398075668, Validation_loss: 1.5364124277594742
epoch: 6, Train_loss: 1.5498483987001432, Validation_loss: 1.537480770149555
epoch: 7, Train_loss: 1.549864774445041, Validation_loss: 1.5395106650461816
epoch: 8, Train_loss: 1.5499083122671118, Validation_loss: 1.5375366218530448
epoch: 9, Train_loss: 1.5493453188558002, Validation_loss: 1.535996670176269
epoch: 10, Train_loss: 1.5492845488254818, Validation_loss: 1.5363697475435374
epoch: 11, Train_loss: 1.549496886361733, Validation_loss: 1.538023228161907
epoch: 12, Train_loss: 1.5492230918357908, Validation_loss: 1.536547731

In [2]:
import matplotlib.pyplot as plt

plt.plot(loss_history['train'])
plt.plot(loss_history['validation'])
plt.axis('off')
plt.show()

NameError: name 'loss_history' is not defined

In [18]:
import torch

model = CustomKoBERT(num_labels=7)
model.load_state_dict(torch.load("./Bert_best_model.pth"))

  model.load_state_dict(torch.load("./Bert_best_model.pth"))


<All keys matched successfully>

In [29]:
#### Test ####

model.to(device)
model.eval()

loss_test = 0.0
correct = 0
with torch.no_grad():
    for batch in test_loader:
        # gpu 보내기
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        token_type_ids = batch['token_type_ids'].to(device)
        labels = batch['label'].to(device)
        # print(input_ids)
        # print(attention_mask)
        # print(token_type_ids)

        # 예측
        outputs = model(
        input_ids = input_ids,
        attention_mask = attention_mask,
        token_type_ids = token_type_ids,
        labels = labels 
        )
        
        loss = outputs["loss"]
        preds = outputs['logits'].argmax(dim=1)
        correct += (labels == preds).sum()

        # loss 저장
        loss_test += loss.item() * batch_size
print(f"Test Loss: {loss_test / len(test_dataset)}, Accuracy: {correct / len(test_dataset)}")

Test Loss: 1.5477738504709981, Accuracy: 0.5136684775352478


In [None]:
tokenizer = AutoTokenizer.from_pretrained("monologg/kobert", trust_remote_code=True)

max_length = 128
text = "나는 너무 행복해"

encoded_input = tokenizer(
    text, 
    max_length = max_length,
    padding="max_length",
    truncation=True,
    add_special_tokens=True,
    return_token_type_ids=True,
    return_attention_mask=True,
    return_tensors='pt'
)

# print(encoded_output)
model = model.to(device)
encoded_input = encoded_input.to(device)
outputs = model(**encoded_input)
print(outputs['logits'].argmax(dim=1).item())

0
