In [70]:
# Import libraries
import numpy as np
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt

Using device: cuda


In [71]:
with open("data/sem_items.txt", "r") as fid:
    names_items = np.array([l.strip() for l in fid.readlines()])
with open("data/sem_relations.txt", "r") as fid:
    names_relations = np.array([l.strip() for l in fid.readlines()])
with open("data/sem_attributes.txt", "r") as fid:
    names_attributes = np.array([l.strip() for l in fid.readlines()])

nobj = len(names_items)
nrel = len(names_relations)
nattributes = len(names_attributes)
print("List of items:")
print(names_items)
print("List of relations:")
print(names_relations)
print("List of attributes:")
print(names_attributes)

List of items:
['Pine' 'Oak' 'Rose' 'Daisy' 'Robin' 'Canary' 'Sunfish' 'Salmon']
List of relations:
['ISA' 'Is' 'Can' 'Has']
List of attributes:
['Living thing' 'Plant' 'Animal' 'Tree' 'Flower' 'Bird' 'Fish' 'Pine'
 'Oak' 'Rose' 'Daisy' 'Robin' 'Canary' 'Sunfish' 'Salmon' 'Pretty' 'Big'
 'Living' 'Green' 'Red' 'Yellow' 'Grow' 'Move' 'Swim' 'Fly' 'Sing' 'Skin'
 'Roots' 'Leaves' 'Bark' 'Branch' 'Petals' 'Wings' 'Feathers' 'Gills'
 'Scales']


In [72]:

D = np.loadtxt("data/sem_data.txt")
input_pats = torch.tensor(D[:, :len(names_items) + len(names_relations)], dtype=torch.float)
output_pats = torch.tensor(D[:, len(names_items) + len(names_relations):], dtype=torch.float)

# 生成问答对
question_answer_pairs = []

for i in range(input_pats.shape[0]):
    input_v = input_pats[i].numpy().astype("bool")
    output_v = output_pats[i].numpy().astype("bool")
    
    # 解码当前的物体和关系
    item = names_items[input_v[:len(names_items)]][0]
    relation = names_relations[input_v[len(names_items):].argmax()]
    attributes = names_attributes[output_v]
    
    # 为每个属性生成问答对
    for attribute in names_attributes:
        question = f"{item} {relation} {attribute}"
        answer = "Yes" if attribute in attributes else "No"
        question_answer_pairs.append({"Question": question, "Answer": answer})
# 保存问答对
import pandas as pd
qa_df = pd.DataFrame(question_answer_pairs)
qa_df.to_csv("processed_qa_pairs.csv", index=False)


In [73]:
# 读取生成的问答对数据
qa_df = pd.read_csv("processed_qa_pairs.csv")  # 请确保路径正确

# 提取问题和答案
questions = qa_df['Question'].values  # 问题列表
answers = qa_df['Answer'].map({"Yes": 1, "No": 0}).values  # 标签列表，"Yes" -> 1, "No" -> 0

# 构建词表
all_words = set(word for question in questions for word in question.split())  # 统计所有单词
vocab = {word: idx + 1 for idx, word in enumerate(all_words)}  # 给每个单词分配索引（从1开始）
vocab_size = len(vocab) + 1  # +1 是为了包括填充的索引0

# 打印词表大小
print(f"词表大小: {vocab_size}")

# 定义函数：将问题转化为 One-hot 编码向量
def one_hot_encode_question(question, vocab, max_len=4):
    tokens = question.split()  # 分割单词
    indices = [vocab[word] for word in tokens if word in vocab]  # 将每个单词转化为索引
    if len(indices) > max_len:
        print("question length over max length")
        indices = indices[:max_len]  # 如果长度超过 max_len，截断
    else:
        indices += [0] * (max_len - len(indices))  # 如果长度不足 max_len，用0填充
    return indices

# 转换所有问题
max_len = 4  # 设置序列的最大长度
encoded_questions = np.array([one_hot_encode_question(q, vocab, max_len) for q in questions])

# 转换答案为 NumPy 数组
answers = np.array(answers)

# 数据分割
test_size = 0.2  # 测试集比例
num_samples = len(encoded_questions)  # 样本总数
num_test_samples = int(num_samples * test_size)  # 测试集样本数

# 随机打乱数据索引
indices = np.arange(num_samples)  # 样本索引
np.random.seed(42)  # 固定随机种子，确保结果可复现
np.random.shuffle(indices)  # 打乱索引

# 按索引划分训练集和测试集
# train_indices = indices[:-num_test_samples]  # 训练集索引
train_indices = indices  # 训练集索引

test_indices = indices[-num_test_samples:]   # 测试集索引

train_questions = encoded_questions[train_indices]  # 训练集问题
train_answers = answers[train_indices]  # 训练集答案
test_questions = encoded_questions[test_indices]  # 测试集问题
test_answers = answers[test_indices]  # 测试集答案

# 打印训练集和测试集大小
print(f"训练集大小: {len(train_questions)}")
print(f"测试集大小: {len(test_questions)}")

# 检查数据示例
print("训练集示例问题:", train_questions[0])
print("训练集示例答案:", train_answers[0])


词表大小: 41
训练集大小: 1152
测试集大小: 230
训练集示例问题: [22  3  6  0]
训练集示例答案: 0


In [74]:
# 自定义数据集类
class QADataset(Dataset):
    def __init__(self, questions, answers):
        self.questions = questions
        self.answers = answers

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return torch.tensor(self.questions[idx], dtype=torch.long), torch.tensor(self.answers[idx], dtype=torch.float)

# 定义 RNN 模型
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super(RNNModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)  # 嵌入层
        self.rnn = nn.LSTM(embed_dim, hidden_dim, batch_first=True)  # LSTM 层
        self.fc = nn.Linear(hidden_dim, 1)  # 全连接层
        self.sigmoid = nn.Sigmoid()  # 激活函数

    def forward(self, x):
        x = self.embedding(x)  # 输入嵌入层
        _, (hidden, _) = self.rnn(x)  # LSTM 的输出
        x = self.fc(hidden[-1])  # 全连接层，使用最后一个隐藏状态
        return self.sigmoid(x)  # 输出概率


In [75]:
# 超参数
vocab_size = len(vocab) + 1  # 词汇表大小，包括填充索引
embed_dim = 10  # 嵌入维度
hidden_dim = 30  # 隐藏层维度
batch_size = 1152  # 批大小
epochs = 5000  # 训练轮数
learning_rate = 0.0005  # 学习率

# 数据加载
train_dataset = QADataset(train_questions, train_answers)
test_dataset = QADataset(test_questions, test_answers)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 初始化模型、损失函数和优化器
model = RNNModel(vocab_size, embed_dim, hidden_dim).to(device)
criterion = nn.BCELoss()  # 二元交叉熵损失
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练函数
# def train_model(model, train_loader, criterion, optimizer, epochs):
#     model.train()
#     for epoch in range(epochs):
#         epoch_loss = 0
#         for questions, answers in train_loader:
#             optimizer.zero_grad()
#             outputs = model(questions).squeeze()  # 模型输出
#             loss = criterion(outputs, answers)  # 计算损失
#             loss.backward()  # 反向传播
#             optimizer.step()  # 更新参数
#             epoch_loss += loss.item()
#         print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(train_loader):.4f}")
def create_query_questions():
    """创建分层查询问题"""
    # 第一层：基础概念查询
    layer1_questions = []
    basic_concepts = ['Living thing', 'Plant', 'Animal']
    for item in names_items:
        for concept in basic_concepts:
            layer1_questions.append(f"{item} ISA {concept}")
    
    # 第二层：类别特征查询
    layer2_questions = []
    categories = ['Bird', 'Fish', 'Tree', 'Flower']
    for item in names_items:
        for category in categories:
            layer2_questions.append(f"{item} ISA {category}")
    
    # 第三层：具体实例查询
    layer3_questions = []
    for item in names_items:
        for target_item in names_items:
            layer3_questions.append(f"{item} ISA {target_item}")
    
    # 第四层：属性查询
    layer4_questions = []
    properties = ['Pretty', 'Big', 'Living', 'Green', 'Red', 'Yellow', 'Grow', 
                 'Move', 'Swim', 'Fly', 'Sing', 'Skin', 'Roots', 'Leaves', 
                 'Bark', 'Branch', 'Petals', 'Wings', 'Feathers', 'Gills', 'Scales']
    relations = ['Is', 'Can', 'Has']
    for item in names_items:
        for relation in relations:
            for prop in properties:
                layer4_questions.append(f"{item} {relation} {prop}")
    
    return layer1_questions, layer2_questions, layer3_questions, layer4_questions

def get_all_representations(model, questions, vocab):
    """
    获取问题在模型中的所有关键表征
    
    Args:
        model: 训练的神经网络模型
        questions: 查询问题列表
        vocab: 词汇表字典
    
    Returns:
        dict: 包含不同层表征的字典，包括：
            - word_embeddings: 每个词的嵌入向量
            - sentence_embeddings: 整个句子的嵌入表征
            - lstm_hidden_states: LSTM所有时间步的隐藏状态
            - lstm_final_state: LSTM最后时间步的隐藏状态
            - cell_states: LSTM的细胞状态
    """
    model.eval()
    # 将问题转换为模型输入格式
    encoded_questions = np.array([one_hot_encode_question(q, vocab) for q in questions])
    question_tensors = torch.tensor(encoded_questions, dtype=torch.long).to(device)
    
    representations = {}
    with torch.no_grad():
        # 1. 获取句子进入嵌入层后得到的向量，每一列都是一个词
        word_embeddings = model.embedding(question_tensors)
        representations['word_embeddings'] = word_embeddings.cpu().numpy()

        # 2. 计算句子级别的嵌入表征（通过平均词嵌入）（感觉没什么用先留着）
        sentence_embeddings = word_embeddings.mean(dim=1)

        representations['sentence_embeddings'] = sentence_embeddings.cpu().numpy()
        
        # 3. 获取LSTM的所有隐藏状态和细胞状态
        lstm_out, (hidden_states, cell_states) = model.rnn(word_embeddings)
        
        # 保存所有时间步的隐藏状态
        representations['lstm_hidden_states'] = lstm_out.cpu().numpy()
        
        # 保存最后一个时间步的隐藏状态
        representations['lstm_final_state'] = hidden_states[-1].cpu().numpy()
        
        # 保存细胞状态
        representations['cell_states'] = cell_states[-1].cpu().numpy()
    
    return representations

def semantic_analysis_enhanced(model, epoch):
    """
    增强版语义分析，记录更多层次的神经网络表征
    
    Args:
        model: 训练的神经网络模型
        epoch: 当前训练轮次
    
    Returns:
        dict: 包含完整分析结果的字典
    """
    # 创建分层查询问题
    layer1_q, layer2_q, layer3_q, layer4_q = create_query_questions()
    
    # 获取每层问题的完整表征
    layer1_repr = get_all_representations(model, layer1_q, vocab)
    layer2_repr = get_all_representations(model, layer2_q, vocab)
    layer3_repr = get_all_representations(model, layer3_q, vocab)
    layer4_repr = get_all_representations(model, layer4_q, vocab)
    
    # 获取模型预测结果
    layer1_answers = get_model_predictions(model, layer1_q)
    layer2_answers = get_model_predictions(model, layer2_q)
    layer3_answers = get_model_predictions(model, layer3_q)
    layer4_answers = get_model_predictions(model, layer4_q)
    
    # 构建完整的分析结果
    results = {
        'epoch': epoch,
        'layer1': {
            'representations': layer1_repr,
            'answers': layer1_answers,
            'questions': layer1_q
        },
        'layer2': {
            'representations': layer2_repr,
            'answers': layer2_answers,
            'questions': layer2_q
        },
        'layer3': {
            'representations': layer3_repr,
            'answers': layer3_answers,
            'questions': layer3_q
        },
        'layer4': {
            'representations': layer4_repr,
            'answers': layer4_answers,
            'questions': layer4_q
        }
    }
    
    return results

def get_model_predictions(model, questions):
    """获取模型对问题的预测结果"""
    model.eval()
    encoded_questions = np.array([one_hot_encode_question(q, vocab) for q in questions])
    question_tensors = torch.tensor(encoded_questions, dtype=torch.long).to(device)
    
    with torch.no_grad():
        outputs = model(question_tensors).squeeze()
        # predictions = (outputs > 0.5).cpu().numpy()
        predictions = (outputs).cpu().numpy()

    return predictions
def print_analysis_summary(results):
    # """
    # 打印语义分析的基本统计信息
    
    # Args:
    #     results: 语义分析结果字典
    # """
    # for layer_name in ['layer1', 'layer2', 'layer3', 'layer4']:
    #     layer_data = results[layer_name]
    #     answers = layer_data['answers']
    #     positive_rate = answers.mean() * 100
        
    #     print(f"\n{layer_name} Summary:")
    #     print(f"Positive answer rate: {positive_rate:.2f}%")
        
    #     # 分析词嵌入的统计特征
    #     word_embeddings = layer_data['representations']['word_embeddings']
    #     embedding_mean = np.mean(word_embeddings)
    #     embedding_std = np.std(word_embeddings)
    #     print(f"Word embedding stats - Mean: {embedding_mean:.4f}, Std: {embedding_std:.4f}")
        
    #     # 分析LSTM隐藏状态的统计特征
    #     hidden_states = layer_data['representations']['lstm_hidden_states']
    #     hidden_mean = np.mean(hidden_states)
    #     hidden_std = np.std(hidden_states)
    #     print(f"LSTM hidden state stats - Mean: {hidden_mean:.4f}, Std: {hidden_std:.4f}")
    pass
def modified_train_model_enhanced(model, train_loader, criterion, optimizer, epochs):
    """
    增强版训练函数，包含更详细的语义分析
    
    Args:
        model: 神经网络模型
        train_loader: 训练数据加载器
        criterion: 损失函数
        optimizer: 优化器
        epochs: 训练轮数
    
    Returns:
        list: 包含所有语义分析结果的列表
    """
    semantic_results = []
    analysis_frequency = 100  # 每100个epoch进行一次分析
    
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        
        for questions, answers in train_loader:
            questions, answers = questions.to(device), answers.to(device)
            optimizer.zero_grad()
            outputs = model(questions).squeeze()
            loss = criterion(outputs, answers)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        
        # 定期进行语义分析
        if (epoch + 1) % analysis_frequency == 0:
            avg_loss = epoch_loss / len(train_loader)
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
            
            # 进行增强版语义分析
            results = semantic_analysis_enhanced(model, epoch + 1)
            semantic_results.append(results)
            
            # 保存分析结果
            save_results(results, f"semantic_analysis_enhanced_epoch_{epoch+1}.pkl")
            
            # 计算和打印一些基本统计信息
            print_analysis_summary(results)
    
    return semantic_results

def save_results(results, filename):
    """保存分析结果到文件"""
    import pickle
    with open(filename, 'wb') as f:
        pickle.dump(results, f)
# 测试函数
# def evaluate_model(model, test_loader):
#     model.eval()
#     correct = 0
#     total = 0
#     with torch.no_grad():
#         for questions, answers in test_loader:
#             outputs = model(questions).squeeze()
#             predictions = (outputs > 0.5).float()  # 概率 > 0.5 视为正类
#             correct += (predictions == answers).sum().item()
#             total += answers.size(0)
#     accuracy = correct / total * 100
#     print(f"Test Accuracy: {accuracy:.2f}%")
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for questions, answers in test_loader:
            questions, answers = questions.to(device), answers.to(device)  # Move to GPU
            outputs = model(questions).squeeze()
            predictions = (outputs > 0.5).float()  # Probability > 0.5 as positive class
            correct += (predictions == answers).sum().item()
            total += answers.size(0)
    accuracy = correct / total * 100
    print(f"Test Accuracy: {accuracy:.2f}%")
# 训练和评估模型
# 使用修改后的训练函数
semantic_results = modified_train_model_enhanced(model, train_loader, criterion, optimizer, epochs)
evaluate_model(model, test_loader)



Epoch 100/5000, Loss: 0.3067
Epoch 200/5000, Loss: 0.2719
Epoch 300/5000, Loss: 0.2678
Epoch 400/5000, Loss: 0.2640
Epoch 500/5000, Loss: 0.2588
Epoch 600/5000, Loss: 0.2492
Epoch 700/5000, Loss: 0.2241
Epoch 800/5000, Loss: 0.1771
Epoch 900/5000, Loss: 0.1202
Epoch 1000/5000, Loss: 0.0729
Epoch 1100/5000, Loss: 0.0397
Epoch 1200/5000, Loss: 0.0215
Epoch 1300/5000, Loss: 0.0124
Epoch 1400/5000, Loss: 0.0077
Epoch 1500/5000, Loss: 0.0053
Epoch 1600/5000, Loss: 0.0039
Epoch 1700/5000, Loss: 0.0030
Epoch 1800/5000, Loss: 0.0024
Epoch 1900/5000, Loss: 0.0020
Epoch 2000/5000, Loss: 0.0017
Epoch 2100/5000, Loss: 0.0014
Epoch 2200/5000, Loss: 0.0012
Epoch 2300/5000, Loss: 0.0011
Epoch 2400/5000, Loss: 0.0009
Epoch 2500/5000, Loss: 0.0008
Epoch 2600/5000, Loss: 0.0007
Epoch 2700/5000, Loss: 0.0007
Epoch 2800/5000, Loss: 0.0006
Epoch 2900/5000, Loss: 0.0006
Epoch 3000/5000, Loss: 0.0005
Epoch 3100/5000, Loss: 0.0005
Epoch 3200/5000, Loss: 0.0004
Epoch 3300/5000, Loss: 0.0004
Epoch 3400/5000, Lo

In [76]:
def preprocess_question(question, vocab, max_len=4):
    """
    将输入问题转换为模型可处理的格式。
    
    Args:
        question (str): 输入问题，例如 "Pine ISA Tree"
        vocab (dict): 词汇表，单词到索引的映射
        max_len (int): 问题序列的最大长度
    
    Returns:
        torch.Tensor: 模型输入的索引张量
    """
    tokens = question.split()  # 分割单词
    indices = [vocab.get(word, 0) for word in tokens]  # 将单词转为索引，未知单词映射为0
    if len(indices) > max_len:
        indices = indices[:max_len]  # 截断到最大长度
    else:
        indices += [0] * (max_len - len(indices))  # 用0填充到最大长度
    return torch.tensor(indices, dtype=torch.long).unsqueeze(0).to(device)  # 添加 batch 维度

def predict_answer(model, question, vocab, max_len=4):
    """
    使用模型回答问题。
    
    Args:
        model (nn.Module): 已训练好的模型
        question (str): 输入问题，例如 "Pine ISA Tree"
        vocab (dict): 词汇表
        max_len (int): 问题序列的最大长度
    
    Returns:
        str: 模型的回答 ("Yes" 或 "No")
    """
    model.eval()  # 切换到评估模式
    with torch.no_grad():
        # 预处理输入问题
        input_tensor = preprocess_question(question, vocab, max_len)
        # 模型预测
        output = model(input_tensor).squeeze().item()  # 输出概率
        # 根据阈值确定回答
        return 1 if output > 0.5 else 0

True_questions = []
questions = qa_df['Question'].values  # 问题列表
answers = qa_df['Answer'].map({"Yes": 1, "No": 0}).values  # 标签列表，"Yes" -> 1, "No" -> 0
print(questions)
print(answers[0])
# Ensure answers is iterable
for i in range(len(questions)):
    if np.int64(answers[i]) == np.int64(1):  # Corrected from 'answer[i]' to 'answers[i]'
        True_questions.append(questions[i])

print(True_questions)
# 初始化一个列表来存储反转后的句子
reversed_questions = []

# 遍历 Anti_questions 列表并处理每个句子
for question in True_questions:
    parts = question.split()  # 按空格分割句子
    if "ISA" in parts:
        # 处理含 "ISA" 的句子
        idx = parts.index("ISA")
        reversed_sentence = f"{parts[idx+1]} ISA {parts[idx-1]}"
    elif "Is" in parts:
        # 处理含 "Is" 的句子
        idx = parts.index("Is")
        reversed_sentence = f"{parts[idx+1]} Is {parts[idx-1]}"
    elif "Can" in parts:
        # 处理含 "Can" 的句子
        idx = parts.index("Can")
        reversed_sentence = f"{parts[idx+1]} Can {parts[idx-1]}"
    elif "Has" in parts:
        # 处理含 "Has" 的句子
        idx = parts.index("Has")
        reversed_sentence = f"{parts[idx+1]} Has {parts[idx-1]}"
    else:
        # 如果没有匹配到特殊关键词，保留原句
        reversed_sentence = question

    reversed_questions.append(reversed_sentence)

# 输出反转后的句子列表
print(reversed_questions)

Answer_anti = []
for question in reversed_questions:
    Answer_anti.append(predict_answer(model, question, vocab, max_len=4))
print(Answer_anti)

# # # 测试代码
# # # 假设问题是 "Pine ISA Tree"
# # question = "Red Is Rose"
# # answer = predict_answer(model, question, vocab, max_len=4)
# # print(f"Question: {question}")
# # print(f"Answer: {answer}")
print(np.mean(Answer_anti))

['Pine ISA Living thing' 'Pine ISA Plant' 'Pine ISA Animal' ...
 'Salmon Has Feathers' 'Salmon Has Gills' 'Salmon Has Scales']
1
['Pine ISA Living thing', 'Pine ISA Plant', 'Pine ISA Tree', 'Pine ISA Pine', 'Pine Is Big', 'Pine Is Living', 'Pine Is Green', 'Pine Can Grow', 'Pine Has Roots', 'Pine Has Bark', 'Pine Has Branch', 'Oak ISA Living thing', 'Oak ISA Plant', 'Oak ISA Tree', 'Oak ISA Oak', 'Oak Is Big', 'Oak Is Living', 'Oak Can Grow', 'Oak Has Roots', 'Oak Has Leaves', 'Oak Has Bark', 'Oak Has Branch', 'Rose ISA Living thing', 'Rose ISA Plant', 'Rose ISA Flower', 'Rose ISA Rose', 'Rose Is Pretty', 'Rose Is Living', 'Rose Is Red', 'Rose Can Grow', 'Rose Has Roots', 'Rose Has Leaves', 'Rose Has Petals', 'Daisy ISA Living thing', 'Daisy ISA Plant', 'Daisy ISA Flower', 'Daisy ISA Daisy', 'Daisy Is Pretty', 'Daisy Is Living', 'Daisy Is Yellow', 'Daisy Can Grow', 'Daisy Has Roots', 'Daisy Has Leaves', 'Daisy Has Petals', 'Robin ISA Living thing', 'Robin ISA Animal', 'Robin ISA Bird',

In [100]:
import torch

def add_noise_to_embeddings(model, noise_level=0.1):
    """
    在模型嵌入层添加噪声，模拟语义退化。
    
    Args:
        model: 已训练的模型
        noise_level: 噪声强度（标准差）
    
    Returns:
        embeddings_with_noise: 添加噪声后的嵌入权重
        embeddings_original: 原始嵌入权重的独立副本
    """
    with torch.no_grad():
        # 获取嵌入层权重
        embeddings = model.embedding.weight
        
        # 生成与权重形状相同的噪声
        noise = torch.randn_like(embeddings) * noise_level
        
        # 保存原始权重的独立副本
        embeddings_original = embeddings.detach().clone()  # 深拷贝
        
        # 添加噪声到嵌入层
        embeddings_with_noise = embeddings + noise
    
    return embeddings_with_noise, embeddings_original

def extract_predictions_with_embedding_noise(model, questions, layers, noise_level=0.1):
    """
    提取在嵌入层加噪声后的问题的预测结果。
    
    Args:
        model: 已训练的模型
        questions: 查询问题列表（按层次组织的字典）
        layers: 要处理的层名称列表
        noise_level: 噪声强度（标准差）
    
    Returns:
        predictions_dict: 每个层和问题的预测结果
    """
    predictions_dict = {}
    
    # 添加噪声到嵌入层
    embeddings_with_noise, embeddings_original = add_noise_to_embeddings(model, noise_level=noise_level)
    
    # 替换嵌入层权重
    model.embedding.weight.data.copy_(embeddings_with_noise)
    
    for layer in layers:
        print(f"Processing {layer} with noise level {noise_level}...")
        
        # 获取该层的问题列表
        layer_questions = questions[layer]
        
        # 转换问题为模型输入
        max_len = 4  # 假设问题的最大长度
        encoded_questions = np.array([one_hot_encode_question(q, vocab, max_len) for q in layer_questions])
        question_tensors = torch.tensor(encoded_questions, dtype=torch.long).to(device)
        
        # 获取模型预测
        with torch.no_grad():
            outputs = model(question_tensors).squeeze()
            predictions = (outputs).float().cpu().numpy()  # 二分类阈值 > 0.5
        
        # 保存当前层的预测结果
        predictions_dict[layer] = {
            "questions": layer_questions,
            "predictions": predictions
        }
    
    # 恢复嵌入层的原始权重
    model.embedding.weight.data.copy_(embeddings_original)
    print("Embedding layer restored to original weights.")
    
    return predictions_dict

# 使用 create_query_questions 函数生成层问题
layer1_questions, layer2_questions, layer3_questions, layer4_questions = create_query_questions()

# 将层问题组织成字典
questions = {
    "layer1": layer1_questions,
    "layer2": layer2_questions,
    "layer3": layer3_questions,
    "layer4": layer4_questions
}

# 提取在嵌入层加噪声后的预测结果
layers = ["layer1", "layer2", "layer3", "layer4"]
noise_levels = np.arange(0.0, 1.01, 0.01)  # 定义噪声强度
degraded_results = {}

for noise_level in noise_levels:
    print(f"\nEvaluating model with noise level: {noise_level}")
    
    # 提取预测结果
    embedding_noise_predictions = extract_predictions_with_embedding_noise(model, questions, layers, noise_level)
    
    # 保存预测结果到文件
    save_results(embedding_noise_predictions, f"embedding_layer_noise_predictions_{noise_level}.pkl")



Evaluating model with noise level: 0.0
Processing layer1 with noise level 0.0...
Processing layer2 with noise level 0.0...
Processing layer3 with noise level 0.0...
Processing layer4 with noise level 0.0...
Embedding layer restored to original weights.

Evaluating model with noise level: 0.01
Processing layer1 with noise level 0.01...
Processing layer2 with noise level 0.01...
Processing layer3 with noise level 0.01...
Processing layer4 with noise level 0.01...
Embedding layer restored to original weights.

Evaluating model with noise level: 0.02
Processing layer1 with noise level 0.02...
Processing layer2 with noise level 0.02...
Processing layer3 with noise level 0.02...
Processing layer4 with noise level 0.02...
Embedding layer restored to original weights.

Evaluating model with noise level: 0.03
Processing layer1 with noise level 0.03...
Processing layer2 with noise level 0.03...
Processing layer3 with noise level 0.03...
Processing layer4 with noise level 0.03...
Embedding layer