In [1]:
!pip install transformers



In [2]:
import pandas as pd
import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from transformers import BertTokenizer, BertForSequenceClassification, BertConfig
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from tqdm import tqdm

In [3]:
# 데이터 불러오기
df = pd.read_csv('complete.csv')
df['text'] = df['title'] + ' ' + df['content']
# NaN 값을 포함하는 행 삭제
df.dropna(subset=['text'], inplace=True)

In [4]:
def remap_label(label):
    if label == 1 or label == 2:
        return 1
    elif label == 3:
        return 2
    elif label == 4 or label == 5:
        return 3
    else:
        return label  # 0의 경우 그대로 반환

df['label2'] = df['label2'].apply(remap_label)


In [5]:
# 원본 데이터를 학습 데이터와 테스트 데이터로 나눔
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

In [6]:
# KoBERT 토크나이저 로드
tokenizer = BertTokenizer.from_pretrained('monologg/kobert')

In [7]:
def encode_data(texts, tokenizer, max_len=512):
    all_tokens = []
    all_masks = []

    for text in texts:
        encoded_dict = tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=max_len,
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        # 직접 패딩 수행
        padding_length = max_len - len(encoded_dict['input_ids'][0])
        all_tokens.append(torch.cat([encoded_dict['input_ids'], torch.zeros((1, padding_length), dtype=torch.long)], dim=1))
        all_masks.append(torch.cat([encoded_dict['attention_mask'], torch.zeros((1, padding_length), dtype=torch.long)], dim=1))

    all_tokens = torch.cat(all_tokens, dim=0)
    all_masks = torch.cat(all_masks, dim=0)

    return all_tokens, all_masks

In [8]:
# 학습 데이터 인코딩
train_input_ids, train_attention_masks = encode_data(train_df['text'], tokenizer)
train_labels2 = torch.tensor(train_df['label2'].values)

# 테스트 데이터 인코딩 및 성능 평가
test_input_ids, test_attention_masks = encode_data(test_df['text'], tokenizer)
test_labels2 = torch.tensor(test_df['label2'].values)

In [9]:
# 데이터셋 및 데이터 로더 생성
train_dataset = TensorDataset(train_input_ids, train_attention_masks, train_labels2)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)

test_dataset = TensorDataset(test_input_ids, test_attention_masks, test_labels2)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=False)

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# 모델 정의
class KoBERTForSingleLabelClassification(torch.nn.Module):
    def __init__(self, num_labels):
        super(KoBERTForSingleLabelClassification, self).__init__()
        self.config = BertConfig.from_pretrained('monologg/kobert', num_labels=num_labels, output_hidden_states=True)
        self.bert = BertForSequenceClassification.from_pretrained('monologg/kobert', config=self.config)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        return outputs.logits


model = KoBERTForSingleLabelClassification(df['label2'].nunique()).to(device)
optimizer = Adam(model.parameters(), lr=1e-5)
loss_fn = torch.nn.CrossEntropyLoss()

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at monologg/kobert and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [11]:
test_inputs, val_inputs, test_masks, val_masks, test_labels2, val_labels2 = train_test_split(
    test_input_ids, test_attention_masks, test_labels2, random_state=42, test_size=0.5
)

test_dataset = TensorDataset(test_inputs, test_masks, test_labels2)
val_dataset = TensorDataset(val_inputs, val_masks, val_labels2)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=False)
val_dataloader = DataLoader(val_dataset, batch_size=4, shuffle=True)

In [12]:
EPOCHS = 10
train_losses = []
best_accuracy = 0
best_model_path = "best_model_label2.pth"

for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    total_correct = 0
    total_samples = 0

    for batch in tqdm(train_dataloader, desc=f"Training Epoch {epoch+1}/{EPOCHS}"):
        optimizer.zero_grad()
        input_ids, attention_mask, label = [b.to(device) for b in batch]
        logits = model(input_ids, attention_mask=attention_mask)

        loss = loss_fn(logits, label)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_correct += (logits.argmax(dim=1) == label).sum().item()
        total_samples += label.size(0)

    avg_train_loss = total_loss / total_samples
    train_losses.append(avg_train_loss)

    # Evaluate on validation set
    model.eval()
    val_correct = 0
    val_samples = 0

    with torch.no_grad():
        for batch in val_dataloader:
            input_ids, attention_mask, label = [b.to(device) for b in batch]
            logits = model(input_ids, attention_mask=attention_mask)

            val_correct += (logits.argmax(dim=1) == label).sum().item()
            val_samples += label.size(0)

    val_accuracy = val_correct / val_samples

    # 최고 정확도를 갱신할 경우 모델 저장
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        torch.save(model.state_dict(), best_model_path)
        print(f"Model saved for epoch {epoch+1} with accuracy: {best_accuracy:.4f}")

Training Epoch 1/10: 100%|██████████| 500/500 [01:03<00:00,  7.84it/s]


Model saved for epoch 1 with accuracy: 0.3920


Training Epoch 2/10: 100%|██████████| 500/500 [01:03<00:00,  7.89it/s]


Model saved for epoch 2 with accuracy: 0.4000


Training Epoch 3/10: 100%|██████████| 500/500 [01:03<00:00,  7.88it/s]


Model saved for epoch 3 with accuracy: 0.4760


Training Epoch 4/10: 100%|██████████| 500/500 [01:03<00:00,  7.88it/s]


Model saved for epoch 4 with accuracy: 0.5080


Training Epoch 5/10: 100%|██████████| 500/500 [01:03<00:00,  7.89it/s]
Training Epoch 6/10: 100%|██████████| 500/500 [01:03<00:00,  7.88it/s]


Model saved for epoch 6 with accuracy: 0.5280


Training Epoch 7/10: 100%|██████████| 500/500 [01:03<00:00,  7.88it/s]


Model saved for epoch 7 with accuracy: 0.5840


Training Epoch 8/10: 100%|██████████| 500/500 [01:03<00:00,  7.88it/s]
Training Epoch 9/10: 100%|██████████| 500/500 [01:03<00:00,  7.88it/s]
Training Epoch 10/10: 100%|██████████| 500/500 [01:03<00:00,  7.87it/s]


In [13]:
# 학습이 끝난 후 가장 좋은 성능을 보인 모델을 불러옵니다.
model.load_state_dict(torch.load(best_model_path))
model.eval()

# 테스트 데이터셋에 대한 예측을 수행
true_labels2 = []
predicted_labels2 = []

with torch.no_grad():
    for batch in test_dataloader:
        input_ids, attention_mask, label2 = [b.to(device) for b in batch]
        logits1 = model(input_ids, attention_mask)

        true_labels2.extend(label2.tolist())
        predicted_labels2.extend(logits1.argmax(dim=1).tolist())


# f1-score를 포함한 성능 지표를 출력
accuracy2 = sum([1 if true == pred else 0 for true, pred in zip(true_labels2, predicted_labels2)]) / len(true_labels2)

f1_2 = f1_score(true_labels2, predicted_labels2, average='weighted')


print(f"Test Accuracy for Label1: {accuracy2:.4f}")
print(f"F1 Score for Label1: {f1_2:.4f}")

Test Accuracy for Label1: 0.5720
F1 Score for Label1: 0.5610


In [14]:
print(len(train_labels2))
print(len(val_labels2))
print(len(test_labels2))

2000
250
250


In [15]:
print(len(train_df))
print(len(train_dataset))


2000
2000
