<a href="https://colab.research.google.com/github/francesco-source/DeepLearning/blob/main/Image_inpainting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Image inpainting problem on CIFAR10

In this notebook I try to implement different nets for solving the same problem.

This was done to fine-tune the networks for optimal performance and to learn the majority of the theoretical concepts covered in the course.

In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras import layers, models, metrics
from tensorflow.keras.optimizers import Adam
from keras import callbacks
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import plot_model

In [None]:
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
print("image range is {}, {}".format(np.min(x_test,axis=(0,1,2,3)),np.max(x_test,axis=(0,1,2,3))))
x_train = (x_train/255.).astype(np.float32)
x_test = (x_test/255.).astype(np.float32)
print("new image range is {}, {}".format(np.min(x_test,axis=(0,1,2,3)),np.max(x_test,axis=(0,1,2,3))))

image range is 0, 255
new image range is 0.0, 1.0


In [None]:
def mask(X,coords):
  x0,y0,x1,y1 = coords
  X[:,x0:x1,y0:y1] = 0
  return X


masked_x_train = mask(np.copy(x_train),(2,16,30,30))
masked_x_test = mask(np.copy(x_test),(2,16,30,30))


## Autoencoder for image unpainting

 This neural network is a convolutional autoencoder model. The goal of the convolutional autoencoder is to compress an input image into a low-dimensional representation (often called a "latent code") and then reconstruct the input image from the latent code.
 
 Before the autoencoder we have 2 indipendent convolution blocks in order to learn different features from the input image.
 There is no dense layer as a bottleneck beacuse the dataset is small compared to the size of the net. 

The convolutional autoencoder is composed of two main parts:
1. The encoder consists of a series of convolutional layers that compress the input image into a low-dimensional representation.
2. The decoder, on the other hand, uses a series of deconvolution (or upsampling) layers to reconstruct the input image from the low-dimensional representation.

The neural network shown here begins with a first convolutional layer that uses a 5x5 convolution and a stride of 1. This layer is followed by a set of four convolutional blocks, each consisting of three convolutional layers, a LeakyReLU activation function with an alpha of 0.2, and a batch normalization layer. The LeakyReLu is used in order to prevent the vanishing relu problem. 
These convolutional blocks are designed to extract the features of the input image in order to compress them into a low-dimensional representation.

Subsequently, the neural network uses three deconvolution (or upsampling) blocks, each consisting of a deconvolution layer, a LeakyReLU activation function, a batch normalization layer, and a concatenation with the corresponding output of the encoder. The last layer of the decoder is a 3x3 convolution with a sigmoid activation function, which produces the reconstructed image.

In summary, this neural network is designed to compress an input image into a low-dimensional representation and then reconstruct the input image from the latent code. The network uses convolutional and deconvolution blocks, LeakyReLU activation functions, and batch normalization to extract the features of the input image and reconstruct the output image.

In [None]:

from tensorflow.keras.layers import Conv2D, Conv2DTranspose, LeakyReLU, BatchNormalization, Concatenate, Input
from tensorflow.keras.models import Model

def ConvBlock(x, filters, kernel_size, strides=1, activation='relu', padding='same'):
    x = Conv2D(filters, kernel_size, strides=strides, padding=padding)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)
    return x

def UpConvBlock(x, skip_connection, filters, kernel_size, strides=1, activation='relu', padding='same'):
    x = Conv2DTranspose(filters, kernel_size, strides=strides, padding=padding)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)
    x = Concatenate()([x, skip_connection])
    return x

def initial_differentiation(x, filters, kernel_size, strides=1, activation='relu', padding='same'):

    conv_a_res = layers.Conv2D(filters=32, kernel_size=1, activation='relu', padding='same')(x)  #32, 32, 32
    conv_a = layers.Conv2D(filters=32, kernel_size=3, activation='relu', padding='same')(conv_a_res)  #32, 32, 32
    conv_a = layers.Conv2D(filters=32, kernel_size=3, activation='relu', padding='same')(conv_a)  #32, 32, 32
    conv_a = layers.Conv2D(filters=32, kernel_size=3, activation='relu', padding='same')(conv_a)  #32, 32, 32
    add_res_a = layers.add([conv_a_res, conv_a]) #32, 32, 64
    conv_a = layers.Conv2D(filters=64, kernel_size=3, activation='relu', padding='same')(add_res_a) #16, 16, 32
    norm_a = layers.BatchNormalization()(conv_a)

    conv_b_res = layers.Conv2D(filters=32, kernel_size=1, activation='relu', padding='same')(x)  #32, 32, 32
    conv_b = layers.Conv2D(filters=32, kernel_size=5, activation='relu', padding='same')(conv_b_res)  #32, 32, 32
    conv_b = layers.Conv2D(filters=32, kernel_size=5, activation='relu', padding='same')(conv_b)  #32, 32, 32
    conv_b = layers.Conv2D(filters=32, kernel_size=5, activation='relu', padding='same')(conv_b)  #32, 32, 32
    add_res_b = layers.add([conv_b_res, conv_b]) #32, 32, 64
    conv_b = layers.Conv2D(filters=64, kernel_size=5, activation='relu', padding='same')(add_res_b) #32, 32, 32
    norm_b = layers.BatchNormalization()(conv_b)

    # Concatenate the output of the two branches
    conc_x = layers.concatenate([norm_a, norm_b]) #32, 32, 64
    conv_x = layers.Conv2D(filters=64, kernel_size=3, padding='same')(conc_x) #32, 32, 64
    leaky_activation = layers.LeakyReLU()(conv_x)

    return leaky_activation

def autoencoder(input_shape_x):
    # Encoder
    inputs_x = Input(shape=input_shape_x)
    x1 = initial_differentiation(inputs_x,64,5,strides = 1, activation = "relu",padding = "same")
    #x1 = ConvBlock(inputs_x, 64, 5, strides=1, activation='relu', padding='same')
    x2 = ConvBlock(x1, 128, 4, strides=2, activation='relu', padding='same')
    x3 = ConvBlock(x2, 256, 4, strides=2, activation='relu', padding='same')
    x4 = ConvBlock(x3, 512, 4, strides=2, activation='relu', padding='same')

    # Decoder
    x = UpConvBlock(x4, x3, 256, 4, strides=2, activation='relu', padding='same')
    x = UpConvBlock(x, x2, 128, 4, strides=2, activation='relu', padding='same')
    x = UpConvBlock(x, x1, 64, 4, strides=2, activation='relu', padding='same')
    x = Conv2D(3, 3, strides=1, padding='same', activation='sigmoid')(x)

    # Create the model
    model = Model(inputs=inputs_x, outputs=x)

    return model

In [None]:
model = autoencoder((32,32,3))

tf.keras.utils.plot_model(model, show_shapes=True, dpi=76)

In [None]:
model.summary()

In [None]:
stop_training = callbacks.EarlyStopping(monitor="val_loss", mode="auto", patience=5, restore_best_weights=True)
reduce_learning_rate = callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=6, min_lr=0.000001)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error', metrics='accuracy')

In [None]:
model.fit(x=masked_x_train, y=x_train, validation_split=0.1, batch_size=32, epochs=100, verbose=1, shuffle=True, callbacks=[stop_training, reduce_learning_rate])


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100


<keras.callbacks.History at 0x7f150c3fd060>

In [None]:
predictions = model.predict(x=masked_x_test, verbose=0)

In [None]:
plt.imshow(masked_x_test[1])
plt.show()

In [None]:
plt.imshow(predictions[1])
plt.show()

In [None]:
plt.imshow(x_test[1])
plt.show()

In [None]:
from keras import backend as K
# calculate the MSE between the predictions and ground truth
mse = K.mean(K.square(x_test - predictions))
# evaluate the model
print('Mean Squared Error:', K.eval(mse))

Mean Squared Error: 0.008250367


In [None]:
mse = np.mean(np.square(predictions - x_test),axis=1)
print(mse.shape)
print("The mse on the test set is : ",np.mean(mse), " +/- ",np.std(mse))

(10000, 32, 3)
The mse on the test set is :  0.008207888  +/-  0.015632764


## GAN for CIFAR_10

In [None]:
from numpy import zeros
from numpy import ones
from numpy.random import randn
from numpy.random import randint
from keras.datasets.cifar10 import load_data
from keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Dropout
import tensorflow as tf
from tensorflow import keras

# nothing more than a binary classifier
def define_discriminator(in_shape=(32,32,3)):
	model = Sequential()
	
	model.add(Conv2D(128, (3,3), strides=(2,2), padding='same', input_shape=in_shape)) #16x16x128
	model.add(LeakyReLU(alpha=0.2))
	
	model.add(Conv2D(128, (3,3), strides=(2,2), padding='same')) #8x8x128
	model.add(LeakyReLU(alpha=0.2))
	
	model.add(Flatten()) #shape of 8192
	model.add(Dropout(0.4))
	model.add(Dense(1, activation='sigmoid')) #shape of 1
	# compile model
	opt = Adam(learning_rate=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
	return model

test_discr = define_discriminator()
print(test_discr.summary())

# define the standalone generator model
# #Given input of latent vector, the Generator produces an image.(here: 32x32)
#latent_dim, for example, can be 100, 1D array of size 100 

#Here we are only using Dense and conv2dlayers. But network can be complicated based
#on the application. For example, you can use VGG for super res. GAN.       

def define_generator(latent_dim = (32,32,3)):    #latent_dim is the dimension of the latent vector (e.g., 100)
	model = Sequential()
	# We will reshape input latent vector into 8x8 image as a starting point. 
    #So n_nodes for the Dense layer can be 128x8x8 so when we reshape the output 
    #it would be 8x8x128 and that can be slowly upscaled to 32x32 image for output.
  model.add(Input(shape = latent_dim ))
  model.add(Conv2D(64,(4,4),strides = (2,2), padding = "same"))
  model.add(LeakyReLU(alpha=0.2))
  model.add(Conv2D(64,(4,4),strides = (2,2), padding = "same"))
  model.add(LeakyReLU(alpha=0.2))

	model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')) #16x16x128
	model.add(LeakyReLU(alpha=0.2))
	# upsample to 32x32
	model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')) #32x32x128
	model.add(LeakyReLU(alpha=0.2))
	# generate
	model.add(Conv2D(3, (8,8), activation='sigmoid', padding='same')) #32x32x3
	return model  #Model not compiled as it is not directly trained like the discriminator.
                    #Generator is trained via GAN combined model. 

test_gen = define_generator(100)
print(test_gen.summary())


# define the combined generator and discriminator model, for updating the generator
#Discriminator is trained separately so here only generator will be trained by keeping
#the discriminator constant. 
def define_gan(generator, discriminator):
	discriminator.trainable = False  #Discriminator is trained separately. So set to not trainable.
	# connect generator and discriminator
	model = Sequential()
	model.add(generator)
	model.add(discriminator)
	# compile model
	opt = Adam(learning_rate=0.0002, beta_1=0.5)
	model.compile(loss='mse', optimizer=opt)
	return model


# load cifar training images
def load_real_samples():
	# cConvert to float and scale.
	# scale from [0,255] to [-1,1]
	 #Generator uses tanh activation so rescale 
                            #original images to -1 to 1 to match the output of generator.
	return x_train

# pick a batch of random real samples to train the GAN
#In fact, we will train the GAN on a half batch of real images and another 
#half batch of fake images. 
#For each real image we assign a label 1 and for fake we assign label 0. 
def generate_real_samples(dataset, n_samples):
	# choose random images
	ix = randint(0, dataset.shape[0], n_samples)
	# select the random images and assign it to X
	X = dataset[ix]
	# generate class labels and assign to y
	y = ones((n_samples, 1)) ##Label=1 indicating they are real
	return X, y

# generate n_samples number of latent vectors as input for the generator
def generate_latent_points(latent_dim, n_samples):
	# generate points in the latent space
	x_input = randn(latent_dim * n_samples)
	# reshape into a batch of inputs for the network
	x_input = x_input.reshape(n_samples, latent_dim)
 
	return x_input

# use the generator to generate n fake examples, with class labels
#Supply the generator, latent_dim and number of samples as input.
#Use the above latent point generator to generate latent points. 
def generate_fake_samples(generator, latent_dim, n_samples):
	# generate points in latent space
	x_input = generate_latent_points(latent_dim, n_samples)
	# predict using generator to generate fake samples. 
	X = generator.predict(x_input)
	# Class labels will be 0 as these samples are fake. 
	y = zeros((n_samples, 1))  #Label=0 indicating they are fake
	return X, y

# train the generator and discriminator
#We loop through a number of epochs to train our Discriminator by first selecting
#a random batch of images from our true/real dataset.
#Then, generating a set of images using the generator. 
#Feed both set of images into the Discriminator. 
#Finally, set the loss parameters for both the real and fake images, as well as the combined loss. 
tf.keras.utils.disable_interactive_logging()

def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=100, n_batch=128):
	bat_per_epo = int(dataset.shape[0] / n_batch)
	half_batch = int(n_batch / 2)  #the discriminator model is updated for a half batch of real samples 
                            #and a half batch of fake samples, combined a single batch. 
	# manually enumerate epochs and bacthes. 
	for i in range(n_epochs):
		# enumerate batches over the training set
		for j in range(bat_per_epo):
            
            # Train the discriminator on real and fake images, separately (half batch each)
        #Research showed that separate training is more effective. 
			# get randomly selected 'real' samples
			X_real, y_real = generate_real_samples(dataset, half_batch)
			# update discriminator model weights
            ##train_on_batch allows you to update weights based on a collection 
            #of samples you provide
            #Let us just capture loss and ignore accuracy value (2nd output below)
			d_loss_real, _ = d_model.train_on_batch(X_real, y_real)
			
            # generate 'fake' examples
			X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
			# update discriminator model weights
			d_loss_fake, _ = d_model.train_on_batch(X_fake, y_fake)
            
            #d_loss = 0.5 * np.add(d_loss_real, d_loss_fake) #Average loss if you want to report single..
            
			# prepare points in latent space as input for the generator
			X_gan = generate_latent_points(latent_dim, n_batch)
             
            # The generator wants the discriminator to label the generated samples
        # as valid (ones)
        #This is where the generator is trying to trick discriminator into believing
        #the generated image is true (hence value of 1 for y)			
			y_gan = ones((n_batch, 1))
            
            # Generator is part of combined model where it got directly linked with the discriminator
        # Train the generator with latent_dim as x and 1 as y. 
        # Again, 1 as the output as it is adversarial and if generator did a great
        #job of folling the discriminator then the output would be 1 (true)
			# update the generator via the discriminator's error
			g_loss = gan_model.train_on_batch(X_gan, y_gan)
	# Print losses on this batch
		print("Epoch",i+1,"d_true_loss",d_loss_real, "d_loss_fake",d_loss_fake,"g_loss",g_loss)
	# save the generator model
	return g_model.save('cifar_generator_2epochs.h5')



None
None


In [None]:
test_discr = define_discriminator()
print(test_discr.summary())

In [None]:
test_gen = define_generator(100)
print(test_gen.summary())

In [None]:
latent_dim = (32,32,3)

discriminator = define_discriminator()

generator = define_generator(latent_dim)

gan_model = define_gan(generator, discriminator)

dataset = load_real_samples()


In [None]:
train(generator, discriminator, gan_model, dataset, latent_dim, n_epochs = 2)

Epoch 1 d_true_loss 0.7822235226631165 d_loss_fake 0.5076725482940674 g_loss 0.4626893401145935
Epoch 2 d_true_loss 0.7951231002807617 d_loss_fake 0.6520218849182129 g_loss 0.29806452989578247
Epoch 3 d_true_loss 0.6508359909057617 d_loss_fake 0.7008427381515503 g_loss 0.2660442590713501
Epoch 4 d_true_loss 0.5789505243301392 d_loss_fake 0.7079810500144958 g_loss 0.25636786222457886
Epoch 5 d_true_loss 0.6596049070358276 d_loss_fake 0.7286345958709717 g_loss 0.24420130252838135
Epoch 6 d_true_loss 0.6991044878959656 d_loss_fake 0.7002875804901123 g_loss 0.2628227174282074


KeyboardInterrupt: ignored

In [None]:
# Now, let us load the generator model and generate images

from keras.models import load_model
from numpy.random import randn


# Plot generated images 
def show_plot(examples, n):
	for i in range(n * n):
		plt.subplot(n, n, 1 + i)
		plt.axis('off')
		plt.imshow(examples[i, :, :, :])
	plt.show()

# load model
model = load_model('cifar_generator_2epochs.h5') #Model trained for 100 epochs
# generate images
latent_points = generate_latent_points(100, 25)  #Latent dim and n_samples
# generate images
X = model.predict(latent_points)
# scale from [-1,1] to [0,1]
X = (X + 1) / 2.0

import numpy as np
X = (X*255).astype(np.uint8)

# plot the result
show_plot(X, 5)

#Note: CIFAR10 classes are: airplane, automobile, bird, cat, deer, dog, frog, horse,
# ship, truck

OSError: ignored