In [7]:
from keras import Model
from keras.layers import Input, Conv2D, ReLU, BatchNormalization, Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from keras import backend as K
from keras.optimizers import Adam
from keras.losses import MeanSquaredError
import numpy as np
import tensorflow as tf
tf.compat.v1.disable_eager_execution()

class VAE:
    def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,
                 conv_strides,
                 latent_space_dim):
        self.input_shape = input_shape   # ex : [28,28,1] For image
        self.conv_filters = conv_filters # ex : [2, 4, 8]
        self.conv_kernels = conv_kernels # ex : [3, 5, 3]
        self.conv_strides = conv_strides # ex : [1, 2, 2]
        self.latent_space_dim = latent_space_dim # ex : 2
        self.reconstruction_loss_weight = 1000

        self.encoder = None
        self.decoder = None
        self.model = None

        self._num_conv_layers = len(conv_filters)
        self._shape_before_bottleneck = None
        self._model_input = None

        self._build()

    def _build(self):
        self._build_encoder()     #Calling _build_encoder Function
        self._build_decoder()     #Calling _build_decoder Function 
        self._build_autoencoder() #Calling _build_autoencoder Function
    
        
    #ENCODER===========================================================================================ENCODER#
    #ENCODER===========================================================================================ENCODER#
    def _build_encoder(self):
        encoder_input = self._add_encoder_input()          # Encoder Helper Function - 1
        conv_layers = self._add_conv_layers(encoder_input) # Encoder Helper Function - 2
        bottleneck = self._add_bottleneck(conv_layers)     # Encoder Helper Function - 3
        self._model_input = encoder_input
        self.encoder = Model(encoder_input, bottleneck, name="encoder")

    # Encoder Helper Function - 1
    def _add_encoder_input(self):
        return Input(shape=self.input_shape, name="encoder_input")
    
    # Encoder Helper Function - 2
    def _add_conv_layers(self, encoder_input):
        """Create all convolutional blocks in encoder."""
        x = encoder_input
        for layer_index in range(self._num_conv_layers):
            x = self._add_conv_layer(layer_index, x)    #Call below function to add layers
        return x

    def _add_conv_layer(self, layer_index, x):
        layer_number = layer_index + 1
        conv_layer = Conv2D(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"encoder_conv_layer_{layer_number}"
        )
        x = conv_layer(x)
        x = ReLU(name=f"encoder_relu_{layer_number}")(x)
        x = BatchNormalization(name=f"encoder_bn_{layer_number}")(x)
        return x
    
    # Encoder Helper Function - 3
    def _add_bottleneck(self, x):
        """Flatten data and add bottleneck with Guassian sampling (Dense layer)."""
        self._shape_before_bottleneck = K.int_shape(x)[1:]
        x = Flatten()(x)
        self.mu = Dense(self.latent_space_dim, name="mu")(x)
        self.log_variance = Dense(self.latent_space_dim, name="log_variance")(x)

        def sample_point_from_normal_distribution(args):
            mu, log_variance = args
            epsilon = K.random_normal(shape=K.shape(self.mu), mean=0., stddev=1.)
            sampled_point = mu + K.exp(log_variance / 2) * epsilon
            return sampled_point

        x = Lambda(sample_point_from_normal_distribution, name="encoder_output")([self.mu, self.log_variance])
        return x
    
    
    #DECODER===========================================================================================DECODER#
    #DECODER===========================================================================================DECODER#
    def _build_decoder(self):
        decoder_input = self._add_decoder_input()                              # Decoder Helper Function - 1
        dense_layer = self._add_dense_layer(decoder_input)                     # Decoder Helper Function - 2
        reshape_layer = self._add_reshape_layer(dense_layer)                   # Decoder Helper Function - 3
        conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer) # Decoder Helper Function - 4
        decoder_output = self._add_decoder_output(conv_transpose_layers)       # Decoder Helper Function - 5
        self.decoder = Model(decoder_input, decoder_output, name="decoder")
    
    # Decoder Helper Function - 1
    def _add_decoder_input(self):
        return Input(shape=self.latent_space_dim, name="decoder_input")
    
    # Decoder Helper Function - 2
    def _add_dense_layer(self, decoder_input):
        num_neurons = np.prod(self._shape_before_bottleneck) # [1, 2, 4] -> 8
        dense_layer = Dense(num_neurons, name="decoder_dense")(decoder_input)
        return dense_layer
    
    # Decoder Helper Function - 3
    def _add_reshape_layer(self, dense_layer):
        return Reshape(self._shape_before_bottleneck)(dense_layer)
    
    # Decoder Helper Function - 4
    def _add_conv_transpose_layers(self, x):
        """Add conv transpose blocks."""
        # loop through all the conv layers in reverse order and stop at the
        # first layer
        for layer_index in reversed(range(1, self._num_conv_layers)):
            x = self._add_conv_transpose_layer(layer_index, x)
        return x

    def _add_conv_transpose_layer(self, layer_index, x):
        layer_num = self._num_conv_layers - layer_index
        conv_transpose_layer = Conv2DTranspose(
            filters=self.conv_filters[layer_index],
            kernel_size=self.conv_kernels[layer_index],
            strides=self.conv_strides[layer_index],
            padding="same",
            name=f"decoder_conv_transpose_layer_{layer_num}"
        )
        x = conv_transpose_layer(x)
        x = ReLU(name=f"decoder_relu_{layer_num}")(x)
        x = BatchNormalization(name=f"decoder_bn_{layer_num}")(x)
        return x
    
    # Decoder Helper Function - 5
    def _add_decoder_output(self, x):
        conv_transpose_layer = Conv2DTranspose(
            filters=1,
            kernel_size=self.conv_kernels[0],
            strides=self.conv_strides[0],
            padding="same",
            name=f"decoder_conv_transpose_layer_{self._num_conv_layers}"
        )
        x = conv_transpose_layer(x)
        output_layer = Activation("sigmoid", name="sigmoid_layer")(x)
        return output_layer

    
    #BUILDING MODEL FROM ENCODER AND DECODER#==================================================================#
    def _build_autoencoder(self):
        model_input = self._model_input
        model_output = self.decoder(self.encoder(model_input))
        self.model = Model(model_input, model_output, name="autoencoder")
    
    #GENERATING SUMMARY#======================================================================================#
    def summary(self):
        self.encoder.summary()
        self.decoder.summary()
        self.model.summary()
        

    #COMPLILE AND TRAINING MODEL#============================================================================#
    def compile(self, learning_rate=0.0001):
        optimizer = Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=optimizer,
                           loss=self._calculate_combined_loss,
                           metrics=[self._calculate_reconstruction_loss,
                                    self._calculate_kl_loss])
        
    def train(self, x_train, batch_size, num_epochs):
        self.model.fit(x_train,
                       x_train,
                       batch_size=batch_size,
                       epochs=num_epochs,
                       shuffle=True)

    #LOSS FUNCTIONS#========================================================================================#
    def _calculate_combined_loss(self, y_target, y_predicted):
        reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
        kl_loss = self._calculate_kl_loss(y_target, y_predicted)
        combined_loss = self.reconstruction_loss_weight * reconstruction_loss + kl_loss
        return combined_loss

    def _calculate_reconstruction_loss(self, y_target, y_predicted):
        error = y_target - y_predicted
        reconstruction_loss = K.mean(K.square(error), axis=[1, 2, 3])
        return reconstruction_loss

    def _calculate_kl_loss(self, y_target, y_predicted):
        kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) - K.exp(self.log_variance), axis=1)
        return kl_loss
    
        
    #RECONSTRUCTING INPUT#===============================================================================#
    def reconstruct(self, input_data):
        latent_representations = self.encoder.predict(input_data)
        reconstructed_data = self.decoder.predict(latent_representations)
        return reconstructed_data,latent_representations


# NLP

In [2]:
# import re
# import nltk
# import pandas as pd
# import numpy as np

In [3]:
# data=pd.read_csv(r'F:\4 - DESKTOP\archive datasets\imdb_movie.csv')
# print(data.shape)
# data.head()

In [4]:
# data=data.iloc[:1000,:]
# data.shape

In [5]:
# data.ndim

In [6]:
# from nltk.corpus import stopwords
# from nltk.stem import PorterStemmer
# ps=PorterStemmer()

# clean_data=[]
# for text in data['review']:
#     text=text.lower()
#     text=re.sub('[^A-Za-z]',' ',text)
#     text = text.split()
#     text=[ps.stem(word) for word in text if word not in set(stopwords.words('english'))]
#     text=' '.join(text)
#     clean_data.append(text)

# # clean_data=pd.DataFrame(data_lst)
# # data['review']=clean_data

In [7]:
# from keras.preprocessing.text import one_hot
# from keras.preprocessing.sequence import pad_sequences

# voc_size=5000
# one_hot_repr=[one_hot(word,voc_size) for word in clean_data]
# one_hot_repr

In [8]:
# len(one_hot_repr)

In [9]:
# data=pad_sequences(one_hot_repr,padding='pre',maxlen=20)
# data

In [10]:
# data.shape

In [11]:
# LEARNING_RATE = 0.0005
# BATCH_SIZE = 32
# EPOCHS = 10
# autoencoder = train(data, LEARNING_RATE, BATCH_SIZE, EPOCHS, (None,20))

In [12]:
# import keras
# from keras.models import Sequential
# from keras.layers import Dense,Dropout,LSTM
# from keras.layers.embeddings import Embedding
# embedding_vector_features=40

# model=Sequential()
# model.add(Embedding(voc_size,embedding_vector_features,input_length=20))
# model.add(Dropout(0.3))
# model.add(LSTM(100))
# model.add(Dropout(0.3))
# model.add(Dense(1,activation='sigmoid'))
# model.compile(loss='binary_crossentropy',optimizer='adam',metrics=['accuracy'])
# model.fit(X_train,y_train,validation_data=(X_test,y_test),epochs=10,batch_size=64)