> ## Introduction:

The code below is adapted from the notebook that uses Autoencoder as Feature Extractor - CIFAR10

I used both general convolutional autoencoder and [U-net](https://arxiv.org/pdf/1505.04597.pdf) model as autoencoder.

The implementation for U-net model is taken from [here](https://towardsdatascience.com/understanding-semantic-segmentation-with-unet-6be4f42d4b47)

The implementation for U-net loss function is taken from [here](https://github.com/shibuiwilliam/Keras_Autoencoder/blob/master/Cifar_Conv_AutoEncoder_UNET.ipynb)

In [None]:
import warnings
warnings.filterwarnings('ignore')

import pickle
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import sys

import tensorflow as tf
from PIL import Image
import skimage
from skimage.viewer import ImageViewer

import keras
from keras.layers import *
from keras import regularizers
from keras.models import Model, Sequential
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from keras.optimizers import SGD, Adam, RMSprop, Adadelta
import keras.backend as K
from keras.losses import mean_squared_error
from keras.preprocessing.image import ImageDataGenerator,array_to_img, img_to_array, load_img
from keras.utils import np_utils
from keras.applications.vgg16 import VGG16


from sklearn.utils import class_weight
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, LabelBinarizer, RobustScaler, StandardScaler
from sklearn.model_selection import train_test_split

> ## Image Data Generator

In [None]:
image_shape = 64
fname = "/kaggle/input/mayadatasetv2/input/train/train_x/x/Female_1.01_scene2.jpg"
img = load_img(fname,target_size=(image_shape,image_shape))
plt.figure(figsize=(20, 4))
plt.imshow(img)
plt.show()

In [None]:
x_float = np.array(img).astype('float32')
x = x_float/255
x = x.reshape((image_shape,image_shape,3))
cropped=x[5:37,16:48,]
print(cropped.shape)
plt.figure(figsize=(20, 4))
plt.imshow(cropped)
plt.show()

In [None]:
input_model_WH = 256
input_model_shape = (256,256,3)
input_image_WH = 512
input_image_shape = (512,512,3)
batch_size = 32

BASE_DIR = "/kaggle/input/mayadatasetv2/input/"
# train_x_dir = BASE_DIR + "train/train_x/"
# train_y_dir= BASE_DIR + "train/train_y/"
train_x_dir = BASE_DIR + "train_with_identity/train_with_identity_x/"
train_y_dir= BASE_DIR + "train_with_identity/train_with_identity_y/"
val_x_dir = BASE_DIR + "val/val_x/"
val_y_dir = BASE_DIR + "val/val_y/"
# test_x_dir = BASE_DIR + "test/test_x/"
# test_y_dir = BASE_DIR + "/test/test_y/"


data_flow_args = dict(
    target_size=(input_image_WH,input_image_WH),
    batch_size=batch_size,
    classes=None, 
    class_mode=None,
    shuffle=False,
    color_mode='rgb')

# A separate dataflow configuration, where batch_size= 1, so that predict generator is in order
test_data_flow_args = dict(
    target_size=(input_image_WH,input_image_WH),
    batch_size=600,
    classes=None, 
    class_mode=None,
    shuffle=False,
    color_mode='rgb')

def im_crop(image):
    im_cropped = image[30:286,130:386,]
    return im_cropped


datagenerator = ImageDataGenerator(rescale=1./255)

# train_x_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/train/train_x/',
#     **data_flow_args)
# train_y_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/train/train_y/',
#     **data_flow_args)
train_x_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/train_with_identity/train_with_identity_x/',
    **data_flow_args)
train_y_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/train_with_identity/train_with_identity_y/',
    **data_flow_args)
val_x_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/val/val_x/',
    **data_flow_args)
val_y_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/val/val_y/',
    **data_flow_args)
# test_x_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/test/test_x/',
#     **test_data_flow_args)
# test_y_batches = datagenerator.flow_from_directory('/kaggle/input/mayadatasetv2/input/test/test_y/',
#     **test_data_flow_args)

def zip_generator(x, y):
    while True:
        batch_x = next(x)
        batch_y = next(y)
        batch_x_crops = np.zeros((batch_x.shape[0], input_model_WH, input_model_WH, 3))
        batch_y_crops = np.zeros((batch_y.shape[0], input_model_WH, input_model_WH, 3))
        # go through each of the images
        for i in range(batch_x.shape[0]):
            batch_x_crops[i] = im_crop(batch_x[i])
            batch_y_crops[i] = im_crop(batch_y[i])
        yield batch_x_crops, batch_y_crops
        
train_batches = zip_generator(train_x_batches, train_y_batches)
val_batches = zip_generator(val_x_batches, val_y_batches)
# test_batches = zip_generator(test_x_batches, test_y_batches)

## Utility functions:

In [None]:
def create_block(input, chs): ## Convolution block of 2 layers for conv autoencoder
    x = input
    for i in range(2):
        x = Conv2D(chs, 3, padding="same")(x)
        x = Activation("relu")(x)
        x = BatchNormalization()(x)
    return x

def conv2d_block(input_tensor, n_filters, kernel_size = 3, batchnorm = True):  ## Convolution block of 2 layers for unet autoencoder
    """Function to add 2 convolutional layers with the parameters passed to it"""
    # first layer
    x = Conv2D(filters = n_filters, kernel_size = (kernel_size, kernel_size),\
              kernel_initializer = 'he_normal', padding = 'same')(input_tensor)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    # second layer
    x = Conv2D(filters = n_filters, kernel_size = (kernel_size, kernel_size),\
              kernel_initializer = 'he_normal', padding = 'same')(input_tensor)
    if batchnorm:
        x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    return x

def showOrigDec(orig, dec,num=10):  ## function used for visualizing original and reconstructed images of the autoencoder model
    
    plt.figure(figsize=(20, 4))
    
    for i in range(num):
        # display original
        ax = plt.subplot(2, num, i+1)
        plt.imshow(orig[i])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        
        # display reconstruction
        ax = plt.subplot(2, num, i +1+num)
        plt.imshow(dec[i])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        
    plt.show()
        
    
def pixel_loss(y_true, y_pred):  ## loss function for using in conv autoencoder models
    mses = mean_squared_error(y_true, y_pred)
    return K.sum(mses, axis=(1,2))

# comparing in feature space
def perceptual_loss(y_true, y_pred):
    vgg = VGG16(include_top=False, weights='imagenet', input_shape=(input_model_WH,input_model_WH,3))
    loss_model = Model(inputs=vgg.input, outputs=vgg.get_layer('block3_conv3').output)
    loss_model.trainable = False
    return K.mean(K.square(loss_model(y_true) - loss_model(y_pred)))


# TODO: compare in pixel level + perceptual_loss
def loss_function(y_true, y_pred):
    return 0.2*pixel_loss(y_true, y_pred)+0.8*perceptual_loss(y_true, y_pred)

'''
replace conv2DTranspose 
''' 
def create_upsample(ngf, mult, input_tensor):
    x = UpSampling2D((2, 2), interpolation='bilinear')(input_tensor)
    x = Conv2D(ngf * mult, (3, 3), strides=(1,1), activation='relu', kernel_initializer='he_normal', padding='same')(x)
    return x

In [None]:
x, y = next(train_batches)

showOrigDec(x,y,0)

## Autoencoder Models(Unet and General Convolutional AE):

In [None]:
def get_unet(n_filters = 16, dropout = 0.1, batchnorm = True):
    input_img = Input(input_model_shape)
    # Contracting Path
    c1 = conv2d_block(input_img, n_filters * 1, kernel_size = 3, batchnorm = batchnorm)
    p1 = MaxPooling2D((2, 2))(c1)
    p1 = Dropout(dropout)(p1)

    c2 = conv2d_block(p1, n_filters * 2, kernel_size = 3, batchnorm = batchnorm)
    p2 = MaxPooling2D((2, 2))(c2)
    p2 = Dropout(dropout)(p2)

    c3 = conv2d_block(p2, n_filters * 4, kernel_size = 3, batchnorm = batchnorm)
    p3 = MaxPooling2D((2, 2))(c3)
    p3 = Dropout(dropout)(p3)

    c4 = conv2d_block(p3, n_filters * 8, kernel_size = 3, batchnorm = batchnorm)
    p4 = MaxPooling2D((2, 2))(c4)
    p4 = Dropout(dropout)(p4)

    c5 = conv2d_block(p4, n_filters = n_filters * 16, kernel_size = 3, batchnorm = batchnorm)

    # Expansive Path
#     u6 = Conv2DTranspose(n_filters * 8, (3, 3), strides = (2, 2), padding = 'same')(c5)
    u6 = create_upsample(n_filters, 8, c5)
    u6 = concatenate([u6, c4])
    u6 = Dropout(dropout)(u6)
    c6 = conv2d_block(u6, n_filters * 8, kernel_size = 3, batchnorm = batchnorm)

#     u7 = Conv2DTranspose(n_filters * 4, (3, 3), strides = (2, 2), padding = 'same')(c6)
    u7 = create_upsample(n_filters, 4, c6)
    u7 = concatenate([u7, c3])
    u7 = Dropout(dropout)(u7)
    c7 = conv2d_block(u7, n_filters * 4, kernel_size = 3, batchnorm = batchnorm)

#     u8 = Conv2DTranspose(n_filters * 2, (3, 3), strides = (2, 2), padding = 'same')(c7)
    u8 = create_upsample(n_filters, 2, c7)
    u8 = concatenate([u8, c2])
    u8 = Dropout(dropout)(u8)
    c8 = conv2d_block(u8, n_filters * 2, kernel_size = 3, batchnorm = batchnorm)

#     u9 = Conv2DTranspose(n_filters * 1, (3, 3), strides = (2, 2), padding = 'same')(c8)
    u9 = create_upsample(n_filters,1, c8)
    u9 = concatenate([u9, c1])
    u9 = Dropout(dropout)(u9)
    c9 = conv2d_block(u9, n_filters * 1, kernel_size = 3, batchnorm = batchnorm)

    outputs = Conv2D(3, (1, 1), activation='sigmoid')(c9)
    model = Model(inputs=[input_img], outputs=[outputs])
    return model


def get_conv():
    
    input = Input(input_model_shape)
    
    # Encoder
    block1 = create_block(input, 32)
    x = MaxPool2D(2)(block1)
    block2 = create_block(x, 64)
    x = MaxPool2D(2)(block2)
    
    
    #Middle
    middle = create_block(x, 128)
#     middle = AdaIN()(encoder.outputs)
    
    # Decoder
    up1 = UpSampling2D((2,2))(middle)
    block3 = create_block(up1, 64)
    #up1 = UpSampling2D((2,2))(block3)
    up2 = UpSampling2D((2,2))(block3)
    block4 = create_block(up2, 32)
    #up2 = UpSampling2D((2,2))(block4)
    
    # output
    x = Conv2D(3, 1)(up2)
    output = Activation("sigmoid")(x)
    
    return Model(input, output)

In [None]:
def run_ae(m):  ## function for choosing unet/general conv autoencoder
    if m=='unet':
        return get_unet()
    elif m=='ae':
        model = get_conv()
        return model

## Run U-Net:

In [None]:
model_unet = run_ae('unet')
model_unet.compile(optimizer='adam', loss=loss_function)
model_unet.summary()

In [None]:
er = EarlyStopping(monitor='val_loss', patience=50, mode='min',restore_best_weights=True)
lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10)
callbacks = [er, lr]
# history = model_unet.fit(x_train, y_train, 
#                          batch_size=512,
#                          epochs=100,
#                          verbose=1,
#                          validation_data=(x_val, y_val),
#                          shuffle=True, callbacks=callbacks)

history = model_unet.fit_generator(train_batches,
                    steps_per_epoch = train_x_batches.samples // batch_size,
                    epochs=200,
                    verbose=1, 
                    validation_data=val_batches,
                    validation_steps = val_x_batches.samples // batch_size,
#                     callbacks=callbacks,
                    use_multiprocessing=True)

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='lower right')
plt.show()

In [None]:
import random

image_shape = 512
def load_reshape_img(fname, image_shape=512):
    img = load_img(fname,target_size=(image_shape,image_shape))
    x_float = np.array(img).astype('float32')
    x = x_float/255
    x = x.reshape((image_shape,image_shape,3))
    return x

def generate_test():
    MAIN_x_DIR = "../input/mayadatasetv2/input/test/test_x/x/"
    MAIN_y_DIR = "../input/mayadatasetv2/input/test/test_y/y/"
    image_x_df = []
    image_y_df = []
    female_idx = [18, 18, 21, 22, 19, 22, 21, 19, 21, 20]
    angle_idx = [6, 8, 6, 2, 8, 10, 8, 8, 11, 6]
    scene = [4, 6, 6, 7, 2, 2, 1, 7, 9, 1]
    for idx in range(1,11):
        # ../input/mayadatasetv2/input/test/test_x/x/Female_18.05_scene9.jpg
        # ../input/mayadatasetv2/input/test/test_x/x/Female_18.04_scene9.jpg
        # ../input/mayadatasetv2/input/test/test_x/x/Female_18.04_scene1.jpg
        IMAGE_PATH = "Female_"+str(female_idx[idx-1])+"."+'{:02}'.format(angle_idx[idx-1])+"_scene"+str(scene[idx-1])+".jpg"
        im_x = load_reshape_img(MAIN_x_DIR+IMAGE_PATH)
        im_y = load_reshape_img(MAIN_y_DIR+IMAGE_PATH)
        image_x_df.append(im_x[30:286,130:386,])
        image_y_df.append(im_y[30:286,130:386,])
    return np.array(image_x_df),np.array(image_y_df)

# def generate_real_test():
#     #../input/real-specular/real_specular/Image-12.jpg
#     MAIN_DIR = "/kaggle/input/d/ooiyueying/real-specular/real_specular/"
#     image_df = []
#     for i in range(12,18):
#         print(MAIN_DIR+"Image-"+str(i)+".jpg")
#         im = load_reshape_img( MAIN_DIR+"Image-"+str(i)+".jpg", 256)
#         image_df.append(im)
#     return np.array(image_df)

def generate_real_test():
    #../input/real-specular/real_specular/Image-12.jpg
    MAIN_DIR = "../input/realspecularimages/real_specular_images/"
    image_df = []
    for i in range(8):
        print(MAIN_DIR+"Capture"+str(i)+".jpg")
        im = load_reshape_img( MAIN_DIR+"Capture"+str(i)+".JPG", 256)
        image_df.append(im)
    return np.array(image_df)

In [None]:
import os
os.listdir("../input/realspecularimages/real_specular_images/")

In [None]:
x = generate_real_test()


recon_test_unet = model_unet.predict(x)

showOrigDec(x, recon_test_unet, 6)


In [None]:
x, y = generate_test()

showOrigDec(x, y, 10)

In [None]:
recon_test_unet = model_unet.predict(x)

showOrigDec(x, recon_test_unet, 10)

In [None]:
x, y = generate_test()

showOrigDec(x, y, 10)

In [None]:
# recon_test_unet = model_unet.predict(x)

# showOrigDec(x, recon_test_unet, 10)

In [None]:
# x, y = generate_test()

# showOrigDec(x, y, 10)

In [None]:
# recon_test_unet = model_unet.predict(x)

# showOrigDec(x, recon_test_unet, 10)

## Run Convolutional AE:

In [None]:
# model_ae = run_ae('ae')
# model_ae.compile("adam", loss=loss_function)
# model_ae.summary()

In [None]:
# er = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
# lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_delta=0.0001)
# callbacks = [er, lr]
# history = model_ae.fit(x_train, y_train, 
#                        batch_size=128,
#                        epochs=100,
#                        verbose=1,
#                        validation_data=(x_val, y_val),
#                        shuffle=True, callbacks=callbacks)

# history = model_ae.fit_generator(train_batches,
#                     steps_per_epoch = train_x_batches.samples // batch_size,
#                     epochs=70,
#                     verbose=1, 
#                     validation_data=val_batches,
#                     validation_steps = val_x_batches.samples // batch_size,
# #                     callbacks=callbacks,
#                     use_multiprocessing=True)

In [None]:
# plt.plot(history.history['loss'])
# plt.plot(history.history['val_loss'])
# plt.title('Model Loss')
# plt.ylabel('Loss')
# plt.xlabel('Epoch')
# plt.legend(['Train', 'Val'], loc='lower right')
# plt.show()

In [None]:
# recon_test_ae = model_ae.predict_generator(test_batches, 10)
# recon_valid_ae = model_ae.predict_generator(val_batches, 10)
# recon_train_ae = model_ae.predict_generator(train_batches, 10)

In [None]:
# showOrigDec(x, y, 10)

In [None]:
# recon_test_unet = model_ae.predict(x)

# showOrigDec(x, recon_test_unet, 10)