In [None]:
import os
import numpy as np
import tensorflow as tf
from sklearn.neighbors import NearestNeighbors
from multiprocessing import Pool
from skimage.transform import resize
import os
import skimage.io
from multiprocessing import Pool
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import offsetbox
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from sklearn import manifold
import random
import pandas as pd
from sklearn.model_selection import train_test_split

In [None]:
def __init__(self, modelName, info):
        self.modelName = modelName
        self.info = info
        self.autoencoder = None
        self.encoder = None
        self.decoder = None
def fit(self, X, n_epochs=50, batch_size=256):
        indices_fracs = split(fracs=[0.9, 0.1], N=len(X), seed=0)
        X_train, X_valid = X[indices_fracs[0]], X[indices_fracs[1]]
        self.autoencoder.fit(X_train, X_train,
                             epochs = n_epochs,
                             batch_size = batch_size,
                             shuffle = True,
                             validation_data = (X_valid, X_valid))
def predict(self, X):
        return self.encoder.predict(X)

In [None]:
def set_arch(self):

        shape_img = self.info["shape_img"]
        shape_img_flattened = (np.prod(list(shape_img)),)

        # Set encoder and decoder graphs


        n_hidden_1, n_hidden_2, n_hidden_3 = 16, 8, 8
        convkernel = (3, 3)  # convolution kernel
        poolkernel = (2, 2)  # pooling kernel

        input = tf.keras.layers.Input(shape=shape_img)
        x = tf.keras.layers.Conv2D(n_hidden_1, convkernel, activation='relu', padding='same')(input)
        x = tf.keras.layers.MaxPooling2D(poolkernel, padding='same')(x)
        x = tf.keras.layers.Conv2D(n_hidden_2, convkernel, activation='relu', padding='same')(x)
        x = tf.keras.layers.MaxPooling2D(poolkernel, padding='same')(x)
        x = tf.keras.layers.Conv2D(n_hidden_3, convkernel, activation='relu', padding='same')(x)
        encoded = tf.keras.layers.MaxPooling2D(poolkernel, padding='same')(x)

        x = tf.keras.layers.Conv2D(n_hidden_3, convkernel, activation='relu', padding='same')(encoded)
        x = tf.keras.layers.UpSampling2D(poolkernel)(x)
        x = tf.keras.layers.Conv2D(n_hidden_2, convkernel, activation='relu', padding='same')(x)
        x = tf.keras.layers.UpSampling2D(poolkernel)(x)
        x = tf.keras.layers.Conv2D(n_hidden_1, convkernel, activation='relu')(x)
        x = tf.keras.layers.UpSampling2D(poolkernel)(x)
        decoded = tf.keras.layers.Conv2D(shape_img[2], convkernel, activation='sigmoid', padding='same')(x)


        # Create autoencoder model
        autoencoder = tf.keras.Model(input, decoded)
        input_autoencoder_shape = autoencoder.layers[0].input_shape[1:]
        output_autoencoder_shape = autoencoder.layers[-1].output_shape[1:]

        # Create encoder model
        encoder = tf.keras.Model(input, encoded)  # set encoder
        input_encoder_shape = encoder.layers[0].input_shape[1:]
        output_encoder_shape = encoder.layers[-1].output_shape[1:]

        # Create decoder model
        decoded_input = tf.keras.Input(shape=output_encoder_shape)

        decoded_output = autoencoder.layers[-7](decoded_input)  # Conv2D
        decoded_output = autoencoder.layers[-6](decoded_output)  # UpSampling2D
        decoded_output = autoencoder.layers[-5](decoded_output)  # Conv2D
        decoded_output = autoencoder.layers[-4](decoded_output)  # UpSampling2D
        decoded_output = autoencoder.layers[-3](decoded_output)  # Conv2D
        decoded_output = autoencoder.layers[-2](decoded_output)  # UpSampling2D
        decoded_output = autoencoder.layers[-1](decoded_output)  # Conv2D
        
        decoder = tf.keras.Model(decoded_input, decoded_output)
        decoder_input_shape = decoder.layers[0].input_shape[1:]
        decoder_output_shape = decoder.layers[-1].output_shape[1:]
        print("\nautoencoder.summary():")
        print(autoencoder.summary())
        print("\nencoder.summary():")
        print(encoder.summary())
        print("\ndecoder.summary():")
        print(decoder.summary())
        self.autoencoder = autoencoder
        self.encoder = encoder
        self.decoder = decoder

In [None]:
def load_models(self, loss="binary_crossentropy", optimizer="adam"):
        print("Loading models...")
        self.autoencoder = tf.keras.models.load_model(self.info["autoencoderFile"])
        self.encoder = tf.keras.models.load_model(self.info["encoderFile"])
        self.decoder = tf.keras.models.load_model(self.info["decoderFile"])
        self.autoencoder.compile(optimizer=optimizer, loss=loss)
        self.encoder.compile(optimizer=optimizer, loss=loss)
        self.decoder.compile(optimizer=optimizer, loss=loss)

In [None]:
def read_img(filePath):
    return skimage.io.imread(filePath, as_gray=False)

In [None]:
def read_imgs_dir(dirPath, extensions):
    args = [os.path.join(dirPath, filename)
            for filename in os.listdir(dirPath)
            if any(filename.lower().endswith(ext) for ext in extensions)]
    args_sorted = sorted(args)
    imgs = [read_img(arg) for arg in args_sorted]
    return imgs

In [None]:
def save_img(filePath, img):
    skimage.io.imsave(filePath, img)

In [None]:
def plot_img(img, range=[0, 255]):
    plt.imshow(img, vmin=range[0], vmax=range[1])
    plt.xlabel("xpixels")
    plt.ylabel("ypixels")
    plt.tight_layout()
    plt.show()
    plt.close()

In [None]:
def plot_query_retrieval(img_query, imgs_retrieval, img_ids, outFile, art_info="art_info",):
    n_retrieval = len(imgs_retrieval)
    fig = plt.figure(figsize=(2*n_retrieval, 4))
    fig.suptitle(f"Similar images")

    # Plot query image
    ax = plt.subplot(2, n_retrieval, 0 + 1)
    plt.imshow(img_query)
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
    for axis in ['top', 'bottom', 'left', 'right']:
        ax.spines[axis].set_linewidth(4)  # increase border thickness
        ax.spines[axis].set_color('black')  # set to black
    ax.set_title("query",  fontsize=10)  # set subplot title
    
    count = 0
    # Plot retrieval images
    for i, img in enumerate(imgs_retrieval):
        ax = plt.subplot(2, n_retrieval, n_retrieval + i + 1)
        plt.imshow(img)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        img_title = f"{art_info.iloc[img_ids[count]].Artwork}"
        img_artist = f"{art_info.iloc[img_ids[count]].Artist}"
        for axis in ['top', 'bottom', 'left', 'right']:
            ax.spines[axis].set_linewidth(1)  # set border thickness
            ax.spines[axis].set_color('black')  # set to black
        ax.set_title(f"{img_title} by {img_artist}", fontsize=10)  # set subplot title
        count+=1
    if outFile is None:
        plt.show()
    else:
        plt.savefig(outFile, bbox_inches='tight')
    plt.close()

In [None]:
def apply_transformer(imgs, transformer, parallel=True):
    if parallel:
        pool = Pool()
        imgs_transform = pool.map(transformer, [img for img in imgs])
        pool.close()
        pool.join()
    else:
        imgs_transform = [transformer(img) for img in imgs]
    return imgs_transform

In [None]:
def normalize_img(img):
    return img / 255.

In [None]:
def resize_img(img, shape_resized):
    img_resized = resize(img, shape_resized,
                         anti_aliasing=True,
                         preserve_range=True)
    assert img_resized.shape == shape_resized
    return img_resized

In [None]:
def flatten_img(img):
    return img.flatten("C")

In [None]:
def split(fracs, N, seed):
    fracs = [round(frac, 2) for frac in fracs]
    if sum(fracs) != 1.00:
        raise Exception("fracs do not sum to one!")

    # Shuffle ordered indices
    indices = list(range(N))
    random.Random(seed).shuffle(indices)
    indices = np.array(indices, dtype=int)

    # Get numbers per group
    n_fracs = []
    for i in range(len(fracs) - 1):
        n_fracs.append(int(max(fracs[i] * N, 0)))
    n_fracs.append(int(max(N - sum(n_fracs), 0)))

    if sum(n_fracs) != N:
        raise Exception("n_fracs do not sum to N!")

    # Sample indices
    n_selected = 0
    indices_fracs = []
    for n_frac in n_fracs:
        indices_frac = indices[n_selected:n_selected + n_frac]
        indices_fracs.append(indices_frac)
        n_selected += n_frac

    # Check no intersections
    for a, indices_frac_A in enumerate(indices_fracs):
        for b, indices_frac_B in enumerate(indices_fracs):
            if a == b:
                continue
            if is_intersect(indices_frac_A, indices_frac_B):
                raise Exception("there are intersections!")

    return indices_fracs

In [None]:
def is_intersect(arr1, arr2):
    n_intersect = len(np.intersect1d(arr1, arr2))
    if n_intersect == 0: return False
    else: return True

In [None]:
outDir = "/content/drive/MyDrive/lewagon_gandy/autoencoder2"
if not os.path.exists(outDir):
    os.makedirs(outDir)

In [None]:
parallel = True  # use multicore processing
dataDir = "/content/drive/MyDrive/lewagon_gandy/abstract_ex2"

In [None]:

extensions = [".jpg", ".jpeg"]
all_images = read_imgs_dir(dataDir, extensions)

In [None]:
all_images

[array([[[239, 222, 206],
         [239, 222, 206],
         [239, 222, 206],
         ...,
         [238, 222, 207],
         [238, 222, 207],
         [238, 222, 207]],
 
        [[239, 222, 206],
         [239, 222, 206],
         [239, 222, 206],
         ...,
         [238, 222, 207],
         [238, 222, 207],
         [237, 221, 206]],
 
        [[239, 222, 206],
         [239, 222, 206],
         [239, 222, 206],
         ...,
         [238, 222, 207],
         [237, 221, 206],
         [236, 220, 205]],
 
        ...,
 
        [[238, 220, 206],
         [240, 222, 208],
         [242, 224, 210],
         ...,
         [237, 219, 205],
         [237, 219, 205],
         [236, 218, 204]],
 
        [[238, 220, 206],
         [240, 222, 208],
         [242, 224, 210],
         ...,
         [237, 219, 205],
         [237, 219, 205],
         [236, 218, 204]],
 
        [[238, 220, 206],
         [240, 222, 208],
         [242, 224, 210],
         ...,
         [236, 218, 204],
  

In [None]:
shape_img = all_images[1].shape
print(f"Image shape = {shape_img}")

Image shape = (128, 128, 3)


In [None]:
class AutoEncoder():
    def __init__(self, modelName, info):
        self.modelName = modelName
        self.info = info
        self.autoencoder = None
        self.encoder = None
        self.decoder = None
    def fit(self, X, n_epochs=500, batch_size=256):
        indices_fracs = split(fracs=[0.9, 0.1], N=len(X), seed=0)
        X_train, X_valid = X[indices_fracs[0]], X[indices_fracs[1]]
        self.autoencoder.fit(X_train, X_train,
                             epochs = n_epochs,
                             batch_size = batch_size,
                             shuffle = True,
                             validation_data = (X_valid, X_valid))

    # Inference
    def predict(self, X):
        return self.encoder.predict(X)

    # Set neural network architecture
    def set_arch(self):

        shape_img = self.info["shape_img"]
        shape_img_flattened = (np.prod(list(shape_img)),)

        # Set encoder and decoder graphs

        n_hidden_1, n_hidden_2, n_hidden_3 = 16, 8, 8
        convkernel = (3, 3)  # convolution kernel
        poolkernel = (2, 2)  # pooling kernel

        input = tf.keras.layers.Input(shape=shape_img)
        x = tf.keras.layers.Conv2D(n_hidden_1, convkernel, activation='relu', padding='same')(input)
        x = tf.keras.layers.MaxPooling2D(poolkernel, padding='same')(x)
        x = tf.keras.layers.Conv2D(n_hidden_2, convkernel, activation='relu', padding='same')(x)
        x = tf.keras.layers.MaxPooling2D(poolkernel, padding='same')(x)
        x = tf.keras.layers.Conv2D(n_hidden_3, convkernel, activation='relu', padding='same')(x)
        encoded = tf.keras.layers.MaxPooling2D(poolkernel, padding='same')(x)

        x = tf.keras.layers.Conv2D(n_hidden_3, convkernel, activation='relu', padding='same')(encoded)
        x = tf.keras.layers.UpSampling2D(poolkernel)(x)
        x = tf.keras.layers.Conv2D(n_hidden_2, convkernel, activation='relu', padding='same')(x)
        x = tf.keras.layers.UpSampling2D(poolkernel)(x)
        x = tf.keras.layers.Conv2D(n_hidden_1, convkernel, activation='relu', padding = 'same')(x)
        x = tf.keras.layers.UpSampling2D(poolkernel)(x)
        decoded = tf.keras.layers.Conv2D(shape_img[2], convkernel, activation='sigmoid', padding='same')(x)

        # Create autoencoder model
        autoencoder = tf.keras.Model(input, decoded)
        input_autoencoder_shape = autoencoder.layers[0].input_shape[1:]
        output_autoencoder_shape = autoencoder.layers[-1].output_shape[1:]

        # Create encoder model
        encoder = tf.keras.Model(input, encoded)  # set encoder
        input_encoder_shape = encoder.layers[0].input_shape[1:]
        output_encoder_shape = encoder.layers[-1].output_shape[1:]

        # Create decoder model
        decoded_input = tf.keras.Input(shape=output_encoder_shape)

        
        decoded_output = autoencoder.layers[-7](decoded_input)  # Conv2D
        decoded_output = autoencoder.layers[-6](decoded_output)  # UpSampling2D
        decoded_output = autoencoder.layers[-5](decoded_output)  # Conv2D
        decoded_output = autoencoder.layers[-4](decoded_output)  # UpSampling2D
        decoded_output = autoencoder.layers[-3](decoded_output)  # Conv2D
        decoded_output = autoencoder.layers[-2](decoded_output)  # UpSampling2D
        decoded_output = autoencoder.layers[-1](decoded_output)  # Conv2D

        decoder = tf.keras.Model(decoded_input, decoded_output)
        decoder_input_shape = decoder.layers[0].input_shape[1:]
        decoder_output_shape = decoder.layers[-1].output_shape[1:]

        # Generate summaries
        print("\nautoencoder.summary():")
        print(autoencoder.summary())
        print("\nencoder.summary():")
        print(encoder.summary())
        print("\ndecoder.summary():")
        print(decoder.summary())

        # Assign models
        self.autoencoder = autoencoder
        self.encoder = encoder
        self.decoder = decoder

    # Compile
    def compile(self, loss="binary_crossentropy", optimizer="adam"):
        self.autoencoder.compile(optimizer=optimizer, loss=loss)

    # Load model architecture and weights
    def load_models(self, loss="binary_crossentropy", optimizer="adam"):
        print("Loading models...")
        self.autoencoder = tf.keras.models.load_model(self.info["autoencoderFile"])
        self.encoder = tf.keras.models.load_model(self.info["encoderFile"])
        self.decoder = tf.keras.models.load_model(self.info["decoderFile"])
        self.autoencoder.compile(optimizer=optimizer, loss=loss)
        self.encoder.compile(optimizer=optimizer, loss=loss)
        self.decoder.compile(optimizer=optimizer, loss=loss)

    # Save model architecture and weights to file
    def save_models(self):
        print("Saving models...")
        self.autoencoder.save(self.info["autoencoderFile"])
        self.encoder.save(self.info["encoderFile"])
        self.decoder.save(self.info["decoderFile"])

In [None]:
# Set up autoencoder
modelName = 'abstractex2'
info = {
        "shape_img": shape_img,
        "autoencoderFile": os.path.join(outDir, f"{modelName}_autoecoder.h5"),
        "encoderFile": os.path.join(outDir, f"{modelName}_encoder.h5"),
        "decoderFile": os.path.join(outDir, f"{modelName}_decoder.h5"),}

model = AutoEncoder(modelName, info)
model.set_arch()


shape_img_resize = shape_img
input_shape_model = tuple([int(x) for x in model.encoder.input.shape[1:]])
output_shape_model = tuple([int(x) for x in model.encoder.output.shape[1:]])
n_epochs = 500



autoencoder.summary():
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 128, 128, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 128, 128, 16)      448       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 64, 64, 16)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 64, 64, 8)         1160      
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 32, 32, 8)        0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 3

In [None]:
input_shape_model

(128, 128, 3)

In [None]:
output_shape_model

(16, 16, 8)

In [None]:
class ImageTransformer(object):

    def __init__(self, shape_resize):
        self.shape_resize = shape_resize

    def __call__(self, img):
        img_transformed = resize_img(img, self.shape_resize)
        img_transformed = normalize_img(img_transformed)
        return img_transformed

In [None]:
transformer = ImageTransformer(shape_img_resize)
print("Applying image transformer to training images ...")
imgs_train_transformed = apply_transformer(all_images, transformer, 
                                           parallel=parallel)

Applying image transformer to training images ...


In [None]:
art_info = pd.read_csv("/content/drive/MyDrive/lewagon_gandy/wikiart_scraped.csv")
art_info = art_info[art_info['Style']=="Abstract-Expressionism"]
art_info.head()

Unnamed: 0,Style,Artwork,Artist,Date,Link
77456,Abstract-Expressionism,Heraldic eagle,Victor Hugo,1855,https://uploads0.wikiart.org/images/victor-hug...
77457,Abstract-Expressionism,"Lace impression, spectral form",Victor Hugo,1855,https://uploads7.wikiart.org/images/victor-hug...
77458,Abstract-Expressionism,Lace and Ghosts,Victor Hugo,1856,https://uploads2.wikiart.org/images/victor-hug...
77459,Abstract-Expressionism,The cellist,Victor Hugo,1856,https://uploads7.wikiart.org/images/victor-hug...
77460,Abstract-Expressionism,Taches with fingerprints,Victor Hugo,1865,https://uploads2.wikiart.org/images/victor-hug...


In [None]:
X_train = np.array(imgs_train_transformed).reshape((-1,) + input_shape_model)

In [None]:

# history = model.fit(X_train,n_epochs=200, batch_size=64)
# model.save_models()
model.load_models()
model.compile(loss="binary_crossentropy", optimizer="adam")

Loading models...


In [None]:
E_train = model.predict(X_train)
E_train_flatten = E_train.reshape((-1, np.prod(output_shape_model)))

In [None]:
def plot_reconstructions(imgs, imgs_reconstruct, outFile,
                         range_imgs=[0, 255],
                         range_imgs_reconstruct=[0, 1]):
    # Create plot to save
    assert len(imgs) == len(imgs_reconstruct)
    fig = plt.figure(figsize=(20, 4))
    fig.suptitle("Image Reconstructions", fontsize=35)
    n = min(len(imgs), 10)
    for i in range(n):

        # Plot original image
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(imgs[i],
                   vmin=range_imgs[0],
                   vmax=range_imgs[1])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # Plot reconstructed image
        ax = plt.subplot(2, n, n + i + 1)
        plt.imshow(imgs_reconstruct[i],
                   vmin=range_imgs_reconstruct[0],
                   vmax=range_imgs_reconstruct[1])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    if outFile is None:
        plt.show()
    else:
        plt.savefig(outFile, bbox_inches='tight')
    plt.close()

In [None]:
print("Visualizing database image reconstructions...")
imgs_train_reconstruct = model.decoder.predict(E_train)
filename = f"{modelName}_loaded_reconstruct.png"
plot_reconstructions(all_images, imgs_train_reconstruct,
                         os.path.join(outDir, filename),
                         range_imgs=[0, 255],
                         range_imgs_reconstruct=[0, 1])
print(f"File saved at {os.path.join(outDir, filename)}")

Visualizing database image reconstructions...
File saved at /content/drive/MyDrive/lewagon_gandy/autoencoder2/abstractex2_loaded_reconstruct.png


In [None]:
# Taking test images from other directory to map onto dataset
imgs_test = read_imgs_dir("/content/drive/MyDrive/lewagon_gandy/test_images", extensions)
imgs_test_transformed = apply_transformer(imgs_test, transformer, 
                                          parallel=parallel)
X_test = np.array(imgs_test_transformed).reshape((-1,) + input_shape_model)

In [None]:
E_test = model.predict(X_test)
E_test_flatten = E_test.reshape((-1, np.prod(output_shape_model)))

In [None]:
print("Fitting k-nearest-neighbour model on training images 1...")
knn = NearestNeighbors(n_neighbors=4, metric="cosine")
knn.fit(E_train_flatten)

Fitting k-nearest-neighbour model on training images 1...


NearestNeighbors(metric='cosine', n_neighbors=4)

In [None]:
for i, emb_flatten in enumerate(E_test_flatten):
    _, indices = knn.kneighbors([emb_flatten]) # find k nearest train neighbours
    img_query = imgs_test[i] # query image
    imgs_retrieval = [all_images[idx] for idx in indices.flatten()] # retrieval images
    img_ids = [idx for idx in indices.flatten()]
    outFile = os.path.join(outDir, "{}_retrieval_{}.png".format(modelName, i))
    plot_query_retrieval(img_query, imgs_retrieval, img_ids, outFile, art_info = art_info)

In [None]:
art_info.iloc[1146].Artwork

'Damn Braces'