<a href="https://colab.research.google.com/github/hyh520/learn/blob/main/MLP_Mnist.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import numpy as np

In [2]:
print(tf.__version__)

2.12.0


In [3]:
class MNISTLoader(object):
  '''
  数据加载处理类
  '''
  def __init__(self):
    # 获取数据
    (self.train_data,self.train_label),(self.test_data,self.test_label)=tf.keras.datasets.mnist.load_data()
    # 2、处理数据，归一化，维度以及类型
    # MNIST中的图像默认为uint8（0-255的数字）。以下代码将其归一化到0-1之间的浮点数，并在最后增加一维作为颜色通道
    # 默认下载是(60000, 28, 28)，扩展到四维方便计算理解[60000, 28, 28, 1]
    self.train_data=np.expand_dims(self.train_data.astype(np.float32)/255.0,axis=-1)
    # [10000, 28, 28, 1]
    self.test_data=np.expand_dims(self.test_data.astype(np.float32)/255.0,axis=-1)
    self.train_label=self.train_label.astype(np.int32)
    self.test_label=self.test_label.astype(np.int32)
    # 获取数据的大小
    self.num_train_data,self.num_test_data=self.train_data.shape[0],self.test_data.shape[0]

  def get_batch(self,batch_size):
    """
    随机获取获取批次数据
    :param batch_size: 批次大小
    :return:
    """
    # 从数据集中随机取出batch_size个元素并返回
    index=np.random.randint(0,np.shape(self.train_data)[0],batch_size)
    return self.train_data[index,:],self.train_label[index]

In [4]:
mnist=MNISTLoader()
train_data,train_label=mnist.get_batch(50)
print(train_data.shape,train_label)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
(50, 28, 28, 1) [4 8 4 3 1 0 5 6 4 3 8 6 4 4 1 1 9 0 6 4 5 6 7 5 2 7 4 3 6 4 3 0 8 1 2 7 1
 8 1 3 2 6 7 0 0 9 1 0 3 1]


In [6]:
class MLP(tf.keras.Model):
  """
  自定义MLP类
  """
  def __init__(self):
    super().__init__()
    # 定义两层神经网络，第一层100个神经元，激活函数relu，第二层10个神经元输出给softmax
    self.flatten=tf.keras.layers.Flatten()
    self.dense1=tf.keras.layers.Dense(units=100,activation=tf.nn.relu)
    self.dense2=tf.keras.layers.Dense(units=10)

  def call(self,inputs):
    # [batch_size, 28, 28, 1]
    x=self.flatten(inputs)
    # [batch_size, 784]
    x=self.dense1(x)
    # [batch_size, 100]
    x=self.dense2(x)
    # [batch_size, 10]
    output=tf.nn.softmax(x)
    return output

In [8]:
"""
1、从 DataLoader 中随机取一批训练数据；
2、将这批数据送入模型,计算出模型的预测值；
3、将模型预测值与真实值进行比较,计算损失函数loss。这里使用 tf.keras.losses 中的交叉熵函数作为损失函数；
4、计算损失函数关于模型变量的导数
5、将求出的导数值传入优化器,使用优化器的 apply_gradients 方法更新模型参数以最小化损失函数
"""
# 实例化模型和数据读取类，并实例化一个优化器，这里使用 Adam 优化器
num_epochs=5
batch_size=50
learning_rate=0.001
model=MLP()
data_loader=MNISTLoader()
optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate)
# 计算出大概需要迭代批次大小
num_batches=int(data_loader.num_train_data//batch_size*num_epochs)
# 进行批次数据获取
for batch_index in range(num_batches):
  X,y=data_loader.get_batch(batch_size)
  with tf.GradientTape() as tape:
    y_pred=model(X)
    # 使用tf.keras.losses计算损失
    loss=tf.keras.losses.sparse_categorical_crossentropy(y_true=y,y_pred=y_pred)
    # 求出平均损失
    loss=tf.reduce_mean(loss)
    print("batch %d : loss %f"%(batch_size,loss.numpy()))
  grads=tape.gradient(loss,model.variables)
  optimizer.apply_gradients(grads_and_vars=zip(grads,model.variables))

y_pred=model.predict(data_loader.test_data)
# 定义评估函数
sparse_categorical_accuracy=tf.keras.metrics.SparseCategoricalAccuracy()
# 定义测试数据集一共批次的大小
sparse_categorical_accuracy.update_state(y_true=data_loader.test_label,y_pred=y_pred)
print("测试准确率:%f"%sparse_categorical_accuracy.result())

[1;30;43m流式输出内容被截断，只能显示最后 5000 行内容。[0m
batch 50 : loss 0.249552
batch 50 : loss 0.181470
batch 50 : loss 0.247539
batch 50 : loss 0.081847
batch 50 : loss 0.221863
batch 50 : loss 0.126175
batch 50 : loss 0.258674
batch 50 : loss 0.208165
batch 50 : loss 0.057344
batch 50 : loss 0.115412
batch 50 : loss 0.157033
batch 50 : loss 0.122704
batch 50 : loss 0.107912
batch 50 : loss 0.206474
batch 50 : loss 0.157962
batch 50 : loss 0.095637
batch 50 : loss 0.362805
batch 50 : loss 0.154394
batch 50 : loss 0.145849
batch 50 : loss 0.276718
batch 50 : loss 0.228142
batch 50 : loss 0.310083
batch 50 : loss 0.614537
batch 50 : loss 0.236048
batch 50 : loss 0.280378
batch 50 : loss 0.088817
batch 50 : loss 0.198263
batch 50 : loss 0.312212
batch 50 : loss 0.145314
batch 50 : loss 0.181162
batch 50 : loss 0.390383
batch 50 : loss 0.191008
batch 50 : loss 0.314695
batch 50 : loss 0.245618
batch 50 : loss 0.230299
batch 50 : loss 0.158194
batch 50 : loss 0.141056
batch 50 : loss 0.208620
batch 50 

In [12]:
class CNN(tf.keras.Model):
  def __init__(self):
    super().__init__()
    # 卷积层
    self.conv1=tf.keras.layers.Conv2D(
        filters=32,        # 卷积层神经元（卷积核）数目
        kernel_size=[5,5],    # 感受野大小
        padding='same',      # padding策略（vaild 或 same）
        activation=tf.nn.relu   # 激活函数
    )
    # 池化层
    self.pool1=tf.keras.layers.MaxPool2D(
        pool_size=[2,2],  # 窗口大小
        strides=2      # 步长
    )
    self.conv2=tf.keras.layers.Conv2D(filters=64,kernel_size=[5,5],padding='same',activation=tf.nn.relu)
    self.pool2=tf.keras.layers.MaxPool2D(pool_size=[2,2],strides=2)
    # 全连接层
    self.flatten=tf.keras.layers.Reshape(target_shape=(7*7*64,))
    self.dense1=tf.keras.layers.Dense(units=1024,activation=tf.nn.relu)
    self.dense2=tf.keras.layers.Dense(units=10)

  def call(self,inputs):
    x=self.conv1(inputs)  # [batch_size, 28, 28, 32]
    x=self.pool1(x)     # [batch_size, 14, 14, 32]
    x=self.conv2(x)     # [batch_size, 14, 14, 64]
    x=self.pool2(x)     # [batch_size, 7, 7, 64]
    x=self.flatten(x)    # [batch_size, 7 * 7 * 64]
    x=self.dense1(x)     # [batch_size, 1024]
    x=self.dense2(x)     # [batch_size, 10]
    outputs=tf.nn.softmax(x)
    return outputs

In [13]:
# 实例化模型和数据读取类，并实例化一个优化器，这里使用 Adam 优化器
num_epochs=5
batch_size=50
learning_rate=0.001
model=CNN()
data_loader=MNISTLoader()
optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate)
# 计算出大概需要迭代批次大小
num_batches=int(data_loader.num_train_data//batch_size*num_epochs)
# 进行批次数据获取
for batch_index in range(num_batches):
  X,y=data_loader.get_batch(batch_size)
  with tf.GradientTape() as tape:
    y_pred=model(X)
    # 使用tf.keras.losses计算损失
    loss=tf.keras.losses.sparse_categorical_crossentropy(y_true=y,y_pred=y_pred)
    # 求出平均损失
    loss=tf.reduce_mean(loss)
    print("batch_index %d : batch %d : loss %f"%(batch_index,batch_size,loss.numpy()))
  grads=tape.gradient(loss,model.variables)
  optimizer.apply_gradients(grads_and_vars=zip(grads,model.variables))

y_pred=model.predict(data_loader.test_data)
# 定义评估函数
sparse_categorical_accuracy=tf.keras.metrics.SparseCategoricalAccuracy()
# 定义测试数据集一共批次的大小
sparse_categorical_accuracy.update_state(y_true=data_loader.test_label,y_pred=y_pred)
print("测试准确率:%f"%sparse_categorical_accuracy.result())

batch_index 0 : batch 50 : loss 2.301061




[1;30;43m流式输出内容被截断，只能显示最后 5000 行内容。[0m
batch_index 1002 : batch 50 : loss 0.002000
batch_index 1003 : batch 50 : loss 0.007211
batch_index 1004 : batch 50 : loss 0.009462
batch_index 1005 : batch 50 : loss 0.001042
batch_index 1006 : batch 50 : loss 0.069710
batch_index 1007 : batch 50 : loss 0.044095
batch_index 1008 : batch 50 : loss 0.033737
batch_index 1009 : batch 50 : loss 0.003586
batch_index 1010 : batch 50 : loss 0.026817
batch_index 1011 : batch 50 : loss 0.048810
batch_index 1012 : batch 50 : loss 0.023231
batch_index 1013 : batch 50 : loss 0.002455
batch_index 1014 : batch 50 : loss 0.010331
batch_index 1015 : batch 50 : loss 0.011643
batch_index 1016 : batch 50 : loss 0.009551
batch_index 1017 : batch 50 : loss 0.181752
batch_index 1018 : batch 50 : loss 0.004201
batch_index 1019 : batch 50 : loss 0.245872
batch_index 1020 : batch 50 : loss 0.049834
batch_index 1021 : batch 50 : loss 0.010950
batch_index 1022 : batch 50 : loss 0.026695
batch_index 1023 : batch 50 : loss 