In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !pip install torch # torch
# !pip install peft # necessary for finetuning of the large model via LoRA approach
# !pip install -i https://pypi.org/simple/ bitsandbytes  # necessary for quantiziation
# !pip install evaluate # extension of the transformers library
# !pip install datasets # extension of the transformers library
# !pip install accelerate

In [None]:
import json
import os

In [None]:
import torch
import argparse
import torch.nn as nn
from tqdm import trange, tqdm
# from peft import LoraConfig, PeftModel, prepare_model_for_kbit_training, get_peft_model
from transformers import (
    AutoModel,
    AutoConfig,
    XLMRobertaModel,
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
    DataCollatorWithPadding)
from torch.utils.data import DataLoader, TensorDataset
from transformers import get_linear_schedule_with_warmup
from transformers import AdamW
from sklearn.metrics import f1_score

In [None]:
PADDING_TOKEN = 1
S_OPEN_TOKEN = 0
S_CLOSE_TOKEN = 2

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('device: ', device)

device:  cuda


In [None]:
labels = ['POSITIVE', 'NEGATIVE']
id2label = {idx: label for idx, label in enumerate(labels)}
label2id = {label: idx for idx, label in enumerate(labels)}

In [None]:
def jsonload(fname, encoding="utf-8"):
    with open(fname, encoding=encoding) as f:
        j = json.load(f)
    return j

In [None]:
# json 개체를 파일이름으로 깔끔하게 저장
def jsondump(j, fname):
    with open(fname, "w", encoding="UTF8") as f:
        json.dump(j, f, ensure_ascii=False)

In [None]:
# jsonl 파일 읽어서 list에 저장
def jsonlload(fname, encoding="utf-8"):
    json_list = []
    with open(fname, encoding=encoding) as f:
        for line in f.readlines():
            json_list.append(json.loads(line))
    return json_list

In [None]:
# jsonlist를 jsonl 형태로 저장
def jsonldump(j_list, fname):
    f = open(fname, "w", encoding='utf-8')
    for json_data in j_list:
        f.write(json.dumps(json_data, ensure_ascii=False) + '\n')

In [None]:
def parse_args():
    parser = argparse.ArgumentParser(description="unethical expression classifier using pretrained model")
    parser.add_argument(
        "--train_data", type=str, default="/content/drive/MyDrive/NLP/data/nikluge-iau-2023-train.jsonl",
        help="train file"
    )
    parser.add_argument(
        "--test_data", type=str, default="/content/drive/MyDrive/NLP/data/nikluge-iau-2023-test.jsonl",
        help="test file"
    )
    parser.add_argument(
        "--pred_data", type=str, default= "/content/drive/MyDrive/NLP/output/result_kcelec1.jsonl", # 변경 필요
        help="pred file"
    )
    parser.add_argument(
        "--dev_data", type=str, default="/content/drive/MyDrive/NLP/data/nikluge-iau-2023-dev.jsonl",
        help="dev file"
    )
    parser.add_argument(
        "--batch_size", type=int, default=8
    )
    parser.add_argument(
        "--learning_rate", type=float, default=3e-5
    )
    parser.add_argument(
        "--eps", type=float, default=1e-8
    )
    parser.add_argument(
        "--do_train", action="store_true"
    )
    parser.add_argument(
        "--do_eval", action="store_true"
    )
    parser.add_argument(
        "--do_test", action="store_true"
    )
    parser.add_argument(
        "--num_train_epochs", type=int, default=8
    )
    parser.add_argument(
        "--base_model", type=str, default= "beomi/KcELECTRA-base" ## model 변경 필요
    )
    parser.add_argument(
        "--access_token", type=str, default="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" # special token
    )
    parser.add_argument(
        "--model_path", type=str, default="/content/drive/MyDrive/NLP/save_models/kcelec/saved_modelv2_epoch_8.pt" # demo 및 test시 변경
    )
    parser.add_argument(
        "--output_dir", type=str, default="/content/drive/MyDrive/NLP/output/"
    )
    parser.add_argument(
        "--do_demo", action="store_true"
    )
    parser.add_argument(
        "--max_len", type=int, default= 256
    )
    parser.add_argument(
        "--classifier_hidden_size", type=int, default=768
    )
    parser.add_argument(
        "--classifier_dropout_prob", type=int, default=0.1, help="dropout in classifier"
    )
    args, unknowns = parser.parse_known_args()
    return args

In [None]:
args = parse_args()

### 1번. 데이터 전처리 수행하기

In [None]:
example = jsonlload(args.train_data)
print(example[:10])

[{'id': 'nikluge-2023-iau-train-000001', 'input': '존나웃기다ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ 마술사가 꿈이싣겨죠?', 'output': 'POSITIVE'}, {'id': 'nikluge-2023-iau-train-000002', 'input': '마간호사 존나멋있고 존나웃겨', 'output': 'POSITIVE'}, {'id': 'nikluge-2023-iau-train-000003', 'input': '가던말던니좆대로해~~', 'output': 'NEGATIVE'}, {'id': 'nikluge-2023-iau-train-000004', 'input': '진짜 존나 무기력하다 큰일남', 'output': 'NEGATIVE'}, {'id': 'nikluge-2023-iau-train-000005', 'input': '미친 &name&', 'output': 'NEGATIVE'}, {'id': 'nikluge-2023-iau-train-000006', 'input': 'b조식은 좃같앗는뎅 ㅎㅋㅋㄱㅎㅋ', 'output': 'NEGATIVE'}, {'id': 'nikluge-2023-iau-train-000007', 'input': '개 휘둘린다 ..', 'output': 'NEGATIVE'}, {'id': 'nikluge-2023-iau-train-000008', 'input': '아 시퐈 ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ', 'output': 'NEGATIVE'}, {'id': 'nikluge-2023-iau-train-000009', 'input': '#&company& 은 뭐가 그리 무서워서 노조 하나 못 만들게 하는지', 'output': 'NEGATIVE'}, {'id': 'nikluge-2023-iau-train-000010', 'input': '치명적인 뒤태..', 'output': 'POSITIVE'}]


In [None]:
# data token화를 통해 훈련용 dataset 생성
def tokenize_and_align_labels(tokenizer, input_text, label, max_len):
    data_dict = {
        'input_ids': [],
        'attention_mask': [],
        'label': [],
    }
    tokenized_data = tokenizer(input_text, padding='max_length', max_length=max_len, truncation=True)
    data_dict['input_ids'].append(tokenized_data['input_ids'])
    data_dict['attention_mask'].append(tokenized_data['attention_mask'])
    data_dict['label'].append(label)

    return data_dict

In [None]:
def get_dataset(raw_data, tokenizer, max_len):
    input_ids_list = []
    attention_mask_list = []
    token_labels_list = []

    for utterance in raw_data:
        tokenized_data = tokenize_and_align_labels(tokenizer,
                                                   utterance['input'],
                                                   label2id[utterance['output']],
                                                   max_len)
        input_ids_list.extend(tokenized_data['input_ids'])
        attention_mask_list.extend(tokenized_data['attention_mask'])
        token_labels_list.extend(tokenized_data['label'])

    print(input_ids_list[:5])
    print(attention_mask_list[:5])
    print(token_labels_list[:5])

    return TensorDataset(torch.tensor(input_ids_list), torch.tensor(attention_mask_list),
                         torch.tensor(token_labels_list))

### 2번. Classication Model 생성하기

In [None]:
class SimpleClassifier(nn.Module):

    def __init__(self, args, num_label):
        super().__init__()
        self.dense = nn.Linear(args.classifier_hidden_size, args.classifier_hidden_size)
        self.dropout = nn.Dropout(args.classifier_dropout_prob)
        self.output = nn.Linear(args.classifier_hidden_size, num_label)

    def forward(self, features):
        x = features[:, 0, :]
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.output(x)
        return x

In [None]:
class CustomClassifier(nn.Module):
    def __init__(self, args, num_labels, len_tokenizer):
        super(CustomClassifier, self).__init__()

        # quantization_config = BitsAndBytesConfig(
        #     load_in_4bit=True,
        # )

        self.num_labels = num_labels

        config = AutoConfig.from_pretrained(
            args.base_model,
            num_labels=num_labels)
        print(config)

        self.pre_trained_model = AutoModelForSequenceClassification.from_pretrained(
            args.base_model,
            token=args.access_token,
            config=config,
            ignore_mismatched_sizes=True,
            # quantization_config=quantization_config  # Uncomment if you want to use quantization
        )

        self.pre_trained_model.resize_token_embeddings(len_tokenizer)

        self.simple_classifier = SimpleClassifier(args, num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.pre_trained_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
        )

        logits = outputs.logits

        loss = None

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels),
                            labels.view(-1))

        return loss, logits

In [None]:
def evaluation(y_true, y_pred):
    # y_true = list(map(int, y_true))
    # y_pred = list(map(int, y_pred))

    print(y_true[:5])
    print(y_pred[:5])

    print('f1_score: ', f1_score(y_true, y_pred, average=None))
    print('f1_score_micro: ', f1_score(y_true, y_pred, average='micro'))
    print('f1_score_macro: ', f1_score(y_true, y_pred, average='macro'))

In [None]:
## 분류기 훈련
def train(args=None):
    if not os.path.exists(args.model_path):
        os.makedirs(args.model_path)

    print('train')
    print('model would be saved at ', args.model_path)

    print('loading train data')
    ## json으로 data를 load한다.
    train_data = jsonlload(args.train_data)
    dev_data = jsonlload(args.dev_data)

    print('tokenizing train data')
    ## base model을 기반으로 토크나이저 초기화
    tokenizer = AutoTokenizer.from_pretrained(args.base_model)


    ## dataloader 설정 - 모델 입력 형식에 맞게 변환
    train_dataloader = DataLoader(get_dataset(train_data, tokenizer, args.max_len), shuffle=True,
                                  batch_size=args.batch_size)
    dev_dataloader = DataLoader(get_dataset(dev_data, tokenizer, args.max_len), shuffle=True,
                                batch_size=args.batch_size)

    ## model 초기화 및 optimizer 설정
    print('loading model')
    model = CustomClassifier(args, num_labels=2, len_tokenizer=len(tokenizer))
    model.to(device)

    # print(model)

    FULL_FINETUNING = True
    if FULL_FINETUNING:
        param_optimizer = list(model.named_parameters())
        no_decay = ['bias', 'gamma', 'beta']
        optimizer_grouped_parameters = [
            {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
             'weight_decay_rate': 0.01},
            {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
             'weight_decay_rate': 0.0}
        ]
    else:
        param_optimizer = list(model.classifier.named_parameters())
        optimizer_grouped_parameters = [{"params": [p for n, p in param_optimizer]}]

    optimizer = AdamW(
        optimizer_grouped_parameters,
        lr=args.learning_rate,
        eps=args.eps
    )
    epochs = args.num_train_epochs
    max_grad_norm = 1.0
    total_steps = epochs * len(train_dataloader)

    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=0,
        num_training_steps=total_steps
    )

    epoch_step = 0

    for _ in trange(epochs, desc="Epoch"):
        model.train()
        epoch_step += 1
        total_loss = 0

        for step, batch in enumerate(train_dataloader):
            batch = tuple(t.to(device) for t in batch)
            b_input_ids, b_input_mask, b_labels = batch

            model.zero_grad()
            optimizer.zero_grad()

            loss, _ = model(b_input_ids, b_input_mask, b_labels)

            loss.backward()

            total_loss += loss.item()

            # print('batch_loss: ', loss.item())

            torch.nn.utils.clip_grad_norm_(parameters=model.parameters(), max_norm=max_grad_norm)
            optimizer.step()
            scheduler.step()

        avg_train_loss = total_loss / len(train_dataloader)
        print("Epoch: ", epoch_step)
        print("Average train loss: {}".format(avg_train_loss))

        if args.do_eval:
            model.eval()

            pred_list = []
            label_list = []

            for batch in dev_dataloader:
                batch = tuple(t.to(device) for t in batch)
                b_input_ids, b_input_mask, b_labels = batch

                with torch.no_grad():
                    loss, logits = model(b_input_ids, b_input_mask, b_labels)

                predictions = torch.argmax(logits, dim=-1)
                pred_list.extend(predictions)
                label_list.extend(b_labels)

            evaluation(label_list, pred_list)

        model_saved_path = args.model_path + 'saved_modelv2_epoch_' + str(epoch_step) + '.pt'
        torch.save(model.state_dict(), model_saved_path)

    print("training is done")

In [None]:
def test(args):
    test_data = jsonlload(args.test_data)
    pred_data = jsonlload(args.pred_data)

    classes = []

    temp_ground_truth_dict = {}

    true_list = []
    pred_list = []

    # 데이터 list로 변경
    for data in test_data:
        if data['id'] in temp_ground_truth_dict:
            return {
                "error": "정답 데이터에 중복된 id를 가지는 경우 존재"
            }
        temp_ground_truth_dict[data['id']] = data['output']

    for data in pred_data:
        if data['id'] not in temp_ground_truth_dict:
            return {
                "error": "제출 파일과 정답 파일의 id가 일치하지 않음"
            }
        true_list.append(temp_ground_truth_dict[data['id']])
        pred_list.append(data['output'])

    evaluation(true_list, pred_list)

In [None]:
def demo(args):
    if not os.path.exists(args.output_dir):
        os.makedirs(args.output_dir)

    tokenizer = AutoTokenizer.from_pretrained(args.base_model)

    test_data = jsonlload(args.test_data)

    model = CustomClassifier(args, len(labels), len(tokenizer))
    model.load_state_dict(torch.load(args.model_path, map_location=device))
    model.to(device)
    model.eval()

    for data in tqdm(test_data):
        input_text = data['input']

        tokenized_data = tokenizer(input_text, padding='max_length', max_length=args.max_len, truncation=True)

        input_ids = torch.tensor([tokenized_data['input_ids']]).to(device)
        attention_mask = torch.tensor([tokenized_data['attention_mask']]).to(device)

        with torch.no_grad():
            _, logits = model(input_ids, attention_mask)
        predictions = torch.argmax(logits, dim=-1)
        data['output'] = id2label[int(predictions[0])]

    jsonldump(test_data, args.output_dir + 'result_kcelec1.jsonl')

In [None]:
print(args)

Namespace(train_data='/content/drive/MyDrive/NLP/data/nikluge-iau-2023-train.jsonl', test_data='/content/drive/MyDrive/NLP/data/nikluge-iau-2023-test.jsonl', pred_data='/content/drive/MyDrive/NLP/output/result_kcelec2.jsonl', dev_data='/content/drive/MyDrive/NLP/data/nikluge-iau-2023-dev.jsonl', batch_size=8, learning_rate=3e-05, eps=1e-08, do_train=False, do_eval=False, do_test=False, num_train_epochs=8, base_model='beomi/KcELECTRA-base', access_token='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', model_path='/content/drive/MyDrive/NLP/save_models/kcelec/saved_modelv2_epoch_6.pt', output_dir='/content/drive/MyDrive/NLP/output/', do_demo=False, max_len=256, classifier_hidden_size=768, classifier_dropout_prob=0.1)


In [None]:
train(args)

In [None]:
demo(args)

ElectraConfig {
  "_name_or_path": "beomi/KcELECTRA-base",
  "architectures": [
    "ElectraForPreTraining"
  ],
  "attention_probs_dropout_prob": 0.1,
  "classifier_dropout": null,
  "embedding_size": 768,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "layer_norm_eps": 1e-12,
  "max_position_embeddings": 512,
  "model_type": "electra",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 3,
  "position_embedding_type": "absolute",
  "summary_activation": "gelu",
  "summary_last_dropout": 0.1,
  "summary_type": "first",
  "summary_use_proj": true,
  "tokenizer_class": "PreTrainedTokenizerFast",
  "transformers_version": "4.41.2",
  "type_vocab_size": 2,
  "use_cache": true,
  "vocab_size": 30000
}



Some weights of ElectraForSequenceClassification were not initialized from the model checkpoint at beomi/KcELECTRA-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
100%|██████████| 1624/1624 [00:27<00:00, 59.95it/s]


In [None]:
test(args)