# 2. Models Tensorflow 模型建立与训练

In [20]:
import tensorflow as tf
import keras 
import numpy as np

## 2.1 模型与层

在tensorflow中，使用keras构建模型
模型以类的方式呈现：tf.keras.Model
keras在tf.keras.layers中内置了大量DL中常用的预定义层，也可以自定义层

Tensorflow程序结构：
模型类定义---> 模型训练 ---> 模型测试/调用
### 模型类定义

在继承类中，需要重写__init__()和call()方法，以及根据需要添加自定义方法

In [2]:
class myModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        #此处添加初始化代码（包含call()方法中会用到的层），例如：
        #layer1 = tf.keras.BuitInLayer(...)
        #layer2 = MyCustomLayer(...)
    def call(self, input):
        #此处添加模型调用的代码（处理输入并返回输出），例如
        #x = layer1(input)
        # output = layer2(x)
        return output 
    #以及其他自定义方法

继承tf.keras.Model类后，可以使用父类的方法和属性

实例化：
model = Model()

#### 例子：线性回归（模型类实现）

y_pred = a * x + b

In [13]:
#数据定义
x = tf.constant([[1.0,2.0,3.0],[4.0,5.0,6.0]])
y = tf.constant([[10.0],[20.0]])
print(x)
print(y)

tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[10.]
 [20.]], shape=(2, 1), dtype=float32)


In [17]:
#模型类定义
class LinearRegression(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = tf.keras.layers.Dense(
            units=1,
            activation=None,
            kernel_initializer=tf.zeros_initializer(),
            bias_initializer=tf.zeros_initializer()
        )
    def call(self, input):
        output = self.dense(input)
        return output

In [18]:
#训练
model = LinearRegression()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for i in range(100):
    with tf.GradientTape() as tape:
        y_pred = model(x)
        loss = tf.reduce_mean(tf.square(y_pred-y))
    grads = tape.gradient(loss,model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads,model.variables))
print(model.variables)

[<tf.Variable 'linear_regression_3/dense_3/kernel:0' shape=(3, 1) dtype=float32, numpy=
array([[0.40784496],
       [1.191065  ],
       [1.9742855 ]], dtype=float32)>, <tf.Variable 'linear_regression_3/dense_3/bias:0' shape=(1,) dtype=float32, numpy=array([0.78322077], dtype=float32)>]


#### Dense 全连接层

input -> z = w * x + b ->  f(x)

f(x)是激活函数

不指定激活函数为线性变换

input: batch_size,input_dim

output: batch_size, units(输入张量的维度)

## 2.2 基础示例

### I 多层感知机MLP

任务：MNIST手写数字数据集分类

一共分为4步

#### Step 1 获取数据集
使用tf.keras.datasets获取数据集并进行数据预处理

In [38]:
class MNISTLoader():
    def __init__(self):
        # 载入数据集，自行下载放到 C:\用户\用户名\.keras\datasets\
        mnist = tf.keras.datasets.mnist#从网络自动下载，网络不好时自行下载
        (self.train_data, self.train_label),(self.test_data, self.test_label) = mnist.load_data()
        # 数据预处理
        # 增加一维颜色通道
        # [图像数目，长，宽，色彩通道数]
        self.train_data = np.expand_dims(self.train_data.astype(np.float32)/255.0,axis=-1)
        print('train data ',self.train_data.shape)
        # axis = -1:倒数第一个轴
        # astype:类型转换
        self.test_data = np.expand_dims(self.test_data.astype(np.float32)/255.0, axis=-1)
        print('test data ',self.test_data.shape)
        self.train_label = self.train_label.astype(np.float32)
        self.test_label = self.test_label.astype(np.float32)
        self.num_train_data, self.num_test_data = self.train_data.shape[0],self.test_data.shape[0]
        print(self.num_train_data)
    def get_batch(self, batch_size):
        # 从数据集中随机获取batch_size个元素并返回
        index = np.random.randint(0,np.shape(self.train_data)[0],batch_size)
        return self.train_data[index, :], self.train_label[index]

#### Step 2 模型构建
多层感知机MLP，使用ReLU激活函数来分类

In [34]:
class MLP(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten() 
        self.dense1 = tf.keras.layers.Dense(units=100,activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10) #10类
    
    def call(self, inputs):
        x = self.flatten(inputs)
        x = self.dense1(x)
        x = self.dense2(x)
        output = tf.nn.softmax(x)#softmax:输出属于每个类的概率
        return output

#### Step 3 模型训练

In [42]:
# hyper parameters:
epochs = 5
batch_size = 64
learning_rate = 0.01
# 读取类
model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
# 迭代epochs*(num_data//batch_size)次以下步骤：
    # 随机抽取一批训练数据
    # 将数据送入模型，计算预测值
    # 将模型预测值与真实值比较，计算损失函数loss，使用交叉熵损失函数
    # 计算损失函数关于模型变量的导数
    # 将求出的导数值传入优化器，使用优化器的apply_gradient方法更新模型参数来最小化损失
data_loader.num_train_data
num_batch = int(data_loader.num_train_data//batch_size*epochs)
for i in range(num_batch):
    x,y = data_loader.get_batch(batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(x)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y,y_pred=y_pred)
        loss = tf.reduce_mean(loss)#计算平均值
        print("batch: %f, loss: %f"%(i,loss.numpy()))
    grads = tape.gradient(loss,model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads,model.variables))

train data  (60000, 28, 28, 1)
test data  (10000, 28, 28, 1)
60000
batch: 0.000000, loss: 2.425479
batch: 1.000000, loss: 2.027631
batch: 2.000000, loss: 1.497665
batch: 3.000000, loss: 0.968717
batch: 4.000000, loss: 1.029364
batch: 5.000000, loss: 0.976270
batch: 6.000000, loss: 0.605652
batch: 7.000000, loss: 0.837723
batch: 8.000000, loss: 0.927531
batch: 9.000000, loss: 0.550657
batch: 10.000000, loss: 0.506534
batch: 11.000000, loss: 0.748059
batch: 12.000000, loss: 0.749966
batch: 13.000000, loss: 0.479900
batch: 14.000000, loss: 0.874153
batch: 15.000000, loss: 0.731783
batch: 16.000000, loss: 0.605508
batch: 17.000000, loss: 0.566433
batch: 18.000000, loss: 0.415143
batch: 19.000000, loss: 0.452941
batch: 20.000000, loss: 0.544187
batch: 21.000000, loss: 0.547097
batch: 22.000000, loss: 0.382694
batch: 23.000000, loss: 0.655218
batch: 24.000000, loss: 0.421696
batch: 25.000000, loss: 0.584934
batch: 26.000000, loss: 0.672212
batch: 27.000000, loss: 0.337891
batch: 28.000000, l

#### Step 4 模型评估

使用测试集测试模型性能

In [44]:
sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batch = int(data_loader.num_test_data//batch_size)
for i in range(num_batch):
    start, end = i*batch_size,(i+1)*batch_size
    y_pred = model.predict(data_loader.test_data[start:end])
    sparse_categorical_accuracy.update_state(
        y_true=data_loader.test_label[start:end],
        y_pred=y_pred
    )
    print("test accuracy: %f"%sparse_categorical_accuracy.result())

test accuracy: 0.953125
test accuracy: 0.968750
test accuracy: 0.968750
test accuracy: 0.964844
test accuracy: 0.965625
test accuracy: 0.960938
test accuracy: 0.962054
test accuracy: 0.960938
test accuracy: 0.965278
test accuracy: 0.962500
test accuracy: 0.958807
test accuracy: 0.957031
test accuracy: 0.959135
test accuracy: 0.959821
test accuracy: 0.958333
test accuracy: 0.958984
test accuracy: 0.959559
test accuracy: 0.959201
test accuracy: 0.958882
test accuracy: 0.953906
test accuracy: 0.953125
test accuracy: 0.952415
test accuracy: 0.953804
test accuracy: 0.951172
test accuracy: 0.950000
test accuracy: 0.950120
test accuracy: 0.949653
test accuracy: 0.949777
test accuracy: 0.950431
test accuracy: 0.950000
test accuracy: 0.951109
test accuracy: 0.951172
test accuracy: 0.950758
test accuracy: 0.950827
test accuracy: 0.951786
test accuracy: 0.951389
test accuracy: 0.951858
test accuracy: 0.951069
test accuracy: 0.950721
test accuracy: 0.951563
test accuracy: 0.950838
test accuracy: 0

### II 卷积神经网络CNN
任务:MNIST手写数字数据集分类

#### Step 1 获取数据集

In [64]:
class mnist_loader:
    def __init__(self):
        # load data
        mnist = tf.keras.datasets.mnist
        (self.train_data,self.train_label),(self.test_data,self.test_label)=mnist.load_data()
        # data pre-processing
        self.train_data = np.expand_dims(self.train_data.astype(np.float32)/255.0,-1)
        self.test_data = np.expand_dims(self.test_data.astype(np.float32)/255.0,-1)
        self.train_label = self.train_label.astype(np.float32)
        self.test_label = self.test_label.astype(np.float32)
        # count num of data
        self.num_train_data = self.train_data.shape[0]
        print(self.num_train_data)
        self.num_test_data = self.test_data.shape[0]

    def get_batch(self,batch_size):
        # generate random number
        index = np.random.randint(0,np.shape(self.train_data)[0],batch_size)
        # get data and label which size is batch_size
        print(self.train_data.shape)
        print(self.train_label.shape)
        return self.train_data[index,:],self.train_label[index]

#### Step 2 模型构建
CNN

In [67]:
class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        # conv -> pool -> conv -> pool -> flatten -> dense -> dense
        self.conv1 = tf.keras.layers.Conv2D(filters=32, kernel_size=[5,5],padding='same',activation=tf.nn.relu)
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2,2], strides=2)
        self.conv2 = tf.keras.layers.Conv2D(filters=64,kernel_size=[5,5],padding='same',activation=tf.nn.relu)
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2,2],strides=2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7*7*64,))
        self.dense1 = tf.keras.layers.Dense(units=1024,activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self,inputs):
        x = self.conv1(inputs)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        output = tf.nn.softmax(x)
        return output

#### Step 3 模型训练

In [68]:
# hyper parameters:
epochs = 5
batch_size = 64
learning_rate = 0.01
# 读取类
model = CNN()
data_loader = mnist_loader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
# 迭代epochs*(num_data//batch_size)次以下步骤：
    # 随机抽取一批训练数据
    # 将数据送入模型，计算预测值
    # 将模型预测值与真实值比较，计算损失函数loss，使用交叉熵损失函数
    # 计算损失函数关于模型变量的导数
    # 将求出的导数值传入优化器，使用优化器的apply_gradient方法更新模型参数来最小化损失
# data_loader.num_train_data
num_batch = int(data_loader.num_train_data//batch_size*epochs)
print(num_batch)
for i in range(num_batch):
    x,y = data_loader.get_batch(batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(x)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y,y_pred=y_pred)
        loss = tf.reduce_mean(loss)#计算平均值
        print("batch: %f, loss: %f"%(i,loss.numpy()))
    grads = tape.gradient(loss,model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads,model.variables))

60000
4685
(60000, 28, 28, 1)
(60000,)
batch: 0.000000, loss: 2.299736
(60000, 28, 28, 1)
(60000,)
batch: 1.000000, loss: 9.309584
(60000, 28, 28, 1)
(60000,)
batch: 2.000000, loss: 6.160585
(60000, 28, 28, 1)
(60000,)
batch: 3.000000, loss: 2.654991
(60000, 28, 28, 1)
(60000,)
batch: 4.000000, loss: 2.323802
(60000, 28, 28, 1)
(60000,)
batch: 5.000000, loss: 2.313544
(60000, 28, 28, 1)
(60000,)
batch: 6.000000, loss: 2.320347
(60000, 28, 28, 1)
(60000,)
batch: 7.000000, loss: 2.322272
(60000, 28, 28, 1)
(60000,)
batch: 8.000000, loss: 2.306544
(60000, 28, 28, 1)
(60000,)
batch: 9.000000, loss: 2.296747
(60000, 28, 28, 1)
(60000,)
batch: 10.000000, loss: 2.306457
(60000, 28, 28, 1)
(60000,)
batch: 11.000000, loss: 2.308363
(60000, 28, 28, 1)
(60000,)
batch: 12.000000, loss: 2.304884
(60000, 28, 28, 1)
(60000,)
batch: 13.000000, loss: 2.291777
(60000, 28, 28, 1)
(60000,)
batch: 14.000000, loss: 2.293540
(60000, 28, 28, 1)
(60000,)
batch: 15.000000, loss: 2.295831
(60000, 28, 28, 1)
(600

#### Step 4 模型评估

In [69]:
sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batch = int(data_loader.num_test_data//batch_size)
for i in range(num_batch):
    start, end = i*batch_size,(i+1)*batch_size
    y_pred = model.predict(data_loader.test_data[start:end])
    sparse_categorical_accuracy.update_state(
        y_true=data_loader.test_label[start:end],
        y_pred=y_pred
    )
    print("test accuracy: %f"%sparse_categorical_accuracy.result())

test accuracy: 0.984375
test accuracy: 0.984375
test accuracy: 0.989583
test accuracy: 0.988281
test accuracy: 0.981250
test accuracy: 0.973958
test accuracy: 0.973214
test accuracy: 0.972656
test accuracy: 0.973958
test accuracy: 0.976562
test accuracy: 0.975852
test accuracy: 0.973958
test accuracy: 0.973558
test accuracy: 0.975446
test accuracy: 0.972917
test accuracy: 0.971680
test accuracy: 0.970588
test accuracy: 0.969618
test accuracy: 0.971217
test accuracy: 0.967969
test accuracy: 0.968006
test accuracy: 0.968040
test accuracy: 0.967391
test accuracy: 0.966797
test accuracy: 0.966875
test accuracy: 0.966947
test accuracy: 0.967014
test accuracy: 0.968192
test accuracy: 0.969289
test accuracy: 0.968750
test accuracy: 0.969254
test accuracy: 0.969238
test accuracy: 0.969223
test accuracy: 0.969210
test accuracy: 0.969196
test accuracy: 0.970052
test accuracy: 0.970861
test accuracy: 0.970806
test accuracy: 0.970353
test accuracy: 0.970703
test accuracy: 0.970656
test accuracy: 0

### III 循环神经网络RNN

任务: 尼采风格文本生成

In [73]:
class DataLoader():
    def __init__(self):
        path = 'nietzsche.txt'
        with open(path, encoding='utf-8') as f:
            self.raw_text = f.read().lower()
        self.chars = sorted(list(set(self.raw_text)))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        self.text = [self.char_indices[c] for c in self.raw_text]

    def get_batch(self, seq_length, batch_size):
        seq = []
        next_char = []
        for i in range(batch_size):
            index = np.random.randint(0, len(self.text) - seq_length)
            seq.append(self.text[index:index+seq_length])
            next_char.append(self.text[index+seq_length])
        return np.array(seq), np.array(next_char)       # [batch_size, seq_length], [num_batch]

In [74]:
class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units=self.num_chars)

    def call(self, inputs, from_logits=False):
        inputs = tf.one_hot(inputs, depth=self.num_chars)       # [batch_size, seq_length, num_chars]
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)   # 获得 RNN 的初始状态
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)   # 通过当前输入和前一时刻的状态，得到输出和当前时刻的状态
        logits = self.dense(output)
        if from_logits:                     # from_logits 参数控制输出是否通过 softmax 函数进行归一化
            return logits
        else:
            return tf.nn.softmax(logits)

    def predict(self, inputs, temperature=1.):
        batch_size, _ = tf.shape(inputs)
        logits = self(inputs, from_logits=True)                         # 调用训练好的RNN模型，预测下一个字符的概率分布
        prob = tf.nn.softmax(logits / temperature).numpy()              # 使用带 temperature 参数的 softmax 函数获得归一化的概率分布值
        return np.array([np.random.choice(self.num_chars, p=prob[i, :]) # 使用 np.random.choice 函数，
                         for i in range(batch_size.numpy())])           # 在预测的概率分布 prob 上进行随机取样

In [75]:
num_batches = 1000
seq_length = 40
batch_size = 50
learning_rate = 1e-3

data_loader = DataLoader()
model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for batch_index in range(num_batches):
    X, y = data_loader.get_batch(seq_length, batch_size)
    with tf.GradientTape() as tape:
        y_pred = model(X)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
        loss = tf.reduce_mean(loss)
        print("batch %d: loss %f" % (batch_index, loss.numpy()))
    grads = tape.gradient(loss, model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

X_, _ = data_loader.get_batch(seq_length, 1)
for diversity in [0.2, 0.5, 1.0, 1.2]:      # 丰富度（即temperature）分别设置为从小到大的 4 个值
    X = X_
    print("diversity %f:" % diversity)
    for t in range(400):
        y_pred = model.predict(X, diversity)    # 预测下一个字符的编号
        print(data_loader.indices_char[y_pred[0]], end='', flush=True)  # 输出预测的字符
        X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)     # 将预测的字符接在输入 X 的末尾，并截断 X 的第一个字符，以保证 X 的长度不变
    print("\n")

batch 0: loss 4.041948
batch 1: loss 4.026323
batch 2: loss 4.020195
batch 3: loss 3.965276
batch 4: loss 3.951597
batch 5: loss 3.862621
batch 6: loss 3.750309
batch 7: loss 3.383144
batch 8: loss 3.610656
batch 9: loss 3.764961
batch 10: loss 3.567576
batch 11: loss 3.346105
batch 12: loss 2.916810
batch 13: loss 3.129558
batch 14: loss 3.109119
batch 15: loss 3.057051
batch 16: loss 2.997149
batch 17: loss 3.127556
batch 18: loss 2.870783
batch 19: loss 2.989759
batch 20: loss 3.001840
batch 21: loss 2.990519
batch 22: loss 3.022388
batch 23: loss 3.042840
batch 24: loss 3.056178
batch 25: loss 3.198258
batch 26: loss 2.906835
batch 27: loss 2.981501
batch 28: loss 3.363407
batch 29: loss 3.110135
batch 30: loss 3.118558
batch 31: loss 3.251695
batch 32: loss 2.924092
batch 33: loss 2.884284
batch 34: loss 3.090482
batch 35: loss 2.927789
batch 36: loss 3.097724
batch 37: loss 2.895963
batch 38: loss 3.119111
batch 39: loss 2.856668
batch 40: loss 2.996445
batch 41: loss 3.062835
ba