In [None]:
import tensorflow as tf
from tensorflow import keras
from glob import glob
import os
import numpy as np
import random

# set bsize, num val images and list files

In [None]:
class_map = {"acf_alv":0, "acf_chap":1, "reboco":2, "tijolo_alv":3, "tijolo_chap":4}

BATCH_SIZE = 64


train_images_path = glob("resized_dataset_texturas_v2/*/*.jpeg")
random.shuffle(train_images_path)
val_images_path = glob("resized_dataset_texturas_v2_test/*/*.jpeg")
random.shuffle(val_images_path)

train_files = train_images_path
val_files = val_images_path

def filename_to_label(file_names, class_map_, class_index=1):
    labels = []
    for file_name in file_names:
        labels.append(class_map_[file_name.split("/")[class_index]])
    return labels

train_labels = filename_to_label(train_files, class_map)
val_labels = filename_to_label(val_files, class_map)


# define data handling functions

In [None]:
def read_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image.set_shape([256, 256, 3])    
    return image
    

def load_data_for_autoencoder(image_list):
    image = read_image(image_list)    
    image = tf.cast(image, tf.float16) / 256    
    return image, image

def load_data_for_classifier(image_list, label_list, oh_depth):
    image = read_image(image_list)    
    image = tf.cast(image, tf.float16) / 256    
    label = tf.one_hot(label_list, oh_depth, 1.0, 0.0)
    label.set_shape([5])
    return image, label


def autoencoder_data_generator(image_list, drop_remainder=True):
    dataset = tf.data.Dataset.from_tensor_slices((image_list))
    dataset = dataset.shuffle(2048)        
    dataset = dataset.map(load_data_for_autoencoder, num_parallel_calls=tf.data.AUTOTUNE)    
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=drop_remainder, num_parallel_calls=tf.data.AUTOTUNE)    
    return dataset

def classifier_data_generator(image_list, labels_list, oh_depth, drop_remainder=True):
    dataset = tf.data.Dataset.from_tensor_slices((image_list, labels_list, oh_depth))
    dataset = dataset.shuffle(2048)        
    dataset = dataset.map(load_data_for_classifier, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=drop_remainder, num_parallel_calls=tf.data.AUTOTUNE)    
    return dataset


train_dataset = autoencoder_data_generator(train_files)
val_dataset = autoencoder_data_generator(val_files, False)

print("Train Dataset:", train_dataset)
print("Val Dataset:", val_dataset)


# define autoencoder arch

In [None]:
def encoder(x):
    x = keras.layers.DepthwiseConv2D(
        kernel_size=16,
        strides=(16,16),
        padding="same",
        depth_multiplier=8,
    )(x)

    x = keras.layers.Conv2D(
        16,
        kernel_size=1,
        strides=1,
        padding="same",
        activation="tanh",
    )(x)

    x = keras.layers.Conv2D(16, kernel_size=1, name="encoder_output")(x)

    return x


def decoder(x):
    x = keras.layers.Conv2D(16, kernel_size=1, activation="tanh", name="decoder_input")(
        x
    )

    x = keras.layers.UpSampling2D()(x)

    x = keras.layers.Conv2D(
        32,
        kernel_size=5,
        padding="same",
        activation="tanh",
    )(x)

    x = keras.layers.UpSampling2D()(x)

    x = keras.layers.Conv2D(
        32,
        kernel_size=5,
        padding="same",
        activation="relu",
    )(x)
    
    x = keras.layers.UpSampling2D()(x)

    x = keras.layers.Conv2D(
        32,
        kernel_size=5,
        padding="same",
        activation="tanh",
    )(x)

    x = keras.layers.UpSampling2D()(x)

    x = keras.layers.Conv2D(
        32,
        kernel_size=5,
        padding="same",
        activation="relu",
    )(x)

    x = keras.layers.Conv2D(
        3, kernel_size=5, padding="same", activation="sigmoid", name="decoder_output"
    )(x)
    return x


def autoencoder():
    input_layer = keras.layers.Input((256,256,3), name="input_layer")    
    encoder_output = encoder(input_layer)
    decoder_output = decoder(encoder_output)

    model = keras.Model(inputs=[input_layer], outputs=[decoder_output])
    return model

model = autoencoder()
model.summary()



# define loss functions

In [None]:
def custom_loss(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    return tf.reduce_mean(tf.abs(tf.subtract(y_true, y_pred)))
    # return tf.losses.binary_crossentropy(y_true, y_pred, label_smoothing=0.02)

# train model

In [None]:
lr_inicial = 0.00001
lr_default = 0.001
lr_final = 0.00001
epochs = 100

# This function warmup the learning rate in the first 
def schedulerWithWarmupAndDecay(epoch, lr):
    if epoch < int(epochs * 0.1):
        return lr + (lr_default / (epochs * 0.1 + 1))
    elif epoch > int(epochs * 0.60):
        return lr_final
    else:
        return lr_default

# callback for updating the learning rate
lr_callback = tf.keras.callbacks.LearningRateScheduler(schedulerWithWarmupAndDecay)

opt = keras.optimizers.Adam(lr_inicial)
model.compile(optimizer=opt, loss=custom_loss)
model.fit(train_dataset, epochs=epochs, callbacks=[lr_callback])

# save
model.save("saved_models/encoder_v3/")
model.save("saved_models/encoder_v3.h5")


# visualize inputs, outputs, and encoddings

In [None]:
import matplotlib.pyplot as plt

model = tf.keras.models.load_model(
    r"C:\Users\georg\Desktop\texture_encoder\saved_models\encoder_v2",
    custom_objects={"custom_loss": custom_loss},
)

debug_model = keras.Model(
    inputs=[model.layers[0].input],
    outputs=[model.layers[-1].output, model.get_layer("encoder_output").output],
)


examples = train_dataset.take(1)
for e in examples.as_numpy_iterator():
    for img in e[0][0:10]:
        output, encoder_output = debug_model.predict(img[tf.newaxis, ...])
        encoder_output = np.mean(encoder_output, axis=-1, keepdims=True)

        plt.imshow(img.astype(np.float32))
        plt.show()

        plt.imshow(output[0].astype(np.float32))
        plt.show()

        plt.imshow(encoder_output[0].astype(np.float32))
        plt.show()

        print("-------------------------\n\n")
    break

# export encoddings

In [None]:
for image_path in train_files + val_files:
    img, lbl = load_data(image_path)
    output, encoder_output = debug_model.predict(img[tf.newaxis, ...])

    print(encoder_output.shape)
    encoder_output = np.ravel(encoder_output)
    
    print(image_path)
    np.save(image_path.replace(".jpeg", ""), encoder_output)    