<a href="https://colab.research.google.com/github/ElwinGao4444/colab/blob/main/%E6%89%8B%E6%92%B8%E7%BB%8F%E5%85%B8%E7%A5%9E%E7%BB%8F%E7%BD%91%E7%BB%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 手撸经典神经网络

In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow import keras

In [None]:
gpu = tf.config.list_physical_devices('GPU')
print('gpu: ', gpu)
cpu = tf.config.list_physical_devices('CPU')
print('cpu: ', cpu)

## LeNet

In [None]:
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = tf.expand_dims(x_train, axis=3) / 255
x_test = tf.expand_dims(x_test, axis=3) / 255
x_train.shape

In [None]:
model = tf.keras.Sequential([
    tf.keras.Input(shape=(28,28,1), dtype="float"),
    tf.keras.layers.Conv2D(6, 5, activation='sigmoid', padding='same'),     # 5x5卷积，6通道
    tf.keras.layers.AvgPool2D(pool_size=2, strides=2),                      # 2x2窗口，步幅2
    tf.keras.layers.Conv2D(16, 5, activation='sigmoid'),                    # 5x5卷积，16通道
    tf.keras.layers.AvgPool2D(pool_size=2, strides=2),                      # 2x2窗口，步幅2
    tf.keras.layers.Flatten(),  # 二维压缩成一维
    tf.keras.layers.Dense(120, activation='sigmoid'),   # 120特征全连接层
    tf.keras.layers.Dense(84, activation='sigmoid'),   # 84特征全连接层
    tf.keras.layers.Dense(10),              # 10特征全连接层
])
model.summary()
# model.layers[0].get_weights()[1]

In [None]:
with tf.device('/device:GPU:0'):
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=1.0),
        metrics=["accuracy"],
    )
    history = model.fit(x_train, y_train, batch_size=256, epochs=10)
    test_scores = model.evaluate(x_test, y_test, verbose=2)
print("Test loss:", test_scores[0])
print("Test accuracy:", test_scores[1])

## AlexNet

In [None]:
# 由于资源限制，所有图片都放大后，内存已经放不下了，所以需要借助Dataset进行流式处理（即实际调用，再进行resize操作）
def resize(iter_img, batch_size = 32, img_size = None):
    process = lambda x, y: (tf.expand_dims(x, axis=3) / 255, tf.cast(y, dtype='int32'))
    resize_fn = lambda x, y: (tf.image.resize_with_pad(x, img_size, img_size) if img_size else x, y)
    return tf.data.Dataset.from_tensor_slices(process(*iter_img)).batch(batch_size).shuffle(len(iter_img[0])).map(resize_fn)

iter_train, iter_test = keras.datasets.fashion_mnist.load_data()
iter_train = resize(iter_train, 128, 224)
iter_test = resize(iter_test, 128, 224)

In [None]:
model = tf.keras.Sequential([
    tf.keras.Input(shape=(224,224,1), dtype="float"),
    tf.keras.layers.Conv2D(96, 11, activation='relu', strides=4),
    tf.keras.layers.AvgPool2D(pool_size=3, strides=2),
    tf.keras.layers.Conv2D(256, 5, activation='relu', padding='same'),
    tf.keras.layers.AvgPool2D(pool_size=3, strides=2),
    tf.keras.layers.Conv2D(384, 3, activation='relu', padding='same'),
    tf.keras.layers.Conv2D(384, 3, activation='relu', padding='same'),
    tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same'),
    tf.keras.layers.AvgPool2D(pool_size=3, strides=2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(4096, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(4096, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(1000),
])
model.summary()

In [None]:
with tf.device('/device:GPU:0'):
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
        metrics=["accuracy"],
    )
    history = model.fit(iter_train, epochs=3)  # 此处不必再传batch_size，因为在Dataset中已经设置过了，所以此处的batch_size参数会失效
    test_scores = model.evaluate(iter_test, verbose=2)
print("Test loss:", test_scores[0])
print("Test accuracy:", test_scores[1])

## ResNet

In [None]:
# 由于资源限制，所有图片都放大后，内存已经放不下了，所以需要借助Dataset进行流式处理（即实际调用，再进行resize操作）
def resize(iter_img, batch_size = 1, img_size = None):
    process = lambda x, y: (tf.expand_dims(x, axis=3) / 255, tf.cast(y, dtype='int32'))
    resize_fn = lambda x, y: (tf.image.resize_with_pad(x, img_size, img_size) if img_size else x, y)
    return tf.data.Dataset.from_tensor_slices(process(*iter_img)).batch(batch_size).shuffle(len(iter_img[0])).map(resize_fn)

iter_train, iter_test = keras.datasets.fashion_mnist.load_data()
# 这里设置image_size为32纯粹就是为了降低计算量，正常size应该是224
iter_train = resize(iter_train, 256, 32)
iter_test = resize(iter_test, 256, 32)

In [None]:
class Residual(tf.keras.Model):
    def __init__(self, channels, strides=1):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(channels, kernel_size=3, padding='same', strides=strides)
        self.conv2 = tf.keras.layers.Conv2D(channels, kernel_size=3, padding='same')
        self.conv3 = tf.keras.layers.Conv2D(channels, kernel_size=1, padding='valid',strides=strides) if strides != 1 else None
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.bn2 = tf.keras.layers.BatchNormalization()

    def call(self, X):
        Y = self.conv1(X);Y = self.bn1(Y)
        Y = tf.keras.activations.relu(Y)
        Y = self.conv2(Y);Y = self.bn2(Y)
        Y += X if self.conv3 is None else self.conv3(X)
        return tf.keras.activations.relu(Y)

class ResnetBlock(tf.keras.layers.Layer):
    def __init__(self, channels, residuals_count, strides=2, **kwargs):
        super(ResnetBlock, self).__init__(**kwargs)
        self.residual_layers = []
        self.residual_layers.append(Residual(channels, strides))
        for i in range(residuals_count-1):
            self.residual_layers.append(Residual(channels))

    def call(self, X):
        for layer in self.residual_layers.layers:
            X = layer(X)
        return X

model = tf.keras.models.Sequential([
    tf.keras.Input(shape=(32,32,1), dtype="float"),
    tf.keras.layers.Conv2D(64, kernel_size=7, strides=2, padding='same'),   # 高宽减半
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Activation('relu'),
    tf.keras.layers.MaxPool2D(pool_size=3, strides=2, padding='same'),      # 高宽减半
    ResnetBlock(64, 2, 1),
    ResnetBlock(128, 2),
    ResnetBlock(256, 2),
    ResnetBlock(512, 2),
    tf.keras.layers.GlobalAvgPool2D(),
    tf.keras.layers.Dense(units=10),
])
model.summary()

In [None]:
with tf.device('/device:GPU:0'):
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.05),
        metrics=["accuracy"],
    )
    history = model.fit(iter_train, epochs=5)  # 此处不必再传batch_size，因为在Dataset中已经设置过了，所以此处的batch_size参数会失效
    test_scores = model.evaluate(iter_test, verbose=2)
print("Test loss:", test_scores[0])
print("Test accuracy:", test_scores[1])