In [0]:


#                              AUTOENCODER


#***********************************IMPORTS*************************************


# Install TensorFlow
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass

import tensorflow as tf
import numpy as np
from google.colab import drive
import sys
import os



drive.mount('/content/gdrive', force_remount=True)
project_path = "/content/gdrive/My Drive/shared/Colab Notebooks/tesi/models"           #PATH NEED TO BE CHANGED ACCORDING TO THE LOCATION OF THE PROJECT
data_path = "/content/gdrive/My Drive/shared/Colab Notebooks/tesi/data/"
weights_path = project_path + '/weights/'
sys.path.append(project_path)

from evaluation_utilities import *
from data_utilities import *
from net_utilities import *


#*******************************************************************************

In [0]:
#************************************PARAMS*************************************


n_classes = None
batch_size = 128
random_seed = 1995
n_epoch = 1999
input_size = (28, 28, 1)


np.random.seed(seed=random_seed)
tf.random.set_seed(seed=random_seed)

#*******************************************************************************

In [0]:
#******************************DATA PROCESSING**********************************


print("loading data...")


# **** load draws ****
(X_draws, y_string_draws) = load_data(data_path + "/draws-28.pickle", size=input_size[0], _3d=False, invert=False, randomize=False, rand_seed=random_seed)

# **** load icons ****
(X_icons, y_string_icons) = load_data(data_path + "/icons-28.pickle", size=input_size[0], _3d=False, invert=False, randomize=False, rand_seed=random_seed)


# **** check datasets classes ****
X_draws, X_icons, y_string_draws, y_string_icons = check_dataset_classes(X_draws, X_icons, y_string_draws, y_string_icons)


# **** preprocess  draws ****
x_train_draws, x_valid_draws, x_test_draws, y_train_draws, y_valid_draws, y_test_draws = split_dataset(X_draws, y_string_draws, _validation_size=0.2, _test_size=0.1, _random_seed=random_seed, stratify=True)
y_train_draws, y_valid_draws, y_test_draws = labels_preprocessing(y_train_draws, y_valid_draws, y_test_draws)
x_train_draws, y_train_draws = shuffle_with_same_indexes(x_train_draws, y_train_draws, seed=random_seed)
x_valid_draws, y_valid_draws = shuffle_with_same_indexes(x_valid_draws, y_valid_draws, seed=random_seed)
x_test_draws, y_test_draws = shuffle_with_same_indexes(x_test_draws, y_test_draws, seed=random_seed)
x_train_draws, x_valid_draws, x_test_draws = data_preprocessing(x_train_draws), data_preprocessing(x_valid_draws), data_preprocessing(x_test_draws)



# **** preprocess  icons ****
x_train_icons, x_valid_icons, x_test_icons, y_train_icons, y_valid_icons, y_test_icons = split_dataset(X_icons, y_string_icons, _validation_size=0.2, _test_size=0.1, _random_seed=random_seed, stratify=True)
y_train_icons, y_valid_icons, y_test_icons = labels_preprocessing(y_train_icons, y_valid_icons, y_test_icons)
x_train_icons, y_train_icons = shuffle_with_same_indexes(x_train_icons, y_train_icons, seed=random_seed)
x_valid_icons, y_valid_icons = shuffle_with_same_indexes(x_valid_icons, y_valid_icons, seed=random_seed)
x_test_icons, y_test_icons = shuffle_with_same_indexes(x_test_icons, y_test_icons, seed=random_seed)
x_train_icons, x_valid_icons, x_test_icons = data_preprocessing(x_train_icons), data_preprocessing(x_valid_icons), data_preprocessing(x_test_icons)



# **** check datasets ****
X_draws, X_icons, y_string_draws, y_string_icons = check_dataset_classes(X_draws, X_icons, y_string_draws, y_string_icons)

print("data loaded")

#*******************************************************************************

In [0]:

# **** preprocess icons ****
for _i, i in enumerate(x_train_icons): x_train_icons[_i] = contour_img(i)
for _i, i in enumerate(x_valid_icons): x_valid_icons[_i] = contour_img(i)
for _i, i in enumerate(x_test_icons): x_test_icons[_i] = contour_img(i)


In [0]:
# **** merge datasets ****

x_train = np.concatenate((x_train_draws, x_train_icons), axis=0)
x_valid = np.concatenate((x_valid_draws, x_valid_icons), axis=0)
x_test = np.concatenate((x_test_draws, x_test_icons), axis=0)

y_train = np.concatenate((y_train_draws, y_train_icons), axis=0)
y_valid = np.concatenate((y_valid_draws, y_valid_icons), axis=0)
y_test = np.concatenate((y_test_draws, y_test_icons), axis=0)

x_train, y_train = shuffle_with_same_indexes(x_train, y_train, seed=random_seed)
x_valid, y_valid = shuffle_with_same_indexes(x_valid, y_valid, seed=random_seed)
x_test, y_test = shuffle_with_same_indexes(x_test, y_test, seed=random_seed)


In [0]:
is_data_already_prepro = False if np.max(x_train[0]) > 1 else True
print("is data preprocessed? ", is_data_already_prepro)

custom_aug = CustomAug({
      'rescale': not is_data_already_prepro,        # if rescale == True, the alg assumes the data is in format 0-255
      'pad' : True,                           
      'horizontal_flip' : True,             
      'erosion' : True,                       
      'half_aug' : True,
    }
)



class AutoencoderGenerator(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, x, batch_size=128, shuffle=True):
        self.x = x
        self.batch_size = batch_size
        self.shuffle = True
        self.dim = (28,28,1)
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.x) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Generate data
        X, y = self.__data_generation(indexes)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.x))

        if self.shuffle == True:
            np.random.shuffle(self.x)

    def __data_generation(self, indexes):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, *self.dim))
 
        # Generate data
        for i, index in enumerate(indexes):
            # Store sample
            X[i,] = custom_aug.custom_preprocessing(self.x[index])

        return X, X



In [0]:

train_gen = AutoencoderGenerator(x_train)
valid_gen = AutoencoderGenerator(x_valid)

In [0]:


class ReversedSimpleEmbeddingNet(tf.keras.Model):

  def __init__(self):
    self.filter_size = 24
    super(ReversedSimpleEmbeddingNet, self).__init__()

    self.l1_dense   = tf.keras.layers.Dense(768, activation='relu')
    self.l1_resh    = tf.keras.layers.Reshape((4, 4, self.filter_size*2))
    self.l1_conv    = tf.keras.layers.Conv2DTranspose(self.filter_size*2,kernel_size=5,strides=2,padding='same',activation='relu')
    self.l1_batch   = tf.keras.layers.BatchNormalization()
    self.l2_conv    = tf.keras.layers.Conv2DTranspose(self.filter_size*2,kernel_size=3,activation='relu')
    self.l2_batch   = tf.keras.layers.BatchNormalization()
    self.l3_conv    = tf.keras.layers.Conv2DTranspose(self.filter_size*2,kernel_size=3,activation='relu')
    self.l3_drop    = tf.keras.layers.Dropout(0.4)
    self.l3_batch   = tf.keras.layers.BatchNormalization()
    self.l4_conv    = tf.keras.layers.Conv2DTranspose(self.filter_size,kernel_size=5,strides=2,padding='same',activation='relu')
    self.l4_batch   = tf.keras.layers.BatchNormalization()
    self.l5_conv    = tf.keras.layers.Conv2DTranspose(self.filter_size,kernel_size=3,activation='relu')
    self.l5_batch   = tf.keras.layers.BatchNormalization()
    self.l6_conv    = tf.keras.layers.Conv2DTranspose(self.filter_size,kernel_size=3,activation='relu',input_shape=(28,28,1))
    self.l7_conv    = tf.keras.layers.Conv2DTranspose(1,kernel_size=3,activation='sigmoid', padding="same", input_shape=(28,28,1))

  def call(self, x):
    x = self.l1_dense(x)           
    x = self.l1_resh(x)            
    x = self.l1_conv(x)            
    x = self.l1_batch(x)           
    x = self.l2_conv(x)            
    x = self.l2_batch(x)           
    x = self.l3_conv(x)            
    x = self.l3_drop(x)            
    x = self.l3_batch(x)         
    x = self.l4_conv(x)            
    x = self.l4_batch(x)         
    x = self.l5_conv(x)           
    x = self.l5_batch(x)          
    x = self.l6_conv(x)                  
    x = self.l7_conv(x)                            
    return x


class Autoencoder(tf.keras.Model):

  def __init__(self):
    super(Autoencoder, self).__init__()

    self.encoder = SimpleEmbeddingNet()
    self.decoder = ReversedSimpleEmbeddingNet()  

  def call(self, x):

    x = self.encoder.call(x) 
    x = self.decoder.call(x)
    return x


In [0]:

#*********************************************************************************************************************************************
net_callbacks = [
	PlotLosses(),
  tf.keras.callbacks.ModelCheckpoint(weights_path + 'autoencoder_' + "{epoch:02d}"  + '.h5',  monitor='val_loss', verbose=1, period=20, save_best_only=True, mode='min'),
	#tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, min_delta=1, verbose=1, mode='auto', restore_best_weights=True)
]
net_callbacks.append(AutoencoderViz())

print("net_callbacks var created")
#*********************************************************************************************************************************************

lr =  0.008   #0.0001

autoencoder = Autoencoder()
adam = tf.keras.optimizers.Adam(learning_rate=lr)

autoencoder.compile(optimizer=adam, loss='binary_crossentropy', metrics=['accuracy'])
autoencoder.call(tf.zeros((1, 28,28,1)))


history = autoencoder.fit(
    train_gen,
    epochs=n_epoch,
    verbose=1, 
    callbacks=net_callbacks,  
    validation_data=valid_gen,
    initial_epoch=0
)

In [0]:

''' test set must be preprocessed like the training and valid generator data '''
if np.max(x_test) > 1:
    x_test = data_preprocessing(x_test)


autoencoder.load_weights(weights_path + '/autoencoder_20.h5')    
print(autoencoder.evaluate(x_test, x_test))
embedding_net = autoencoder.encoder



In [0]:
X_icons_eval = load_data(path=data_path + "/icons_eval.pickle", size=input_size[0], invert=False, _3d=False, randomize=False, rand_seed=random_seed)
print(len(list(X_icons_eval)))

# delete white images
X_icons_eval = np.asarray([i for i in X_icons_eval if np.min(i) != np.max(i)])

X_icons_eval_edges = data_preprocessing(X_icons_eval)

for _i, i in enumerate(X_icons_eval_edges): X_icons_eval_edges[_i] = contour_img(i)

for i, im in enumerate(X_icons_eval_edges):
  if i < 10:
    show_img(im)

In [0]:
manual_eval = RankImages(X_icons_eval_edges, embedding_net, _n=15, _show_im=False, _show_dist=False)


In [0]:

images = [cv2.imread(os.path.join(data_path, 'targets', im_path), 0) for im_path in os.listdir(os.path.join(data_path, 'targets')) if im_path.endswith('.jpg')]
images = [cv2.resize(i, (input_size[0], input_size[1])) for i in images]

for image in images:
  target_im = image

  target_im = np.expand_dims(data_preprocessing(target_im), axis=-1)

  #it should be commented...
  target_im_edges = target_im
  #target_im_edges = contour_img(target_im)

  res = manual_eval.get_n_most_similar_images(target_im_edges, _returnType='indexes')
  manual_eval.format_result(target_im, [X_icons_eval_edges[r] for r in res])

In [0]:
print("creating vector space...")
x_feat_test_icons = np.array([embedding_net.predict(f[np.newaxis, ...]) for f in x_test_icons])[:, 0, :]
x_feat_test_draws = np.array([embedding_net.predict(f[np.newaxis, ...]) for f in x_test_draws])[:, 0, :]

print("vector space created")

knn = KNN(x_feat_test_icons, y_test_icons, k=10)

y_test_icons_true = y_test_icons                                                # np.array_equal(y_test_icons,y_test_icons) is True
y_test_draws_pred = knn.get_labels(x_feat_test_draws)

get_score(y_test_icons_true, y_test_draws_pred)

