In [None]:
# import the necessary packages
from keras.applications.vgg19 import VGG19
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import mnist
import numpy as np
import matplotlib.pyplot as plt

In [None]:
class ConvAutoencoder(object):
    def __init__ (self, inputShape, filters, latentDim, learning_rate=3e-4):
        '''
        Inputs:
            :param width: (int) -> Width of the input image in pixels.
            :param height: (int) -> Height of the input image in pixels.
            :param depth: (int) -> Number of channels (i.e., depth) of the input volume.
            :param filters: (tuple(int)) -> A tuple that contains the set of filters for convolution operations.
            :param latentDim: (int) -> The number of neurons in our fully-connected (Dense) latent vector
        '''
        self.inputShape = inputShape
        self.filters = filters
        self.latentDim = latentDim
        self.learning_rate = learning_rate
        self.optimizer = Adam(learning_rate)
        self.encoder, self.decoder, self.autoencoder = self.create_nets(width, height, depth, filters, latentDim)

    #-------------------------------------------------------------------------------------------------------------------       
    def create_nets(self, width, height, depth, filters, latentDim):
        inputShape = (height, width, depth)
        chanDim = -1
        # define the input to the encoder
        inputs = Input(shape=inputShape)
        x = inputs
        # loop over the number of filters
        for f in filters:
            # apply a CONV => RELU => BN operation
            x = Conv2D(f, (3, 3), strides=2, padding="same")(x)
            x = LeakyReLU(alpha=0.2)(x)
            x = BatchNormalization(axis=chanDim)(x)
        # flatten the network and then construct our latent vector
        volumeSize = K.int_shape(x)
        x = Flatten()(x)
        latent = Dense(latentDim)(x)
        # build the encoder model
        encoder = Model(inputs, latent, name="encoder")

        # start building the decoder model which will accept the
        # output of the encoder as its inputs
        latentInputs = Input(shape=(latentDim,))
        x = Dense(np.prod(volumeSize[1:]))(latentInputs)
        x = Reshape((volumeSize[1], volumeSize[2], volumeSize[3]))(x)
        # loop over our number of filters again, but this time in
        # reverse order
        for f in filters[::-1]:
            # apply a CONV_TRANSPOSE => RELU => BN operation
            x = Conv2DTranspose(f, (3, 3), strides=2, padding="same")(x)
            x = LeakyReLU(alpha=0.2)(x)
            x = BatchNormalization(axis=chanDim)(x)

        # apply a single CONV_TRANSPOSE layer used to recover the
        # original depth of the image
        x = Conv2DTranspose(depth, (3, 3), padding="same")(x)
        outputs = Activation("sigmoid")(x)
        # build the decoder model
        decoder = Model(latentInputs, outputs, name="decoder")
        # our autoencoder is the encoder + decoder
        autoencoder = Model(inputs, decoder(encoder(inputs)), name="autoencoder")
        
        autoencoder.compile(loss="mse", optimizer=self.optimizer)
        # return 3 networks
        return encoder, decoder, autoencoder
    
    #-------------------------------------------------------------------------------------------------------------------
    def train(self, batch_size, train_data, train_label, num, verbose_num, validation_data, epochs=1):
        verbose = 1 if num%verbose_num==0 else 0
        validation_data = validation_data if num%verbose_num==0 else None
        self.autoencoder.fit(
            train_data, 
            train_label, 
            epochs=epochs, 
            batch_size=batch_size,
            validation_data=validation_data,
            verbose=verbose,
        )
        
    #-------------------------------------------------------------------------------------------------------------------
    def predict(self, input_data):
        return self.autoencoder.predict(input_data)
    
    #-------------------------------------------------------------------------------------------------------------------
    def encode(self, input_data):
        return self.encoder.predict(input_data)
    
    #-------------------------------------------------------------------------------------------------------------------
    def decode(self, input_data):
        return self.decoder.predict(input_data)
    
    #-------------------------------------------------------------------------------------------------------------------
    def save(self, path, name, num):
        if not os.path.exists(path+name+'/'):
            os.mkdir(path+name+'/')
            
        path = path+name+'/_'+str(num)+'/'
        os.mkdir(path)
        
        enc_model_json = self.encoder.to_json()
        with open(path+'_encoder_'+str(num)+'.json', 'w') as json_file:
            json_file.write(enc_model_json)
        self.encoder.save_weights(path+'_encoder_'+str(num)+'.h5')
        
        dec_model_json = self.decoder.to_json()
        with open(path+'_decoder_'+str(num)+'.json', 'w') as json_file:
            json_file.write(dec_model_json)
        self.decoder.save_weights(path+'_decoder_'+str(num)+'.h5')
        
        aut_model_json = self.autoencoder.to_json()
        with open(path+'_autoencoder_'+str(num)+'.json', 'w') as json_file:
            json_file.write(aut_model_json)
        self.autoencoder.save_weights(path+'_autoencoder_'+str(num)+'.h5')
     
    #-------------------------------------------------------------------------------------------------------------------
    def load(self, path, num):
        path = path+'/_'+str(num)+'/'
        
        enc_json_file = open(path+'_encoder_'+str(num)+'.json', 'r')
        enc_loaded_model_json = enc_json_file.read()
        enc_json_file.close()
        self.encoder = models.model_from_json(enc_loaded_model_json)
        self.encoder.load_weights(path+'_encoder_'+str(num)+'.h5')
        
        dec_json_file = open(path+'_decoder_'+str(num)+'.json', 'r')
        dec_loaded_model_json = dec_json_file.read()
        dec_json_file.close()
        self.decoder = models.model_from_json(dec_loaded_model_json)
        self.decoder.load_weights(path+'_decoder_'+str(num)+'.h5')
        
        aut_json_file = open(path+'_autoencoder_'+str(num)+'.json', 'r')
        aut_loaded_model_json = aut_json_file.read()
        aut_json_file.close()
        self.autoencoder = models.model_from_json(aut_loaded_model_json)
        self.autoencoder.load_weights(path+'_autoencoder_'+str(num)+'.h5')
        
        self.autoencoder.compile(loss="mse", optimizer=self.optimizer)


In [None]:
class Simbolic_Net(object):
    def __init__(self):
        self.model = VGG19()
        self.autoencoders = []
        
        self.autoencoders.append(ConvAutoencoder(56, 56, 128, (44, 64, 128), 4096))
        self.soms.append(MiniSom(32, 32, 4096, 2, 0.03))
     
        self.autoencoders.append(ConvAutoencoder(28, 28, 43, (64, 128), 2048))
        self.soms.append(MiniSom(25, 25, 2048, 1.7, 0.04))
        
        self.autoencoders.append(ConvAutoencoder(14, 14, 86, (128,), 1024))
        self.soms.append(MiniSom(20, 20, 1024, 1.5, 0.05))
        
        #self.autoencoders.append(ConvAutoencoder(7, 7, 86, (), 500))
    
    #-------------------------------------------------------------------------------------------------------------------
    def train(self, epochs, batch_size, images_path, images_names, validation_data, stride, num_comp, pooling_ratio, only_som=False, only_aut=False):
        print('INICIO')
        
        for epoch in range(epochs):
            val_data = self.sens_net.extract_batch(batch_size, images_path, validation_data, stride, num_comp, pooling_ratio)
            #Gera o vetor com um batch de ativações ja com dimensões reduzidas
            activations  = self.sens_net.extract_batch(batch_size, images_path, images_names, stride, num_comp, pooling_ratio) 
            for i, _ in enumerate(self.autoencoders):
                train_data = np.array([layer[i] for layer in activations])
                validation_data_ = np.array([layer[i] for layer in val_data])
                
                if only_aut or not only_som:
                    self.autoencoders[i].train(batch_size, train_data, train_data, epoch, 200, (validation_data_, validation_data_))
                
                if only_som or not only_aut:
                    self.soms[i].train(np.asarray(self.autoencoders[i].encode(train_data)), num_iteration=3)
                
            print('\rEpoch: ', epoch+1, '/', epochs, end='')
        
        print('\nFIM')
    
    #-------------------------------------------------------------------------------------------------------------------
    def generate_maps(self, image_path, image_name, stride, num_comp, pooling_ratio):
        data = self.sens_net.extract_batch(len(image_name), images_path, image_name, stride, num_comp, pooling_ratio)
        out_data = []
        atoenc_data = []
        
        for i, _ in enumerate(self.autoencoders):
            data_aux = np.array([layer[i] for layer in data])
            auto_out = self.autoencoders[i].predict(data_aux)
            data_aux = self.autoencoders[i].encode(data_aux)
            aux=[]
            
            for act in data_aux:
                aux.append(self.soms[i].activate(act))
            
            atoenc_data.append(auto_out)
            out_data.append(aux)
            
        return out_data, atoenc_data
    
    #-------------------------------------------------------------------------------------------------------------------
    def save_net(self, path, name):
        for i, _ in enumerate(self.autoencoders):
            self.autoencoders[i].save(path, name, i)
            with open(path+name+'/_'+str(i)+'/SOM_'+str(i)+'.p', 'wb') as outfile:
                pickle.dump(self.soms[i], outfile)
    
    #-------------------------------------------------------------------------------------------------------------------
    def load_net(self, path, som=True):
        for i, _ in enumerate(self.autoencoders):
            self.autoencoders[i].load(path, i)
            if som:
                with open(path+'_'+str(i)+'/SOM_'+str(i)+'.p', 'rb') as infile:
                    self.soms[i] = pickle.load(infile)
