In [None]:
from src.config.data_loader_config import DATA_LOADER_CONFIG, OPTIMIZER_CONFIG
from src.data_loader.loader import Dataloader
from src.model.model import Model, Models, LossFunctions
from src.trainer.predict import save_result
import src.callback as callback
import pytorch_lightning as pl
import os

# Parameters 설정
batch_size = DATA_LOADER_CONFIG['batch_size']
shuffle = DATA_LOADER_CONFIG['shuffle']
learning_rate = OPTIMIZER_CONFIG['learning_rate']
max_epoch = OPTIMIZER_CONFIG['max_epoch']
os.environ["TOKENIZERS_PARALLELISM"] = "false"
num_workers: int = DATA_LOADER_CONFIG.get('num_workers', 4)

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
from scipy.stats import pearsonr

# 모델과 토크나이저 로드
model_name = "maywell/Synatra-7B-v0.3-dpo"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)




# 문장을 임베딩으로 변환하는 함수
def get_sentence_embedding(sentence, model, tokenizer):
    inputs = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():  # 학습이 아니라 추론이므로 no_grad() 사용
        outputs = model(**inputs)
    # 문장의 [CLS] 토큰에 해당하는 임베딩을 사용하거나 마지막 layer 평균 사용
    embeddings = outputs.last_hidden_state.mean(dim=1).squeeze()
    return embeddings

# 두 문장의 유사도를 계산하는 함수 (코사인 유사도 사용)
def cosine_similarity(embedding1, embedding2):
    return 1 - cosine(embedding1, embedding2)

# 문장 예시
sentence1 = "The cat sits on the mat."
sentence2 = "The dog lies on the carpet."

# 임베딩 생성
embedding1 = get_sentence_embedding(sentence1, model, tokenizer)
embedding2 = get_sentence_embedding(sentence2, model, tokenizer)

# 피어슨 계산
pearson_corr, _ = pearsonr(embedding1.cpu().numpy(), embedding2.cpu().numpy())
print(f"Pearson Correlation: {pearson_corr}")


In [None]:
from transformers import AutoTokenizer, AutoModel
from scipy.spatial.distance import euclidean
import torch
import torch.nn.functional as F

class SimilarityEvaluator:
    def __init__(self, model_name):
        # GPU가 가능하면 사용, 그렇지 않으면 CPU 사용
        print(torch.cuda.is_available())
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # 모델과 토크나이저 로드
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name, torch_dtype=torch.float16).to(self.device)

    def get_sentence_embedding(self, sentence):
        inputs = self.tokenizer(sentence, return_tensors="pt", padding=True, truncation=True).to(self.device)
        with torch.no_grad():
            outputs = self.model(**inputs)
        return outputs.last_hidden_state.mean(dim=1).squeeze()

    def compute_cosine_score(self, embedding1, embedding2):
        cosine_sim = F.cosine_similarity(embedding1, embedding2, dim=0).item()
        # 코사인 유사도를 0 ~ 5 점수로 변환
        score = (cosine_sim + 1) * 2.5  # -1 ~ 1 => 0 ~ 5
        return min(max(score, 0), 5)  # 0 ~ 5 사이 값으로 클램핑
    
    def compute_euclidean_score(self, embedding1, embedding2):
        # 임베딩 간 유클리디안 거리 계산
        distance = euclidean(embedding1.cpu().numpy(), embedding2.cpu().numpy())
        # 거리 값을 유사도 점수로 변환 (거리가 작을수록 점수가 높음)
        score = max(0, 5 - distance)  # 거리가 0에 가까우면 점수 5, 멀수록 낮아짐
        return score

def evaluate_similarity(sentence1, sentence2, model_name):
    evaluator = SimilarityEvaluator(model_name)
    embedding1 = evaluator.get_sentence_embedding(sentence1)
    embedding2 = evaluator.get_sentence_embedding(sentence2)
    
    #cosine_score = evaluator.compute_cosine_score(embedding1, embedding2)
    
    euclidean_score = evaluator.compute_euclidean_score(embedding1, embedding2)

    return euclidean_score

# 예시
sentence1 = "여성가족부 명칭 가족부로 바꿔주세요"
sentence2 = "여성가족부의 이름을 복지부로 바꿔주세요!"

model_name = "saltlux/Ko-Llama3-Luxia-8B"
score = evaluate_similarity(sentence1, sentence2, model_name)
print(f"Similarity Score: {score}")


# 학습

In [None]:
trainer = pl.Trainer(accelerator='gpu', devices='auto', max_epochs=max_epoch, callbacks=[lr_monitor, epoch_print_callback,checkpoint_callback, early_stopping], precision='16-mixed')
trainer.fit(model=model, datamodule=dataloader)

In [None]:
import torch
torch.cuda.empty_cache()

# 추론


In [None]:
checkpoint_callback.best_model_path

In [None]:
# 가장 좋은 모델 불러오기
best_model_path = checkpoint_callback.best_model_path
model = Model.load_from_checkpoint(best_model_path, loss_func=LossFunctions.hu_loss)
trainer.test(model=model, datamodule=dataloader)
# 추론
predictions = trainer.predict(model=model, datamodule=dataloader)

# 결과 저장
save_result(predictions, model_name, max_epoch)