In [1]:
# 1.构建一个简单的网络层

In [None]:
# ###
# 1.自定义网络层就是定义该层的权重和输入到输出的计算过程（前向传播）
#   （1）定义权重
#         当知道输入的维度时，网络层权重可以在初始化时定义，权重可以用add_weight方法，也可以自己定义变量
#         当不知道输入的维度时，需要重写build函数，定义权重，权重可以用add_weight方法，也可以自己定义变量
#   （2）定义前向传播
#        重写call方法
# #2.使用子层递归构建网络层：先定义一个层，然后再定义层时调用之前定义好的层
#     可通过该方法定义网络层，或者进行loss收集
      

In [2]:
import tensorflow as tf
tf.keras.backend.clear_session()
import tensorflow.keras as keras
import tensorflow.keras.layers as layers
# import tensorflow as tf
# from tensorflow.keras import layers
from tensorflow import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  # Restrict TensorFlow to only allocate 1GB of memory on the first GPU
  try:
    tf.config.experimental.set_virtual_device_configuration(
        gpus[0],
        [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1500)])
    logical_gpus = tf.config.experimental.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Virtual devices must be set before GPUs have been initialized
    print(e)

1 Physical GPUs, 1 Logical GPUs


In [5]:
# 定义网络层就是：设置网络权重和输出到输入的计算过程
class MyLayer(layers.Layer):
    def __init__(self, input_dim=32, unit=32):
        super(MyLayer, self).__init__()
        
        w_init = tf.random_normal_initializer()
        self.weight = tf.Variable(initial_value=w_init(
            shape=(input_dim, unit), dtype=tf.float32), trainable=True)
        
        b_init = tf.zeros_initializer()
        self.bias = tf.Variable(initial_value=b_init(
            shape=(unit,), dtype=tf.float32), trainable=True)
    
    def call(self, inputs):
        return tf.matmul(inputs, self.weight) + self.bias
        
x = tf.ones((3,5))
my_layer = MyLayer(5, 4)
out = my_layer(x)
print(out)
        
        


tf.Tensor(
[[-0.14850608  0.02952327  0.01521497  0.24282955]
 [-0.14850608  0.02952327  0.01521497  0.24282955]
 [-0.14850608  0.02952327  0.01521497  0.24282955]], shape=(3, 4), dtype=float32)


In [6]:
#按上面构建网络层，图层会自动跟踪权重w和b，当然我们也可以直接用add_weight的方法构建权重
class MyLayer(layers.Layer):
    def __init__(self, input_dim=32, unit=32):
        super(MyLayer, self).__init__()
        self.weight = self.add_weight(shape=(input_dim, unit),
                                     initializer=keras.initializers.RandomNormal(),
                                     trainable=True)
        self.bias = self.add_weight(shape=(unit,),
                                   initializer=keras.initializers.Zeros(),
                                   trainable=True)
    
    def call(self, inputs):
        return tf.matmul(inputs, self.weight) + self.bias
        
x = tf.ones((3,5))
my_layer = MyLayer(5, 4)
out = my_layer(x)
print(out)
        


tf.Tensor(
[[-0.14427458  0.01830874  0.07907504  0.01994785]
 [-0.14427458  0.01830874  0.07907504  0.01994785]
 [-0.14427458  0.01830874  0.07907504  0.01994785]], shape=(3, 4), dtype=float32)


In [7]:
#也可以设置不可训练的权重
class AddLayer(layers.Layer):
    def __init__(self, input_dim=32):
        super(AddLayer, self).__init__()
        self.sum = self.add_weight(shape=(input_dim,),
                                     initializer=keras.initializers.Zeros(),
                                     trainable=False)
       
    
    def call(self, inputs):
        self.sum.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.sum
        
x = tf.ones((3,3))
my_layer = AddLayer(3)
out = my_layer(x)
print(out.numpy())
out = my_layer(x)
print(out.numpy())
print('weight:', my_layer.weights)
print('non-trainable weight:', my_layer.non_trainable_weights)
print('trainable weight:', my_layer.trainable_weights)


[3. 3. 3.]
[6. 6. 6.]
weight: [<tf.Variable 'Variable:0' shape=(3,) dtype=float32, numpy=array([6., 6., 6.], dtype=float32)>]
non-trainable weight: [<tf.Variable 'Variable:0' shape=(3,) dtype=float32, numpy=array([6., 6., 6.], dtype=float32)>]
trainable weight: []


In [1]:
import tensorflow as tf

class MyDenseLayer(tf.keras.layers.Layer):
    def __init__(self, num_outputs):
        super(MyDenseLayer, self).__init__()
        self.num_outputs = num_outputs

    def build(self, input_shape):
        print("{} build() is called.".format(self.name))
        self.kernel = self.add_variable("kernel",
        shape=[int(input_shape[-1]),
        self.num_outputs])
#         self.built = True

    def call(self, input):
        print("{} call() is called.".format(self.name))
        return tf.matmul(input, self.kernel)

layer = MyDenseLayer(10)

In [2]:
print(layer(tf.zeros([10, 5])))

my_dense_layer build() is called.
my_dense_layer call() is called.
tf.Tensor(
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]], shape=(10, 10), dtype=float32)


In [3]:
'''
设置self.built = True
此时build函数只会被调用一次，即在首次调用层时，第二次调用层时由于built=True，所以不会调用build函数

不过好像不设置也不影响
'''
print(layer(tf.zeros([10, 5])))

my_dense_layer call() is called.
tf.Tensor(
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]], shape=(10, 10), dtype=float32)


In [8]:
#当定义网络时不知道网络的维度是可以重写build()函数，用获得的shape构建网络
class MyLayer(layers.Layer):
    def __init__(self, unit=32):
        super(MyLayer, self).__init__()
        self.unit = unit
        
    def build(self, input_shape):
        self.weight = self.add_weight(shape=(input_shape[-1], self.unit),
                                     initializer=keras.initializers.RandomNormal(),
                                     trainable=True)
        self.bias = self.add_weight(shape=(self.unit,),
                                   initializer=keras.initializers.Zeros(),
                                   trainable=True)
    
    def call(self, inputs):
        return tf.matmul(inputs, self.weight) + self.bias
        

my_layer = MyLayer(3)
x = tf.ones((3,5))
out = my_layer(x)
print(out)
my_layer = MyLayer(3)

x = tf.ones((2,2))
out = my_layer(x)
print(out)


tf.Tensor(
[[ 0.02168566  0.17378914 -0.00253749]
 [ 0.02168566  0.17378914 -0.00253749]
 [ 0.02168566  0.17378914 -0.00253749]], shape=(3, 3), dtype=float32)
tf.Tensor(
[[ 0.01057525 -0.04035868  0.05209654]
 [ 0.01057525 -0.04035868  0.05209654]], shape=(2, 3), dtype=float32)


In [9]:
#2.使用子层递归构建网络层
class MyBlock(layers.Layer):
    def __init__(self):
        super(MyBlock, self).__init__()
        self.layer1 = MyLayer(32)##mylayer的定义方式和myblock类似，是一个类，里面定义了自己的层，初始化之后返回输入和前向传播的结果
        self.layer2 = MyLayer(16)
        self.layer3 = MyLayer(2)
    def call(self, inputs):
        h1 = self.layer1(inputs)
        h1 = tf.nn.relu(h1)
        h2 = self.layer2(h1)
        h2 = tf.nn.relu(h2)
        return self.layer3(h2)
    
my_block = MyBlock()
print('trainable weights:', len(my_block.trainable_weights))
y = my_block(tf.ones(shape=(3, 64)))
# 构建网络在build()里面，所以执行了才有网络
print('trainable weights:', len(my_block.trainable_weights)) 


trainable weights: 0
trainable weights: 6


In [10]:
# 可以通过构建网络层的方法来收集loss
class LossLayer(layers.Layer):
  
  def __init__(self, rate=1e-2):
    super(LossLayer, self).__init__()
    self.rate = rate
  
  def call(self, inputs):
    self.add_loss(self.rate * tf.reduce_sum(inputs))
    return inputs

class OutLayer(layers.Layer):
    def __init__(self):
        super(OutLayer, self).__init__()
        self.loss_fun=LossLayer(1e-2)##调用之前定义好的网络层
    def call(self, inputs):
        return self.loss_fun(inputs)
    
my_layer = OutLayer()
print(len(my_layer.losses)) # 还未call
y = my_layer(tf.zeros(1,1))
print(len(my_layer.losses)) # 执行call之后
y = my_layer(tf.zeros(1,1))
print(len(my_layer.losses)) # call之前会重新置0




0
1
1


In [11]:
# ##如果中间调用了keras网络层，里面的正则化loss也会被加入进来
class OuterLayer(layers.Layer):

    def __init__(self):
        super(OuterLayer, self).__init__()
        self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))
    
    def call(self, inputs):
        return self.dense(inputs)


my_layer = OuterLayer()
y = my_layer(tf.zeros((1,1)))
print(my_layer.losses) 
print(my_layer.weights) 


[<tf.Tensor: id=278, shape=(), dtype=float32, numpy=0.002479173>]
[<tf.Variable 'outer_layer/dense/kernel:0' shape=(1, 32) dtype=float32, numpy=
array([[ 0.36982614,  0.15829384,  0.41114604, -0.1738567 , -0.36384574,
        -0.41569725, -0.3653594 , -0.16020256, -0.3764801 , -0.30773857,
        -0.1286354 ,  0.00701469, -0.3177905 ,  0.38692498, -0.0770219 ,
         0.33967847, -0.2363088 , -0.2834754 , -0.37630484, -0.02641556,
         0.28634483,  0.28438884,  0.08040458,  0.21643132,  0.40347278,
        -0.02424791, -0.02258158, -0.0307191 , -0.33047134, -0.1508998 ,
         0.32122374, -0.36606735]], dtype=float32)>, <tf.Variable 'outer_layer/dense/bias:0' shape=(32,) dtype=float32, numpy=
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
      dtype=float32)>]


In [29]:
#全连接层（无激活函数）->batchnorm->sigma(激活函数)->dropout()
# 如果中间调用了keras网络层，里面的正则化loss也会被加入进来



#2.使用子层递归构建网络层
class MyBlock_dense_norm_relu_drop_relu(layers.Layer):
    def __init__(self):
        super(MyBlock_dense_norm_relu_drop_relu, self).__init__()
        self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))
        self.batch_norm=layers.BatchNormalization()
        self.drop_out=layers.Dropout(0.5)
        
    def call(self, inputs):
        h1 = self.dense(inputs)
        h1 = self.batch_norm(h1) 
        h1 = tf.nn.relu(h1)
        h1=self.drop_out(h1)
        h1 = tf.nn.relu(h1)
        return h1
    
my_block = MyBlock_dense_norm_relu_drop_relu()
print('trainable weights:', len(my_block.trainable_weights))
y = my_block(tf.ones(shape=(3, 64)))
# 构建网络在build()里面，所以执行了才有网络
print('trainable weights:', len(my_block.trainable_weights)) 
print(my_block.losses) 
print(my_block.weights) 




# class dense_norm_Layer_relu(layers.Layer):

#     def __init__(self):
#         super(dense_norm_Layer_relu, self).__init__()
#         self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))
# #         self.batch_norm=layers.BatchNormalization(self.dense)
    
#     def call(self, inputs):
# #          h1 = self.batch_norm(inputs)
#          h1=self.dense(inputs)

# #          h1=layers.BatchNormalization(h1)
#          h1 = tf.nn.relu(h1)
#          return h1


# my_layer = dense_norm_Layer_relu()
# y = my_layer(tf.zeros((1,1)))
# print(my_layer.losses) 
# print(my_layer.weights) 

trainable weights: 0
trainable weights: 4
[<tf.Tensor: id=1211, shape=(), dtype=float32, numpy=0.041968748>]
[<tf.Variable 'my_block_dense_norm_relu_drop_relu/dense_15/kernel:0' shape=(64, 32) dtype=float32, numpy=
array([[ 0.05467254,  0.16974711,  0.00301045, ..., -0.17138743,
         0.22536135,  0.19827598],
       [ 0.13767499,  0.11382854,  0.00651169, ...,  0.18844402,
        -0.04664928,  0.18892705],
       [ 0.2296558 ,  0.07903302, -0.13097149, ...,  0.09964603,
        -0.00492388, -0.04274988],
       ...,
       [-0.07860452,  0.19541878, -0.06114101, ..., -0.10381567,
         0.09945786,  0.1345439 ],
       [-0.23823732, -0.09354186,  0.14720261, ..., -0.07961458,
         0.17681861,  0.23255426],
       [ 0.01601791,  0.21351051, -0.06572437, ..., -0.16238868,
        -0.20642775, -0.20408392]], dtype=float32)>, <tf.Variable 'my_block_dense_norm_relu_drop_relu/dense_15/bias:0' shape=(32,) dtype=float32, numpy=
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 

In [11]:
# 3.其他网络层配置
# 使自己的网络层可以序列化

In [12]:
class Linear(layers.Layer):

    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                 initializer='random_normal',
                                 trainable=True)
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    
    def get_config(self):
        config = super(Linear, self).get_config()
        config.update({'units':self.units})
        return config
    
    
layer = Linear(125)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)


{'trainable': True, 'name': 'linear', 'dtype': None, 'units': 125}


In [13]:
# 配置只有训练时可以执行的网络层
class MyDropout(layers.Layer):
    def __init__(self, rate, **kwargs):
        super(MyDropout, self).__init__(**kwargs)
        self.rate = rate
    def call(self, inputs, training=None):
        
        ###类似if else,此处意思是if training,excute tf.nn.dropout,else inputs
        return tf.cond(training, 
                       lambda: tf.nn.dropout(inputs, rate=self.rate),
                      lambda: inputs)
    
        


In [None]:
# 4.构建自己的模型
######## 通常，我们使用Layer类来定义内部计算块，并使用Model类来定义外部模型 - 即要训练的对象。

# Model类与Layer的区别：

# 它公开了内置的训练，评估和预测循环（model.fit(),model.evaluate(),model.predict()）。
# 它通过model.layers属性公开其内层列表。
# 它公开了保存和序列化API。


In [14]:
# 下面通过构建一个变分自编码器（VAE），来介绍如何构建自己的网络。
# 采样网络
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5*z_log_var) * epsilon
# 编码器
class Encoder(layers.Layer):
    def __init__(self, latent_dim=32, 
                intermediate_dim=64, name='encoder', **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()
        
    def call(self, inputs):
        h1 = self.dense_proj(inputs)
        z_mean = self.dense_mean(h1)
        z_log_var = self.dense_log_var(h1)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z
        
# 解码器
class Decoder(layers.Layer):
    def __init__(self, original_dim, 
                 intermediate_dim=64, name='decoder', **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_output = layers.Dense(original_dim, activation='sigmoid')
    def call(self, inputs):
        h1 = self.dense_proj(inputs)
        return self.dense_output(h1)
    
# 变分自编码器
class VAE(tf.keras.Model):
    
    ##init的时候定义层
    def __init__(self, original_dim, latent_dim=32, 
                intermediate_dim=64, name='encoder', **kwargs):
        super(VAE, self).__init__(name=name, **kwargs)
    
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim,
                              intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim=original_dim,
                              intermediate_dim=intermediate_dim)
        
    ##call的时候定义前向传播
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        
        kl_loss = -0.5*tf.reduce_sum(
            z_log_var-tf.square(z_mean)-tf.exp(z_log_var)+1)
        self.add_loss(kl_loss)
        return reconstructed


In [15]:

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255
vae = VAE(784,32,64)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

##使用自带的方法进行训练，无法控制每一个batch
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)


Train on 60000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7f6176acedd8>

In [None]:
# 自己编写训练方法
train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

original_dim = 784
vae = VAE(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

# Iterate over epochs.
for epoch in range(3):
  print('Start of epoch %d' % (epoch,))

  # Iterate over the batches of the dataset.
  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      reconstructed = vae(x_batch_train)
      # Compute reconstruction loss
      loss = mse_loss_fn(x_batch_train, reconstructed)
      loss += sum(vae.losses)  # Add KLD regularization loss
      
    ##gradiedttype可以直接进行微分处理
    
    ##计算loss关于，vae.trainable_variables的梯度
    grads = tape.gradient(loss, vae.trainable_variables)
    ##运行优化器，梯度下降仅仅是其中一个步骤
    optimizer.apply_gradients(zip(grads, vae.trainable_variables))
    
    loss_metric(loss)
    
    if step % 100 == 0:
      print('step %s: mean loss = %s' % (step, loss_metric.result()))


Start of epoch 0
step 0: mean loss = tf.Tensor(268.53214, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(9.374093, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(4.7623854, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(3.2093432, shape=(), dtype=float32)
step 400: mean loss = tf.Tensor(2.4282207, shape=(), dtype=float32)
step 500: mean loss = tf.Tensor(1.9589478, shape=(), dtype=float32)
step 600: mean loss = tf.Tensor(1.645642, shape=(), dtype=float32)
step 700: mean loss = tf.Tensor(1.4212394, shape=(), dtype=float32)
step 800: mean loss = tf.Tensor(1.2527492, shape=(), dtype=float32)
step 900: mean loss = tf.Tensor(1.1216668, shape=(), dtype=float32)
Start of epoch 1
step 0: mean loss = tf.Tensor(1.0791163, shape=(), dtype=float32)


In [3]:
class ResnetIdentityBlock(tf.keras.Model):
  def __init__(self, kernel_size, filters):
    super(ResnetIdentityBlock, self).__init__(name='')
    filters1, filters2, filters3 = filters

    self.conv2a = tf.keras.layers.Conv2D(filters1, (1, 1))
    self.bn2a = tf.keras.layers.BatchNormalization()

    self.conv2b = tf.keras.layers.Conv2D(filters2, kernel_size, padding='same')
    self.bn2b = tf.keras.layers.BatchNormalization()

    self.conv2c = tf.keras.layers.Conv2D(filters3, (1, 1))
    self.bn2c = tf.keras.layers.BatchNormalization()

  def call(self, input_tensor, training=False):
    x = self.conv2a(input_tensor)
    x = self.bn2a(x, training=training)###训练时设置为training
    x = tf.nn.relu(x)

    x = self.conv2b(x)
    x = self.bn2b(x, training=training)
    x = tf.nn.relu(x)

    x = self.conv2c(x)
    x = self.bn2c(x, training=training)

    x += input_tensor
    return tf.nn.relu(x)


block = ResnetIdentityBlock(1, [1, 2, 3])
print(block(tf.zeros([1, 2, 3, 3])))
print([x.name for x in block.trainable_variables])

tf.Tensor(
[[[[0. 0. 0.]
   [0. 0. 0.]
   [0. 0. 0.]]

  [[0. 0. 0.]
   [0. 0. 0.]
   [0. 0. 0.]]]], shape=(1, 2, 3, 3), dtype=float32)
['resnet_identity_block/conv2d/kernel:0', 'resnet_identity_block/conv2d/bias:0', 'resnet_identity_block/batch_normalization/gamma:0', 'resnet_identity_block/batch_normalization/beta:0', 'resnet_identity_block/conv2d_1/kernel:0', 'resnet_identity_block/conv2d_1/bias:0', 'resnet_identity_block/batch_normalization_1/gamma:0', 'resnet_identity_block/batch_normalization_1/beta:0', 'resnet_identity_block/conv2d_2/kernel:0', 'resnet_identity_block/conv2d_2/bias:0', 'resnet_identity_block/batch_normalization_2/gamma:0', 'resnet_identity_block/batch_normalization_2/beta:0']
