# Making new layers and models via subclassing


In [31]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

# Layer Sınıfı : Weightlerler ve Hesaplamalar
Kerastaki en temel abstractionlardan biri Layer sınıfıdır. Layer sınıfı state(weights) ve inputtan outputa transformasyonu(call metodu ile) encapsulate eder.  
Aşağıda bir dense layer oluşturalım.

In [3]:
class Linear(keras.layers.Layer):
    def __init__(self,units=32,input_dim=32):
        super(Linear,self).__init__()
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(initial_value=w_init(shape=(input_dim,units),dtype="float32"),
                            trainable=True)
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(initial_value=b_init(shape=(units,),dtype="float32"),trainable=True)
        
    def call(self,inputs):
        return tf.matmul(inputs,self.w)+self.b

In [4]:
# Layerı kullanmak için input gönderelim 
x = tf.ones((2,2))
linear_layer = Linear(4,2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[-0.06484985  0.09588623  0.11465454 -0.11038208]
 [-0.06484985  0.09588623  0.11465454 -0.11038208]], shape=(2, 4), dtype=float32)


In [5]:
# Yukarıdaki sınıf add_weight() metoduyla bu şekilde de tanımlanabilir. 
class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b


x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)

tf.Tensor(
[[-0.06484985 -0.02227783  0.06439209 -0.01803589]
 [-0.06484985 -0.02227783  0.06439209 -0.01803589]], shape=(2, 4), dtype=float32)


## Layerlarda eğitilemez ağırlıklar olabilir
Eğitilebilir ağırlıkların yanında eğitilemez ağırlıklarda vardır. Bu ağırlıklar backprop sırasında hesaba katılmazlar.  
Nasıl eklediğimizi inceleyelim. 

In [8]:
class ComputeSum(keras.layers.Layer):
    def __init__(self,input_dim):
        super(ComputeSum,self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim,)),trainable=False)
    
    def call(self,inputs):
        # tf.reduce_sum(x,axis=0) --> satırları toplar [[1,1],[1,1]] = [2,2] 
        self.total.assign_add(tf.reduce_sum(inputs,axis=0)) # totale value ekler. total 0 olduğu için değişim olmaz
        return self.total

x = tf.ones((2,2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())

[2. 2.]
[4. 4.]


In [10]:
print("Ağırlıklar : ",len(my_sum.weights))
print("Non-trainable Ağırlıklar: ",(len(my_sum.non_trainable_weights)))
print("trainable Ağırlıklar:", my_sum.trainable_weights)


Ağırlıklar :  1
Non-trainable Ağırlıklar:  1
trainable Ağırlıklar: []


## Input shape öğrenilene kadar weigthleri oluşturmayı ertelemek
Yukarıdaki örneklerde weightler init metodu içinde oluşturduk. Bunun yerine weightleri oluşturmayı build metodu içerisined yapmak isteriz. 

In [16]:
class Linear(keras.layers.Layer):
    def __init__(self,units=32):
        super(Linear,self).__init__()
        self.units = units
    
    def build(self,input_shape):
        self.w = self.add_weight(shape=(input_shape[-1],self.units),initializer="random_normal",trainable=True)
        self.b = self.add_weight(shape=(self.units,),initializer="random_normal",trainable=True)
        
    def call(self,inputs):
        return tf.matmul(inputs,self.w)+self.b

In [17]:
# __call__ metodu layer oluşturup input gönderdiğimizde çağrılır. 
linear_layer = Linear(32)
# layer çağrıldı ve input gönderildi. weightler otomatik olarak oluşturulu. 
y = linear_layer(x)

## Layerlar recursive olarak kullanılabilir
Eğer bir layerı diğer bir layera gönderirsek dışta bulunan layer içerdeki layerın weightlerini takip edebilir.  
Sublayerları init metodu içeriside yapmak önerilir. 

In [18]:
class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super(MLPBlock, self).__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))

weights: 6
trainable weights: 6


## The add_loss() method 
Bir layer için call metodu yazdığımızda, daha sonra kullanmak isteyeceğimiz loss tensorlarını oluşturabiliriz. Bunu self.add_loss(value) ile sağlayabiliriz.   
Loss değerlerini layer.losses ile getirebiliriz. Bu değerler her __call__() metodu çağrıldığında resetlenir. Bu sebeple layers.losses her zaman son forward pass da ki loss değerlerinir tutar. Loss değerleri her layerda alınır ve total loss'a eklenir.  

In [19]:
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self,rate=1e-2):
        super(ActivityRegularizationLayer,self).__init__()
        self.rate = rate
    
    def call(self,inputs):
        self.add_loss(self.rate*tf.reduce_sum(inputs))
        return inputs        

In [20]:
class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


In [22]:
layer = OuterLayer()
assert len(layer.losses) == 0 # herhangi bir loss oluşturulmadı

_ = layer(tf.zeros(1,1))
assert len(layer.losses) == 1 # bir loss değeri oluştu __call__ metodu çağrıldı 

# layer.losses değeri her __call__ metodu çağrıldığında resetlenir. 
_ = layer(tf.zeros(1,1)) 
assert len(layer.losses) == 1 # 

## add_metric method()
Layerlar add_loss yanında add_metric metoduna da sahiptir. Model başarısının takip edilmesi için gereklidir. 


In [27]:
class LogisticEndpoint(keras.layers.Layer):
    def __init__(self, name=None):
        super(LogisticEndpoint, self).__init__(name=name)
        self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
        self.accuracy_fn = keras.metrics.BinaryAccuracy()
    def call(self,targets,logits,sample_weights=None):
        # Eğitim sırasındaki loss değeri hesaplanır ve add.loss ile eklenir.
        loss = self.loss_fn(targets,logits,sample_weights)
        self.add_loss(loss)
        
        # Accuracy hesaplanır ve eklenir 
        acc = self.accuracy_fn(targets,logits,sample_weights)
        self.add_metric(acc,name="accuracy")
        
        # .predict sonucunu döndürür
        return tf.nn.softmax(logits)

In [29]:
# metricleri takip etmek için layer.metrics kullanılır 

layer = LogisticEndpoint()

targets = tf.ones((2,2))
logits = tf.ones((2,2))
y = layer(targets,logits)

print("layer.metric : ",layer.metrics)
print("accuracy : ",float(layer.metrics[0].result()))

layer.metric :  [<keras.metrics.BinaryAccuracy object at 0x000001FA3BE113C8>]
accuracy :  1.0


In [32]:
## add_loss gibi metriclerde fit metodu ile takip edilebilir 
inputs = keras.Input(shape=(3,), name="inputs")
targets = keras.Input(shape=(10,), name="targets")
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name="predictions")(logits, targets)

model = keras.Model(inputs=[inputs,targets],outputs=predictions)
model.compile(optimizer="adam")

data = {
    "inputs":np.random.random((3,3)),
    "targets":np.random.random((3,10)),
}

model.fit(data)



<keras.callbacks.History at 0x1fa4a50b940>

## Layerlarda serialization yapmak için 
get_config() metodunu implemente edersek layerları seralize edebiliriz. 


In [33]:
class Linear(keras.layers.Layer):
    def __init__(self,units=32):
        super(Linear,self).__init__()
        self.units = units
        
    def build(self,input_shape):
        self.w = self.add_weights(shape=(input_shape[-1],self.units),
                                 initializer="random_normal",trainable=True)
        self.b = self.add_weights(shape=(self.units,),initializer="random_normal",trainable=True)
        
    def call(self,inputs):
        return tf.matmul(inputs,self.w)+self.b
    
    def get_config(self):
        return {"units":self.units}
    
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'units': 64}


Base layerdaki init metodu name dtype gibi bazı parametreler alır. Bu bilgileri config içerisine göndermek daha uygun olacaktır. Eğer serialization işleminde daha flex olmamız gerekiyorsa from_config metodunu da override edebiliriz.

In [36]:
class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super(Linear, self).get_config()
        config.update({"units": self.units})
        return config
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'name': 'linear_9', 'trainable': True, 'dtype': 'float32', 'units': 64}


## Training durumunda call metodu 
BatchNormalization, Dropout gibi bazı layerler eğitim ve inference aşamalarında farklı davranışlar göstermelidir. Bunu sağlayabilmek için boolean bir training parametresi eklemek daha iyi olacaktır. 

In [37]:
class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        # eğitim sırasında dropout uygulanır ama inference aşamasında dropout uygulanmamalı 
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

# Model Sınıfı 
Layer sınıfında iç hesaplamaları yaparız. Model sınıfını kullanarakta modelin yapısını oluştururuz. Örneğin ResNet50 modelinde bulunan ResNet bloklaro Layer sınıfından extend edilmiştir. Genel bir Model sınıfı kullanarakta ResNet50 modelini oluştururuz.  
Model sınıfı Layer sınıfının özelliklerini barındırmakla birlikte ayrıca  ;
* model.fit , model.evaluate ve model.predict metotları bulunur
* layerları görmek için model.layers kullanılır 
* Modeli kaydetmek ve searalize etmek için model.save ve save_weights metotları bulunur. 

Layer sınıfı literatürde neye layer diyorsak bunları karşılar. Örneğin ; convolution layer, recurrent layer yada bloklar.  
Model sınıfı ise literatürde neye model diyorsak buna karşılık gelir. Örneğin ; deep learning model, network   

Model mi layer sınıfını mı kullanmam gerek gibi bir soru ile karşı karşıya kalırsanız. fit metodunu çağırmaya, save metoduna ihityacım varmı sorularını sorarak karar verebilirsiniz.  


In [39]:
class ResNet(tf.keras.Model):

    def __init__(self, num_classes=1000):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


# resnet = ResNet()
# dataset = ...
# resnet.fit(dataset, epochs=10)
# resnet.save(filepath)

## Tüm öğrendiklerimi bir araya getirelim 
* Layer sınıfı içerisinde weights, bias bilgileri bulunur __call__ metodu içerisinde hesaplamalar yapılır 
* Layerlar recursive olarak kullanılabilir. 
* Layerlar ile loss ve accuracy bilgisi kontrol edilebilir 
* Tüm layerları kaplayan yapı Modeldir. 

Variational AutoEncoder ile tüm bu öğrendiklerimizi bir araya getirelim 


In [40]:
from tensorflow.keras import layers


class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = -0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

In [41]:
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs.
for epoch in range(epochs):
    print("Start of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print("step %d: mean loss = %.4f" % (step, loss_metric.result()))

Start of epoch 0
step 0: mean loss = 0.3431
step 100: mean loss = 0.1250
step 200: mean loss = 0.0989
step 300: mean loss = 0.0890
step 400: mean loss = 0.0841
step 500: mean loss = 0.0808
step 600: mean loss = 0.0787
step 700: mean loss = 0.0771
step 800: mean loss = 0.0759
step 900: mean loss = 0.0749
Start of epoch 1
step 0: mean loss = 0.0746
step 100: mean loss = 0.0740
step 200: mean loss = 0.0735
step 300: mean loss = 0.0730
step 400: mean loss = 0.0727
step 500: mean loss = 0.0723
step 600: mean loss = 0.0720
step 700: mean loss = 0.0717
step 800: mean loss = 0.0715
step 900: mean loss = 0.0712


In [42]:
vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x1fa4a718a90>