In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/llm-classification-finetuning/sample_submission.csv
/kaggle/input/llm-classification-finetuning/train.csv
/kaggle/input/llm-classification-finetuning/test.csv
/kaggle/input/finetuned/fine-tuned-sbert-model/config.json
/kaggle/input/finetuned/fine-tuned-sbert-model/README.md
/kaggle/input/finetuned/fine-tuned-sbert-model/tokenizer.json
/kaggle/input/finetuned/fine-tuned-sbert-model/tokenizer_config.json
/kaggle/input/finetuned/fine-tuned-sbert-model/sentence_bert_config.json
/kaggle/input/finetuned/fine-tuned-sbert-model/config_sentence_transformers.json
/kaggle/input/finetuned/fine-tuned-sbert-model/model.safetensors
/kaggle/input/finetuned/fine-tuned-sbert-model/modules.json
/kaggle/input/finetuned/fine-tuned-sbert-model/special_tokens_map.json
/kaggle/input/finetuned/fine-tuned-sbert-model/vocab.txt
/kaggle/input/finetuned/fine-tuned-sbert-model/1_Pooling/config.json
/kaggle/input/embedding/fine_tuned_embedding_B.npy
/kaggle/input/embedding/fine_tuned_embedding_p.npy
/k

In [2]:
import transformers
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, confusion_matrix
from sentence_transformers import SentenceTransformer, util
from sklearn.model_selection import train_test_split
from sentence_transformers import SentenceTransformer, InputExample, losses
# GPUが利用可能か確認
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

Using device: cuda


In [3]:
# データの読み込み
train_data = pd.read_csv(r'/kaggle/input/llm-classification-finetuning/train.csv')
test_data = pd.read_csv(r'/kaggle/input/llm-classification-finetuning/test.csv')
labels = np.zeros(len(train_data), dtype=int)
labels[train_data['winner_model_a'] == 1] = 0  # Aの勝ち
labels[train_data['winner_model_b'] == 1] = 1  # Bの勝ち
labels[train_data['winner_tie'] == 1] = 2    # 同点

# トレーニングデータにおけるプロンプトと各解答の埋め込みベクトル(SBERTをトレーニングデータでfinetuningしたエンコーダを使用)
embedding_a = np.load("/kaggle/input/embedding/fine_tuned_embedding_A.npy")
embedding_b = np.load("/kaggle/input/embedding/fine_tuned_embedding_B.npy")
embedding_p = np.load("/kaggle/input/embedding/fine_tuned_embedding_p.npy")

In [4]:
class DualEncoderModel(nn.Module):
    """
    デュアルエンコーダーアーキテクチャを使用した好まれやすさ予測モデル
    
    プロンプトと回答のペアをそれぞれ評価し、その結果を比較します。
    回答AとBは順序によらず同等に扱われます。
    """
    def __init__(self, input_dim=384, hidden_dim=256, dropout=0.1):
        """
        初期化関数
        
        Args:
            input_dim (int): 入力特徴量の次元数（SBERTの場合通常は384次元）
            hidden_dim (int): 隠れ層の次元数
            dropout (float): ドロップアウト率
        """
        super(DualEncoderModel, self).__init__()
        
        # プロンプト-回答のマッチングを評価するエンコーダー
        self.pair_encoder = nn.Sequential(
            nn.Linear(input_dim * 2, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout)
        )
        
        # ペアの特徴から個別スコアを計算
        self.pair_scorer = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, 1)
        )
        
        # 好まれやすさの差から最終的な分類を行う層
        self.preference_classifier = nn.Sequential(
            nn.Linear(hidden_dim * 2 + 2, hidden_dim),  # 2つのペア特徴+スコア差
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.LayerNorm(hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, 3)  # 3クラス分類
        )
        
        # モデルの重みを初期化
        self._init_weights()
        
    def _init_weights(self):
        """モデルの重みを適切に初期化する関数"""
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
        
    def encode_pair(self, prompt_emb, response_emb):
        """
        プロンプトと回答のペアをエンコードする
        
        Args:
            prompt_emb (Tensor): プロンプトの埋め込み [batch_size, input_dim]
            response_emb (Tensor): 回答の埋め込み [batch_size, input_dim]
            
        Returns:
            tuple: (ペアの特徴, 適合度スコア)
        """
        # プロンプトと回答を結合
        pair_input = torch.cat([prompt_emb, response_emb], dim=1)
        
        # ペアの特徴を抽出
        pair_features = self.pair_encoder(pair_input)
        
        # 適合度スコアを計算
        score = self.pair_scorer(pair_features)
        
        return pair_features, score
        
    def forward(self, prompt_emb, response_a_emb, response_b_emb):
        """
        順伝播関数
        
        Args:
            prompt_emb (Tensor): プロンプトの埋め込み [batch_size, input_dim]
            response_a_emb (Tensor): 回答Aの埋め込み [batch_size, input_dim]
            response_b_emb (Tensor): 回答Bの埋め込み [batch_size, input_dim]
            
        Returns:
            Tensor: 3クラスの確率 [batch_size, 3]
        """
        # プロンプト-回答Aのペアを評価
        pair_a_features, score_a = self.encode_pair(prompt_emb, response_a_emb)
        
        # プロンプト-回答Bのペアを評価
        pair_b_features, score_b = self.encode_pair(prompt_emb, response_b_emb)
        
        # スコアの差を計算
        score_diff = score_a - score_b
        score_abs_diff = torch.abs(score_diff)
        
        # すべての特徴を結合
        combined_features = torch.cat([
            pair_a_features, 
            pair_b_features, 
            score_diff,  # 方向性のある差（A-B）
            score_abs_diff  # 絶対的な差（同等かどうかの判断に役立つ）
        ], dim=1)
        
        # 最終的な分類
        logits = self.preference_classifier(combined_features)
        
        return logits, score_a, score_b

In [5]:
# データセットクラス（より明示的な並列構造用）
class ParallelResponseDataset(Dataset):
    """
    回答を並列に扱うためのデータセットクラス
    """
    def __init__(self, prompt_embeddings, response_a_embeddings, response_b_embeddings, labels=None):
        """
        初期化関数
        
        Args:
            prompt_embeddings (numpy.ndarray): プロンプトの埋め込み [N, embed_dim]
            response_a_embeddings (numpy.ndarray): 回答Aの埋め込み [N, embed_dim]
            response_b_embeddings (numpy.ndarray): 回答Bの埋め込み [N, embed_dim]
            labels (numpy.ndarray, optional): ラベル [N]
                                             0: Aが好まれる, 1: Bが好まれる, 2: 同等
        """
        self.prompt_embeddings = torch.tensor(prompt_embeddings, dtype=torch.float32)
        self.response_a_embeddings = torch.tensor(response_a_embeddings, dtype=torch.float32)
        self.response_b_embeddings = torch.tensor(response_b_embeddings, dtype=torch.float32)
        
        if labels is not None:
            self.labels = torch.tensor(labels, dtype=torch.long)
        else:
            self.labels = None
            
        self.has_labels = labels is not None
        
    def __len__(self):
        """データセットの長さを返す"""
        return len(self.prompt_embeddings)
    
    def __getitem__(self, idx):
        """インデックスに対応するデータを返す"""
        if self.has_labels:
            return (
                self.prompt_embeddings[idx],
                self.response_a_embeddings[idx],
                self.response_b_embeddings[idx],
                self.labels[idx]
            )
        else:
            return (
                self.prompt_embeddings[idx],
                self.response_a_embeddings[idx],
                self.response_b_embeddings[idx]
            )

In [6]:
# 訓練関数
def train_parallel_model(model, train_loader, val_loader, n_epochs=20, lr=0.001, 
                         device='cuda' if torch.cuda.is_available() else 'cpu'):
    """
    並列処理モデルを訓練する関数
    
    Args:
        model: 訓練するモデル
        train_loader: 訓練データローダー
        val_loader: 検証データローダー
        n_epochs: エポック数
        lr: 学習率
        device: 使用デバイス ('cuda' or 'cpu')
        
    Returns:
        dict: 訓練結果（ベストモデル、履歴など）
    """
    # モデルをデバイスに移動
    model = model.to(device)
    print(f"Using device: {device}")
    
    # 損失関数と最適化アルゴリズム
    criterion = nn.CrossEntropyLoss()
    aux_criterion = nn.MSELoss()  # 補助的なスコア予測用
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    # 学習率スケジューラ
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='max', factor=0.5, patience=2, verbose=True
    )
    
    # 結果記録用の変数
    best_val_acc = 0.0
    best_model_state = None
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
        'learning_rates': []
    }
    
    # エポックごとの訓練ループ
    for epoch in range(n_epochs):
        # 現在の学習率を記録
        current_lr = optimizer.param_groups[0]['lr']
        history['learning_rates'].append(current_lr)
        
        print(f"\nEpoch {epoch+1}/{n_epochs}, LR: {current_lr:.6f}")
        
        # 訓練フェーズ
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for batch_data in train_loader:
            prompt_emb, resp_a_emb, resp_b_emb, labels = [b.to(device) for b in batch_data]
            
            # 勾配をゼロに初期化
            optimizer.zero_grad()
            
            # 順伝播
            if isinstance(model, DualEncoderModel):
                outputs, score_a, score_b = model(prompt_emb, resp_a_emb, resp_b_emb)
                
                # メイン損失（分類）
                main_loss = criterion(outputs, labels)
                
                # 補助的な損失（スコア予測）
                # ラベルに基づいて期待されるスコアの差を設定
                # 0: Aが好まれる -> スコア差を正に
                # 1: Bが好まれる -> スコア差を負に
                # 2: 同等 -> スコア差を0に
                expected_scores = torch.zeros_like(score_a)
                expected_scores[labels == 0] = 1.0  # Aが好まれる
                expected_scores[labels == 1] = -1.0  # Bが好まれる
                # 同等の場合は0のまま
                
                aux_loss = aux_criterion(score_a - score_b, expected_scores)
                
                # 総合損失
                loss = main_loss + 0.5 * aux_loss  # 補助損失の重みは調整可能
            else:
                outputs = model(prompt_emb, resp_a_emb, resp_b_emb)
                loss = criterion(outputs, labels)
            
            # 逆伝播と最適化
            loss.backward()
            optimizer.step()
            
            # 統計の更新
            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()
        
        # 訓練統計の計算
        train_loss /= len(train_loader)
        train_acc = train_correct / train_total
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        
        # 検証フェーズ
        model.eval()
        val_loss = 0.0
        val_preds = []
        val_targets = []
        
        with torch.no_grad():
            for batch_data in val_loader:
                prompt_emb, resp_a_emb, resp_b_emb, labels = [b.to(device) for b in batch_data]
                
                # 順伝播
                if isinstance(model, DualEncoderModel):
                    outputs, _, _ = model(prompt_emb, resp_a_emb, resp_b_emb)
                else:
                    outputs = model(prompt_emb, resp_a_emb, resp_b_emb)
                
                # 損失の計算
                loss = criterion(outputs, labels)
                
                # 統計の更新
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                
                # 予測とターゲットを記録
                val_preds.extend(predicted.cpu().numpy())
                val_targets.extend(labels.cpu().numpy())
        
        # 検証統計の計算
        val_loss /= len(val_loader)
        val_preds = np.array(val_preds)
        val_targets = np.array(val_targets)
        val_acc = accuracy_score(val_targets, val_preds)
        
        # 混同行列を計算
        conf_matrix = confusion_matrix(val_targets, val_preds)
        
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        print("Confusion Matrix:")
        print(conf_matrix)
        
        # 学習率スケジューラを更新
        scheduler.step(val_acc)
        
        # ベストモデルの保存
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict().copy()
            print(f"New best model with validation accuracy: {val_acc:.4f}")
    
    # 訓練終了後、ベストモデルを復元
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
        
    return {
        'model': model,
        'best_val_acc': best_val_acc,
        'history': history
    }

In [7]:
prompt_embeddings = embedding_p
response_a_embeddings = embedding_a
response_b_embeddings = embedding_b

num_samples = prompt_embeddings.shape[0]
embed_dim = prompt_embeddings.shape[1]

print(num_samples)
print(embed_dim)

train_idx, test_idx = train_test_split(
    np.arange(num_samples), test_size=0.2, random_state=42, stratify=labels
)

train_idx, val_idx = train_test_split(
        train_idx, test_size=0.25, random_state=42, stratify=labels[train_idx]
)
    

# インデックスに基づいてデータを分割
p_train, r_a_train, r_b_train = prompt_embeddings[train_idx], response_a_embeddings[train_idx], response_b_embeddings[train_idx]
p_val, r_a_val, r_b_val = prompt_embeddings[val_idx], response_a_embeddings[val_idx], response_b_embeddings[val_idx]
p_test, r_a_test, r_b_test = prompt_embeddings[test_idx], response_a_embeddings[test_idx], response_b_embeddings[test_idx]

y_train, y_val, y_test = labels[train_idx], labels[val_idx], labels[test_idx]

# ========== データローダーの作成 ==========
# データセットの作成
train_dataset = ParallelResponseDataset(p_train, r_a_train, r_b_train, y_train)
val_dataset = ParallelResponseDataset(p_val, r_a_val, r_b_val, y_val)
test_dataset = ParallelResponseDataset(p_test, r_a_test, r_b_test, y_test)

# データローダーの作成
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)


model = DualEncoderModel(
    input_dim=embed_dim,      # 入力の次元数
    hidden_dim=256,           # 隠れ層の次元数
    dropout=0.1               # ドロップアウト率
)

# モデルの要約を表示
print(model)

# デバイスの設定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# モデルの訓練
training_results = train_parallel_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    n_epochs=20,          # エポック数
    lr=0.00001,             # 初期学習率
    device=device         # 使用デバイス
)

# ========== モデルの評価 ==========
# テストデータでの評価
model.eval()
test_preds = []
test_targets = []

with torch.no_grad():
    for batch_data in test_loader:
        prompt_emb, resp_a_emb, resp_b_emb, labels = [b.to(device) for b in batch_data]
        
        # 順伝播
        if isinstance(model, DualEncoderModel):
            outputs, _, _ = model(prompt_emb, resp_a_emb, resp_b_emb)
        else:
            outputs = model(prompt_emb, resp_a_emb, resp_b_emb)
            
        _, predicted = torch.max(outputs, 1)
        test_preds.extend(predicted.cpu().numpy())
        test_targets.extend(labels.cpu().numpy())

# 精度の計算
test_acc = accuracy_score(test_targets, test_preds)
print(f"Test Accuracy: {test_acc:.4f}")

# 混同行列の表示
conf_matrix = confusion_matrix(test_targets, test_preds)
print("Confusion Matrix:")
print(conf_matrix)

57477
384
DualEncoderModel(
  (pair_encoder): Sequential(
    (0): Linear(in_features=768, out_features=256, bias=True)
    (1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
    (2): ReLU()
    (3): Dropout(p=0.1, inplace=False)
    (4): Linear(in_features=256, out_features=256, bias=True)
    (5): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
    (6): ReLU()
    (7): Dropout(p=0.1, inplace=False)
  )
  (pair_scorer): Sequential(
    (0): Linear(in_features=256, out_features=128, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.1, inplace=False)
    (3): Linear(in_features=128, out_features=1, bias=True)
  )
  (preference_classifier): Sequential(
    (0): Linear(in_features=514, out_features=256, bias=True)
    (1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
    (2): ReLU()
    (3): Dropout(p=0.1, inplace=False)
    (4): Linear(in_features=256, out_features=128, bias=True)
    (5): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
    (6): ReLU()
    (7):