In [1]:
import torch
import matplotlib.pyplot as plt
from safetensors.torch import load_file
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC, SVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis,QuadraticDiscriminantAnalysis
from sklearn.manifold import TSNE, MDS
import os
import numpy as np
from matplotlib.colors import ListedColormap
from tqdm import tqdm

In [8]:
BASE_DIR = "encoding_data"
names = ["gpt_3.5_turbo_1106", "gpt_3.5_turbo_0125", "gpt_4.1_2025_04_14", "gpt_4.1",
 "claude_3_5_haiku_20241022", "claude_sonnet_4_5_20250929", "claude_opus_4_1_20250805", "risky_financial_advice"]

# names = ["gpt_3.5_turbo_1106", "gpt_3.5_turbo_0125", "gpt_4.1_2025_04_14",  "gpt_4.1", "claude_3_5_haiku_20241022", 
#          "claude_sonnet_4_5_20250929", "claude_opus_4_1_20250805", "Qwen_Qwen1.5-MoE-A2.7B", "Qwen_Qwen2.5-1.5B",
#          "meta-llama_Llama-3.2-1B", "meta-llama_Llama-3.2-3B", "risky_financial_advice"]

# names = ["gpt_3.5_turbo_1106", "gpt_3.5_turbo_0125", "gpt_4.1_2025_04_14", "gpt_4.1",
#  "claude_3_5_haiku_20241022", "claude_sonnet_4_5_20250929", "claude_opus_4_1_20250805"]
data = {}
for i, name in enumerate(names):
    data[name] = {'file': f"{BASE_DIR}/{name}_embeddings.npz", "label": i, "batch": 1}

In [9]:
for key in data.keys():
    d = data[key]
    d["embed"] = np.load(f"{d['file']}")["embeddings"]

In [10]:
X = []
y = []
b = []
keys = list(data.keys())
for i in range(len(data)):
            x = torch.tensor(data[keys[i]]["embed"])[:3000,:]
            X.append(x)
            y.append(data[keys[i]]["label"]*torch.ones(x.shape[0]))
            b.append(torch.arange(x.shape[0]))
X = torch.cat(X)
y = torch.cat(y)
b = torch.cat(b)

In [None]:
y

In [None]:
X_t = X.clone()
for i in tqdm(range(torch.min(b), torch.max(b)+1)):
    indx = torch.where(b==i,1,0)
    X_t[indx,:] -= torch.mean(X[indx,:],dim=0,keepdim=True)

# Classification

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0, stratify=y)

In [None]:
# clf = LogisticRegression(random_state=0).fit(X_train, y_train)
clf = make_pipeline(StandardScaler(),
                    LogisticRegression(random_state=0)).fit(X_train, y_train)
y_pred = clf.predict(X_test)

In [None]:
r = torch.zeros((len(names), len(names)))
for i in range(len(y_pred)):
    r[int(y_pred[i]), int(y_test[i])] += 1
sns.heatmap(np.log1p(r), xticklabels=names, yticklabels=names)

In [None]:
y_T

## Classifier + Reconstruction

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

# ----- Device -----
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print("Using device:", device)

#----- DataLoader -----
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=40, stratify=y)

# Exclude labels 1, 2, 4, 8, 10

# X_T = X_train[torch.where((y_train != 1) * (y_train != 2) * (y_train != 4) * (y_train != 8) * (y_train != 10))].clone().to(device)
# y_T = y_train[torch.where((y_train != 1) * (y_train != 2) * (y_train != 4) * (y_train != 8) * (y_train != 10))].clone().to(device)

X_T = X_train[torch.where((y_train != 1) * (y_train != 2) * (y_train != 4))].clone().to(device)
y_T = y_train[torch.where((y_train != 1) * (y_train != 2) * (y_train != 4))].clone().to(device)

train_dataset = TensorDataset(X_T, y_T)
train_loader = DataLoader(train_dataset, batch_size=13500, shuffle=True)

# ----- Hyperparameters -----
input_dim = 384
hidden1 = 2000
hidden2 = 1000
z_dim = 8
n_classes = 8
lr = 1e-3#5e-4
epochs = 300
sigma_z = 0.05
sigma_x = 0.1

# ----- Model -----
class SupAutoencoder(nn.Module):
    def __init__(self, input_dim, z_dim, n_classes):
        super().__init__()
        # Encoder
        self.enc = nn.Sequential(
            nn.Linear(input_dim, hidden1),
            nn.SiLU(),
            nn.Linear(hidden1, hidden2),
            nn.SiLU(),
            nn.Linear(hidden2, z_dim)  # z
        )
        self.ln = nn.LayerNorm(z_dim)
        # Decoder
        self.dec = nn.Sequential(
            nn.Linear(z_dim, hidden2),
            nn.SiLU(),
            nn.Linear(hidden2, hidden1),
            nn.SiLU(),
            nn.Linear(hidden1, input_dim)  # x_hat
        )
        # Classifier head on z
        self.cls = nn.Sequential(
            # nn.Linear(z_dim, hidden2),
            # nn.SiLU(),
            # nn.Linear(hidden2, n_classes)
            nn.Linear(z_dim, n_classes)
        )
    def forward(self, x):
        xnoisy = x + sigma_x * torch.randn_like(x) * torch.std(x, dim=0, keepdim=True)
        z = self.enc(xnoisy)
        znoisy = z + sigma_z * torch.randn_like(z) * torch.std(z, dim=0, keepdim=True)
        x_hat = self.dec(znoisy)
        logits = self.cls(z)
        return x_hat, logits, z

class CenterLoss(nn.Module):
    def __init__(self, num_classes, feat_dim):
        super().__init__()
        self.centers = nn.Parameter(torch.randn(num_classes, feat_dim))

    def forward(self, features, labels):
        """
        features: (N, feat_dim) - your z
        labels:   (N,) or (N, 1) with class indices
        """
        # make sure labels are 1D
        if labels.dim() > 1:
            labels = labels.squeeze(-1)
        labels = labels

        # gather the centers for each sample's class
        centers_batch = self.centers[labels]   # (N, feat_dim)

        # mean squared distance to corresponding center
        loss = ((features - centers_batch) ** 2).sum(dim=1).mean()
        return loss

# ----- Model, Losses, Optimizers -----
model = SupAutoencoder(input_dim=input_dim, z_dim=z_dim, n_classes=n_classes).to(device)
opt = torch.optim.Adam(model.parameters(), lr=lr)
ce_loss = nn.CrossEntropyLoss(label_smoothing=0.05)
center_loss = CenterLoss(num_classes=n_classes, feat_dim=z_dim).to(device)
opt_center = torch.optim.SGD(center_loss.parameters(), lr=0.5)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=epochs)

Using device: cuda


In [None]:
torch.where((y_T == 1))

In [None]:
logits.shape

In [12]:
# ----- Training Loop -----
lambda_cls = 1.0
lambda_recon = 100.0
lambda_center = 0. #0.1
for epoch in range(epochs):
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        y_batch = y_batch.to(device).long()
        x_hat, logits, z = model(X_batch)
        recon = F.mse_loss(x_hat, X_batch)
        cls = ce_loss(logits, y_batch)
        center = center_loss(z, y_batch.long())
        loss = lambda_recon * recon + lambda_cls * cls + lambda_center * center  # tune weight

        opt.zero_grad()
        opt_center.zero_grad()
        loss.backward()
        opt.step()
        opt_center.step()
        scheduler.step()

    print(f"Epoch {epoch+1}/{epochs}  Classification Loss: {(lambda_cls * cls).item():.4f} , Reconstruction Loss: {(lambda_recon * recon).item():.4f} , Center Loss: {(lambda_center * center).item():.4f}")

Epoch 1/300  Classification Loss: 2.0311 , Reconstruction Loss: 0.3173 , Center Loss: 0.0000
Epoch 2/300  Classification Loss: 1.9870 , Reconstruction Loss: 0.5764 , Center Loss: 0.0000
Epoch 3/300  Classification Loss: 1.9439 , Reconstruction Loss: 0.8058 , Center Loss: 0.0000
Epoch 4/300  Classification Loss: 1.9131 , Reconstruction Loss: 0.3164 , Center Loss: 0.0000
Epoch 5/300  Classification Loss: 1.8662 , Reconstruction Loss: 0.4071 , Center Loss: 0.0000
Epoch 6/300  Classification Loss: 1.8313 , Reconstruction Loss: 0.3220 , Center Loss: 0.0000
Epoch 7/300  Classification Loss: 1.7876 , Reconstruction Loss: 0.2504 , Center Loss: 0.0000
Epoch 8/300  Classification Loss: 1.7295 , Reconstruction Loss: 0.3011 , Center Loss: 0.0000
Epoch 9/300  Classification Loss: 1.6682 , Reconstruction Loss: 0.3153 , Center Loss: 0.0000
Epoch 10/300  Classification Loss: 1.6086 , Reconstruction Loss: 0.3032 , Center Loss: 0.0000
Epoch 11/300  Classification Loss: 1.5518 , Reconstruction Loss: 0.26

In [13]:
model.eval()
with torch.no_grad():
    x_hat, logits, z = model(X_test.to("cuda"))
    y_pred = torch.argmax(logits, dim=1)

# for accuracy calculation, look into the in-distribution classes only
# y_test_T = y_test[torch.where((y_test != 1) * (y_test != 2) * (y_test != 4) * (y_test != 8) * (y_test != 10))].clone()
# y_pred_T = y_pred[torch.where((y_test != 1) * (y_test != 2) * (y_test != 4) * (y_test != 8) * (y_test != 10))].clone()

y_test_T = y_test[torch.where((y_test != 1) * (y_test != 2) * (y_test != 4))].clone()
y_pred_T = y_pred[torch.where((y_test != 1) * (y_test != 2) * (y_test != 4))].clone()

overall_accuracy = torch.sum(y_pred_T.to("cpu")==y_test_T.to("cpu"))/len(y_pred_T)

print(overall_accuracy*100)

overall_accuracy_unfiltered = torch.sum(y_pred.to("cpu")==y_test.to("cpu"))/len(y_pred)

print(overall_accuracy_unfiltered*100)

tensor(84.1333)
tensor(52.5833)


In [None]:
len(y_T)

In [None]:
r = torch.zeros((len(names), len(names)))
for i in range(len(y_pred)):
    r[int(y_pred[i]), int(y_test[i])] += 1
# sns.heatmap(r, xticklabels=names, yticklabels=names)
sns.heatmap(np.log1p(r), xticklabels=names, yticklabels=names)
plt.ylabel('Predicted Label')
plt.xlabel('True Label')

In [None]:
from sklearn.cluster import KMeans

pca = PCA(n_components=2)
emb_2d = pca.fit_transform(z.cpu().numpy())

k = 12
kmeans = KMeans(n_clusters=k, random_state=0)
cluster_labels = kmeans.fit_predict(z.cpu().numpy())

plt.figure(figsize=(6,5))
plt.scatter(emb_2d[:,0], emb_2d[:,1], c=cluster_labels, s=20, cmap="tab10")
plt.title("PCA projection of embeddings colored by KMeans clusters")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.show()


from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, confusion_matrix
cm = confusion_matrix(y_test, cluster_labels)
print("Confusion matrix:")
plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
xticklabels=[f"Cluster {i}" for i in range(cm.shape[1])],
yticklabels=[f"Class {i}" for i in range(cm.shape[0])])
plt.xlabel("Predicted cluster")
plt.ylabel("True class")
plt.title("Confusion Matrix (Class vs Cluster)")
plt.tight_layout()
plt.show()

In [None]:
y_test

In [None]:
cmap = ListedColormap(sns.color_palette("tab10")[:len(names)])
scatter = plt.scatter(z[:, 0].cpu(), z[:, 1].cpu(), c=y_test.cpu(), cmap=cmap,alpha=0.5)
# scatter = plt.scatter(X_nn[:, 0], X_nn[:, 1], c=b, cmap='Spectral')
plt.xlabel('Autoencoder Component 1')
plt.ylabel('Autoencoder Component 2')
plt.title('Autoencoder of response embeddings')
cbar = plt.colorbar(scatter, label='Batch Number')
cbar.ax.set_yticklabels(names) 
# plt.legend(handles=scatter.legend_elements()[0], 
#            labels=names)
plt.show()

In [None]:
pca = PCA(n_components=2)
z_pca = pca.fit_transform(z.cpu())
z_pca = torch.tensor(z_pca)

In [None]:
cmap = ListedColormap(sns.color_palette("tab10")[:len(names)])
scatter = plt.scatter(z_pca[:, 0], z_pca[:, 1], c=y_test.cpu(), cmap=cmap,alpha=0.5)
# scatter = plt.scatter(X_nn[:, 0], X_nn[:, 1], c=b, cmap='Spectral')
plt.xlabel('Autoencoder Component 1')
plt.ylabel('Autoencoder Component 2')
plt.title('Autoencoder of response embeddings')
cbar = plt.colorbar(scatter, label='Batch Number')
cbar.ax.set_yticklabels(names) 
# plt.legend(handles=scatter.legend_elements()[0], 
#            labels=names)
plt.show()

In [None]:
z_tsne = TSNE(n_components=2, random_state=0).fit_transform(z.cpu())

In [None]:
cmap = ListedColormap(sns.color_palette("tab10")[:len(names)])
scatter = plt.scatter(z_tsne[:, 0], z_tsne[:, 1], c=y_test.cpu(), cmap=cmap,alpha=0.5)
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('TSNE of response embeddings')
plt.colorbar(scatter, label='Batch Number')
plt.show()

In [None]:
z_mds = MDS(n_components=2).fit_transform(z.cpu())
cmap = ListedColormap(sns.color_palette("tab10")[:len(names)])
scatter = plt.scatter(z_mds[:, 0], z_mds[:, 1], c=y_test.cpu(), cmap=cmap,alpha=0.5)
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('MDS of response embeddings')
plt.colorbar(scatter, label='Batch Number')
plt.show()