# Tensorflow 3 - 自定义层

In [2]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np

from tensorflow import keras
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '2'

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

## 1 没有权重的层

* 某些层没有权重，如 keras.layers.Flatten, keras.layers.ReLU。
* 若要编写不带权重的自定义层，最简单的方法是编写一个函数包装在 keras.layers.Lambda 中。

In [2]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [3]:
exponential_layer([-1., 0., 1.])

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.36787945, 1.        , 2.7182817 ], dtype=float32)>

* 自定义层可以用在激活函数中（activation=tf.exp, activation=keras.activations.exponential, activation='exponential'）

In [5]:
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu', input_shape=input_shape),
    keras.layers.Dense(1),
    exponential_layer
])
model.compile(loss='mse', optimizer='sgd')
model.fit(X_train_scaled, y_train, epochs=5,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


0.36374431848526

## 2 带权重的层（有状态层）

* 需要创建 keras.layers.Layer 类的子类。

In [4]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs) # 负责处理 input_shape, trainable, name 等参数
        self.units = units
        self.activation = keras.activations.get(activation) # 将 activation 转换为标准的激活函数
        
    def build(self, batch_input_shape): # 首次使用该层的时候将调用，batch_input_shape 是由keras自动传给他的。
        # 创建层的变量
        self.kernel = self.add_weight(
            name='kernel', shape=[batch_input_shape[-1], self.units],
            initializer='glorot_normal')
        self.bias = self.add_weight(
            name='bias', shape=[self.units], initializer='zeros')
        super().build(batch_input_shape) # 调用父类的方法，告诉 keras 这一层被构建了。
        
    def call(self, X): # 执行所需的操作
        return self.activation(X @ self.kernel + self.bias)
    
    def compute_output_shape(self, batch_input_shape): # 可以不给出 P346
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units]) # 最后一个维度被替换为神经元的数量
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'units': self.units,
                'activation': keras.activations.serialize(self.activation)} # 保存激活函数的配置

In [6]:
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    MyDense(30, activation='relu', input_shape=input_shape),
    MyDense(1),
])
model.compile(loss='mse', optimizer='nadam')
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.4679130017757416

In [7]:
model.save('./models/TF3-model1.h5')
model = keras.models.load_model('./models/TF3-model1.h5',
                                custom_objects={'MyDense': MyDense})

## 3 多输入层的创建

* 如 Concatenate 层。
* call 方法的参数应包含所有输入的元组，返回输出列表；
* compute_output_shape 方法应该是一个包含每个输入形状的元组，返回输出形状的列表。

In [8]:
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        print('X1.Shape: ', X1.shape, 'X2.shape: ', X2.shape)
        return X1 + X2, X1 * X2 # 将两个输入返回加和乘。
    
    def compute_output_shape(self, batch_input_shape):
        batch_input_shape1, batch_input_shape2 = batch_input_shape
        return [batch_input_shape1, batch_input_shape2]

In [9]:
# 注意不能用顺序API，因为有多个输入多个输出。
inputs1 = keras.layers.Input(shape=[2])
inputs2 = keras.layers.Input(shape=[2])
outputs1, outputs2 = MyMultiLayer()((inputs1, inputs2))

X1.Shape:  (None, 2) X2.shape:  (None, 2)


In [10]:
def split_data(data):
    columns_count = data.shape[-1]
    half = columns_count // 2
    return data[:, :half], data[:, half:]

X_train_scaled_A, X_train_scaled_B = split_data(X_train_scaled)
X_valid_scaled_A, X_valid_scaled_B = split_data(X_valid_scaled)
X_test_scaled_A, X_test_scaled_B = split_data(X_test_scaled)

# Printing the splitted data shapes
X_train_scaled_A.shape, X_train_scaled_B.shape

((11610, 4), (11610, 4))

In [12]:
outputs1, outputs2 = MyMultiLayer()((X_train_scaled_A, X_train_scaled_B))



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

X1.Shape:  (11610, 4) X2.shape:  (11610, 4)


In [13]:
input_A = keras.layers.Input(shape=X_train_scaled_A.shape[-1])
input_B = keras.layers.Input(shape=X_train_scaled_B.shape[-1])
hidden_A, hidden_B = MyMultiLayer()((input_A, input_B))
hidden_A = keras.layers.Dense(30, activation='selu')(hidden_A)
hidden_B = keras.layers.Dense(30, activation='selu')(hidden_B)
concat = keras.layers.Concatenate()((hidden_A, hidden_B))
output = keras.layers.Dense(1)(concat)
model = keras.models.Model(inputs=[input_A, input_B], outputs=[output])

X1.Shape:  (None, 4) X2.shape:  (None, 4)


In [15]:
model.compile(loss='mse', optimizer='nadam')
model.fit((X_train_scaled_A, X_train_scaled_B), y_train, epochs=2,
          validation_data=((X_valid_scaled_A, X_valid_scaled_B), y_valid))

Epoch 1/2
X1.Shape:  (None, 4) X2.shape:  (None, 4)
X1.Shape:  (None, 4) X2.shape:  (None, 4)
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7f060421ada0>

## 4 不同行为层用于训练与测试

* 如 Dropout，BatchNormalization 层。
* 下面的例子是一个在训练时添加高斯噪声的正则化层，但是测试时不执行任何操作（keras.layers.GaussianNoise）:
* 主要是调整 call 方法

In [18]:
class AddGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
        
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
        
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

In [19]:
model = keras.models.Sequential([
    AddGaussianNoise(stddev=1.0),
    keras.layers.Dense(30, activation="selu"),
    keras.layers.Dense(1)
])
model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.8010250329971313