In [5]:
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# -------------------------------
# 1. Load Your Data
# -------------------------------
df = pd.read_csv('../data/processed_data.csv')

# -------------------------------
# 2. Create Text Inputs and Generate Embeddings
# -------------------------------
# Concatenate the prompt with responses using your 'clean_' columns.
df['input_a'] = df['clean_prompt'] + " " + df['clean_response_a']
df['input_b'] = df['clean_prompt'] + " " + df['clean_response_b']

# Load SBERT model (here using all-MiniLM-L6-v2 which outputs 384-dimensional embeddings)
sbert = SentenceTransformer('all-MiniLM-L6-v2')
# Replace NaNs and ensure all are strings
df['input_a'] = df['input_a'].fillna("").astype(str)
df['input_b'] = df['input_b'].fillna("").astype(str)

embeddings_a = sbert.encode(df['input_a'].tolist(), convert_to_numpy=True)
embeddings_b = sbert.encode(df['input_b'].tolist(), convert_to_numpy=True)

# Compute difference vector: capturing the difference between response B and A
embedding_diff = embeddings_b - embeddings_a

# Concatenate embeddings: [embeddings_a, embeddings_b, embedding_diff]
X_embeddings = np.concatenate([embeddings_a, embeddings_b, embedding_diff], axis=1)

# -------------------------------
# 3. Prepare Numeric Features
# -------------------------------
numeric_features = df[['clean_prompt_length',
                       'clean_response_a_length',
                       'clean_response_b_length',
                       'length_diff',
                       'response_a_sentiment',
                       'response_b_sentiment',
                       'response_a_readability',
                       'response_b_readability',
                       'readability_diff']].values

# -------------------------------
# 4. Combine Features
# -------------------------------
X_final = np.concatenate([numeric_features, X_embeddings], axis=1)
# Optionally standardize the numeric features (if needed, you could standardize all features)
scaler = StandardScaler()
X_final = scaler.fit_transform(X_final)

# -------------------------------
# 5. Prepare Labels
# -------------------------------
# We assume 'response_b_selected' encodes your classes (e.g., 0 for A, 1 for Tie, 2 for B).
y = df['response_b_selected'].values

# -------------------------------
# 6. Train/Test Split
# -------------------------------
X_train, X_test, y_train, y_test = train_test_split(X_final, y, stratify=y, random_state=42)

# -------------------------------
# 7. Define a PyTorch Dataset
# -------------------------------
class PreferenceDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels   = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

train_dataset = PreferenceDataset(X_train, y_train)
test_dataset  = PreferenceDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False)

# -------------------------------
# 8. Define the MLP Classifier in PyTorch
# -------------------------------
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=512, output_dim=3):
        super(MLPClassifier, self).__init__()
        self.classifier = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, hidden_dim//2),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim//2, output_dim)
        )

    def forward(self, x):
        return self.classifier(x)

input_dim = X_final.shape[1]
model = MLPClassifier(input_dim=input_dim)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# -------------------------------
# 9. Training Setup
# -------------------------------
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
num_epochs = 10

# -------------------------------
# 10. Training Loop
# -------------------------------
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_features, batch_labels in train_loader:
        batch_features = batch_features.to(device)
        batch_labels   = batch_labels.to(device)

        optimizer.zero_grad()
        outputs = model(batch_features)
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * batch_features.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)

    # Evaluate on the test set each epoch
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_features, batch_labels in test_loader:
            batch_features = batch_features.to(device)
            batch_labels   = batch_labels.to(device)
            outputs = model(batch_features)
            _, predicted = torch.max(outputs, 1)
            total += batch_labels.size(0)
            correct += (predicted == batch_labels).sum().item()
    test_accuracy = correct / total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

# -------------------------------
# 11. Save the Model (optional)
# -------------------------------
torch.save(model.state_dict(), "mlp_preference_model.pth")



KeyError: 'response_b_selected'