# sBERT

In [None]:
from transformers import AutoModel, AutoTokenizer
import torch.nn as nn
import torch
from sources.models import sBERTRegressor, sBERTRegressorNew, RoBERTaRegressor

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
model_name = "snunlp/KR-SBERT-V40K-klueNLI-augSTS" 
is_freeze = False  

model = sBERTRegressor(model_name, is_freeze).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)

trained_model = sBERTRegressor(model_name, is_freeze).to(device)
model_path = "checkpoints/sBERT_2024-07-03 17:15:29.pth"
trained_model.load_state_dict(torch.load(model_path))

In [None]:
example = "으아아아"
max_length = 128
encoded_text = tokenizer(example, return_tensors="pt",max_length=max_length,padding="max_length",truncation=True)
iids = encoded_text["input_ids"].to(device)
atm = encoded_text["attention_mask"].to(device)

In [None]:
print("학습되지 않은 모델:",round(float(model(iids, atm)), 3), "점")
print("학습된 모델:",round(float(trained_model(iids, atm)), 3), "점")

In [None]:
device = "cpu"

model_name = "snunlp/KR-SBERT-V40K-klueNLI-augSTS" 
is_freeze = True  

model = sBERTRegressorNew(model_name, is_freeze, version=2).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)

trained_model = sBERTRegressorNew(model_name, is_freeze, version=2).to(device)
model_path = "checkpoints/sBERTNewV2_2024-07-04_13-36-30.pth"
trained_model.load_state_dict(torch.load(model_path))

In [None]:
example = "정말요???/????????????????????????????????????????????????/????????"
max_length = 128
encoded_text = tokenizer(example, return_tensors="pt",max_length=max_length,padding="max_length",truncation=True)
iids = encoded_text["input_ids"].to(device)
atm = encoded_text["attention_mask"].to(device)

In [None]:
print("학습되지 않은 모델:",round(float(model(iids, atm)), 3), "점")
print("학습된 모델:",round(float(trained_model(iids, atm)), 3), "점")

# RoBERTa

In [None]:
from transformers import AutoModel, AutoTokenizer
import torch.nn as nn
import torch
from sources.models import sBERTRegressor, sBERTRegressorNew, RoBERTaRegressor, RoBERTaRegressorNew, RoBERTaRegressorDeep

device = "cpu"
is_freeze = True
sigmoid_scaling = False
pooling_method = "mean"
max_length = 64
model_name = "klue/roberta-large" 
model_path = "checkpoints/RoBERTaNew-Large_2024-07-15_10-03-19.pth"  

# model = RoBERTaRegressorNew(model_name, is_freeze, sigmoid_scaling, pooling_method).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)

trained_model = RoBERTaRegressorNew(model_name, is_freeze, sigmoid_scaling, pooling_method).to(device)
trained_model.load_state_dict(torch.load(model_path))

# model.eval()
trained_model.eval()

In [None]:
from huggingface_hub import hf_hub_download
import pandas as pd
from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv("HF_API_KEY")
repo_id = "SaeSSak/Conversation"
file_path = hf_hub_download(repo_id, "TestData.parquet", repo_type="dataset", use_auth_token=api_key)

# 파일 로드
df = pd.read_parquet(file_path)
example = list(df["음성인식결과"])

encoded_text = tokenizer(example, return_tensors="pt",max_length=max_length,padding="max_length",truncation=True)
iids = encoded_text["input_ids"].to(device)
atm = encoded_text["attention_mask"].to(device)

In [None]:
import torch

# 모델 예측
with torch.no_grad():
    predictions = trained_model(iids, atm)

# 그래디언트 추적 분리 및 NumPy 배열 변환
predictions_numpy = predictions.detach().numpy()

# 데이터프레임에 예측 결과 추가
df["나이점수"] = df["나이"] * 10
if sigmoid_scaling:
    df["예측점수"] = predictions_numpy * (100 - 30) + 30
else:
    df["예측점수"] = predictions_numpy

df["오차"] = (df["나이점수"] - df["예측점수"]).abs()

# 각 나이별로 편차 계산
age_groups = df.groupby("나이")["오차"].mean().reset_index()
overall_deviation = df["오차"].mean()

In [None]:
for ag in age_groups.values:
    print("나이:", ag[0], "  오차:", round(ag[1], 2))
print("MAE:",overall_deviation)

In [None]:
import matplotlib.pyplot as plt
#  그래프 그리기
plt.figure(figsize=(10, 6))
plt.plot(df.index, df["나이점수"], color='black', label='Target')
plt.plot(df.index, df["예측점수"], color='red', label='Prediction')
plt.xlabel('Index')
plt.ylabel('Score')
plt.title('Target - Prediction Graph')
plt.legend()
plt.show()

In [None]:
df[df["예측점수"] > 80]

In [None]:
name = model_path.split("/")[-1].split(".")[0]
df.to_csv(f"./result/{name}.csv", index = False)

# 모델 실험

In [None]:
device = "cpu"

In [None]:
example = "안녕하세요."
max_length = 128
encoded_text = tokenizer(example, return_tensors="pt",max_length=max_length,padding="max_length",truncation=True)
iids = encoded_text["input_ids"].to(device)
atm = encoded_text["attention_mask"].to(device)

In [None]:
model_name = "snunlp/KR-SBERT-V40K-klueNLI-augSTS" 
is_freeze = True 
sbert = AutoModel.from_pretrained(model_name).to(device)

In [None]:
outputs = sbert(input_ids=iids, attention_mask=atm)

- mean pooling

In [None]:
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output.last_hidden_state  # First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask

In [None]:
token_embeddings = outputs.last_hidden_state

In [None]:
token_embeddings = outputs.last_hidden_state
print(token_embeddings.shape)
input_mask_expanded = atm.unsqueeze(-1).expand(token_embeddings.size()).float()
print(input_mask_expanded.shape)
sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
print(sum_embeddings.shape)
sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
print(sum_embeddings.shape)
(sum_embeddings / sum_mask).shape

- attention pooling

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AttentionLayer(nn.Module):
    def __init__(self, hidden_size):
        super(AttentionLayer, self).__init__()
        self.attention_weights = nn.Parameter(torch.Tensor(hidden_size), requires_grad=True)
        nn.init.xavier_uniform_(self.attention_weights.data)

    def forward(self, token_embeddings, attention_mask):
        # attention_mask를 사용하여 실제 토큰만 고려하도록 합니다.
        attention_scores = torch.matmul(token_embeddings, self.attention_weights)
        attention_scores = attention_scores.masked_fill(attention_mask == 0, -1e9)
        attention_weights = F.softmax(attention_scores, dim=1)
        
        # 가중 평균을 계산합니다.
        weighted_sum = torch.matmul(attention_weights.unsqueeze(1), token_embeddings).squeeze(1)
        return weighted_sum

In [None]:
hidden_size = 16
attention_weights = nn.Parameter(torch.Tensor(hidden_size), requires_grad=True)
attention_weights

In [None]:
token_nums = 10
token_embeddings = torch.rand(token_nums, hidden_size)
token_embeddings

In [None]:
attention_scores = torch.matmul(token_embeddings, attention_weights)

In [None]:
attention_scores

In [None]:
attention_mask = torch.tensor([[1,1,1,1,1,1,0,0,0,0]])

In [None]:
attention_scores = attention_scores.masked_fill(attention_mask == 0, -1e9)

In [None]:
attention_scores

In [None]:
attention_weights = F.softmax(attention_scores, dim=1)

In [None]:
attention_weights

In [None]:
attention_weights.unsqueeze(1)

In [None]:
weighted_sum = torch.matmul(attention_weights.unsqueeze(1), token_embeddings).squeeze(1)

In [None]:
weighted_sum