<a href="https://colab.research.google.com/github/ttogle918/NLU_3-/blob/main/preprocessing_to_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 패키지 설치 및 선언

In [None]:
!pip install optuna
!pip install pytorch-transformers
!pip install transformers
!pip install datasets

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import AdamW
from torch.nn.utils import clip_grad_norm_
from torch.utils.data import Dataset, DataLoader
import numpy as np
from tqdm import tqdm, tqdm_notebook
from sklearn.metrics import f1_score
from scipy import stats
import matplotlib.pyplot as plt

In [None]:
# gpu 연산이 가능하면 'cuda:0', 아니면 'cpu' 출력
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device, torch.cuda.device_count()

In [None]:
from transformers import BertForNextSentencePrediction, AutoTokenizer, BertConfig
from transformers.optimization import get_cosine_schedule_with_warmup
from transformers import get_linear_schedule_with_warmup

# Model class ( 1 ~ 4 )

### dataset Tokenizing ( 1, 2번 해당 )

CustomDataset

In [None]:
from datasets import load_dataset
dataset = load_dataset('klue', 'sts')

In [None]:
import re
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler

In [None]:
class CustomDataset(Dataset):
    def __init__(self, dataset, appended_data=None, appended_data2=None):
        self.sentence1, self.sentence2, self.labels = self.make_dataset(dataset, appended_data, appended_data2)

    def make_dataset(self, dataset, appended_data=None, appended_data2=None):
        """
        self.label : dataset의 label의 list
        self.input : sentence1, sentence2를 tokenizer한 값을 이어 붙임 
        rlabels : # real-label
        """
        sentence1, sentence2, rlabels = [], [], []

        for data in dataset :
          rlabels.append(data['labels']['real-label'])
          sentence1.append(self.cleaning(data['sentence1']))
          sentence2.append(self.cleaning(data['sentence2']))

        if appended_data is not None :
          for data in appended_data :
            if data['score'] is None or data['sentence1'] is None or data['sentence2'] is None :
              continue 
            rlabels.append(data['score'])
            sentence1.append(self.cleaning(data['sentence1']))
            sentence2.append(self.cleaning(data['sentence2']))

        if appended_data2 is not None :
          for data in appended_data2 :
            if data['score'] is None or data['sentence1'] is None or data['sentence2'] is None :
              continue
            rlabels.append(data['score'])
            sentence1.append(self.cleaning(data['sentence1']))
            sentence2.append(self.cleaning(data['sentence2']))
        return sentence1, sentence2, rlabels
        
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.sentence1[idx], self.sentence2[idx], self.labels[idx]

    def cleaning(self, sentence) :
        # return re.sub('[^가-힣]','', sentence)    # data preprocessing
        return sentence

In [None]:
def custom_collate_fn(batch):
    input1_list, input2_list, target_list = [], [], []

    for _input1, _input2, _target in batch:
        input1_list.append(_input1)
        input2_list.append(_input2)
        target_list.append(_target)
    
    tensorized_input = tokenizer(
        input1_list, input2_list,
        add_special_tokens=True,
        padding="longest",  # 배치내 가장 긴 문장을 기준으로 부족한 문장은 [PAD] 토큰을 추가
        truncation=True, # max_length를 넘는 문장은 이 후 토큰을 제거함
        max_length=512,
        return_tensors='pt' # 토크나이즈된 결과 값을 텐서 형태로 반환
    )
    tensorized_label = torch.tensor(target_list)

    return tensorized_input, tensorized_label

In [None]:
def make_dataloader(dataset, tok_model, batch_size, s='train') :
  global tokenizer
  tokenizer = AutoTokenizer.from_pretrained(tok_model)
  if s == 'train' :
    dataloader = DataLoader(
        dataset,
        batch_size =batch_size,
        sampler = RandomSampler(dataset),
        collate_fn = custom_collate_fn
    )
  else :
    dataloader = DataLoader(
        dataset,
        batch_size =batch_size,
        sampler = SequentialSampler(dataset),
        collate_fn = custom_collate_fn
    )
  print(f'batch_size : {batch_size}')
  return dataloader

## [1] BERT일 경우

BERT일 경우, BertForNextSentencePrediction 클래스로 Pre-trained된 모델을 받아서 Fine-Tuning => 문장의 유사성(STS)과 다음 문장 예측(NSP)은 목적이 다르다. BertModel에 layer를 쌓아 STS를 구하는 것이 낫다.

[전체 코드 바로가기](practice/최지현_sts.ipynb)

In [None]:
# 모델 클래스
class CustomSTS(nn.Module):
    def __init__(self, hidden_size: int, model_name):
        super(CustomSTS, self).__init__()
        self.bert_config = BertConfig.from_pretrained(model_name)   
        self.model = BertForNextSentencePrediction.from_pretrained(model_name, config=self.bert_config)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
        """
        outputs(NextSentencePredictorOutput) : logtis, loss(next_sentence_label이 주어질 때 return)
                                              hidden_states(optional), attentions(optional) 을 가지고 있다.
        loss는 주어진 label이 0~5 사이의 값으로 scale 되어있기 때문에 직접 구해야한다!
        """
        # logits's shape : (batch_size, 2)
        logits = self.model(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
        ).logits
        probs = self.softmax(logits)
        probs = probs[:, 0] * 5    # 0~5 사이의 값으로 정답(T)일 확률 뽑아내기
        return probs    # 정답(T)일 확률, 정답일때 1 

## [2] RoBERTModel을 받아 2 layer 추가(STS)

[전체 코드 바로가기](practice/최지현_sts_roberta.ipynb)

In [None]:
class CustomBertOnlyNSPHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.seq_relationship = nn.Linear(config.hidden_size, 2)

    def forward(self, pooled_output):
        seq_relationship_score = self.seq_relationship(pooled_output)
        return seq_relationship_score
# 모델 클래스
class CustomSTS(nn.Module):
    def __init__(self, model_name):
        super(CustomSTS, self).__init__()
        self.config = RobertaConfig.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.cls = CustomBertOnlyNSPHead(self.config)
        self.softmax = nn.Softmax(dim=1)
        self.classifier = nn.Sequential(
          nn.Linear(768, 32),
          nn.ReLU(),
          nn.Dropout(0.1),
          nn.Linear(32, 2)
        )

    def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
        # logits's shape : (batch_size, seq_len, 2)
        outputs = self.model(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
        )['last_hidden_state']
        logits = self.classifier(outputs)[:, 0, :]
        probs = self.softmax(logits)
        # probs = probs[:, 0] * 5    # 0~5 사이의 값으로 정답(T)일 확률 뽑아내기
        return probs    # 정답(T)일 확률

## [3] AutoModelForSequenceClassification를 통해 분류

[전체 코드 바로가기](practice/우경화_sts.ipynb)

In [None]:
num_labels = 1

# model : klue/roberta-base 사용
model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=num_labels)

# ...
model_trainer.train()

## [4] SentenceTransformer와 EmbeddingSimilarityEvaluator을 통해 분류

[전체 코드 바로가기](practice/김연식_sts.ipynb)

In [None]:
# train
train_loss = losses.CosineSimilarityLoss(model=model)

model = SentenceTransformer(modules=[embedding_model, pooler])
evaluator = EmbeddingSimilarityEvaluator.from_input_examples(
    test_examples_dt,
    name="sts-dev",
)

model.fit(
    train_objectives=[(train_dataloader, train_loss)],
    evaluator=evaluator,
    epochs=num_epochs,
    evaluation_steps=1000,
    warmup_steps=warmup_steps,
    output_path=model_save_path,
)
# test
model = SentenceTransformer(model_save_path)
test_evaluator(model, output_path=model_save_path)