In [None]:
#Setup the google drive
import os
#from google.colab import drive
#drive.mount('/content/gdrive')
#os.listdir('/content/gdrive/MyDrive')
#Basic packages
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
np.random.seed(1)
import tensorflow as tf
tf.random.set_seed(2)
#Keras/TensorFlow
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Conv2DTranspose, Input, Concatenate, Dense, Add
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
#Keras image processing
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image
#Seaborn
import seaborn as sns #

In [None]:
# can replace wit full trainPath and TestPath
curPath = os.getcwd()
trainPath = curPath + '/TRAIN'
testPath = curPath + '/TEST'


trainIDs = next(os.walk(trainPath))[1]  # IDs, refers to 6-digit identifier (ie 000000)
train_x = []
train_y = []

for id in trainIDs:
    imPath = trainPath + '/' + id
    imNames = next(os.walk(imPath))[2]  # this will break if there are not at least 3 images in each subdir
    train_x.append([tf.keras.utils.load_img(imPath + '/' + imNames[0], target_size=(256,256)), \
                    tf.keras.utils.load_img(imPath + '/' + imNames[1], target_size=(256,256))])
    train_y.append(tf.keras.utils.load_img(imPath + '/' + imNames[2], target_size=(256,256)))

train_x = np.array(train_x)
train_y = np.array(train_y)

testIDs = next(os.walk(testPath))[1]  # IDs, refers to 6-digit identifier (ie 000000)
test_x = []
test_y = []

for id in testIDs:
    imPath = testPath + '/' + id
    imNames = next(os.walk(imPath))[2]  # this will break if there are not at least 3 images in each subdir
    test_x.append([tf.keras.utils.load_img(imPath + '/' + imNames[0], target_size=(256,256)), \
                    tf.keras.utils.load_img(imPath + '/' + imNames[1], target_size=(256,256))])
    test_y.append(tf.keras.utils.load_img(imPath + '/' + imNames[2], target_size=(256,256)))

test_x = np.array(test_x)
test_y = np.array(test_y)

In [None]:
# x shape (bs, 2)
# y shape (bs)
# images are three channels, but only one is needed since grayscale
fig, ax = plt.subplots(1)

pos = ax.imshow(train_y[0][:,:,0], cmap='plasma')
fig.colorbar(pos, ax=ax)
print(train_y[:,:,:,0].shape)

In [None]:
# kernel = (4,4)
# pool_size = (2,2)
# filters = 32
# loss_rate = 1e-4
# epochs = 2
# batch_size = 128
# val_split = 0.15
# hyperparameters = [kernel,pool_size]

# class customModel(tf.keras.Model):
#     def __init__(self, hyperparameters):
#         super().__init__()
#         self.inp1 = Input(shape=(1024,1024,1))
#         self.inp2 = Input(shape=(1024,1024,1))
#         self.cat = Concatenate(axis=1)
#         self.conv1 = Conv2D(filters, kernel, padding = 'same', activation='relu')
#         self.pool1 = MaxPooling2D(pool_size=pool_size)
#         self.conv2 = Conv2D(filters,kernel, padding = 'same',activation='relu')
#         self.pool2 = MaxPooling2D(pool_size=pool_size)
#         self.tconv1 = Conv2DTranspose(filters, kernel, strides=pool_size, padding = 'same') #Upsampling to get back to 1024x1024
#         self.tconv2 = Conv2DTranspose(1, kernel, strides=pool_size, padding = 'same')
#     def call(self, inputs):
#         x1 = inputs[0]
#         x2 = inputs[1]
#         x = self.cat([x1,x2])
#         x = self.conv1(x)
#         x = self.pool1(x)
#         x = self.conv2(x)
#         x = self.pool2(x)
#         x = self.tconv1(x)
#         return self.tconv2(x)


# model = customModel('dum')
# model.compile(optimizer=Adam(loss_rate),loss='mean_squared_error',metrics=['accuracy'])
# checkpoint = ModelCheckpoint('best_model.keras', monitor='val_accuracy', save_best_only=True, mode='min')
# history=model.fit([train_x[:,0], train_x[:,1]], train_y, epochs=epochs, batch_size=batch_size, callbacks=checkpoint)

In [None]:
kernel = (4,4)
pool_size = (2,2)
filters = [4, 8]
loss_rate = 1e-4
epochs = 10
batch_size = 128
val_split = 0.15

x1 = Input(shape=(256,256,1))
x2 = Input(shape=(256,256,1))
x = Add()([x1,x2])
x = Conv2D(filters[0], kernel, padding = 'same', activation='relu')(x)
x = MaxPooling2D(pool_size=pool_size)(x)
x = Conv2D(filters[0],kernel, padding = 'same',activation='relu')(x)
x = Dense(1, activation='relu')(x)
x = MaxPooling2D(pool_size=(2,2))(x)    # editted this line to change dimension sizes
x = Conv2DTranspose(filters[0], kernel, strides=pool_size, padding = 'same')(x) #Upsampling to get back to 1024x1024
x = Conv2DTranspose(1, kernel, strides=pool_size, padding = 'same')(x)

# # Alternative to MSE loss
# def SSIMLoss(y_true, y_pred):
#   return 1 - tf.reduce_mean(tf.image.ssim(y_true, y_pred, 1.0))


model = Model(inputs=[x1,x2], outputs=x)
model.compile(optimizer=Adam(loss_rate),loss='MAE')
model.summary()
checkpoint = ModelCheckpoint('best_model.keras', monitor='val_loss', save_best_only=True, mode='min')
history=model.fit([train_x[:,0,:,:,0], train_x[:,1,:,:,0]], train_y[:,:,:,0], epochs=epochs, verbose=True, batch_size=batch_size, validation_split = val_split, callbacks=checkpoint)

In [None]:
mae = history.history['loss']
vmae = history.history['val_loss']
plt.figure()
plt.plot(mae, label='Training-LSTM',color='blue',linestyle='-')
plt.plot(vmae, label='Validation-LSTM',color='blue',linestyle='--')
plt.xlabel('Epoch')
plt.ylabel('Value')
plt.legend()

In [None]:
model_best = load_model('best_model.keras')
best_eval = model_best.evaluate([test_x[:,0,:,:,0], test_x[:,1,:,:,0]], test_y,verbose=2)
new_images = model_best.predict([test_x[:,0,:,:,0], test_x[:,1,:,:,0]])

In [None]:
plt.imshow(new_images[0], cmap='plasma')
plt.colorbar()

In [None]:
diff = []
image_mean = []
image_std = []

for i in new_images:
  diff.append(np.array(new_images[i]) - np.array(test_y[i]))
  image_mean.append(np.mean(diff[i]))
  image_std.append(np.std(diff[i]))

total_mean = np.mean(image_mean)
total_std = np.std(image_std)
print("The mean of the pixel differences is ", total_mean, " +/- ", total_std)

In [None]:
n = 10
for i in range(n):
        i = np.random.randint(0,diff.shape[0]) # choose a random number between 0 and len(X_te)
        plt.figure()
        plt.imshow(diff[i], cmap='gray')
        plt.title('Difference between true image and predicted image for the ' + str(i) + 'th image')
        plt.colorbar()

        plt.show()