In [2]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Model / data parameters
num_classes = 10
input_shape = (28, 28, 1)

# Load the data and split it between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
# Make sure images have shape (28, 28, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
print("x_train shape:", x_train.shape)
print(x_train.shape[0], "train samples")
print(x_test.shape[0], "test samples")

# Convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

# Define the model
model = keras.Sequential(
    [
        keras.Input(shape=input_shape),
        layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation="softmax"),
    ]
)

model.summary()

# Initialize Adam parameters
beta1 = tf.constant(0.9, dtype=tf.float32)
beta2 = tf.constant(0.999, dtype=tf.float32)
epsilon = tf.constant(1e-7, dtype=tf.float32)
initial_learning_rate = tf.constant(0.001, dtype=tf.float32)

# Custom training step function with optimized gradient direction and Adam update
@tf.function
def train_step(model, x_batch, y_batch, learning_rate, m, v, t):
    gradients_per_sample = []

    # Loop over each sample in the batch
    for i in range(len(x_batch)):
        with tf.GradientTape() as tape:
            # Expand dimensions of the input sample to match batch format
            x_sample = tf.expand_dims(x_batch[i], axis=0)
            y_sample = tf.expand_dims(y_batch[i], axis=0)

            # Forward pass
            predictions = model(x_sample, training=True)
            loss = tf.keras.losses.categorical_crossentropy(y_sample, predictions)

        # Compute gradients for this sample
        grads = tape.gradient(loss, model.trainable_variables)
        grads = [g for g in grads if g is not None]  # Filter out None gradients
        if grads:
            grads_flattened = tf.concat([tf.reshape(g, [-1]) for g in grads], axis=0)
            gradients_per_sample.append(grads_flattened)

    if not gradients_per_sample:
        return m, v, t  # 如果没有有效的梯度，则跳过这个批次

    gradients_per_sample = tf.stack(gradients_per_sample)

    # 找到与大多数样本梯度方向一致的方向 d
    d = tf.reduce_mean(gradients_per_sample, axis=0)
    sign_sums = tf.reduce_sum(tf.sign(tf.tensordot(gradients_per_sample, d, axes=[[1], [0]])))

    # 优化 d 的方向，尝试找到更好的方向
    max_iterations = 100

    for _ in range(max_iterations):
        d_try = d + tf.random.normal(d.shape) * learning_rate
        sign_sums_try = tf.reduce_sum(tf.sign(tf.tensordot(gradients_per_sample, d_try, axes=[[1], [0]])))
        if sign_sums_try > sign_sums:
            d = d_try
            sign_sums = sign_sums_try

    # 创建 m 和 v 的副本以避免修改输入参数
    m_new = [tf.identity(m_i) for m_i in m]
    v_new = [tf.identity(v_i) for v_i in v]

    # 用找到的 d 方向更新权重
    start_idx = 0
    t = tf.cast(t, tf.float32) + 1  # 更新时间步长，并转换为 float32
    learning_rate_t = learning_rate * tf.sqrt(1 - beta2**t) / (1 - beta1**t)  # 动态学习率

    for j, var in enumerate(model.trainable_variables):
        shape = tf.shape(var)
        size = tf.reduce_prod(shape)
        var_grad = tf.reshape(d[start_idx:start_idx + size], shape)

        # Update biased first moment estimate
        m_new[j] = beta1 * m_new[j] + (1.0 - beta1) * var_grad

        # Update biased second raw moment estimate
        v_new[j] = beta2 * v_new[j] + (1.0 - beta2) * tf.square(var_grad)

        # Compute bias-corrected first moment estimate
        m_hat = m_new[j] / (1.0 - beta1**t)

        # Compute bias-corrected second raw moment estimate
        v_hat = v_new[j] / (1.0 - beta2**t)

        # Update parameters
        var.assign_sub(learning_rate_t * m_hat / (tf.sqrt(v_hat) + epsilon))

        start_idx += size

    return m_new, v_new, tf.cast(t, tf.int32)

# Custom training loop
batch_size = 128
epochs = 15

# Initialize the moments and time step
m = [tf.zeros_like(var) for var in model.trainable_variables]
v = [tf.zeros_like(var) for var in model.trainable_variables]
t = 0

# Training loop
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")
    num_batches = x_train.shape[0] // batch_size
    for i in range(0, x_train.shape[0], batch_size):
        x_batch = x_train[i:i+batch_size]
        y_batch = y_train[i:i+batch_size]
        m, v, t = train_step(model, x_batch, y_batch, initial_learning_rate, m, v, t)

    # 在每个epoch结束时进行验证
    model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    loss, accuracy = model.evaluate(x_train, y_train, verbose=0)
    print(f"loss: {loss:.4f}, accuracy: {accuracy:.4f}")

# 最后评估模型在测试数据上的表现
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print(f"Test loss: {test_loss:.4f}, Test accuracy: {test_acc:.4f}")


x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


Epoch 1/15
loss: 0.1618, accuracy: 0.9528
Epoch 2/15
loss: 0.0876, accuracy: 0.9741
Epoch 3/15
loss: 0.0639, accuracy: 0.9808
Epoch 4/15
loss: 0.0517, accuracy: 0.9844
Epoch 5/15
loss: 0.0446, accuracy: 0.9867
Epoch 6/15
loss: 0.0410, accuracy: 0.9876
Epoch 7/15
loss: 0.0346, accuracy: 0.9895
Epoch 8/15
loss: 0.0338, accuracy: 0.9897
Epoch 9/15
loss: 0.0316, accuracy: 0.9905
Epoch 10/15
loss: 0.0298, accuracy: 0.9907
Epoch 11/15
loss: 0.0267, accuracy: 0.9918
Epoch 12/15
loss: 0.0243, accuracy: 0.9927
Epoch 13/15
loss: 0.0228, accuracy: 0.9928
Epoch 14/15
loss: 0.0220, accuracy: 0.9930
Epoch 15/15
loss: 0.0208, accuracy: 0.9935
Test loss: 0.0271, Test accuracy: 0.9906
