In [None]:
!pip install transformers sentence_transformers evaluate peft

In [None]:
from transformers import AutoModel, AutoTokenizer, DataCollatorWithPadding
from sentence_transformers import SentenceTransformer, models
from sentence_transformers.models import Transformer
import numpy as np
import evaluate
import torch
from peft import get_peft_config, get_peft_model, LoraConfig, TaskType, PeftModelForFeatureExtraction
from utils import preprocess_nli, preprocess_sts
import copy, os

**You guys need to install torch library, depending on which environment you use.**


In [None]:
debug = True # define debug mode 
model_and_paths = 'klue/roberta-large'

**Since the average length of target data is 33 and std is 12, I set `max_length` to cover 2~3 sigma, hence set as 64**

In [None]:
max_len = 64
train_epochs = 5 if debug==False else 1
batch_size = 16
dataloader_num_workers= 2 if debug==False else 0

In [None]:
from torch import nn
from transformers import AutoModel, AutoTokenizer, AutoConfig
import json
from typing import List, Dict, Optional
import os

**Following class is for setting any torch.nn module to be self.model**

In [None]:
class Customized_Transformer(Transformer):
    def __init__(self, model_name_or_path: str, max_seq_length: int = 64,
                 model_args: Dict = {}, cache_dir: Optional[str] = None,
                 tokenizer_args: Dict = {}):
        super(Customized_Transformer, self).__init__(model_name_or_path)
        self.config_keys = ['max_seq_length']
        self.max_seq_length = max_seq_length

        config = AutoConfig.from_pretrained(model_name_or_path, **model_args, cache_dir=cache_dir)
        self.auto_model = AutoModel.from_pretrained(model_name_or_path, config=config, cache_dir=cache_dir)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, cache_dir=cache_dir, **tokenizer_args)

    def tokenize(self, texts):
        output = {}
        if isinstance(texts[0], str):
            to_tokenize = [texts]
        elif isinstance(texts[0], dict):
            to_tokenize = []
            output["text_keys"] = []
            for lookup in texts:
                text_key, text = next(iter(lookup.items()))
                to_tokenize.append(text)
                output["text_keys"].append(text_key)
            to_tokenize = [to_tokenize]
        else:
            batch1, batch2 = [], []
            for text_tuple in texts:
                batch1.append(text_tuple[0])
                batch2.append(text_tuple[1])
            to_tokenize = [batch1, batch2]

        # strip
        to_tokenize = [[str(s).strip() for s in col] for col in to_tokenize]

        # Lowercase
        if self.do_lower_case:
            to_tokenize = [[s.lower() for s in col] for col in to_tokenize]

        output.update(
            self.tokenizer(
                *to_tokenize,
                padding='max_length',
                truncation="longest_first",
                return_tensors="pt",
                max_length=self.max_seq_length,
            )
        )
        return output

**set model**

* You can set device here

In [None]:
def set_sent_tranformer(model_and_paths):
    emb_model = Customized_Transformer(model_and_paths, max_seq_length = 64)
    pooling_model = models.Pooling(emb_model.get_word_embedding_dimension(),
                                   pooling_mode='mean')
    sent_rep_model = SentenceTransformer(modules=[emb_model, pooling_model], device='cpu')
    return sent_rep_model

sent_rep_model = set_sent_tranformer(model_and_paths)

**Following code will erase garbage memories.**

In [None]:
def del_whole(SentenceTransformer_model):
    del SentenceTransformer_model
    torch.cuda.empty_cache()

`preprocess_nli` or `sts` will set each dataset to be ready for training.

In [None]:
from datasets import load_dataset
from utils import preprocess_nli, preprocess_sts

nli_dataset = load_dataset('klue', 'nli')
sts_dataset = load_dataset('klue', 'sts')

train_nli, valid_nli = preprocess_nli(nli_dataset)
train_sts, valid_sts = preprocess_sts(sts_dataset)

if debug:
    train_nli = train_nli[:500]
    train_sts = train_sts[:500]

**Finally, set dataloader and ...**

In [None]:
from torch.utils.data import DataLoader
train_dataloader_nli = DataLoader(train_nli, shuffle=True, batch_size=batch_size,
                                  num_workers=dataloader_num_workers)
train_dataloader_sts = DataLoader(train_sts, shuffle=True, batch_size=batch_size,
                                  num_workers=dataloader_num_workers)

In [None]:
from CustomizedESEv import customizedEmbeddingSimilarityEvaluator
evaluator = customizedEmbeddingSimilarityEvaluator.from_input_examples(valid_sts)

**launch!**

**Following code will find best learning rate.**

In [None]:
from sentence_transformers.losses import MultipleNegativesRankingLoss, CosineSimilarityLoss, TripletLoss
from sentence_transformers.losses import AnglELoss
lr_finder = [{'lr':5e-5},{'lr':2e-5},{'lr':9e-6}]

for lr_suggestion in lr_finder:
    copied_sent_rep_model = sent_rep_model

    train_loss_nli = MultipleNegativesRankingLoss(model=copied_sent_rep_model)
    train_loss_sts = AnglELoss(model=copied_sent_rep_model)

    train_objectives = [(train_dataloader_nli, train_loss_nli), (train_dataloader_sts, train_loss_sts)]

    copied_sent_rep_model.fit(
        train_objectives=train_objectives, optimizer_params=lr_suggestion,
        epochs=train_epochs, output_path=f'MeanMulti_test_large_{lr_suggestion["lr"]}_maxlen',
        warmup_steps=50, evaluator=evaluator)

    print(f'Best score of klue_large_longer_{lr_suggestion["lr"]} : {copied_sent_rep_model.best_score}')
    del_whole(copied_sent_rep_model)

**Test phase. I used KorSTS to test OOD performance.**

In [None]:
import csv
from sentence_transformers.readers import InputExample
import os

def load_kor_sts_samples(filename):
    samples = []
    with open(filename, 'rt', encoding='utf8') as fIn:
        reader = csv.DictReader(fIn, delimiter='\t', quoting=csv.QUOTE_NONE)
        for row in reader:
            score = float(row['score']) #/ 5.0  Normalize score to range 0 ... 1
            samples.append(InputExample(texts=[row['sentence1'], row['sentence2']], label=score))
    return samples

sts_dataset_path = 'KorNLUDatasets/KorSTS'
test_file = os.path.join(sts_dataset_path, "sts-test.tsv")
test_samples = load_kor_sts_samples(test_file)

In [None]:
model = SentenceTransformer(modules=[Transformer(model_path,max_seq_length=64),
                                       Pooling(word_embedding_dimension=1024, pooling_mode='mean')])
# test_evaluator=evaluator
test_evaluator = customizedEmbeddingSimilarityEvaluator.from_input_examples(test_samples)

try:
    os.mkdir(res_path)
except FileExistsError:
    pass

print(test_evaluator(model, output_path=res_path))