# U-net Task 1

## import libraries

In [0]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import glob
import cv2

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Activation
from tensorflow.python.keras.layers import Conv2D, MaxPooling2D, LeakyReLU
from tensorflow.python.keras.layers import concatenate, UpSampling2D, BatchNormalization
from tensorflow.keras.optimizers import Adam

## Dowload images

In [0]:
!pip install wget
import wget
import os
url = "https://people.eecs.berkeley.edu/~tinghuiz/projects/pix2pix/datasets/facades.tar.gz"
wget.download(url, '.')
!tar -zxvf facades.tar.gz

In [0]:
# import and normalize train, test, and validation images
files_train = glob.glob('facades/train/*')
pass_train = [i for i in files_train]
input_train = np.array([cv2.cvtColor(cv2.imread(i, cv2.IMREAD_COLOR)[:,256:,:], cv2.COLOR_BGR2RGB) for i in pass_train])
output_train = np.array([cv2.cvtColor(cv2.imread(i, cv2.IMREAD_COLOR)[:,:256,:], cv2.COLOR_BGR2RGB) for i in pass_train])
input_train = input_train/255.0
output_train = output_train/255.0
# input_train = 2.0*input_train/255.0 - 1.0
# output_train = 2.0*output_train/255.0 - 1.0

files_test = glob.glob('facades/test/*')
pass_test = [i for i in files_test]
input_test = np.array([cv2.cvtColor(cv2.imread(i, cv2.IMREAD_COLOR)[:,256:,:], cv2.COLOR_BGR2RGB) for i in pass_test])
output_test = np.array([cv2.cvtColor(cv2.imread(i, cv2.IMREAD_COLOR)[:,:256,:], cv2.COLOR_BGR2RGB) for i in pass_test])
input_test = input_test/255.0 
output_test = output_test/255.0
# input_test = 2.0*input_test/255.0 - 1.0
# output_test = 2.0*output_test/255.0 - 1.0

files_val = glob.glob('facades/val/*')
pass_val = [i for i in files_val]
input_val = np.array([cv2.cvtColor(cv2.imread(i, cv2.IMREAD_COLOR)[:,256:,:], cv2.COLOR_BGR2RGB) for i in pass_val])
output_val = np.array([cv2.cvtColor(cv2.imread(i, cv2.IMREAD_COLOR)[:,:256,:], cv2.COLOR_BGR2RGB) for i in pass_val])
input_val = input_val/255.0
output_val = output_val/255.0
# input_val = 2.0*input_val/255.0 - 1.0
# output_val = 2.0*output_val/255.0 - 1.0

print('input_train : ', input_train.shape)
print('output_train : ', output_train.shape)
print('input_val : ', input_val.shape)
print('output_val : ', output_val.shape)
print('input_test : ', input_test.shape)
print('output_test : ', output_test.shape)
print()
print('Plot example from training set')
print()
item_id = 5
print('item_id : ', item_id)
print()

# show sample training input and output
plt.imshow((input_train[item_id]+0.0 ) / 1.0)
plt.title('input_train [' + str(item_id) + ']')
plt.grid(None)
plt.xticks([])
plt.yticks([])
plt.show()

plt.imshow((output_train[item_id]+0.0 ) / 1.0)
plt.title('output_train [' + str(item_id) + ']')
plt.grid(None)
plt.xticks([])
plt.yticks([])
plt.show()

## Define network

In [0]:
def unet(
    input_train = input_train, 
    output_train = output_train,
    input_val = input_val,
    output_val = output_val,
    optimizer_type = Adam(lr=2e-4),
    loss = 'mean_squared_error',
    dropout_ratio = 0.0,
    max_epochs = 50,
    batch_size = 32,
    batch_shuffle = True,
    remove_skipconnection = False):

    # define input shape
    input_shape = (256, 256, 3)
    inputs = Input(shape=input_shape)

    # encoder section

    down0 = Conv2D(64, (4, 4), strides=(2, 2), padding='same')(inputs)
    down0 = BatchNormalization()(down0)
    down0 = LeakyReLU()(down0)

    down1 = Conv2D(128, (4, 4), strides=(2, 2), padding='same')(down0)
    down1 = BatchNormalization()(down1)
    down1 = LeakyReLU()(down1)

    down2 = Conv2D(256, (4, 4), strides=(2, 2), padding='same')(down1)
    down2 = BatchNormalization()(down2)
    down2 = LeakyReLU()(down2)

    down3 = Conv2D(512, (4, 4), strides=(2, 2), padding='same')(down2)
    down3 = BatchNormalization()(down3)
    down3 = LeakyReLU()(down3)

    down4 = Conv2D(512, (4, 4), strides=(2, 2), padding='same')(down3)
    down4 = BatchNormalization()(down4)
    down4 = LeakyReLU()(down4)

    down5 = Conv2D(512, (4, 4), strides=(2, 2), padding='same')(down4)
    down5 = BatchNormalization()(down5)
    down5 = LeakyReLU()(down5)

    down6 = Conv2D(512, (4, 4), strides=(2, 2), padding='same')(down5)
    down6 = BatchNormalization()(down6)
    down6 = LeakyReLU()(down6)

    # center section

    center = Conv2D(512, (4, 4), strides=(2, 2), padding='same')(down6)
    center = BatchNormalization()(center)
    center = LeakyReLU()(center)
        
    # decoder section with skip connections to the encoder section

    up6 = UpSampling2D(size=(2, 2))(center)
    up6 = concatenate([down6, up6], axis=3)
    up6 = Conv2D(512, (4, 4), padding='same')(up6)
    up6 = BatchNormalization()(up6)
    up6 = LeakyReLU()(up6)

    up5 = UpSampling2D(size=(2, 2))(up6)
    if not(remove_skipconnection):
        up5 = concatenate([down5, up5], axis=3)
    up5 = Conv2D(512, (4, 4), padding='same')(up5)
    up5 = BatchNormalization()(up5)
    up5 = LeakyReLU()(up5)

    up4 = UpSampling2D(size=(2, 2))(up5)
    if not(remove_skipconnection):
        up4 = concatenate([down4, up4], axis=3)
    up4 = Conv2D(512, (4, 4), padding='same')(up4)
    up4 = BatchNormalization()(up4)
    up4 = LeakyReLU()(up4)

    up3 = UpSampling2D(size=(2, 2))(up4)
    up3 = concatenate([down3, up3], axis=3)
    up3 = Conv2D(512, (4, 4), padding='same')(up3)
    up3 = BatchNormalization()(up3)
    up3 = LeakyReLU()(up3)

    up2 = UpSampling2D(size=(2, 2))(up3)
    if not(remove_skipconnection):
        up2 = concatenate([down2, up2], axis=3)
    up2 = Conv2D(256, (4, 4), padding='same')(up2)
    up2 = BatchNormalization()(up2)
    up2 = LeakyReLU()(up2)

    up1 = UpSampling2D(size=(2, 2))(up2)
    if not(remove_skipconnection):
        up1 = concatenate([down1, up1], axis=3)
    up1 = Conv2D(128, (4, 4), padding='same')(up1)
    up1 = BatchNormalization()(up1)
    up1 = LeakyReLU()(up1)

    up0 = UpSampling2D(size=(2, 2))(up1)
    up0 = concatenate([down0, up0], axis=3)
    up0 = Conv2D(64, (4, 4), padding='same')(up0)
    up0 = BatchNormalization()(up0)
    up0 = LeakyReLU()(up0)

    outputs = UpSampling2D(size=(2, 2))(up0)
    outputs = Conv2D(3, (4, 4), padding='same')(outputs)
    outputs = Activation('tanh')(outputs)

    # compile the network
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer=optimizer_type, loss=loss)
    # Display a summary of the compiled neural network
    print(model.summary())  
    print()

    print('* Training the compiled network *')
    print()
    history = model.fit(input_train, output_train, \
                        batch_size=batch_size, \
                        epochs=max_epochs, \
                        validation_data=(input_val, output_val), \
                        shuffle=batch_shuffle)
    print()
    print('Training completed')
    print()

    return model, history

In [0]:
# function to plot history
def show_result(model, input_test = input_test, output_test = output_test):
    idx = [0,1,2,3]
    model_out = model.predict(input_test[idx])
    for i in range(4):
        fig = plt.figure()
        plt.subplot(1,3,1);plt.imshow((input_test[i]+0.0)/1.0);plt.title('test_input')
        plt.subplot(1,3,2);plt.imshow((output_test[i]+0.0)/1.0);plt.title('true_image')
        plt.subplot(1,3,3);plt.imshow((model_out[i]+0.0)/1.0);plt.title('Reconstructed image')
        plt.show()

In [0]:
# train unet without with skip connections shown in Fig1.a
model, history = unet()

In [0]:
# show reconstructed images along the ground truth images
show_result(model)

## Task 2

In [0]:
# train unet without with skip connections shown in Fig1.b
model2, history2 = unet(remove_skipconnection = True)

In [0]:
# show reconstructed images along the ground truth images
show_result(model2)

Removing skipping connections improve reconstructed images.