In [1]:
import torch
import torch.nn as nn
from transformers import ElectraTokenizer, ElectraForSequenceClassification
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from tqdm import tqdm 

In [2]:
# 예시 데이터로 대체 (이후 실제 데이터셋으로 변경해야 합니다)
dataset = pd.read_csv("../dataset/primary_classification_data_final.csv", encoding='UTF-8')
dataset

Unnamed: 0,sentence,sentiment
0,언니 동생으로 부르는게 맞는 일인가요..??,부정
1,그냥 내 느낌일뿐겠지?,부정
2,아직너무초기라서 그런거죠?,부정
3,유치원버스 사고 낫다던데,부정
4,근데 원래이런거맞나요,부정
...,...,...
669176,솔직히 예보 제대로 못하는 데 세금이라도 아끼게 그냥 폐지해라..,부정
669177,재미가 없으니 망하지,부정
669178,공장 도시락 비우생적임 아르바이트했는데 화장실가성 손도 않씯고 재료 담고 바닥 떨어...,부정
669179,코딱지 만한 나라에서 지들끼리 피터지게 싸우는 센징 클래스 ㅉㅉㅉ,부정


In [3]:
dataset['sentiment'].value_counts()

부정    367032
긍정    282958
중립     19191
Name: sentiment, dtype: int64

In [4]:
dataset.dropna(axis=0, inplace=True)
dataset.reset_index(drop=True, inplace=True)
dataset.drop_duplicates(inplace=True)
print(dataset['sentiment'].value_counts())
dataset

부정    356838
긍정    273860
중립     15122
Name: sentiment, dtype: int64


Unnamed: 0,sentence,sentiment
0,언니 동생으로 부르는게 맞는 일인가요..??,부정
1,그냥 내 느낌일뿐겠지?,부정
2,아직너무초기라서 그런거죠?,부정
3,유치원버스 사고 낫다던데,부정
4,근데 원래이런거맞나요,부정
...,...,...
669166,솔직히 예보 제대로 못하는 데 세금이라도 아끼게 그냥 폐지해라..,부정
669167,재미가 없으니 망하지,부정
669168,공장 도시락 비우생적임 아르바이트했는데 화장실가성 손도 않씯고 재료 담고 바닥 떨어...,부정
669169,코딱지 만한 나라에서 지들끼리 피터지게 싸우는 센징 클래스 ㅉㅉㅉ,부정


In [5]:
# 데이터 프레임에서 한국어 문장과 라벨을 가져옵니다.
sentences = dataset['sentence'].values
labels = dataset['sentiment'].values

In [6]:
# 라벨을 One-Hot 인코딩으로 변환합니다.
label_map = {'부정': 0, '중립': 1, '긍정': 2}
labels = np.array([label_map[label] for label in labels])

In [7]:
# koELECTRA 토크나이저 불러오기
tokenizer = ElectraTokenizer.from_pretrained("koelectra-base-v3-discriminator")

In [8]:
# 문장을 토큰화하고 시퀀스로 변환했을 때의 길이 출력
length = [len(tokenizer.encode(sentence)) for sentence in tqdm(sentences)]
len(length)

100%|██████████| 645820/645820 [02:47<00:00, 3854.28it/s]


645820

In [9]:
import plotly.graph_objects as go

dataset['length'] = length

# 히스토그램 생성
histogram = go.Histogram(x=dataset['length'], nbinsx=20)

# 최댓값과 최솟값 계산
max_value = dataset['length'].max()
min_value = dataset['length'].min()

# 최댓값과 최솟값을 주석으로 추가하여 표시
annotations = [
    {
        'x': max_value,
        'y': 0,
        'xref': 'x',
        'yref': 'y',
        'text': f'Max: {max_value}',
        'showarrow': True,
        'arrowhead': 4,
        'ax': 0,
        'ay': -40
    },
    {
        'x': min_value,
        'y': 0,
        'xref': 'x',
        'yref': 'y',
        'text': f'Min: {min_value}',
        'showarrow': True,
        'arrowhead': 4,
        'ax': 0,
        'ay': -40
    }
]

# 그래프 생성 및 레이아웃 설정
fig = go.Figure(histogram)
fig.update_layout(annotations=annotations)

# 그래프 출력
fig.show()

In [10]:
len([i for i in length if i <= 80])/len(length) # 패딩 최대길이를 산정하기 위함. 80으로 설정하면 약 99%커버가능

0.9933510885386021

In [11]:
# 문장을 토큰화하고 시퀀스로 변환합니다.
sequences = [tokenizer.encode(sentence, padding='max_length', max_length=80, truncation=True) for sentence in tqdm(sentences)]

100%|██████████| 645820/645820 [03:02<00:00, 3535.62it/s]


In [12]:
# 학습 데이터와 검증 데이터로 나눕니다.
train_sequences, val_sequences, train_labels, val_labels = train_test_split(
    sequences, labels, test_size=0.2, random_state=42
)

In [13]:
# 데이터셋 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, sentences, labels, tokenizer, max_length):
        self.sentences = sentences
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            sentence,
            add_special_tokens=True,
            max_length=self.max_length,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long),
        }

In [14]:
# 데이터셋과 데이터로더 생성
train_dataset = CustomDataset(train_sequences, train_labels, tokenizer, max_length=80)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

val_dataset = CustomDataset(val_sequences, val_labels, tokenizer, max_length=80)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)

In [15]:
# koELECTRA 모델 불러오기
model = ElectraForSequenceClassification.from_pretrained("koelectra-base-v3-discriminator", num_labels=3)

Some weights of the model checkpoint at koelectra-base-v3-discriminator were not used when initializing ElectraForSequenceClassification: ['discriminator_predictions.dense_prediction.weight', 'discriminator_predictions.dense.weight', 'discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense.bias']
- This IS expected if you are initializing ElectraForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ElectraForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of ElectraForSequenceClassification were not initialized from the model checkpoint at koelectra-base-v3-discriminator and are newly initialized: ['classifier.dense

In [None]:
''' 모델가중치를 불러오거나 체크포인트를 불러올 때 실행 '''

# # 모델의 상태 딕셔너리를 로드합니다.
# model_state_dict = torch.load("model_state_dict_epoch_2.pt")

# # 모델을 생성하고 상태를 로드합니다.
# model = ElectraForSequenceClassification.from_pretrained("koelectra-base-v3-discriminator", num_labels=3)
# model.load_state_dict(model_state_dict)

# # 옵티마이저의 상태 딕셔너리를 로드합니다.
# optimizer_state_dict = torch.load("optimizer_state_dict_epoch_2.pt")

# # 옵티마이저를 생성하고 상태를 로드합니다.
# optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
# optimizer.load_state_dict(optimizer_state_dict)

In [20]:
# 변경하고자 하는 Dropout 비율
new_dropout_rate = 0.2

# 모든 Dropout 레이어의 비율 변경
for name, module in model.named_modules():
    if isinstance(module, torch.nn.Dropout):
        module.p = new_dropout_rate


In [21]:
# 모델 구조 확인
print(model)

ElectraForSequenceClassification(
  (electra): ElectraModel(
    (embeddings): ElectraEmbeddings(
      (word_embeddings): Embedding(35000, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.2, inplace=False)
    )
    (encoder): ElectraEncoder(
      (layer): ModuleList(
        (0): ElectraLayer(
          (attention): ElectraAttention(
            (self): ElectraSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.2, inplace=False)
            )
            (output): ElectraSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm

In [22]:
# 학습 함수 정의 (tqdm을 사용하여 진행 상황 및 지표 시각화)
def train_fn(data_loader, model, optimizer, device):
    model.train()
    progress_bar = tqdm(data_loader, desc="Training")
    
    train_losses = []  # train_loss 기록을 위한 리스트
    train_accs = []  # train_accuracy 기록을 위한 리스트

    for batch in progress_bar:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits

        loss.backward()
        optimizer.step()
        
        # 정확도 계산
        predicted_labels = torch.argmax(logits, dim=1)
        accuracy = (predicted_labels == labels).float().mean().item()
        
        train_losses.append(loss.item())
        train_accs.append(accuracy)
        progress_bar.set_postfix({'Loss': loss.item(), 'Accuracy': accuracy})

    return train_losses, train_accs

In [23]:
# 평가 함수 정의
def eval_fn(data_loader, model, device):
    model.eval()
    correct_predictions = 0
    total_predictions = 0
    predicted_labels_list = []  # 예측한 라벨들을 저장하기 위한 리스트
    true_labels_list = []  # 실제 라벨들을 저장하기 위한 리스트

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits

            predicted_labels = torch.argmax(logits, dim=1)
            correct_predictions += (predicted_labels == labels).sum().item()
            total_predictions += labels.size(0)

            # 예측한 라벨과 실제 라벨을 리스트에 추가
            predicted_labels_list.extend(predicted_labels.tolist())
            true_labels_list.extend(labels.tolist())

    accuracy = correct_predictions / total_predictions

    # 예측한 라벨과 실제 라벨 출력
    predicted_labels_list = np.array(predicted_labels_list)
    true_labels_list = np.array(true_labels_list)
    print("Predicted Labels:", predicted_labels_list)
    print("True Labels:", true_labels_list)

    return accuracy

In [24]:
# 장치 설정 (GPU 사용을 위해)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# 옵티마이저와 손실 함수 설정
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
loss_fn = torch.nn.CrossEntropyLoss()

In [27]:
# 학습 및 Early Stopping 정의
num_epochs = 15

def train_with_early_stopping(train_dataloader, val_dataloader, model, optimizer, loss_fn, device, patience=3):
    model.to(device)
    best_val_accuracy = 0.0
    best_model_state_dict = None
    no_improvement = 0

    for epoch in range(num_epochs):
        train_losses, train_accs = train_fn(train_dataloader, model, optimizer, loss_fn, device)
        val_loss, val_accuracy = eval_fn(val_dataloader, model, loss_fn, device)

        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {np.mean(train_losses):.4f} - Train Accuracy: {np.mean(train_accs):.4f} - Validation Loss: {val_loss:.4f} - Validation Accuracy: {val_accuracy:.4f}")

        # Early Stopping 체크
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_model_state_dict = model.state_dict()
            no_improvement = 0
        else:
            no_improvement += 1

        if no_improvement >= patience:
            print(f"No improvement in validation accuracy for {patience} epochs. Early stopping...")
            break

    # 최적의 모델 state_dict 반환
    return best_model_state_dict

In [28]:
# 학습시작
train_losses_epoch = []  # epoch 별 train_loss 기록을 위한 리스트
train_accs_epoch = []  # epoch 별 train_accuracy 기록을 위한 리스트
for epoch in range(num_epochs):
    train_losses, train_accs = train_fn(train_dataloader, model, optimizer, device)
    train_losses_epoch.extend(train_losses)
    train_accs_epoch.extend(train_accs)
    # 일정한 간격으로 체크포인트 저장
    if (epoch+1) % 5 == 0 and epoch > 0:
        checkpoint = {
            'epoch': epoch+1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            # 필요한 경우 추가 정보 저장 (예: loss, accuracy 등)
        }
        torch.save(checkpoint, f'./checkpoint/checkpoint_epoch_{epoch+1}.pth')

Training: 100%|██████████| 8073/8073 [57:44<00:00,  2.33it/s, Loss=0.286, Accuracy=0.917] 
Training:   9%|▉         | 724/8073 [05:09<52:30,  2.33it/s, Loss=0.284, Accuracy=0.891] 

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# 시각화
fig = make_subplots(rows=1, cols=2, subplot_titles=("Train Loss", "Train Accuracy"))

# Loss 그래프
fig.add_trace(
    go.Scatter(x=list(range(len(train_losses_epoch))), y=train_losses_epoch, mode='lines', name='Train Loss'),
    row=1, col=1
)
fig.update_xaxes(title_text="Iterations", row=1, col=1)
fig.update_yaxes(title_text="Loss", row=1, col=1)

# Accuracy 그래프
fig.add_trace(
    go.Scatter(x=list(range(len(train_accs_epoch))), y=train_accs_epoch, mode='lines', name='Train Accuracy'),
    row=1, col=2
)
fig.update_xaxes(title_text="Iterations", row=1, col=2)
fig.update_yaxes(title_text="Accuracy", row=1, col=2)

fig.update_layout(title="Training Progress", showlegend=False)
fig.show()

In [None]:
# 모델의 상태 딕셔너리를 얻어옵니다.
model_state_dict = model.state_dict()

# 모델 상태 딕셔너리를 파일로 저장합니다.
torch.save(model_state_dict, "Ternary_model_state_dict_epoch_15.pt")

# 옵티마이저의 상태 딕셔너리를 얻어옵니다.
optimizer_state_dict = optimizer.state_dict()

# 옵티마이저 상태 딕셔너리를 파일로 저장합니다.
torch.save(optimizer_state_dict, "Ternary_optimizer_state_dict_epoch_15.pt")

In [None]:
# 검증 데이터로 평가 수행
val_accuracy = eval_fn(val_dataloader, model, device)
print("Validation Accuracy:", val_accuracy)

## 실제로 예측해보기

In [None]:
# 한국어 문장을 입력으로 받아서 예측 라벨을 출력하는 함수
def predict_label(sentence, model, tokenizer, device):
    model.eval()
    with torch.no_grad():
        inputs = tokenizer(sentence, return_tensors='pt', truncation=True, padding=True)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        outputs = model(**inputs)
        logits = outputs.logits
        predicted_label = torch.argmax(logits, dim=1).item()
        return predicted_label

In [None]:
df = pd.read_csv("../dataset/사랑이별_커뮤니티_챗봇데이터.csv", encoding='UTF-8')

In [None]:
# 한국어 문장 입력 받기
korean_sentences = df['sentence'].tolist()

# 예측 라벨 출력
emotion_labels = {0:'부정', 1:'중립', 2:'긍정'}
predicted_label = [emotion_labels[predict_label(korean_sentence, model, tokenizer, device)] for korean_sentence in tqdm(korean_sentences)]
df['label'] = predicted_label

In [None]:
df