In [2]:
#!pip install konlpy
#!pip install git+https://github.com/haven-jeon/PyKoSpacing.git

In [1]:
# 라이브러리 불러오기 및 함수화
import os
import re
import shutil
import itertools
import numpy as np
import pandas as pd
from tqdm import tqdm
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.utils import resample
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

import torch
from konlpy.tag import Komoran
from pykospacing import Spacing
import torch.nn.functional as F
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report, confusion_matrix
from transformers import BertTokenizer, BertModel, ProgressCallback, Trainer, BertForSequenceClassification, TrainingArguments

komoran = Komoran()
spacing = Spacing()
label_encoder = LabelEncoder()

# tqdm과 pandas 통합
tqdm.pandas()

  from .autonotebook import tqdm as notebook_tqdm
  torch.utils._pytree._register_pytree_node(
  torch.utils._pytree._register_pytree_node(


In [3]:
# 파일 업로드
# uploaded = files.upload()

df = pd.read_excel('../../../data/bitcoin_news1.xlsx')

# 기본 불용어 불러오기
# korean_stopwords_path = 'stopwords-ko.txt'
# with open(korean_stopwords_path, encoding='utf8') as f:
#     stopwords = f.readlines()
# stopwords = [x.strip() for x in stopwords]

In [9]:
# # 텍스트 전처리 함수
# def preprocessing(text):
#     text = spacing(text)
#     text = text.lower()
#     text = re.sub(r'[^\w\s]', '', text)
#     return text

# # komoran토큰화 &불용어 처리 함수
# def remove_stopwords(text, stopwords):
#     tokens = []
#     morphs = komoran.morphs(text)
#     for token in morphs:
#         if token not in stopwords:
#             tokens.append(token)
#     return tokens

# # 텍스트 전처리 및 토큰화, 불용어 처리
# cleaned_data = []
# for i in tqdm(range(len(df))):
#     feature_text = df.loc[i, 'summary_content']
#     processed_text = preprocessing(feature_text)
#     cleaned_text = remove_stopwords(processed_text, stopwords)
#     cleaned_data.append(cleaned_text)
# df['cleaned'] = cleaned_data

In [4]:
# 클래스별로 데이터 분리
df_positive = df[df['Outcome'] == '악재'] # 0
df_negative = df[df['Outcome'] == '호재'] # 1

# 최소 클래스의 샘플 수 확인
min_class_count = min(len(df_positive), len(df_negative))

# 다운샘플링 적용
df_positive_downsampled = resample(df_positive,
                                   replace=True,                                # 샘플을 복원하지 않고
                                   n_samples=int(min_class_count*2.30),         # 최소 클래스의 개수로 맞추기
                                   random_state=42)                             # 재현성을 위해 random_state 사용

df_negative_downsampled = resample(df_negative,
                                   replace=True,
                                   n_samples=int(min_class_count*2.42),
                                   random_state=42)

df_balanced = pd.concat([df_positive_downsampled, df_negative_downsampled])     # 다운샘플링된 데이터 결합
df = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)         # 데이터 셔플
print(df['Outcome'].value_counts())                                    # 결과 확인

호재    6100
악재    5798
Name: Outcome, dtype: int64


In [5]:
# num_labels는 분류할 클래스의 수를 지정합니다.
tokenizer = BertTokenizer.from_pretrained('monologg/kobert')
model = BertForSequenceClassification.from_pretrained('monologg/kobert', num_labels=2)

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to see activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'KoBertTokenizer'. 
The class this function is called from is 'BertTokenizer'.
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at monologg/kobert and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [6]:
# Dataset 클래스를 정의하여 데이터를 모델에 맞게 전처리합니다.
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts                                          # 전처리된 텍스트 데이터 리스트
        self.labels = labels                                        # 라벨 데이터 리스트
        self.tokenizer = tokenizer                                  # KoBERT 토큰나이저
        self.max_len = max_len                                      # 최대 토큰 길이 (128로 설정)

    def __len__(self):
        return len(self.texts)                                      # 데이터셋의 크기를 반환

    def __getitem__(self, idx):
        text = self.texts[idx]                                      # 주어진 인덱스에 해당하는 텍스트 가져오기
        label = self.labels[idx]                                    # 주어진 인덱스에 해당하는 라벨 가져오기

        # 텍스트를 KoBERT 토크나이저로 인토딩하여 입력 데이터로 변환
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,                                # 특별 토큰([CLS], [SEP]) 추가
            max_length=self.max_len,                                # 최대 토큰 길이만큼 패딩 또는 자르기
            return_token_type_ids=False,                            # token_type_ids 반환하지 않음
            padding='max_length',                                   # max_length만큼 패딩 적용
            truncation=True,                                        # max_length를 초과하는 부분을 잘라냄
            return_attention_mask=True,                             # 어텐션 마스크 반환 (패딩된 부분은 0, 나머지는 1)
            return_tensors='pt',                                    # PyTorch 텐서로 변환
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),           # 인코딩된 입력 ID 텐서
            'attention_mask': encoding['attention_mask'].flatten(), # 어텐션 마스크 텐서
            'labels': torch.tensor(label, dtype=torch.long)         # 라벨 텐서 (정수형)
        }

# 평가지표를 계산하는 함수 정의
def compute_metrics(eval_pred):                                      # 모델의 예측값과 실제 라벨을 받아 다양한 평가지표를 계산
    logits, labels = eval_pred                                       # eval_pred는 (logits, labels)의 튜플, 이를 분리

    if isinstance(logits, np.ndarray):                               # logits이 numpy.ndarray 타입인 경우
        logits = torch.tensor(logits)                                # PyTorch 텐서로 변환하여 일관성 유지

    predictions = torch.argmax(logits, dim=1)                        # logits에서 argmax를 사용하여 예측 클래스 인덱스를 얻음

    acc = accuracy_score(labels, predictions.numpy())                # 실제 라벨과 예측값을 비교하여 정확도(accuracy) 계산

    precision, recall, f1, _ = precision_recall_fscore_support(      # weighted 평균을 사용하여 precision, recall, f1-score 계산
        labels, predictions.numpy(), average='weighted'
    )

    return {                                                         # 계산된 정확도, F1-score, precision, recall을 딕셔너리로 반환
        'accuracy': acc,                                             # 모델의 정확도
        'f1': f1,                                                    # F1-score
        'precision': precision,                                      # 정밀도
        'recall': recall                                             # 재현율
    }

# 가중치를 반영한 손실 계산 함수 정의
def compute_loss(model, inputs, return_outputs=False):               # 손실을 계산하고 선택적으로 출력도 반환
    labels = inputs.get("labels")                                    # 입력 데이터에서 실제 라벨을 가져옴
    outputs = model(**inputs)                                        # 모델에 입력 데이터를 전달하여 예측값을 계산
    logits = outputs.get("logits")                                   # 모델의 예측 결과에서 logits(출력값)을 추출

    loss = F.cross_entropy(logits, labels, weight=class_weights)     # 크로스 엔트로피 손실 계산, 클래스별 가중치 적용

    return (loss, outputs) if return_outputs else loss               # return_outputs에 따라 손실과 출력 반환 여부 결정

# 커스터마이징된 Trainer 클래스 정의
class CustomTrainer(Trainer):
    """
    Trainer 클래스를 상속받아 손실 함수 계산을 커스터마이즈한 클래스입니다.
    주로 클래스 가중치를 적용하여 불균형한 데이터셋에서 모델 성능을 향상시키는 데 사용됩니다.
    """

    def compute_loss(self, model, inputs, return_outputs=False):
        """
        모델의 손실을 계산하는 메서드를 오버라이드합니다.

        이 메서드는 모델의 예측 결과와 실제 라벨을 비교하여 손실을 계산합니다.
        클래스별 가중치를 적용하여 손실을 계산하며, 필요에 따라 모델 출력을 함께 반환할 수 있습니다.

        Args:
            model (PreTrainedModel): 훈련할 모델 객체
            inputs (dict): 모델에 입력으로 전달될 데이터. 'labels'와 같은 필드를 포함해야 함
            return_outputs (bool, optional): 손실과 함께 모델 출력을 반환할지 여부를 결정하는 플래그. 기본값은 False

        Returns:
            loss (torch.Tensor): 계산된 손실 값
            outputs (ModelOutput, optional): 모델의 출력값 (return_outputs=True인 경우)
        """

        # 입력 데이터에서 실제 라벨을 추출
        labels = inputs.get("labels")

        # 모델에 입력 데이터를 전달하여 예측값을 계산
        outputs = model(**inputs)

        # 모델의 예측 결과에서 logits(출력값)을 추출
        logits = outputs.get("logits")

        # logits이 위치한 디바이스 (GPU 또는 CPU)에서 클래스 가중치를 이동
        device = logits.device
        weight = class_weights.to(device)

        # 크로스 엔트로피 손실을 계산하고 클래스별 가중치를 적용
        loss = F.cross_entropy(logits, labels, weight=weight)

        # return_outputs가 True일 경우 손실과 모델 출력을 함께 반환
        return (loss, outputs) if return_outputs else loss

In [7]:
# 전처리된 텍스트와 라벨로 학습 및 평가 데이터셋 인스턴스 생성
train_df, eval_df = train_test_split(df, test_size=0.2, random_state=42)

train_texts   = train_df['cleaned'].tolist()
train_labels  = label_encoder.fit_transform(train_df['Outcome'])

eval_texts    = eval_df['cleaned'].tolist()
eval_labels   = label_encoder.transform(eval_df['Outcome'])

train_dataset = TextDataset(train_texts, train_labels, tokenizer)
eval_dataset  = TextDataset(eval_texts, eval_labels, tokenizer)

# 클래스 가중치 계산
class_weights = compute_class_weight(class_weight='balanced', classes=[0, 1], y=train_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float)

In [8]:
# 학습 파라미터 설정
training_args = TrainingArguments(
    output_dir                 = '.',                                # 학습 결과가 저장될 디렉토리
    num_train_epochs           =3,                                   # 학습을 반복할 에폭 수
    per_device_train_batch_size=16,                                  # 학습 시 배치 크기
    per_device_eval_batch_size =16,                                  # 평가 시 배치 크기
    warmup_steps               =500,                                 # 학습 초기 단계에서 학습률을 서서히 증가하는 단계 수
    weight_decay               =0.01,                                # 가중치 감쇠 (L2 정규화) 비율
    logging_dir                ='.',                                 # 학습 로그가 저장될 디렉토리
    logging_steps              =10,                                  # 몇 스텝마다 로그를 남길지 설정
    evaluation_strategy        ='steps',                             # 평가 전략 (학습 중 주기적으로 평가)
    save_total_limit           =2                                    # 저장할 체크포인트 파일의 개수를 제한
)

In [9]:
# Trainer 클래스 설정
# Trainer는 학습을 쉽게 관리할 수 있게 해주는 Hugging Face의 유틸리티 클래스
trainer = Trainer(
    model          = model,
    args           = training_args,
    train_dataset  = train_dataset,
    eval_dataset   = eval_dataset,
    compute_metrics= compute_metrics,
    # callbacks    = [ProgressCallback]
)

#trainer.train()

dataloader_config = DataLoaderConfiguration(dispatch_batches=None)


In [12]:
# 하이퍼 파라미터 그리드 정의
learning_rates = [1e-5, 2e-5, 3e-5, 4e-5, 5e-5]
batch_sizes    = [4, 8, 16]
epochs         = list(range(2, 20, 5))

train_losses = []
eval_losses = []
best_accuracy = 0.0
best_model_dir = None
param_grid = list(itertools.product(learning_rates, batch_sizes, epochs))
print(len(epochs))

4


In [13]:
# 손실 값을 저장할 리스트 초기화
train_losses = []
eval_losses = []
best_accuracy = 0.0
best_model_dir = None

def check_overfitting_or_underfitting(train_losses, eval_losses):
    if len(train_losses) > 1 and len(eval_losses) > 1:
        # 가장 최근의 손실 값
        last_train_loss = train_losses[-1]
        last_eval_loss = eval_losses[-1]

        # 초기 손실 값
        initial_train_loss = train_losses[0]
        initial_eval_loss = eval_losses[0]

        # 손실 감소 추세 계산
        train_loss_decrease = initial_train_loss - last_train_loss
        eval_loss_increase = last_eval_loss - initial_eval_loss

        # 과적합 및 과소적합 조건
        if last_train_loss < last_eval_loss and eval_loss_increase > 0:
            return "overfitting"
        elif last_train_loss >= last_eval_loss and train_loss_decrease < 0.01:
            return "underfitting"
        else:
            return "ok"
    else:
        return "not_enough_data"

In [None]:
# 학습 과정에서 손실 값을 추적
for lr, batch_size, epoch in tqdm(param_grid, desc="Find Best Param"):
    print(f"Testing: Learning Rate={lr}, Batch_Size={batch_size}, Epochs={epoch}")

    training_args = TrainingArguments(
        output_dir                 ='./results',
        num_train_epochs           = epoch,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size =batch_size,
        warmup_steps               =500,
        weight_decay               =0.01,
        logging_dir                ='.',
        logging_steps              =10,
        evaluation_strategy        ='epoch',
        learning_rate              =lr
    )

    # Trainer 설정
    trainer = Trainer(
        model                      =model,
        args                       =training_args,
        train_dataset              =train_dataset,
        eval_dataset               =eval_dataset,
        compute_metrics            =compute_metrics
    )

    # 모델 학습
    train_result = trainer.train()
    train_losses.append(train_result.training_loss)

    # 모델 평가
    eval_results = trainer.evaluate()
    eval_losses.append(eval_results['eval_loss'])

    # 과적합 및 과소적합 여부 확인
    fitting_status = check_overfitting_or_underfitting(train_losses, eval_losses)

    if fitting_status == "ok":
        if eval_results['eval_accuracy'] > best_accuracy:
            best_accuracy = eval_results['eval_accuracy']
            best_params = (lr, batch_size, epoch)

            # 최적의 모델 저장
            best_model_dir = './best_model'
            trainer.save_model(best_model_dir)

    print(f"Accuracy: {eval_results['eval_accuracy']} - Status: {fitting_status}")

# 최적의 파라미터 정보 출력
if best_model_dir:
    print(f"Best Params: Learning Rate={best_params[0]}, Batch_size={best_params[1]}, Epochs={best_params[2]}")
    print(f"Best Model saved in directory: {best_model_dir}")
else:
    print("No suitable model found that is not overfitted or underfitted.")



Testing: Learning Rate=1e-05, Batch_Size=8, Epochs=2


Epoch,Training Loss,Validation Loss,Accuracy,F1,Precision,Recall
1,0.5282,0.53397,0.665126,0.65759,0.679981,0.665126
2,0.4777,0.523617,0.697899,0.666948,0.811325,0.697899


Find Best Param:  20%|██        | 1/5 [05:48<23:14, 348.72s/it]

Accuracy: 0.6978991596638655 - Status: not_enough_data
Testing: Learning Rate=2e-05, Batch_Size=8, Epochs=2




Epoch,Training Loss,Validation Loss,Accuracy,F1,Precision,Recall
1,0.5269,0.533217,0.677311,0.663257,0.711147,0.677311
2,0.4823,0.523375,0.689496,0.66877,0.75036,0.689496


In [None]:
# 최종 모델 평가
final_eval_results = trainer.evaluate()

# 평가 데이터셋에 대한 예측 수행
eval_predictions = trainer.predict(eval_dataset)
y_pred = np.argmax(eval_predictions.predictions, axis=1)
y_true = eval_labels

# classification_report를 계산하고 딕셔너리로 변환
report_dict = classification_report(y_true, y_pred, output_dict=True)

# 딕셔너리를 pandas 데이터프레임으로 변환
report_df = pd.DataFrame(report_dict).transpose()

# 데이터프레임을 주피터 노트북에서 보기 좋게 출력
styled_report = report_df.style.format("{:.2f}").background_gradient(cmap='Blues')

# 최종 평가 결과와 스타일링된 classification_report 출력
print(f"Final Accuracy: {final_eval_results['eval_accuracy']}")
styled_report

In [None]:
# 예측값 계산
predictions, labels, _ = trainer.predict(eval_dataset)
predictions = torch.argmax(torch.tensor(predictions), dim=1)

# 혼동행렬 계산 및 시각화
cm = confusion_matrix(labels, predictions)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

In [None]:
from sklearn.preprocessing import LabelEncoder

# 라벨 인코더 생성 및 라벨 인코딩
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(df['Outcome'])

# 각 라벨이 어떻게 인코딩되었는지 확인
label_mapping = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
print("Label Mapping:", label_mapping)

In [None]:
# 라벨 인덱스와 실제 라벨명을 매핑한 딕셔너리
label_map = {0: '악재', 1: '호재'}

# 텍스트를 입력 받아 예측하는 함수 정의
def predict_text(text, model, tokenizer, max_len=128):
    encoding = tokenizer.encode_plus(
        text,
        add_special_tokens   =True,
        max_length           =max_len,
        return_token_type_ids=False,
        padding              ='max_length',
        truncation           =True,
        return_attention_mask=True,
        return_tensors       ='pt',
    )
    input_ids      = encoding['input_ids']
    attention_mask = encoding['attention_mask']

    with torch.no_grad():
        outputs    = model(input_ids, attention_mask=attention_mask)

    logits     = outputs.logits
    prediction = torch.argmax(logits, dim=-1)

    return prediction.item()

In [None]:
# 테스트할 텍스트 입력
text_to_predict = '''[서울와이어 천성윤 기자] 삼성전자 최대 노조인 전국삼성전자노동조합(삼성노조)이 29일 파업을 선언했다. 삼성전자에서 파업은 창사 이래 처음이다.

삼성노조는 이날 서울 서초구 삼성전자 서초사옥 앞에서 기자회견을 열고 “노동자들을 무시하는 사측의 태도에 파업을 선언한다”고 밝혔다.

삼성노조의 파업 선언은 전날 올해 임금협상을 위한 교섭에서 사측과 이견이 좁혀지지 않으며 파행한 지 하루만에 이뤄졌다. 전날 교섭에서 노사 양측은 사측 위원 2명의 교섭 참여를 놓고 갈등을 빚었다. 이 문제 때문에 정작 핵심인 임금협상 관련 중요 내용은 오가지도 못한 것으로 알려졌다.

삼성노조는 “사측이 교섭에 아무런 안건도 준비하지 않고 나왔다”며 파업 선언에 이르기까지의 책임을 사측에 돌렸다.

현재 삼성노조 조합원 수는 2만8000여명으로 삼성전자 전체 직원(약 12만5000명)의 22% 수준이다. 이들이 파업에 돌입함으로서 실적 개선을 이어가야 하는 삼성전자는 큰 타격을 입을 것으로 예상된다.

삼성전자는 지난해 반도체 업황 부진으로 디바이스솔루션(DS) 부문에서 14조8800억원의 적자를 기록했다. 올해 1분기는 매출 71조9200억원, 영업이익 6조6100억원으로 상승세에 올라탔다.

삼성노조는 즉각적인 총파업에 나서는 대신 연차 소진 등의 방식으로 단체행동을 이어갈 예정이다. 삼성노조 집행부는 조합원들에게 오는 6월 7일 하루 연차를 소진하라는 지침을 전달했다.

또 이날부터 서초사옥 앞에서 버스 숙박 농성을 진행한다. 삼성노조 측은 “아직은 소극적인 파업으로 볼 수 있지만, 단계를 밟아나가겠다”면서 “총파업까지 갈 수 있고, 파업이 실패할 수도 있지만 1호 파업 행동 자체가 의미 있다”고 밝혔다.

출처 : 서울와이어(http://www.seoulwire.com)'''

# 예측
predicted_label_index = predict_text(text_to_predict, model, tokenizer)

# 예측 결과 출력
predicted_label = label_map[predicted_label_index]
print(f"Predicted Label: {predicted_label}")