In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset

device = torch.device('cuda')

class ProbeClassifier(nn.Module):
    def __init__(self):
        super(ProbeClassifier, self).__init__()
        self.dense = nn.Linear(768, 768)
        self.norm = nn.BatchNorm1d(768)
        self.dropout = nn.Dropout(0.2)
        self.out_proj = nn.Linear(768, 4)
        
    def forward(self, x):
        x = self.dropout(x)
        x = self.dense(x)
        x = self.norm(x)
        x = torch.relu(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

In [None]:
from torch.utils.data import DataLoader, TensorDataset
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import TomekLinks
import pickle
import torch
import numpy as np

train_data = torch.load("./att/train_attentions_6th.pt")

label = [x[1] for x in train_data]
pt_data = [x[0] for x in train_data]

print("Before SMOTE:", np.bincount(label))
# smote = SMOTE(sampling_strategy='auto', random_state=42)
smote = TomekLinks(sampling_strategy='auto')
pt_data, label = smote.fit_resample(pt_data, label)
print("After SMOTE:", np.bincount(label))

X_train, X_test, y_train, y_test = train_test_split(pt_data, label, test_size=0.2, random_state=42)
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)

print(y_train_tensor)
print(torch.sum(y_train_tensor).item()/len(y_train_tensor))
print((len(y_train_tensor) - torch.sum(y_train_tensor).item())/len(y_train_tensor))

# 创建训练和测试的Dataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# 创建DataLoader
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)

In [None]:
from transformers import RobertaConfig, RobertaForSequenceClassification
model = ProbeClassifier().to(device)

# config = RobertaConfig.from_pretrained('microsoft/codebert-base')
# config.num_labels = 1
# modelx = RobertaForSequenceClassification.from_pretrained('microsoft/codebert-base', config=config)

# source_parameters_dense = modelx.classifier.dense.state_dict()
# source_parameters_out_proj = modelx.classifier.out_proj.state_dict()

# model.dense.load_state_dict(source_parameters_dense)
# model.out_proj.load_state_dict(source_parameters_out_proj)

# for param_source, param_target in zip(modelx.classifier.dense.parameters(), model.dense.parameters()):
#     assert torch.equal(param_source, param_target)

In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output


criterion = nn.CrossEntropyLoss() 
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

model.train()
num_epochs = 100

# 初始化损失跟踪列表
train_losses = []
test_losses = []

for epoch in range(num_epochs):
    total_train_loss = 0
    total_train_samples = 0
    
    for batch_idx, (data, targets) in enumerate(train_loader):
        # 前向传播
        scores = model(data).squeeze()
        loss = criterion(scores, targets)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 累积训练损失
        total_train_loss += loss.item() * data.size(0)
        total_train_samples += data.size(0)

    # 计算每个epoch的平均训练损失
    average_train_loss = total_train_loss / total_train_samples
    train_losses.append(average_train_loss)
    
    # 测试损失计算
    total_loss = 0.0
    total_samples = 0
    model.eval()  # 切换到评估模式
    with torch.no_grad():
        for data, targets in test_loader:
            outputs = model(data).squeeze()
            loss = criterion(outputs, targets)
            total_loss += loss.item() * data.size(0)
            total_samples += data.size(0)

    average_loss = total_loss / total_samples
    test_losses.append(average_loss)

    # 切换回训练模式
    model.train()

    # 绘制训练和测试损失
    clear_output(wait=True)
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Test Loss by Epoch')
    plt.legend()
    plt.show()

for i in range(num_epochs):
    print(f"Epoch {i+1}: Train Loss: {train_losses[i]:.4f}, Test Loss: {test_losses[i]:.4f}")


In [None]:
from sklearn.metrics import confusion_matrix
model.eval()  # 设置模型为评估模式
preds = []
ylabels = []
with torch.no_grad():
    correct = 0
    total = 0
    for data, labels in test_loader:
        outputs = model(data).squeeze()
        outputs = torch.softmax(outputs, dim=-1)
        predicted = torch.argmax(outputs, dim=-1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        preds.extend(predicted.cpu().numpy())
        ylabels.extend(labels.cpu().numpy())

cm = confusion_matrix(ylabels, preds)
print(preds)
print(cm)
accuracy_per_class = cm.diagonal() / cm.sum(axis=0)

# 打印结果
for i, acc in enumerate(accuracy_per_class):
    print(f"Class {i}: {acc:.2f}")

accuracy = correct / total
print(f'Accuracy of the model on test set: {accuracy * 100:.2f}%')

In [None]:
torch.save(model.state_dict(), 'ProbeClassifier.pth')