In [None]:
import os
import random
import numpy as np
import pandas as pd
from tqdm import tqdm
import re

import torch
from torch.utils.data import Dataset, ConcatDataset

import evaluate
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from transformers import DataCollatorWithPadding
from transformers import TrainingArguments, Trainer

from sklearn.model_selection import train_test_split

from dataloader.datasets import MaskedDataset, MixupBERTDataset

In [None]:

SEED = 456
SEED = 12191885
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
DEVICE

BASE_DIR = os.getcwd()
DATA_DIR = os.path.join(BASE_DIR, 'data')
OUTPUT_DIR = os.path.join(BASE_DIR, 'output')

model_name = 'klue/bert-base'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=7).to(DEVICE)


In [None]:
data = pd.read_csv('/data/ephemeral/home/code/final_relabelBT.csv')

def clean_text(text):
    text = re.sub(r"[^a-zA-Z0-9가-힣\s]", "", text)
    text = text.lower()
    return text

data['cleaned_text'] = data['cleaned_text'].apply(lambda x: clean_text(x))
data['BT_text'] = data['BT_text'].apply(lambda x: clean_text(x))
data['text'] = data['text'].apply(lambda x: clean_text(x))
data['BT_text'] = data['BT_text'].apply(lambda x: clean_text(x))
data['BERT_aug'] = data['BERT_aug'].apply(lambda x: clean_text(x))

In [None]:
data = pd.DataFrame({
    'text': data['cleaned_text'].tolist() + data['text'].tolist() + data['BT_text'].tolist() + data['BERT_aug'].tolist(),
    'target': data['target'].tolist() * 4
})

In [None]:
data['target'] = data['target'].astype(int)

# 중복된 행을 제거한 데이터프레임 생성
data_unique = data.drop_duplicates(subset='text')

# 중복된 행의 개수 계산
num_duplicates = len(data) - len(data_unique)

print(f"중복된 행의 개수: {num_duplicates}")

data = data_unique

In [None]:
dataset_train, dataset_valid = train_test_split(data, test_size=0.3, random_state=SEED)

In [None]:
original_dataset = MaskedDataset(dataset_train, tokenizer)

# Mixup 학습 데이터셋 생성 (같은 target끼리만 Mixup)
#data_train = MixupBERTDataset(dataset_train, tokenizer, alpha=0.4)
mixup_dataset = MixupBERTDataset(dataset_train, tokenizer, alpha=0.5)

# 원본 데이터셋과 Mixup 데이터셋을 결합
data_train = ConcatDataset([original_dataset, mixup_dataset])

# 원본 검증 데이터셋 생성
data_valid = MaskedDataset(dataset_valid, tokenizer)

In [None]:
data_train = MaskedDataset(dataset_train, tokenizer)
data_valid = MaskedDataset(dataset_valid, tokenizer)

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
f1 = evaluate.load('f1')
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return f1.compute(predictions=predictions, references=labels, average='macro')


training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    overwrite_output_dir=True,
    do_train=True,
    do_eval=True,
    do_predict=True,
    logging_strategy='steps',
    eval_strategy='steps',
    save_strategy='steps',
    logging_steps=100,
    eval_steps=100,
    save_steps=100,
    save_total_limit=2,
    learning_rate= 2e-05,
    adam_beta1 = 0.9,
    adam_beta2 = 0.999,
    adam_epsilon=1e-08,
    weight_decay=0.01,
    lr_scheduler_type='linear',
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=2,
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1',
    greater_is_better=True,
    seed=SEED
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=data_train,
    eval_dataset=data_valid,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()

In [None]:
dataset_test = pd.read_csv(os.path.join(DATA_DIR, 'test.csv'))

In [None]:
model.eval()
preds = []

for idx, sample in tqdm(dataset_test.iterrows(), total=len(dataset_test), desc="Evaluating"):
    inputs = tokenizer(sample['text'], return_tensors="pt").to(DEVICE)
    with torch.no_grad():
        logits = model(**inputs).logits
        pred = torch.argmax(torch.nn.Softmax(dim=1)(logits), dim=1).cpu().numpy()
        preds.extend(pred)

In [None]:
dataset_test['target'] = preds
dataset_test.to_csv(os.path.join(BASE_DIR, 'luckybiki.csv'), index=False)