In [1]:
import tensorflow as tf
import keras
from keras.layers import Conv2D, Conv2DTranspose
import os
import cv2
import numpy as np
from data_generator import MaskedImageDataGenerator

In [2]:
conv_kwargs = {
    "padding"             : "SAME",
    "activation"          : keras.layers.LeakyReLU(alpha=0.2),
    "kernel_initializer"  : tf.random_normal_initializer(stddev=.1)
}

In [32]:
class Autoencoder(tf.keras.Model):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.encoder = keras.Sequential([
            Conv2D(16, 8, 2, **conv_kwargs),
            Conv2D(16, 8, 2, **conv_kwargs),
            keras.layers.BatchNormalization(),
            Conv2D(16, 8, 1, **conv_kwargs),
            Conv2D(16, 4, 1, **conv_kwargs),
        ], name="ae_encoder")

        self.decoder = keras.Sequential([
            Conv2DTranspose(32, 4, 1, **conv_kwargs),
            Conv2DTranspose(16, 8, 2, **conv_kwargs),
            Conv2DTranspose(3, 8, 2, padding='same', kernel_initializer=tf.random_normal_initializer(stddev=.1), activation='sigmoid')
        ], name='ae_decoder')


    def call(self, inputs):
        inputs = self.encoder(inputs)
        inputs = self.decoder(inputs)
        return inputs


In [13]:
def custom_loss(y_true, y_pred):
    mse_loss = keras.losses.MeanSquaredError()
    mae_loss = keras.losses.MeanAbsoluteError()
    bce_loss = keras.losses.BinaryCrossentropy()
    mse = mse_loss(y_true, y_pred)
    mae = mae_loss(y_true, y_pred)
    bce = bce_loss(y_true, y_pred)
    loss =  .3*mse + .7*mae
    #loss = mae
    return loss

In [33]:
ae_model = Autoencoder(name='autoencoder')

ae_model.build(input_shape = (1,256,256,3))   ## Required to see architecture summary
initial_weights = ae_model.get_weights() ## Just so we can reset out autoencoder

ae_model.summary()
ae_model.encoder.summary()
ae_model.decoder.summary()

ae_model.compile(
    optimizer   = keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss        = custom_loss,
    metrics     = [
       #tf.keras.metrics.MeanSquaredError(),
        tf.keras.metrics.MeanAbsoluteError()
    ]
)

Model: "autoencoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 ae_encoder (Sequential)     (1, 64, 64, 16)           40000     
                                                                 
 ae_decoder (Sequential)     (1, 256, 256, 3)          44083     
                                                                 
Total params: 84083 (328.45 KB)
Trainable params: 84083 (328.45 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Model: "ae_encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_48 (Conv2D)          (1, 128, 128, 16)         3088      
                                                                 
 conv2d_49 (Conv2D)          (1, 64, 64, 16)           16400     
                                                               

In [22]:
import sys

isColab = "google.colab" in sys.modules
data_dir = '../collapsed_data'
# this also works:
# isColab = "COLAB_GPU" in os.environ

if isColab:
    from google.colab import drive
    drive.mount("/content/drive", force_remount=True)

    data_dir = ("/content/drive/MyDrive/collapsed_data")

In [None]:
training_generator = MaskedImageDataGenerator(data_dir + '/train', mask_denom=5, target_size=(256, 256), batch_size=50)
validation_generator = MaskedImageDataGenerator(data_dir + '/validation', mask_denom=5, target_size=(256, 256), batch_size=50)

In [None]:
# Train the model
devices = tf.config.list_physical_devices()
print(devices)
print('Fitting model')
with tf.device('/device:GPU:0'):
    ae_model.fit(training_generator, epochs=10, validation_data=validation_generator)

In [None]:
model_name = 'model.keras'
MODEL_DIR = '../models/'
ae_model.save(MODEL_DIR + model_name)

In [23]:
def get_masked_data(sub_dir, mask_denom=5,target_size=(256,256)):
    dir = os.path.join(data_dir, sub_dir)
    files = os.listdir(dir)
    x = []
    y = []
    for f in files:
        path = os.path.join(dir, f)
        try:
            img = cv2.imread(path)
            img = cv2.resize(img, target_size)
            y.append(img)
        except:
            print(path)
            continue
    y = np.array(y, dtype=np.float32)
    x = np.copy(y)
    h,w = target_size
    sec_w = w//mask_denom
    # Make middle black
    offset = (mask_denom // 2)
    x[:,:,sec_w * offset:sec_w * (offset + 1),:]=0
    y=y/255
    x = x/255
    return x,y


In [25]:
ae_model = keras.models.load_model('../models/newArch-e5-size256.keras',custom_objects={'Autoencoder': Autoencoder, 'custom_loss': custom_loss})

In [26]:
print('Evaluating model on testing data')
x_test,y_test = get_masked_data('test')
test = MaskedImageDataGenerator(data_dir + '/test', mask_denom=5, target_size=(256, 256), batch_size=50)
ae_model.evaluate(test, batch_size=32)

Evaluating model on testing data


[0.0633048266172409, 0.015408388338983059, 0.08383187651634216]

In [27]:
x = x_test[:5]
y = y_test[:5]
pred = ae_model.predict(x)
for i in range(4):
    cv2.imshow('im', x[i])
    cv2.waitKeyEx()
    cv2.imshow('truth', y[i])
    cv2.waitKeyEx()
    cv2.imshow('pred', pred[i])
    cv2.waitKeyEx()
   
cv2.destroyAllWindows()
cv2.waitKey(1)



-1

In [28]:
x = x_test[:5]
h,w=(256,256)
rec_w = w//5
modified = []
for i in range(4):
    img1 = x[i]
    img2 = x[i+1]
    new_img = img1.copy()
    new_img[:,rec_w*3:rec_w*5,:]=img2[:,rec_w*3:rec_w*5,:]
    modified.append(new_img)
modified = np.array(modified)
preds = ae_model.predict(modified)
for i in range(4):
    mod_img = modified[i]
    cv2.imshow('mod', mod_img)
    cv2.waitKeyEx()
    cv2.imshow('pred', preds[i])
    cv2.waitKeyEx()

