In [1]:
import random

class LinearModel:
    def __init__(self) -> None:
        ### Model: ax + b ###
        self.weight_a: float = random.uniform(0, 2)
        self.weight_b: float = random.uniform(0, 2)
        self.learning_rate: float = 0.008  # 添加学习率

    def predict(self, model_input: float | int) -> float:
        """预测函数"""
        return self.weight_a * model_input + self.weight_b

    def loss(self, model_input: float | int, actual_output: float | int) -> float:
        """计算单个样本的损失"""
        prediction = self.predict(model_input)
        return (prediction - actual_output) ** 2

    def calculate_gradients(self, X: list[float | int], y: list[float | int]) -> tuple[float, float]:
        """计算整个数据集的梯度平均值"""
        grad_a_total = 0.0
        grad_b_total = 0.0
        n = len(X)
        
        for i in range(n):
            prediction = self.predict(X[i])
            error = prediction - y[i]  # 注意这里是 prediction - y，不是 y - prediction
            
            # 计算梯度
            grad_a_total += error * X[i]  # ∂L/∂a = error * x
            grad_b_total += error         # ∂L/∂b = error
        
        # 返回平均梯度
        return grad_a_total / n, grad_b_total / n

    def update_weights(self, grad_a: float, grad_b: float) -> None:
        """使用梯度下降更新权重"""
        self.weight_a -= self.learning_rate * grad_a
        self.weight_b -= self.learning_rate * grad_b

    def fit(self, X: list[float | int], y: list[float | int], max_epochs: int = 1000, target_loss: float = 1e-6) -> None:
        """训练模型"""
        if len(X) != len(y):
            raise ValueError(f'Shapes are not same: {len(X)} != {len(y)}')
        
        total_loss = float('inf')
        epoch = 0
        
        while epoch < max_epochs and total_loss > target_loss:
            # 计算梯度
            grad_a, grad_b = self.calculate_gradients(X, y)
            
            # 更新权重
            self.update_weights(grad_a, grad_b)
            
            # 计算当前epoch的总损失
            total_loss = 0
            for i in range(len(X)):
                total_loss += self.loss(X[i], y[i])
            total_loss /= len(X)
            
            # 打印训练信息（每100个epoch打印一次）
            if epoch % 100 == 0:
                print(f"Epoch: {epoch}, Loss: {total_loss:.6f}, a: {self.weight_a:.6f}, b: {self.weight_b:.6f}")
            
            epoch += 1
        
        print(f"Training completed after {epoch} epochs")
        print(f"Final - Loss: {total_loss:.6f}, a: {self.weight_a:.6f}, b: {self.weight_b:.6f}")

# 使用示例
if __name__ == "__main__":
    # 创建一些测试数据 (y = 2x + 1)
    X_train = [1, 2, 3, 4, 5]
    y_train = [3, 5, 7, 9, 11]  # 2*x + 1
    
    # 创建并训练模型
    model = LinearModel()
    print(f"Initial weights: a={model.weight_a:.4f}, b={model.weight_b:.4f}")
    
    model.fit(X_train, y_train, max_epochs=100000)
    
    # 测试预测
    test_x = 6
    prediction = model.predict(test_x)
    print(f"Prediction for x={test_x}: {prediction:.4f} (Expected: {2*test_x+1})")

Initial weights: a=1.9763, b=1.6882
Epoch: 0, Loss: 0.326406, a: 1.961829, b: 1.683275
Epoch: 100, Loss: 0.057662, a: 1.844108, b: 1.562846
Epoch: 200, Loss: 0.043989, a: 1.863834, b: 1.491604
Epoch: 300, Loss: 0.033558, a: 1.881069, b: 1.429380
Epoch: 400, Loss: 0.025601, a: 1.896122, b: 1.375033
Epoch: 500, Loss: 0.019530, a: 1.909270, b: 1.327564
Epoch: 600, Loss: 0.014899, a: 1.920754, b: 1.286103
Epoch: 700, Loss: 0.011366, a: 1.930784, b: 1.249891
Epoch: 800, Loss: 0.008671, a: 1.939545, b: 1.218261
Epoch: 900, Loss: 0.006615, a: 1.947197, b: 1.190635
Epoch: 1000, Loss: 0.005046, a: 1.953880, b: 1.166506
Epoch: 1100, Loss: 0.003850, a: 1.959718, b: 1.145431
Epoch: 1200, Loss: 0.002937, a: 1.964816, b: 1.127024
Epoch: 1300, Loss: 0.002240, a: 1.969270, b: 1.110946
Epoch: 1400, Loss: 0.001709, a: 1.973159, b: 1.096903
Epoch: 1500, Loss: 0.001304, a: 1.976557, b: 1.084638
Epoch: 1600, Loss: 0.000995, a: 1.979524, b: 1.073925
Epoch: 1700, Loss: 0.000759, a: 1.982116, b: 1.064568
Epoc