# 커뮤니티 포스트 감정분석을 위한 KoBERT 기반 Multi Classification 모델

## 환경 설정

In [1]:
PROJECT_NAME = "tbb-model-api"
MODEL_PATH = f'/Users/rock/Documents/tumblbug-codes/{PROJECT_NAME}/packages/model/sentiment_classification'
DATASET_PATH = f'/Users/rock/Documents/tumblbug-codes/{PROJECT_NAME}/packages/model/sentiment_classification/data'

In [None]:
!uv pip install -r requirements.txt

### 데이터 분포

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np

df = pd.read_csv(f'{DATASET_PATH}/ko_sentence_template_origin.csv')
# ==========================
# origin data 분포
# --------------------------
# template
# positive    0.414991
# neutral     0.386045
# negative    0.198964
# Name: proportion, dtype: float64
print('==========================')
print('origin data 분포')
print('--------------------------')
print(df['template'].value_counts(normalize=True))
# df['template'].hist(figsize=(5,5))

df['template'].hist(bins=20, alpha=0.5, edgecolor = 'black')

### 데이터셋 전처리

In [4]:
#train & test 데이터로 나누기
import pandas as pd
from sklearn.model_selection import train_test_split

df = pd.read_csv(f'{DATASET_PATH}/ko_sentence_template_origin.csv')
# df.head(100)
# df.shape

# 특수문자 제거
df['sentence'] = df['sentence'].str.replace(pat=r'[^\w\s]', repl=r'', regex=True)
df['sentence'] = df['sentence'].str.replace(',', ' ')

# 줄바꿈 제거
df['sentence'] = df['sentence'].str.replace('\s+', ' ')
df['sentence'] = df['sentence'].str.replace('\n', ' ')

# 공백 포함 행 제거
df = df.replace('', pd.NA)
df = df.dropna()

neutral_samples = df[df['template'] == 'neutral'].sample(n=100000, random_state=42)
positive_samples = df[df['template'] == 'positive'].sample(n=100000, random_state=42)
negative_samples = df[df['template'] == 'negative'].sample(n=100000, random_state=42)

merged_df = pd.concat([neutral_samples, positive_samples, negative_samples], ignore_index=True)
merged_df = merged_df.sample(frac=1).reset_index(drop=True)
merged_df.to_csv(f'{DATASET_PATH}/ko_sentence_template.csv', encoding='utf-8', index=False)

# template code를 int로 변환
merged_df.loc[(merged_df['template'] == "neutral"), 'template'] = 1   # neutral : 1
merged_df.loc[(merged_df['template'] == "positive"), 'template'] = 2     # positive : 2
merged_df.loc[(merged_df['template'] == "negative"), 'template'] = 0    # negative : 0

train_dataset, test_dataset = train_test_split(merged_df, test_size=0.25, random_state=11, stratify=merged_df['template'])

# 학습과 테스트를 위해 cleaned data를 저장
train_dataset.to_csv(f'{MODEL_PATH}/data/cleaned_train.csv', encoding='utf-8')
test_dataset.to_csv(f'{MODEL_PATH}/data/cleaned_test.csv', encoding='utf-8')

### 추출한 최종 데이터 분포 확인

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np

df = pd.read_csv(f'{DATASET_PATH}/ko_sentence_template.csv')
# ==========================
# 추출한 data 분포
# --------------------------
# template
# positive    0.333333
# neutral     0.333333
# negative    0.333333
# Name: proportion, dtype: float64
print('==========================')
print('추출한 data 분포')
print('--------------------------')
print(df['template'].value_counts(normalize=True))
# df['template'].hist(figsize=(5,5))

df['template'].hist(bins=20, alpha=0.5, edgecolor = 'black')

In [None]:
import math, torch, wandb, sys, os

py_file_location = f'{MODEL_PATH}'
sys.path.append(os.path.abspath(py_file_location))

from datetime import datetime, timezone, timedelta
import pandas as pd

from transformers import TrainingArguments, AutoModelForSequenceClassification, DebertaV2ForSequenceClassification

from modules.trainer import CustomTrainer
from modules.dataset import CustomDataset
from modules.optimizer import get_optimizer
from modules.metrics import compute_metrics
from modules.utils import load_yaml
from modules.split import split

### 메모리 초기화

In [7]:
import torch, gc
gc.collect()
torch.cuda.empty_cache()

## 학습

In [None]:
# Load config
config_path = os.path.join(MODEL_PATH, 'config', 'train_config.yaml')
config = load_yaml(config_path)

# Train Serial
kst = timezone(timedelta(hours=9))
train_serial = datetime.now(tz=kst).strftime("%Y%m%d_%H%M%S")

# Recorder directory
OUTPUT_DIR = os.path.join(MODEL_PATH, 'results')
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Data directory
DATA_DIR = os.path.join(MODEL_PATH, 'data', config.DIRECTORY.dataset)

#DEVICE
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available() : device = torch.device('cuda')
elif torch.backends.mps.is_available() : device = torch.device('mps')
else : device=torch.device('cpu')
print(f'Using {device}')


if __name__ == '__main__':

    wandb.login()

    run = wandb.init(project='post-sentiment-cls',
            config = {
                "learning_rate": config.TRAIN.learning_rate.pretrained,
                "architecture": config.MODEL.model_name,
                "dataset": DATA_DIR + '_train.csv',
                "epochs": config.TRAIN.num_of_epochs,
                "batch_size": config.TRAIN.batch_size,
                "num_of_classes": config.MODEL.num_of_classes
                }
            )

    output_dir      = OUTPUT_DIR
    pretrained_link = config.MODEL.pretrained_link[config.MODEL.model_name]
    num_of_classes  = config.MODEL.num_of_classes
    batch_size      = config.TRAIN.batch_size
    num_of_epochs   = config.TRAIN.num_of_epochs
    learning_rate   = config.TRAIN.learning_rate.pretrained
    max_grad_norm   = config.TRAIN.max_grad_norm
    warmup_ratio    = config.TRAIN.warmup_ratio
    run_name        = config.MODEL.model_name + train_serial
    max_seq_len     = config.TRAIN.max_seq_len

    data = pd.read_csv(DATA_DIR + '_train.csv', index_col = 0)

    train_df, valid_df = split(data)

    train_len = len(train_df)
    loader_len = math.ceil(train_len / batch_size)
    t_total = loader_len * num_of_epochs
    warmup_step = int(t_total * warmup_ratio)

    training_args = TrainingArguments(output_dir = output_dir,
                                      per_device_train_batch_size = batch_size,
                                      per_device_eval_batch_size = batch_size,
                                      num_train_epochs = num_of_epochs,
                                      learning_rate = learning_rate,
                                      max_grad_norm = max_grad_norm,
                                      lr_scheduler_type = 'cosine',
                                      warmup_ratio = warmup_ratio,
                                      evaluation_strategy = 'steps',
                                      report_to = 'wandb',
                                      run_name = run_name,
                                      dataloader_pin_memory = False,
                                      logging_steps = 20,
                                      load_best_model_at_end = True,
                                      save_steps = 500,
                                      no_cuda=True
                                      )

    if config.MODEL.model_name == 'DeBERTa':
        model = DebertaV2ForSequenceClassification.from_pretrained(pretrained_link, num_labels=num_of_classes).to(device)
    else:
        model = AutoModelForSequenceClassification.from_pretrained(pretrained_link, num_labels=num_of_classes).to(device)

    optimizer = get_optimizer('AdamW', model, learning_rate, warmup_step, t_total)

    train_dataset = CustomDataset(train_df, pretrained_link, max_seq_len)
    valid_dataset = CustomDataset(valid_df, pretrained_link, max_seq_len)

    trainer = CustomTrainer(model = model,
                            args = training_args,
                            optimizers = optimizer,
                            train_dataset = train_dataset,
                            eval_dataset = valid_dataset,
                            compute_metrics = compute_metrics
                            )

    # Train
    trainer.train()
    trainer.save_model(os.path.join(OUTPUT_DIR, f'{config.MODEL.model_name}_{config.TRAIN.max_seq_len}_{config.TRAIN.loss}_{config.DIRECTORY.dataset}'))
    run.finish()

## 테스트

In [None]:
import os, torch, sys, wandb
py_file_location = f'{MODEL_PATH}'
sys.path.append(os.path.abspath(py_file_location))

import pandas as pd
import numpy as np
from tqdm import tqdm

from transformers import TrainingArguments, AutoModelForSequenceClassification, AutoTokenizer, DebertaV2ForSequenceClassification

from modules.metrics import compute_metrics
from modules.trainer import CustomTrainer
from modules.utils import load_yaml
from modules.preprocess import preprocess_infer
from modules.dataset import CustomDataset

# Load config
config_path = os.path.join(MODEL_PATH, 'config', 'inference_config.yaml')
config = load_yaml(config_path)

# Recorder directory
CHECKPOINT_DIR  = os.path.join(MODEL_PATH, 'results', config.TEST.checkpoint_path)
OUTPUT_DIR = os.path.join(CHECKPOINT_DIR, 'test_results')
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Data directory
DATA_DIR = os.path.join(MODEL_PATH, 'data', config.TEST.directory.dataset)

#DEVICE
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available() : device = torch.device('cuda')
elif torch.backends.mps.is_available() : device = torch.device('mps')
else : device=torch.device('cpu')
print(f'Using {device}')

# wandb.init(project='text-binary-cls',
#            config = {
#               "architecture": config.MODEL.model_name,
#               "max_seq_len": config.MODEL.max_seq_len,
#               "num_of_classes": config.MODEL.num_of_classes
#             }
#            )

def test():

    output_dir      = OUTPUT_DIR
    pretrained_link = config.MODEL.pretrained_link[config.MODEL.model_name]
    num_of_classes  = config.MODEL.num_of_classes
    max_seq_len     = config.MODEL.max_seq_len
    checkpoint_path = CHECKPOINT_DIR

    test_args = TrainingArguments(output_dir = output_dir,
                                      per_device_eval_batch_size = config.TEST.batch_size,
                                      report_to = 'wandb',
                                      dataloader_pin_memory = False,
                                      do_eval = True,
                                      no_cuda=True
                                      )

    if config.MODEL.model_name == 'DeBERTa':
        model = DebertaV2ForSequenceClassification.from_pretrained(checkpoint_path, num_labels=num_of_classes).to(device)
    else:
        model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path, num_labels=num_of_classes).to(device)


    data = pd.read_csv(DATA_DIR + '_test.csv', index_col = 0)
    data = preprocess_infer(data)
    dataset = CustomDataset(data, pretrained_link, max_seq_len)

    # wandb.login()
    trainer = CustomTrainer(model = model,
                            args = test_args,
                            compute_metrics = compute_metrics,
                            eval_dataset = dataset
                            )
    trainer.evaluate()

def inference(data):

    output_dir      = OUTPUT_DIR
    pretrained_link = config.MODEL.pretrained_link[config.MODEL.model_name]
    num_of_classes  = config.MODEL.num_of_classes
    max_seq_len     = config.MODEL.max_seq_len
    checkpoint_path = CHECKPOINT_DIR

    print('=' * 50)
    print('Get Model & Tokenizer')
    print('=' * 50)

    test_args = TrainingArguments(output_dir=output_dir,
                                    dataloader_pin_memory = False,
                                    do_predict = True,
                                    no_cuda=True
                                    )

    if config.MODEL.model_name == 'DeBERTa':
        model = DebertaV2ForSequenceClassification.from_pretrained(checkpoint_path, num_labels=num_of_classes).to(device)
    else:
        model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path, num_labels=num_of_classes).to(device)

    trainer = CustomTrainer(model = model,
                            args = test_args,
                            compute_metrics = compute_metrics,
                            )

    tokenizer = AutoTokenizer.from_pretrained(pretrained_link)

    print('=' * 50)
    print('Tokenizing...')
    print('=' * 50)

    items = list()

    if type(data) == str:
        item = {key: torch.tensor(val).to(device) for key, val in tokenizer(data,
                                                                            truncation = True,
                                                                            padding = 'max_length',
                                                                            max_length = max_seq_len).items()}
        items.append(item)

    elif type(data) == pd.DataFrame:
        for name in tqdm(data['sentence_r']):
            item = {key: torch.tensor(val).to(device) for key, val in tokenizer(name,
                                                                                truncation = True,
                                                                                padding = 'max_length',
                                                                                max_length = max_seq_len).items()}
            items.append(item)

    print('=' * 50)
    print('Predicting...')
    print('=' * 50)

    test_results = trainer.predict(items)
    label_ids = np.argmax(test_results[0], axis = 1)

    if type(data) == str:
        return config.LABELING[label_ids[0]]

    elif type(data) == pd.DataFrame:
        data['template_r'] = label_ids
        # data['template_r'] = data['template_r'].replace(config.LABELING.keys(), config.LABELING.values())
        return data

if __name__ == '__main__':
    print('=' * 50)
    print('Preprocessing...')
    print('=' * 50)

# 실제 label과 추론 label 비교
    # A = inference(preprocess_infer(pd.read_csv(f'{MODEL_PATH}/data/cleaned_test.csv', index_col = 0)))
    # A.to_csv(f'{MODEL_PATH}/results/{config.MODEL.model_name}/inferenced.csv')
    # print(A)

# 문장 입력
    B = inference(preprocess_infer('텀블벅 프로젝트에 함께해 주셔서 고맙습니다. '))
    print(B)

# 테스트 데이터셋 활용
    # a = test()
    # print(a)
    # wandb.finish()
    # pass
