<a href="https://colab.research.google.com/github/JeongHanJun/Colab/blob/master/%EC%84%9C%EB%B8%8C%ED%81%B4%EB%9E%98%EC%8B%B1%EC%9D%84_%ED%86%B5%ED%95%9C_%EB%A0%88%EC%9D%B4%EC%96%B4_%EB%B0%8F_%EB%AA%A8%EB%8D%B8_%EA%B0%9C%EB%B0%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
from tensorflow import keras

# 서브클래싱을 통한 새 모델 / 레이어를 만드는 과정

## Layer class = combination of weight + computation

In [13]:
class Linear(keras.layers.Layer):
  def __init__(self, units = 32, input_dim = 32):
    super(Linear, self).__init__()
    w_init = tf.random_normal_initializer()
    self.w = tf.Variable(
        initial_value = w_init(shape = (input_dim, units), dtype = 'float32'),
        trainable = True
    )
    b_init = tf.zeros_initializer()
    self.b = tf.Variable(
        initial_value = b_init(shape = (units, ), dtype = 'float32'),
        trainable = True
    )
  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

#### w = weight 가중치 , b = bias 편향

In [14]:
x = tf.ones( (2, 2) )
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[-0.00268681  0.08147862 -0.04045039  0.04046633]
 [-0.00268681  0.08147862 -0.04045039  0.04046633]], shape=(2, 4), dtype=float32)


In [15]:
assert linear_layer.weights == [linear_layer.w , linear_layer.b]

In [17]:
class Linear(keras.layers.Layer):
  def __init__(self, units = 32, input_dim = 32):
    super(Linear, self).__init__()
    self.w = self.add_weight(
        shape = (input_dim, units), 
        initializer = 'random_normal',
        trainable = True
    )
    self.b = self.add_weight(
        shape = (units, ) ,
        initializer = 'zeros',
        trainable = True,
                             )
  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

x = tf.ones( (2, 2) )
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[-0.01950542 -0.00725076  0.03661449  0.11594519]
 [-0.01950542 -0.00725076  0.03661449  0.11594519]], shape=(2, 4), dtype=float32)


In [19]:
class ComputeSum(keras.layers.Layer):
  def __init__(self, input_dim):
    super(ComputeSum, self).__init__()
    self.total =tf.Variable(initial_value = tf.zeros((input_dim, )), trainable = False)

  def call(self, inputs):
    self.total.assign_add(tf.reduce_sum(inputs, axis = 0))
    return self.total

x = tf.ones( (2, 2) )
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())

[2. 2.]
[4. 4.]


In [22]:
print("weights : ", len(my_sum.weights))
print()
print('non-trainable weights : ', len(my_sum.non_trainable_weights))
print()
print('trainable_weights : ', len(my_sum.trainable_weights))

weights :  1

non-trainable weights :  1

trainable_weights :  0


## Best practice : input의 shape가 알려질때까지 가중치 생성 연기 ( deferring weight creation until the shape of the inputs is known )

In [25]:
class Linear(keras.layers.Layer):
  def __init__(self, units = 32):
    super(Linear, self).__init__()
    self.units = units
  
  # create layer weight in the build method
  def build(self, input_shape):
    self.w = self.add_weight(
        shape = (input_shape[-1], self.units),
        initializer = 'random_normal',
        trainable = True
    )
    self.b = self.add_weight(
        shape = (self.units,), initializer = 'random_normal', trainable = True
    )
  
  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

linear_layer = Linear(32)
y = linear_layer(x)

## 레이어의 재귀적 구성
- init 메소드에서 하위 레이어를 구성
- 하위 레이어에는 일반적으로 빌드 메소드가 있으므로, 외부 레이어가 빌드될 때 같이 빌드된다.

In [26]:
class MLPBlock(keras.layers.Layer):
  def __init__(self):
    super(MLPBlock, self).__init__()
    self.linear_1 = Linear(32)
    self.linear_2 = Linear(32)
    self.linear_3 = Linear(1)
  
  def call(self, inputs):
    x = self.linear_1(inputs)
    x = tf.nn.relu(x)
    x = self.linear_2(x)
    x = tf.nn.relu(x)
    return self.linear_3(x)

mlp = MLPBlock()
y = mlp(tf.ones(shape = (3, 64)))
print('weights : ', len(mlp.weights))
print('trainable weights : ', len(mlp.trainable_weights))

weights :  6
trainable weights :  6


## add_loss()
- call() layer를 만들떄, 나중에 훈련 루프를 작성할떄 사용할 손실 텐서를 만들수 있다.

In [28]:
class ActivityRegularizationLayer(keras.layers.Layer):
  def __init__(self, rate = 1e-2):
    super(ActivityRegularizationLayer, self).__init__()
    self.rate = rate
  def call(self, inputs):
    self.add_loss(self.rate * tf.reduce_sum(inputs))
    return inputs

In [32]:
class OuterLayer(keras.layers.Layer):
  def __init__(self):
    super(OuterLayer, self).__init__()
    self.activity_reg = ActivityRegularizationLayer(1e-2)
  
  def call(self, inputs):
    return self.activity_reg(inputs)

layer = OuterLayer()
assert len(layer.losses) == 0

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1

In [34]:
class OuterLayerWithKernelRegularizer(keras.layers.Layer):
  def __init__(self):
    super(OuterLayerWithKernelRegularizer, self).__init__()
    self.dense = keras.layers.Dense(
        32, kernel_regularizer = tf.keras.regularizers.l2(1e-3)
    )
  
  def call(self, inputs):
    return self.dense(inputs)
  
layer = OuterLayerWithKernelRegularizer()
_ = layer(tf.zeros((1, 1)))
print(layer.losses)

[<tf.Tensor: shape=(), dtype=float32, numpy=0.0019578154>]


### 위와 같은 손실은 일반적인 훈련 루프( Model Training loop ) 작성시 고려해야 한다.

In [39]:
import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))



<keras.callbacks.History at 0x7fb0eb9e1850>

# Model Class
- 일반적으로 layer클래스를 사용해, 내부의 계산 블록 모델을 정의하고, 클래스를 사용해서 학습할 외부 모델을 정의한다.
- 유명한 ResNet50 모델에서는 여러 ResNet 블록을 서브클래싱 Layer하고 1개의 Model은 전체 ResNet50 네트워크를 포함한다.
- 이 Model 클래스는 Layer의 다음 3개 차이점을 제외하고 동일한 API를 갖는다.
  1. 기본 제공 교육, 평가 및 예측 루프 ( model.fit(), model.evaluate(), model.predict())를 보여준다.
  2. model.layers 속성을 통해 내부 layer목록을 보여준다.
  3. 저장 및 직렬화 API ( save(), save_weights() )를 보여준다.

- Layer클래스는 계층 또는 블록으로 부르기도 한다.
- Model 클래스는 모델 또는 네트워크로 부르기도 한다.
- 아래는 기초적인 mnist resnet 예제를 통한 Model fit과 save_weights이다.

In [42]:
'''
class ResNet(tf.keras.Model):

  def __init__(self, num_classes = 1000):
    super(ResNet, self).__init__()
    self.block_1 = ResNetBlock()
    self.block_2 = ResNetBlock()
    self.global_pool = layers.GlobalAveragePooling2D()
    self.classifier = Dense(num_classes)
  
  def call(self, inputs):
    x = self.block_1(inputs)
    x = self.block_2(x)
    x = self.global_pool(x)
    return self.classifier(x)

resnet = ResNet()
dataset = 
resnet.fit(dataset, epochs = 10)
resnet.save(filepath)
'''

'\nclass ResNet(tf.keras.Model):\n\n  def __init__(self, num_classes = 1000):\n    super(ResNet, self).__init__()\n    self.block_1 = ResNetBlock()\n    self.block_2 = ResNetBlock()\n    self.global_pool = layers.GlobalAveragePooling2D()\n    self.classifier = Dense(num_classes)\n  \n  def call(self, inputs):\n    x = self.block_1(inputs)\n    x = self.block_2(x)\n    x = self.global_pool(x)\n    return self.classifier(x)\n\nresnet = ResNet()\ndataset = \nresnet.fit(dataset, epochs = 10)\nresnet.save(filepath)\n'

In [48]:
# 종합적으로 VAE(Variational AutoEncoder) 구현

from tensorflow.keras import layers

class Sampling(layers.Layer):

  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.keras.backend.random_normal(shape = (batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class Encoder(layers.Layer):

  def __init__(self, latent_dim = 32, intermediate_dim = 64, name = 'encoder', **kwargs):
    super(Encoder, self).__init__(name = name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation = 'relu')
    self.dense_mean = layers.Dense(latent_dim)
    self.dense_log_var = layers.Dense(latent_dim)
    self.sampling = Sampling()
  
  def call(self, inputs):
    x = self.dense_proj(inputs)
    z_mean = self.dense_mean(x)
    z_log_var = self.dense_log_var(x)
    z = self.sampling( (z_mean, z_log_var) )
    return z_mean, z_log_var, z

class Decoder(layers.Layer):

  def __init__(self, original_dim, intermediate_dim = 64, name = 'decoder', **kwargs):
    super(Decoder, self).__init__(name = name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation = 'relu')
    self.dense_output = layers.Dense(original_dim, activation = 'sigmoid')
  
  def call(self, inputs):
    x = self.dense_proj(inputs)
    return self.dense_output(x)
  
class VariationalAutoEncoder(keras.Model):

  def __init__(
      self,
      original_dim,
      intermediate_dim = 64,
      latent_dim = 32,
      name = 'autoencoder',
      **kwargs):
    super(VariationalAutoEncoder, self).__init__(name = name, **kwargs)
    self.original_dim = original_dim
    self.encoder = Encoder(latent_dim = latent_dim, intermediate_dim = intermediate_dim)
    self.decoder = Decoder(original_dim, intermediate_dim = intermediate_dim)
  
  def call(self, inputs):
    z_mean, z_log_var, z = self.encoder(inputs)
    reconstructed = self.decoder(z)
    kl_loss = -0.5 * tf.reduce_mean(
      z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
    )
    self.add_loss(kl_loss)
    return reconstructed
    



In [52]:
# simple MNIST Train Loop
original_dim = 784
VAE = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3)
mse_loss = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size = 1024).batch(64)

epochs = 3
# Iterate over epochs
for epoch in range(epochs):
  print('start of epoch %d' %(epoch, ))

  # Iterate over batches of the dataset
  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      reconstructed = VAE(x_batch_train)
      loss = mse_loss(x_batch_train, reconstructed)
      loss += sum(VAE.losses)
    
    grads = tape.gradient(loss, VAE.trainable_weights)
    optimizer.apply_gradients(zip(grads, VAE.trainable_weights))

    loss_metric(loss)

    if step % 100 == 0:
      print('step %d : mean loss = %.4f'%(step, loss_metric.result()))

start of epoch 0
step 0 : mean loss = 0.3306
step 100 : mean loss = 0.1256
step 200 : mean loss = 0.0992
step 300 : mean loss = 0.0892
step 400 : mean loss = 0.0842
step 500 : mean loss = 0.0808
step 600 : mean loss = 0.0787
step 700 : mean loss = 0.0771
step 800 : mean loss = 0.0760
step 900 : mean loss = 0.0749
start of epoch 1
step 0 : mean loss = 0.0747
step 100 : mean loss = 0.0740
step 200 : mean loss = 0.0735
step 300 : mean loss = 0.0730
step 400 : mean loss = 0.0727
step 500 : mean loss = 0.0723
step 600 : mean loss = 0.0720
step 700 : mean loss = 0.0717
step 800 : mean loss = 0.0715
step 900 : mean loss = 0.0712
start of epoch 2
step 0 : mean loss = 0.0711
step 100 : mean loss = 0.0710
step 200 : mean loss = 0.0708
step 300 : mean loss = 0.0707
step 400 : mean loss = 0.0706
step 500 : mean loss = 0.0704
step 600 : mean loss = 0.0703
step 700 : mean loss = 0.0702
step 800 : mean loss = 0.0701
step 900 : mean loss = 0.0700


In [53]:
# VAE는 subclassing Model 이므로, 기본 제공 학습 루프가 있다.

vae = VariationalAutoEncoder(784, 64, 32)
optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3)
vae.compile(optimizer, loss = tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs = 2, batch_size = 64)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7fb0ec266550>