# データの前処理


wikipediaからテキストファイルをダウンロードする

In [27]:
!wget https://huggingface.co/datasets/princeton-nlp/datasets-for-simcse/resolve/main/wiki1m_for_simcse.txt

--2026-01-13 04:43:07--  https://huggingface.co/datasets/princeton-nlp/datasets-for-simcse/resolve/main/wiki1m_for_simcse.txt
Resolving huggingface.co (huggingface.co)... 3.165.160.59, 3.165.160.11, 3.165.160.12, ...
Connecting to huggingface.co (huggingface.co)|3.165.160.59|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://us.gcp.cdn.hf.co/xet-bridge-us/621ffdd236468d709f183d48/af65686a27c825e2fbe0c13f49573907a8ec57d7417f82062de6b1c67aa69462?response-content-disposition=inline%3B+filename*%3DUTF-8%27%27wiki1m_for_simcse.txt%3B+filename%3D%22wiki1m_for_simcse.txt%22%3B&response-content-type=text%2Fplain&Expires=1768282987&Policy=eyJTdGF0ZW1lbnQiOlt7IkNvbmRpdGlvbiI6eyJEYXRlTGVzc1RoYW4iOnsiRXBvY2hUaW1lIjoxNzY4MjgyOTg3fX0sIlJlc291cmNlIjoiaHR0cHM6Ly91cy5nY3AuY2RuLmhmLmNvL3hldC1icmlkZ2UtdXMvNjIxZmZkZDIzNjQ2OGQ3MDlmMTgzZDQ4L2FmNjU2ODZhMjdjODI1ZTJmYmUwYzEzZjQ5NTczOTA3YThlYzU3ZDc0MTdmODIwNjJkZTZiMWM2N2FhNjk0NjJcXD9yZXNwb25zZS1jb250ZW50LWRpc3Bvc2l0aW9uPSomcm

In [28]:
TEXT_FILE_PATH = "wiki1m_for_simcse.txt"

In [29]:
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [30]:
def preprocess(text):
  tokenized_text = word_tokenize(text)

  # 10単語以上50単語以下の単語のみを使用する
  if 10 <= len(tokenized_text) and len(tokenized_text) <=50:
    return text.strip()
  else:
    return None

In [31]:
with open(TEXT_FILE_PATH, "r") as f:
  lines = f.readlines()

input_texts = list(filter(lambda line: preprocess(line) is not None, lines))


# 対照学習

In [32]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pandas as pd
from transformers.trainer_utils import set_seed
from transformers import AutoTokenizer
from transformers import AutoModel
from transformers import TrainingArguments
from datasets import Dataset
from transformers import Trainer
from transformers import EarlyStoppingCallback
import sys
import random
import numpy as np

In [33]:
model = "google-bert/bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model)

In [34]:
def fix_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

fix_seed(42)

In [35]:
def eval_collate_fn(examples):
  tokenized_sent = tokenizer(
      [example["text"] for example in examples],
      padding=True,
      return_tensors="pt",
  )
  labels = torch.arange(len(examples))

  return {
      "tokenized_texts_1": tokenized_sent,
      "tokenized_texts_2": tokenized_sent,
      "labels": labels,
  }


In [36]:
def train_collate_fn(examples):

  tokenized_sent = tokenizer(
      [example["text"] for example in examples],
      padding=True,
      return_tensors="pt",
  )

  labels = torch.arange(len(examples))

  return {
      "tokenized_texts_1": tokenized_sent,
      "tokenized_texts_2": tokenized_sent,
      "labels": labels,
  }


In [37]:
class SimCSEModel(nn.Module):
  """SimCSEのモデル"""

  def __init__(
      self,
      base_model_name,
      mlp_only_train = False,
      temperature = 0.05,
  ):
      """モデルの初期化"""
      super().__init__()

      self.encoder = AutoModel.from_pretrained(base_model_name)

      # MLP層の次元数
      self.hidden_size = self.encoder.config.hidden_size
      # MLP層の線形層
      self.dense = nn.Linear(self.hidden_size, self.hidden_size)
      # MLP層の活性化関数
      self.activation = nn.Tanh()

      self.mlp_only_train = mlp_only_train
      # 交差エントロピー損失の計算時に使用する温度
      self.temperature = temperature

  def encode_texts(self, tokenized_texts):
      """エンコーダを用いて文をベクトルに変換"""
      encoded_texts = self.encoder(**tokenized_texts)

      encoded_texts = encoded_texts.last_hidden_state[:, 0]

      if self.mlp_only_train and not self.training:
          return encoded_texts

      # MLP層によるベクトルの変換を行う
      encoded_texts = self.dense(encoded_texts)
      encoded_texts = self.activation(encoded_texts)

      return encoded_texts

  def forward(
      self,
      tokenized_texts_1,
      tokenized_texts_2,
      labels):
      """モデルの前向き計算を定義"""
      # 文ペアをベクトルに変換する

      encoded_texts_1 = self.encode_texts(tokenized_texts_1)
      encoded_texts_2 = self.encode_texts(tokenized_texts_2)

      # 文ペアの類似度行列を作成する
      sim_matrix = F.cosine_similarity(
          encoded_texts_1.unsqueeze(1),
          encoded_texts_2.unsqueeze(0),
          dim=2,
      )

      loss = F.cross_entropy(sim_matrix / self.temperature, labels)

      return {"loss": loss}

In [38]:
class SimCSETrainer(Trainer):
  """SimCSEの訓練に使用するTrainer"""
  def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
      labels = inputs.pop("labels")

      if self.model.training:
          outputs = model(**inputs, labels=labels)
          loss = outputs["loss"]
          return loss
      else:
          outputs = model(**inputs, labels=labels)
          loss = outputs["loss"]
          return loss, outputs

  def get_eval_dataloader(self, eval_dataset):

      if eval_dataset is None:
          eval_dataset = self.eval_dataset

      return DataLoader(
          eval_dataset,
          batch_size=64,
          collate_fn=eval_collate_fn,
          pin_memory=True,
      )

In [39]:
sup_model = SimCSEModel(model, mlp_only_train=False)

In [40]:
from datasets import Dataset

train_dataset = Dataset.from_dict({"text": input_texts[:20000]})
dev_dataset = Dataset.from_dict({"text": input_texts[20000:22000]})

In [41]:
sup_training_args = TrainingArguments(
    output_dir="./save_model",
    per_device_train_batch_size=64,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="steps",
    metric_for_best_model="eval_loss",
    learning_rate=5e-5,
    num_train_epochs=1,
    logging_steps=100,
    save_total_limit=1,
    fp16=False,
    remove_unused_columns=False,
    load_best_model_at_end=True,
    report_to="none",
)

sup_trainer = SimCSETrainer(
    model=sup_model,
    args=sup_training_args,
    data_collator=train_collate_fn,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
    callbacks=[EarlyStoppingCallback(3)],
)

sup_trainer.train()
sup_model.encoder.save_pretrained("./save_model")
tokenizer.save_pretrained("./save_model")

Epoch,Training Loss,Validation Loss
1,0.0012,0.008754


('./save_model/tokenizer_config.json',
 './save_model/special_tokens_map.json',
 './save_model/vocab.txt',
 './save_model/added_tokens.json',
 './save_model/tokenizer.json')

# 評価

In [42]:
from datasets import load_dataset

stsb_test = load_dataset("sentence-transformers/stsb", split="test")


sentences1 = list(stsb_test["sentence1"])
sentences2 = list(stsb_test["sentence2"])
gold_scores = list(stsb_test["score"])

In [43]:
import torch
from scipy.stats import spearmanr
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

def get_embeddings(model, tokenizer, sentences):
    model.eval()
    inputs = tokenizer(sentences, padding=True, truncation=True, return_tensors="pt").to("cuda")
    with torch.no_grad():
        outputs = model.encoder(**inputs)
        embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
    return embeddings

embeddings1 = get_embeddings(sup_model, tokenizer, sentences1)
embeddings2 = get_embeddings(sup_model, tokenizer, sentences2)

cosine_sims = []
for e1, e2 in zip(embeddings1, embeddings2):
    sim = cosine_similarity(e1.reshape(1, -1), e2.reshape(1, -1))[0][0]
    cosine_sims.append(sim)

In [44]:
# スピアマン相関係数の計算
spearman_corr, _ = spearmanr(gold_scores, cosine_sims)

print(f"STSB Test Spearman Correlation: {spearman_corr:.4f}")

STSB Test Spearman Correlation: 0.5400
