<a href="https://colab.research.google.com/github/AtSourav/AE-n-VAE-with-CB-loss-on-stl10/blob/main/AE_stl10_CBloss.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow import math
import tensorflow_datasets as tfds
from keras import losses
from keras import layers
from keras import utils
from keras import backend as K

import os
import random
import matplotlib.pyplot as plt

In [2]:
img_ht = 48
img_wd = 48

input_size = (img_ht,img_wd,3)
latent_dim = 256
batch_size = 512

In [3]:
ds = tfds.load('stl10', split='unlabelled[:10%]')        # we're only getting 10% of the unlabelled set for now coz the entire set is huge

Downloading and preparing dataset 2.46 GiB (download: 2.46 GiB, generated: 1.86 GiB, total: 4.32 GiB) to /root/tensorflow_datasets/stl10/1.0.0...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/3 [00:00<?, ? splits/s]

Generating train examples...:   0%|          | 0/5000 [00:00<?, ? examples/s]

Shuffling /root/tensorflow_datasets/stl10/1.0.0.incompleteUPELVZ/stl10-train.tfrecord*...:   0%|          | 0/…

Generating test examples...:   0%|          | 0/8000 [00:00<?, ? examples/s]

Shuffling /root/tensorflow_datasets/stl10/1.0.0.incompleteUPELVZ/stl10-test.tfrecord*...:   0%|          | 0/8…

Generating unlabelled examples...:   0%|          | 0/100000 [00:00<?, ? examples/s]

Shuffling /root/tensorflow_datasets/stl10/1.0.0.incompleteUPELVZ/stl10-unlabelled.tfrecord*...:   0%|         …

Dataset stl10 downloaded and prepared to /root/tensorflow_datasets/stl10/1.0.0. Subsequent calls will reuse this data.


In [4]:
ds_to_np = list(ds.as_numpy_iterator())

In [7]:
ds_to_np[1]

{'image': array([[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        ...,
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]]], dtype=uint8),
 'label': -1}

In [5]:
print(set(tuple(x.keys()) for x in ds_to_np))                # we converted the list of keys into a tuple since we wanted to use set() for duplicate removal
                                                             # set() only works on hashable objects like strings, numbers, and tuples,
                                                             # and not on mutables like dicts and strings

print(set(x['label'] for x in ds_to_np))
print(set(type(x['image']) for x in ds_to_np))
print(set(x['image'].shape for x in ds_to_np))
print(len(ds_to_np))

{('image', 'label')}
{-1}
{<class 'numpy.ndarray'>}
{(96, 96, 3)}
10000


ds_to_np is a list of dictionaries where the key:value pairs are the 'image':img_ndarray, 'label'=-1. We want to collect all these arrays and form a tensor/array. There are 100k images. We'll use 50k of them in the training set, 25k for validation, and 25k for the test set.

In [6]:
random.seed(100)
random.shuffle(ds_to_np)                                    # random permutation in place
img_train = np.array([x['image'] for x in ds_to_np[:7000]])
img_valid = np.array([x['image'] for x in ds_to_np[7000:8000]])
img_test = np.array([x['image'] for x in ds_to_np[8000:]])

img_train = img_train/255
img_valid = img_valid/255
img_test = img_test/255

In [7]:
print(img_train.shape)

(7000, 96, 96, 3)


In [8]:
img_train = layers.Resizing(img_ht, img_wd, interpolation="bilinear")(img_train)
print(img_train.shape)

img_valid = layers.Resizing(img_ht, img_wd, interpolation="bilinear")(img_valid)
print(img_valid.shape)

img_test = layers.Resizing(img_ht, img_wd, interpolation="bilinear")(img_test)
print(img_test.shape)

(7000, 48, 48, 3)
(1000, 48, 48, 3)
(2000, 48, 48, 3)


In [9]:
encoder_input = keras.Input(shape=input_size)

x = layers.Conv2D(8, 2, padding="same")(encoder_input)
x = layers.MaxPooling2D(pool_size=(2, 2), strides=None, padding="valid")(x)
#x = layers.BatchNormalization(axis=-1)(x)
x = layers.ReLU()(x)

x = layers.Conv2D(16, 2, padding="same")(x)
x = layers.MaxPooling2D(pool_size=(2, 2), strides=None, padding="valid")(x)
x = layers.ReLU()(x)

x = layers.Conv2D(32, 2, padding="same")(x)
x = layers.MaxPooling2D(pool_size=(2, 2), strides=None, padding="valid")(x)
x = layers.ReLU()(x)

x = layers.Conv2D(64, 2, padding="valid")(x)
x = layers.ReLU()(x)

x = layers.Conv2D(128, 2, padding="valid")(x)
x = layers.ReLU()(x)

x = layers.Conv2D(256, 2, padding="valid")(x)
x = layers.ReLU()(x)

x = layers.Conv2D(512, 2, padding="valid")(x)
x = layers.ReLU()(x)

#x = layers.Conv2D(512, 2, padding="valid")(x)
#x = layers.ReLU()(x)

x = layers.Conv2D(1024, 2, padding="valid")(x)
x = layers.ReLU()(x)

#x = layers.Conv2D(1024, 2, padding="valid")(x)
#x = layers.ReLU()(x)

x = layers.Flatten()(x)

x = layers.Dense(2*latent_dim)(x)
x = layers.ReLU()(x)


z = layers.Dense(latent_dim, name="z")(x)

encoder = keras.Model(encoder_input, z, name='encoder')
encoder.summary()



Model: "encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 48, 48, 3)]       0         
                                                                 
 conv2d (Conv2D)             (None, 48, 48, 8)         104       
                                                                 
 max_pooling2d (MaxPooling2  (None, 24, 24, 8)         0         
 D)                                                              
                                                                 
 re_lu (ReLU)                (None, 24, 24, 8)         0         
                                                                 
 conv2d_1 (Conv2D)           (None, 24, 24, 16)        528       
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 12, 12, 16)        0         
 g2D)                                                      

In [10]:
latent_input = keras.Input(shape=(latent_dim,))

x = layers.Dense(1*1*1024)(latent_input)
x = layers.ReLU()(x)
x = layers.Reshape((1,1,1024))(x)

x = layers.Conv2DTranspose(512, 2, strides=2, padding='same')(x)
x = layers.ReLU()(x)

x = layers.Conv2DTranspose(256, 2, strides=1, padding='valid')(x)
x = layers.ReLU()(x)

x = layers.Conv2DTranspose(256, 2, strides=2, padding='same')(x)
x = layers.ReLU()(x)

x = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(x)
x = layers.ReLU()(x)

x = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(x)
x = layers.ReLU()(x)

x = layers.Conv2DTranspose(16, 2, strides=2, padding='same')(x)
x = layers.ReLU()(x)

#x = layers.Conv2DTranspose(8, 2, strides=1, padding='same')(x)
#x = layers.ReLU()(x)

#x = layers.Conv2DTranspose(32, 2, strides=1, padding='same')(x)
#x = layers.ReLU()(x)

x = layers.Conv2DTranspose(16, 2, strides=1, padding='same')(x)
x = layers.ReLU()(x)

x = layers.Conv2DTranspose(8, 2, strides=1, padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)

decoder_output = layers.Conv2DTranspose(3, 2, activation='sigmoid', strides=1, padding='same')(x)

decoder = keras.Model(latent_input, decoder_output, name="decoder")
decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 256)]             0         
                                                                 
 dense_1 (Dense)             (None, 1024)              263168    
                                                                 
 re_lu_9 (ReLU)              (None, 1024)              0         
                                                                 
 reshape (Reshape)           (None, 1, 1, 1024)        0         
                                                                 
 conv2d_transpose (Conv2DTr  (None, 2, 2, 512)         2097664   
 anspose)                                                        
                                                                 
 re_lu_10 (ReLU)             (None, 2, 2, 512)         0         
                                                           

In [11]:
decoder_out = decoder(encoder(encoder_input))
VAE = keras.Model(encoder_input, decoder_out, name='VAE')

VAE.summary()

Model: "VAE"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 48, 48, 3)]       0         
                                                                 
 encoder (Functional)        (None, 256)               3454296   
                                                                 
 decoder (Functional)        (None, 48, 48, 3)         3354539   
                                                                 
Total params: 6808835 (25.97 MB)
Trainable params: 6808819 (25.97 MB)
Non-trainable params: 16 (64.00 Byte)
_________________________________________________________________


In [12]:
l_cutoff = 0.495
u_cutoff = 0.505

def norm_CB(z):

    gate = math.logical_and(math.greater(z,l_cutoff), math.greater(u_cutoff,z))

    #z = tf.clip_by_value(z, clip_value_min = K.epsilon(), clip_value_max = 1 - K.epsilon())
    z_reg = tf.where(gate, l_cutoff, z)

    norm_reg = (2*math.atanh(1 - 2*z_reg))/(1 - 2*z_reg)
    norm_taylor = 2.0 + (8.0/3.0)*math.pow(z-0.5,2) + (32.0/5.0)*math.pow(z-0.5,4)  +  (128.0/7.0)*math.pow(z-0.5,6)


    norm = tf.where(gate, norm_taylor, norm_reg)

    return norm

def CB_logloss(true, pred):
  true = layers.Flatten()(true)
  pred = layers.Flatten()(pred)
  bce = losses.binary_crossentropy(true,pred)

  corrected_loss_tensor = bce + tf.reduce_mean(math.log(norm_CB(pred)), axis=-1 )


  return tf.reduce_mean(corrected_loss_tensor)



In [13]:
#lr_schedule = keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.001, decay_steps=100, decay_rate=0.98)
optim = keras.optimizers.Adam(learning_rate = 0.001)                  # clipnorm=2.0
VAE.compile(optimizer=optim,loss=CB_logloss)

In [14]:
history = VAE.fit(img_train, img_train, batch_size=batch_size, validation_data=(img_valid, img_valid), epochs=20, steps_per_epoch=None)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
history2 = VAE.fit(img_train, img_train, batch_size=batch_size, validation_data=(img_valid, img_valid), epochs=50, steps_per_epoch=None)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
 3/14 [=====>........................] - ETA: 1s - loss: 1.3223