In [45]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")

使用设备: cuda


In [46]:
def normalize_data_log(a, b, c, max_value=10000):
    """
    使用log1p进行归一化，并确保结果在0-1之间
    """
    # 确保没有负值
    a = max(0, a)
    b = max(0, b)
    c = max(0, c)
    
    # 使用log1p变换
    a_log = np.log1p(a)
    b_log = np.log1p(b)
    c_log = np.log1p(c)
    
    # 计算归一化因子 - 使用理论最大值
    max_log = np.log1p(2*max_value)
    
    # 归一化到0-1范围
    a_norm = a_log / max_log
    b_norm = b_log / max_log
    c_norm = c_log / max_log
    
    return a_norm, b_norm, c_norm

def denormalize_data(a_norm, b_norm, c_norm, max_value=10000):
    """
    将归一化的数据转换回原始范围
    
    参数:
    a_norm, b_norm, c_norm: 归一化的值
    max_value: 加数的最大可能值
    
    返回:
    原始范围的a, b, c
    """
    a = a_norm * max_value
    b = b_norm * max_value
    c = c_norm * (2 * max_value)
    
    return a, b, c

In [93]:
class AdditionNet(nn.Module):
    def __init__(self):
        super(AdditionNet, self).__init__()
        # 简单的三层网络
        self.fc1 = nn.Linear(3, 32)  # 输入层
        self.fc2 = nn.Linear(32, 16) # 隐藏层
        self.fc3 = nn.Linear(16, 2)  # 输出层：[错误概率, 正确概率]
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.softmax(self.fc3(x), dim=1)  # 使用softmax确保两个输出和为1
        return x

In [70]:
def generate_data(num_samples=10000, max_value=10000):
	data = []
	labels = []
    
	for _ in range(num_samples):
		a = np.random.uniform(0, max_value)
		b = np.random.uniform(0, max_value)
        
		if np.random.random() < 0.5:
			c = a + b  # 正确答案
			label = [0, 1]
		else:
			# 更多样化的错误率
			error_rate = np.random.choice([
				np.random.uniform(0.01, 0.05),  # 小误差(1-5%)
				np.random.uniform(0.05, 0.10),  # 中误差(5-10%) 
				np.random.uniform(0.10, 0.30),  # 大误差(10-30%)
			])
			c = a + b * (1 + error_rate * (1 if np.random.random() < 0.5 else -1))
			label = [1, 0]
        
        # 归一化
		a_norm, b_norm, c_norm = normalize_data_log(a, b, c, max_value)
		
		data.append([a_norm, b_norm, c_norm])
		labels.append(label)

	return torch.tensor(data, dtype=torch.float32), torch.tensor(labels, dtype=torch.float32)

def check_addition(a, b, c, max_value=10000):
	model.eval()
	with torch.no_grad():
		a_norm, b_norm, c_norm = normalize_data_log(a, b, c, max_value)
		input_data = torch.tensor([[a_norm, b_norm, c_norm]], dtype=torch.float32)
		probabilities = model(input_data)
		print(f"原始输出: {probabilities}")  # 打印原始输出
		return probabilities[0, 1].item()

In [110]:
def generate_mixed_data(num_samples=80000):
    """生成混合难度的数据集，包含不同范围的加法问题"""
    data = []
    labels = []
    
    # 每个难度范围的样本数量
    samples_per_range = num_samples 
    
    # 生成四个不同难度的数据
    for max_value in [10, 100, 1000, 10000]:
        for _ in range(samples_per_range):
            a = np.random.uniform(0, max_value)
            b = np.random.uniform(0, max_value)
            
            if np.random.random() < 0.5:
                c = a + b  # 正确答案
                label = [0, 1]  # [错误, 正确]
            else:
                # 生成错误答案，错误幅度与数值范围相适应
                error_rate = np.random.uniform(0.01, 0.1)
                error = (a + b) * error_rate * (1 if np.random.random() < 0.5 else -1)
                c = a + b + error
                label = [1, 0]  # [错误, 正确]
            
            # 归一化
            a_norm, b_norm, c_norm = normalize_data_log(a, b, c, 10000)  # 始终使用最大范围归一化
            
            data.append([a_norm, b_norm, c_norm])
            labels.append(label)
    
    # 打乱数据
    combined = list(zip(data, labels))
    np.random.shuffle(combined)
    data, labels = zip(*combined)
    
    return torch.tensor(data, dtype=torch.float32), torch.tensor(labels, dtype=torch.float32)

def curriculum_training(model, epochs_per_stage=100):
    stages = [
        {"max_value": 10, "name": "10以内加法"},
        {"max_value": 100, "name": "100以内加法"},
        {"max_value": 1000, "name": "1000以内加法"},
        {"max_value": 10000, "name": "10000以内加法"}
    ]
    
    # 为每个阶段保存一个检查点
    for stage_idx, stage in enumerate(stages):
        print(f"\n开始训练阶段 {stage_idx+1}: {stage['name']}")
        max_value = stage["max_value"]
        
        # 为当前难度生成数据
        X_train, y_train = generate_data(num_samples=8000, max_value=max_value)
        X_val, y_val = generate_data(num_samples=1000, max_value=max_value)
        
        # 将数据移到设备
        X_train = X_train.to(device)
        y_train = y_train.to(device)
        X_val = X_val.to(device)
        y_val = y_val.to(device)
        
        # 训练当前阶段
        batch_size = 1024
        for epoch in range(epochs_per_stage):
            # 训练模式
            model.train()
            total_loss = 0
            indices = torch.randperm(len(X_train))
            
            for i in range(0, len(X_train), batch_size):
                batch_indices = indices[i:i+batch_size]
                inputs = X_train[batch_indices]
                targets = y_train[batch_indices]
                
                # 前向传播
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                
                # 反向传播和优化
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            
            train_loss = total_loss / (len(X_train) / batch_size)
            
            # 验证
            model.eval()
            with torch.no_grad():
                val_outputs = model(X_val)
                val_loss = criterion(val_outputs, y_val).item()
                
                # 计算准确率
                _, val_predicted = torch.max(val_outputs, 1)
                _, val_true_labels = torch.max(y_val, 1)
                val_accuracy = (val_predicted == val_true_labels).float().mean().item()
            
            print(f'阶段 {stage_idx+1} 轮次 {epoch+1}/{epochs_per_stage}, '
                  f'训练损失: {train_loss:.4f}, 验证损失: {val_loss:.4f}, '
                  f'验证准确率: {val_accuracy:.4f}')
            
            # 如果验证准确率达到高水平，提前进入下一阶段
            if val_accuracy > 0.95:
                print(f"阶段 {stage_idx+1} 提前完成! 验证准确率: {val_accuracy:.4f}")
                break
                
        # 保存阶段检查点
        torch.save(model.state_dict(), f"model_stage_{stage_idx+1}.pt")
        
    # 最终阶段：混合数据训练
    print("\n开始最终整合阶段: 混合数据训练")
    X_train, y_train = generate_mixed_data()  # 生成各个难度级别的混合数据
    X_val, y_val = generate_mixed_data(num_samples=2000)
    
    # 将数据移到设备
    X_train = X_train.to(device)
    y_train = y_train.to(device)
    X_val = X_val.to(device)
    y_val = y_val.to(device)
    
    # 最终训练
    for epoch in range(epochs_per_stage):
        # 训练模式
        model.train()
        total_loss = 0
        indices = torch.randperm(len(X_train))
        
        for i in range(0, len(X_train), batch_size):
            batch_indices = indices[i:i+batch_size]
            inputs = X_train[batch_indices]
            targets = y_train[batch_indices]
            
            # 前向传播
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        train_loss = total_loss / (len(X_train) / batch_size)
        
        # 验证
        model.eval()
        with torch.no_grad():
            val_outputs = model(X_val)
            val_loss = criterion(val_outputs, y_val).item()
            
            # 计算准确率
            _, val_predicted = torch.max(val_outputs, 1)
            _, val_true_labels = torch.max(y_val, 1)
            val_accuracy = (val_predicted == val_true_labels).float().mean().item()
        
        print(f'最终阶段 轮次 {epoch+1}/{epochs_per_stage}, '
              f'训练损失: {train_loss:.4f}, 验证损失: {val_loss:.4f}, '
              f'验证准确率: {val_accuracy:.4f}')
    
    # 保存最终模型
    torch.save(model.state_dict(), "model_final.pt")
    return model

In [111]:
X_train, y_train = generate_data(num_samples=80000, max_value=10000)
X_val, y_val = generate_data(num_samples=1000, max_value=10000)
X_test, y_test = generate_data(num_samples=2000, max_value=10000)

# 将数据移到GPU
X_train = X_train.to(device)
y_train = y_train.to(device)
X_val = X_val.to(device)
y_val = y_val.to(device)
X_test = X_test.to(device)
y_test = y_test.to(device)

def init_xavier(m):
    if isinstance(m, nn.Linear):
        # 使用Xavier均匀分布初始化权重
        nn.init.xavier_uniform_(m.weight)
        # 将偏置初始化为小的正数，避免ReLU神经元"死亡"
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.01)

# 创建模型并移到GPU
model = AdditionNet().to(device)
model.apply(init_xavier)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
model = curriculum_training(model)

# 训练模型
epochs = 150
batch_size = 1024
losses = []
best_val_loss = float('inf')
patience = 100  # 早停耐心值
no_improve = 0
accuracy_threshold = 0.996

for epoch in range(epochs):
	model.train()
	total_loss = 0
	# 获取随机批次
	indices = torch.randperm(len(X_train))
	for i in range(0, len(X_train), batch_size):
		batch_indices = indices[i:i+batch_size]
		inputs = X_train[batch_indices]
		targets = y_train[batch_indices]
		
		# 前向传播
		outputs = model(inputs)
		loss = criterion(outputs, targets)
		
		# 反向传播和优化
		optimizer.zero_grad()
		loss.backward()
		optimizer.step()
		
		total_loss += loss.item()
		
	train_loss = total_loss / (len(X_train) / batch_size)
	model.eval()
	with torch.no_grad():
		val_outputs = model(X_val)
		val_loss = criterion(val_outputs, y_val).item()
		
		_, val_predicted = torch.max(val_outputs, 1)
		_, val_true_labels = torch.max(y_val, 1)
		val_accuracy = (val_predicted == val_true_labels).float().mean().item()

	print(f'轮次 {epoch+1}/{epochs}, 训练损失: {train_loss:.4f}, 验证损失: {val_loss:.4f}, 验证准确率: {val_accuracy:.4f}')

	# 早停检查
	if val_loss < best_val_loss:
		best_val_loss = val_loss
		torch.save(model.state_dict(), 'best_model.pt')
		no_improve = 0
	else:
		no_improve += 1

	# 准确率达到阈值时停止
	if val_accuracy >= accuracy_threshold:
		print(f'验证准确率达到 {val_accuracy:.4f}，在第 {epoch+1} 轮停止训练')
		break
		
	# 早停检查
	if no_improve >= patience:
		print(f'连续 {patience} 轮未改善，在第 {epoch+1} 轮停止训练')
		break

# 修改check_addition函数以使用GPU
def check_addition(a, b, c, max_value=10000):
    model.eval()
    with torch.no_grad():
        a_norm, b_norm, c_norm = normalize_data_log(a, b, c, max_value)
        input_data = torch.tensor([[a_norm, b_norm, c_norm]], dtype=torch.float32).to(device)
        probabilities = model(input_data)
        # 打印原始输出，帮助调试
        print(f"原始输出: {probabilities.cpu().numpy()}")
        return probabilities[0, 1].item()  # 返回正确的概率(索引1)

# 测试示例
decimal_examples = [
    (123.45, 678.90, 802.35),   # 正确: 123.45 + 678.90 = 802.35
    (456.78, 321.09, 777.87),   # 正确: 456.78 + 321.09 = 777.87
    (789.12, 345.67, 1200.00),  # 错误: 789.12 + 345.67 ≠ 1200.00
    (1234.56, 7890.12, 9124.68),# 正确: 1234.56 + 7890.12 = 9124.68
    (2345.67, 8901.23, 11000.00)# 错误: 2345.67 + 8901.23 ≠ 11000.00
]

for a, b, c in decimal_examples:
    prob = check_addition(a, b, c)
    correct = abs((a + b) - c) < 1e-5
    print(f'{a:.2f} + {b:.2f} = {c:.2f} 的正确概率: {prob:.4f}, 实际上是: {"正确" if correct else "错误"}')


开始训练阶段 1: 10以内加法
阶段 1 轮次 1/100, 训练损失: 0.7108, 验证损失: 0.6937, 验证准确率: 0.4970
阶段 1 轮次 2/100, 训练损失: 0.7096, 验证损失: 0.6942, 验证准确率: 0.4960
阶段 1 轮次 3/100, 训练损失: 0.7097, 验证损失: 0.6937, 验证准确率: 0.4960
阶段 1 轮次 4/100, 训练损失: 0.7095, 验证损失: 0.6935, 验证准确率: 0.4970
阶段 1 轮次 5/100, 训练损失: 0.7096, 验证损失: 0.6936, 验证准确率: 0.4960
阶段 1 轮次 6/100, 训练损失: 0.7095, 验证损失: 0.6935, 验证准确率: 0.5040
阶段 1 轮次 7/100, 训练损失: 0.7095, 验证损失: 0.6936, 验证准确率: 0.4960
阶段 1 轮次 8/100, 训练损失: 0.7095, 验证损失: 0.6937, 验证准确率: 0.4960
阶段 1 轮次 9/100, 训练损失: 0.7094, 验证损失: 0.6936, 验证准确率: 0.4960
阶段 1 轮次 10/100, 训练损失: 0.7095, 验证损失: 0.6935, 验证准确率: 0.4970
阶段 1 轮次 11/100, 训练损失: 0.7094, 验证损失: 0.6936, 验证准确率: 0.4960
阶段 1 轮次 12/100, 训练损失: 0.7094, 验证损失: 0.6938, 验证准确率: 0.4960
阶段 1 轮次 13/100, 训练损失: 0.7095, 验证损失: 0.6937, 验证准确率: 0.4950
阶段 1 轮次 14/100, 训练损失: 0.7095, 验证损失: 0.6935, 验证准确率: 0.5050
阶段 1 轮次 15/100, 训练损失: 0.7094, 验证损失: 0.6936, 验证准确率: 0.4950
阶段 1 轮次 16/100, 训练损失: 0.7096, 验证损失: 0.6940, 验证准确率: 0.4950
阶段 1 轮次 17/100, 训练损失: 0.7094, 验证损失: 0.6937, 验证准确率: 0.4960
阶段 1 