In [None]:
# Importing libaries
import numpy as np
import matplotlib.pyplot as plt
from keras.datasets import mnist
from keras.layers import Input, Dense, Lambda, Dropout, Flatten,Conv2D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.models import Model, Sequential
from keras.callbacks import EarlyStopping
from keras import backend as K
from keras import objectives
from scipy.stats import norm
from keras.utils import np_utils

In [None]:
# data load
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# x_train, x_test = x_train.astype('float32')/255., x_test.astype('float32')/255.
# x_train, x_test = x_train.reshape(x_train.shape[0], -1), x_test.reshape(x_test.shape[0], -1)
print(x_train.shape, x_test.shape)


Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
(60000, 28, 28) (10000, 28, 28)


In [None]:
class mnist_mlp():
  def __init__(self,x_train,y_train,x_test,y_test, n_hidden, n_classes, batch_size, epoch, p=1):
    self.x_train=x_train.copy()
    self.x_test=x_test.copy()
    self.y_train=y_train.copy()
    self.y_test=y_test.copy()
    self.n_hidden=n_hidden
    self.n_classes=n_classes
    self.batch=batch_size
    self.n_epoch=epoch
    self.reduction=p
  
  def preprocess(self):
    #self.x_train = self.x_train.reshape(self.x_train.shape[0], self.x_train.shape[1]*self.x_train.shape[2])
    #self.x_test = self.x_test.reshape(self.x_test.shape[0], self.x_test.shape[1]*self.x_test.shape[2])
    self.x_train=self.x_train.reshape(self.x_train.shape[0],784)
    self.x_test=self.x_test.reshape(self.x_test.shape[0],784)
    self.x_train=self.x_train.astype('float32')/255
    self.x_test=self.x_test.astype('float32')/255
    self.y_train = np_utils.to_categorical(self.y_train, self.n_classes)
    self.y_test = np_utils.to_categorical(self.y_test, self.n_classes) 

  def run(self):
    mlp_model = Sequential()
    mlp_model.add(Dense(self.n_hidden, activation='relu', input_dim=self.x_train.shape[1]))
    mlp_model.add(Dropout(0.2))
    mlp_model.add(Dense(self.n_hidden, activation='relu'))
    mlp_model.add(Dropout(0.2))
    mlp_model.add(Dense(self.n_hidden, activation='relu'))
    mlp_model.add(Dropout(0.2))
    mlp_model.add(Dense(self.n_classes, activation='softmax'))
    mlp_model.summary()

    #Compile Model
    mlp_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    # Reduce Data Size
    size = int(len(self.x_train) * self.reduction)
    self.x_train, self.y_train = self.x_train[:size],self.y_train[:size]


    #Fit Model  
    mlp_model.fit(self.x_train, self.y_train, batch_size=self.batch, epochs=self.n_epoch, validation_split=0.33,shuffle=True,
              callbacks= EarlyStopping(monitor='val_loss', patience=2, mode='min')
             )
    
  def predict(self):
    score = mlp_model.evaluate(self.x_test, self.y_test)
    predictions = mlp_model.predict_classes(X_test)
    predictions = list(predictions)
    actuals = list(y_test)
    print('Test accuracy: ', score[1])



In [None]:
MLP = mnist_mlp(x_train,y_train,x_test,y_test,128, 10, 32, 50,0.4)

In [None]:
MLP.preprocess()
MLP.run()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 128)               100480    
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 128)               16512     
_________________________________________________________________
dropout_1 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 128)               16512     
_________________________________________________________________
dropout_2 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 10)                1

In [None]:
class mnist_CNN():
  def __init__(self,x_train,y_train,x_test,y_test, n_classes, batch_size, epoch, p=1):
    self.x_train=x_train.copy()
    self.x_test=x_test.copy()
    self.y_train=y_train.copy()
    self.y_test=y_test.copy()
    self.n_classes=n_classes
    self.batch=batch_size
    self.n_epoch=epoch
    self.reduction=p
  
  def preprocess(self):
    #self.x_train = self.x_train.reshape(self.x_train.shape[0], self.x_train.shape[1]*self.x_train.shape[2])
    #self.x_test = self.x_test.reshape(self.x_test.shape[0], self.x_test.shape[1]*self.x_test.shape[2])
    self.x_train=self.x_train.reshape(self.x_train.shape[0],28,28,1)
    self.x_test=self.x_test.reshape(self.x_test.shape[0],28,28,1)
    self.x_train=self.x_train.astype('float32')/255
    self.x_test=self.x_test.astype('float32')/255
    self.y_train = np_utils.to_categorical(self.y_train, self.n_classes)
    self.y_test = np_utils.to_categorical(self.y_test, self.n_classes) 
  
  def build_model(self):
    self.m = Sequential()
    self.m.add(Conv2D(32, (3, 3), activation='relu', input_shape=(28,28,1)))
    BatchNormalization(axis=-1)
    self.m.add(Conv2D(32, (3, 3), activation='relu'))
    self.m.add(MaxPooling2D(pool_size=(2,2)))
    BatchNormalization(axis=-1)
    self.m.add(Conv2D(64,(3, 3) ,activation='relu'))
    BatchNormalization(axis=-1)
    self.m.add(Conv2D(64, (3, 3), activation='relu'))
    self.m.add(MaxPooling2D(pool_size=(2,2)))
    self.m.add(Flatten())
    # Fully connected layer
    BatchNormalization()
    self.m.add(Dense(512,activation='relu'))
    BatchNormalization()
    self.m.add(Dropout(0.2))
    self.m.add(Dense(self.n_classes, activation='softmax'))
    self.m.summary()
  
  def run_model(self):
    self.m.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    self.m.fit(self.x_train, self.y_train, batch_size=self.batch, epochs=self.n_epoch, validation_split=0.33)
              #callbacks=EarlyStopping(monitor='val_loss', patience=2, mode='min') )
  
  def predict(self):
    score = self.m.evaluate(self.x_test, self.y_test)
    predictions = self.m.predict_classes(self.x_test)
    predictions = list(predictions)
    actuals = list(y_test)
    print('Test accuracy: ', score[1])

In [None]:
cnn=mnist_CNN(x_train,y_train,x_test,y_test, 10, 32, 10, p=1)
cnn.preprocess()
cnn.build_model()
cnn.run_model()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 26, 26, 32)        320       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 24, 24, 32)        9248      
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 12, 12, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 10, 10, 64)        18496     
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 8, 8, 64)          36928     
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 4, 4, 64)          0         
_________________________________________________________________
flatten (Flatten)            (None, 1024)              0

In [None]:
class mnist_VAE():
   def __init__(self,x_train, n_hidden, z_dim, batch_size, epoch):
     self.x_train=x_train.copy()
     self.n_hidden=n_hidden
     self.z_dim=z_dim
     self.batch=batch_size
     self.n_epoch=epoch
     self.vae_loss=0

   def preprocess(self):
     self.x_train=self.x_train.reshape(self.x_train.shape[0],784)
     self.x_train=self.x_train.astype('float32')/255

    
   def encoder(self):
     x = Input(shape=(self.x_train.shape[1:]))
     x_encoded = Dense(self.n_hidden, activation='relu')(x)
     x_encoded = Dense(self.n_hidden//2, activation='relu')(x_encoded)
     mu = Dense(self.z_dim)(x_encoded)
     log_var = Dense(self.z_dim)(x_encoded)
     return x,mu,log_var

   def sampling(self,args):
     mu, log_var = args
     eps = K.random_normal(shape=(self.z_dim,), mean=0., stddev=1.0)
     return mu + K.exp(log_var) * eps
     
   def decoder(self, mu, log_var):
     z_decoder1 = Dense(self.n_hidden//2, activation='relu')
     z_decoder2 = Dense(self.n_hidden, activation='relu')
     y_decoder = Dense(self.x_train.shape[1], activation='sigmoid')
     z = Lambda(self.sampling, output_shape=(self.z_dim,))([mu, log_var])
     z_decoded = z_decoder1(z)
     z_decoded = z_decoder2(z_decoded)
     y = y_decoder(z_decoded)  
     return y 

   def loss(self,x,y,mu,log_var):
     reconstruction_loss = objectives.binary_crossentropy(x, y) * x_train.shape[1]
     kl_loss = 0.5 * K.sum(K.square(mu) + K.exp(log_var) - log_var - 1, axis = -1)
     self.vae_loss = reconstruction_loss + kl_loss   

   def build(self,x,y):
     self.m = Model(x, y)
     self.m.add_loss(self.vae_loss)
     #self.m.summary()

   def all_run(self):
     x = Input(shape=(self.x_train.shape[1:]))
     x_encoded = Dense(self.n_hidden, activation='relu')(x)
     x_encoded = Dense(self.n_hidden//2, activation='relu')(x_encoded)
     mu = Dense(self.z_dim)(x_encoded)
     log_var = Dense(self.z_dim)(x_encoded)
     z_decoder1 = Dense(self.n_hidden//2, activation='relu')
     z_decoder2 = Dense(self.n_hidden, activation='relu')
     y_decoder = Dense(self.x_train.shape[1], activation='sigmoid')
     z = Lambda(self.sampling, output_shape=(self.z_dim,))([mu, log_var])
     z_decoded = z_decoder1(z)
     z_decoded = z_decoder2(z_decoded)
     y = y_decoder(z_decoded)
     reconstruction_loss = objectives.binary_crossentropy(x, y) * x_train.shape[1]
     kl_loss = 0.5 * K.sum(K.square(mu) + K.exp(log_var) - log_var - 1, axis = -1)
     vae_loss = reconstruction_loss + kl_loss 
     m = Model(x, y)
     m.add_loss(vae_loss)
     m.compile(optimizer='adam')
     m.fit(self.x_train,
       shuffle=True,
       epochs=self.n_epoch,
       batch_size=self.batch,
       validation_split=0.3,
       callbacks=EarlyStopping(monitor='val_loss', patience=2),
       verbose=1)     

   def run(self):
     self.preprocess()
     x,mu,log_var= self.encoder()
     #self.sampling()
     y=self.decoder(mu, log_var)
     self.loss(x,y,mu,log_var)
     self.build(x,y)
     self.m.compile(optimizer='adam')
     self.m.fit(self.x_train,
       shuffle=True,
       epochs=self.n_epoch,
       batch_size=self.batch,
       validation_split=0.3,
       callbacks=EarlyStopping(monitor='val_loss', patience=2),
       verbose=1)
     
   def build_encoder(self):
     encoder = Model(x, mu)
     encoder.summary()

   def latent_rep(self, x_test):
     z_latent = encoder.predict(x_test, batch_size=batch_size)
     return z_latent

   def build_decoder(self):
     decoder_input = Input(shape=(self.z_dim,))
     _z_decoded = z_decoder1(decoder_input)
     _z_decoded = z_decoder2(_z_decoded)
     _y = y_decoder(_z_decoded)
     generator = Model(decoder_input, _y)
     generator.summary()

   def decode_digit(self):
     x_decoded = generator.predict(z_sample)
     digit = x_decoded[0].reshape(28, 28)
     return digit

In [None]:
vae = mnist_VAE(x_train, 256, 2, 64, 50)
vae.preprocess()

In [None]:
vae.all_run()

reconstruction_loss: KerasTensor(type_spec=TensorSpec(shape=(None,), dtype=tf.float32, name=None), name='tf.math.multiply_16/Mul:0', description="created by layer 'tf.math.multiply_16'") 	 KL Loss: KerasTensor(type_spec=TensorSpec(shape=(None,), dtype=tf.float32, name=None), name='tf.math.multiply_17/Mul:0', description="created by layer 'tf.math.multiply_17'")
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
