In [1]:
from keras import layers
from keras.models import Model, Sequential
from keras.utils import plot_model
from keras import backend as K
#from tqdm import tqdm
import matplotlib.pyplot as plt
from IPython import display

Using TensorFlow backend.


In [2]:
def res_block(y, nb_channels, _strides = (1,1), _project_shortcut=False):
    shortcut = y

    y = layers.Conv2D(nb_channels, kernel_size=(3, 3), strides=_strides, padding='same')(y)
    y = layers.BatchNormalization()(y)
    y = layers.ReLU()(y)

    y = layers.Conv2D(nb_channels, kernel_size=(3, 3), strides=_strides, padding='same')(y)
    y = layers.BatchNormalization()()

    if _project_shortcut or _strides != (1, 1):
        shortcut = layers.Conv2D(nb_channels, kernel_size=(1, 1), strides=_strides, padding='same')(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    y = layers.add([shortcut, y])
    #y = layers.LeakyReLU()(y)

    return y

In [3]:
def res_net(x, nb_channels, _strides=(1, 1)):
    x = layers.Conv2D(64, kernel_size=(3, 3), strides=_strides, padding='same', activation='relu')(x)
    shortcut = x
    for _ in range(16):
        x = res_block(x, 64)

    x = layers.Conv2D(64, kernel_size=(3, 3), strides=_strides, padding='same', activation='relu')(x)
    x = layers.add([shortcut, x])
    
    return x

In [4]:
def conv_net(x, nb_channels, _strides=(1, 1)):
    x = layers.Conv2D(32, kernel_size=(3, 3), strides=_strides, padding='same', activation='relu')(x)
    #x = layers.Conv2D(64, kernel_size=(3, 3), strides=_strides, padding='same', activation='relu')(x)
    
    return x

In [5]:
def post_net(y, nb_channels, _strides=(1, 1)):
    #y = layers.Conv2D(64, kernel_size=(3, 3), strides=_strides, padding='same', activation='relu')(y)
    #y = layers.Conv2D(32, kernel_size=(3, 3), strides=_strides, padding='same', activation='relu')(y)
    y = layers.Conv2D(3, kernel_size=(3, 3), strides=_strides, padding='same', activation='linear')(y)
    
    return y

In [6]:
import cv2
import numpy as np

def load_imgs(path, number, train_type):
    result=np.empty((number, 64, 64, 3), dtype="float64")
    for i in range(number):
        I = cv2.imread(path + "{:04}_{}.jpeg".format(i+1, train_type))
        result[i, :, :, :] = I
    return result/result.max()

In [7]:
#inport training data
dataNum = 1000
x1_train = load_imgs("./blurImg/", dataNum, 1)
x2_train = load_imgs("./blurImg/", dataNum, 2)
y_train = load_imgs("./blurImg/", dataNum, 0)

def make_trainable(net, val):
    net.trainable = val
    for l in net.layers:
        l.trainable = val
        
def loss_wrapper(in_tensor1, in_tensor2):
    def gaussian_blur(in_tensor):
        # use large kernel to blur pred and in_tensor//
        return
        
    def custom_loss(y_true, y_pred):
        # or better implementation like fourier transformation
        return K.binary_crossentropy(y_true, y_pred) + K.reduce_mean(K.square(gaussian_blur(y_pred)-gaussian_blur(in_tensor1)))
    return custom_loss

In [8]:
img_a = layers.Input(shape=(64, 64, 3))
img_b = layers.Input(shape=(64, 64, 3))
#feature_a = conv_net(img_a, 3)
#feature_b = conv_net(img_b, 3)
feature_a = res_net(img_a, 3)
feature_b = res_net(img_b, 3)
merge = layers.concatenate([feature_a, feature_b])
aif = post_net(merge, 128)
gen = Model(inputs = [img_a, img_b], output = [aif])
#gen.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
gen.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])
#gen.summary()
#plot_model(gen, to_file='generator.png')

  if __name__ == '__main__':


In [9]:
image_fake = gen([img_a, img_b])
dis = Sequential()
dis.add(layers.Conv2D(64, kernel_size=(3, 3),strides=(2, 2), padding='same'))
dis.add(layers.LeakyReLU())
#dis.add(layers.Dropout(0.25))
dis.add(layers.Conv2D(128, kernel_size=(3, 3), strides=(2, 2),padding='same'))
dis.add(layers.LeakyReLU())
#dis.add(layers.Dropout(0.25))
dis.add(layers.Conv2D(256, kernel_size=(3, 3), strides=(2, 2),padding='same'))
dis.add(layers.LeakyReLU())
#dis.add(layers.Dropout(0.25))
#dis.add(layers.Conv2D(1, kernel_size=(3, 3), padding='same'))

dis.add(layers.Flatten())
dis.add(layers.Dense(256))
dis.add(layers.Dense(2))
dis.add(layers.Activation('softmax'))
pred_prob = dis(image_fake)
dis.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
#dis.summary()
#plot_model(dis, to_file='discriminator.png')
make_trainable(dis, False)

In [10]:
am = Model(inputs = [img_a, img_b], output = [pred_prob])
am.summary()
am.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
#plot_model(am, to_file='adversary.png')

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 64, 64, 3)    0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 64, 64, 3)    0                                            
__________________________________________________________________________________________________
model_1 (Model)                 (None, 64, 64, 3)    2444291     input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 2)            4565890     model_1[1][0]                    
Total para

  """Entry point for launching an IPython kernel.


In [12]:
def plot_loss(losses):
        display.clear_output(wait=True)
        display.display(plt.gcf())
        plt.figure(figsize=(10,8))
        plt.plot(losses["d"], label='discriminitive loss')
        plt.plot(losses["g"], label='generative loss')
        plt.legend()
        plt.show()

In [13]:
# Train discriminator on generated images
losses = {"d":[], "g":[]}
Batch_size = 16
nb_epoch = 100
for epoch in range(nb_epoch):
    rand_idx = np.random.randint(0, x1_train.shape[0], size = Batch_size)
    img_batch1 = x1_train[rand_idx, :, :, :]
    img_batch2 = x2_train[rand_idx, :, :, :]
    y_batch = y_train[np.random.randint(0, y_train.shape[0], size = Batch_size), :, :, :]
    #gen.fit([x1_train, x2_train], y_train)

    
    gen_img = gen.predict([img_batch1, img_batch2])
    X = np.concatenate((y_batch, gen_img))
    y = np.zeros([2*Batch_size,2])
    y[0:Batch_size, 1] = 1
    y[Batch_size:, 0] = 1
    make_trainable(dis,True)
    dis.fit(X, y,epochs=1, batch_size=Batch_size*2)
    #losses["d"].append(d_loss)
 
    y2 = np.zeros([Batch_size, 2])
    y2[:, 1] = 1
    # train Generator-Discriminator stack on input noise to non-generated output class
    make_trainable(dis,False)
    am.fit([img_batch1, img_batch2], y2,epochs=1, batch_size=Batch_size) #same batch or ???
    #losses["g"].append(g_loss)
    #if epoch % 25 == 25 - 1:
    #    plot_loss(losses)
    


Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1


Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1


Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1
Epoch 1/1


In [None]:
Batch_size = 1000

rand_idx = np.random.randint(0, x1_train.shape[0], size = Batch_size)
img_batch1 = x1_train[rand_idx, :, :, :]
img_batch2 = x2_train[rand_idx, :, :, :]
y_batch = y_train[rand_idx, :, :, :]
gen.fit([img_batch1, img_batch2], y_batch)
    

gen_img[0]
gen_img.min()
gen_img = gen.predict([x1_train, x2_train])    
cv2.imwrite("a.jpg", 255.0*(gen_img[4]-gen_img[4].min())/(gen_img[4].max()-gen_img[4].min()))

In [None]:
#pre-train discriminate network
make_trainable(dis,True)
Batch_size = 100

rand_idx = np.random.randint(0, x1_train.shape[0], size = Batch_size)
img_batch1 = x1_train[rand_idx, :, :, :]
img_batch2 = x2_train[rand_idx, :, :, :]
y_batch = y_train[np.random.randint(0, y_train.shape[0], size = Batch_size), :, :, :]
gen_img = gen.predict([img_batch1, img_batch2])
X = np.concatenate((y_batch, gen_img))
y = np.zeros([2*Batch_size,2])
y[0:Batch_size, 0] = 1
y[Batch_size:, 1] = 1

dis.fit(X, y)