<a href="https://colab.research.google.com/github/Amritha16/ImageResolutionEnhancement/blob/master/ImageResolutionEnhancement.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This project is a tensorflow implementation of https://arxiv.org/pdf/1501.00092.pdf


The data for this project is stored in a github repository. Hence, the repo is being cloned to enable the access of that data. The data is in the form of .jpeg files.

Data : https://github.com/Amritha16/ImageResolutionEnhancement


In [None]:
!git clone https://github.com/Amritha16/ImageResolutionEnhancement.git


fatal: destination path 'ImageResolutionEnhancement' already exists and is not an empty directory.


In [None]:
!rm -r ImageResolutionEnhancement/Result
!mkdir ImageResolutionEnhancement/Result
!rm -r ImageResolutionEnhancement/Model
!mkdir ImageResolutionEnhancement/Model

Import the required packages

In [None]:
import os                                                                    
import math 
import numpy as np
import cv2

import tensorflow.compat.v1 as tf                                               
tf.disable_v2_behavior()                                                        # DISABLED TO ENABLE PLACEHOLDERS


All the appropriate directories are set

In [None]:
ROOT_DIR = os.path.abspath('ImageResolutionEnhancement/')                       # ROOT DIRECTORY IS SET
DATA_DIR = ROOT_DIR
TRAIN_DIR = os.path.join(ROOT_DIR, 'Train/')                                    
TEST_DIR = os.path.join(ROOT_DIR, 'Test/')
OUT_DIR = os.path.join(ROOT_DIR, 'Result/')
MODEL_DIR = os.path.join(ROOT_DIR, 'Model/')



Setting global variables

In [None]:
SEED = 128

NUM_EPOCHS = 20                                                                 # MAXIMUM NUMBER OF ITERATIONS IS SET TO 20
INITIAL_LEARNING_RATE = 0.0001                                                  # LOW LEARNING RATE IS SET FOR MORE ACCURATE RESULTS

READ_SIZE = 242
INPUT_SIZE = 33
OUTPUT_SIZE = 21
SCALE = 3
DIM = 1
PAD = int((INPUT_SIZE - OUTPUT_SIZE)/2)
STRIDE = 14

SUB_IMG = int(math.floor((READ_SIZE-INPUT_SIZE)/STRIDE) + 1)
NUM_SUBIMG = int(math.pow(SUB_IMG, 2))
BATCH_SIZE = 5

In [None]:
def cropFunction(image, scale=3):
    if image.shape[2] == 3:
        size = image.shape                                                      # ALL THE IMAGES ARE SET TO SAME SCALE
        size -= np.mod(size, scale)
        image = image[0:size[0], 0:size[1]]
        return image

def im2double(im):                                                              # DATA TYPE 
    info = np.iinfo(im.dtype) 
    return im.astype(np.float) / info.max


Preprocessing the data using sparse coding methods

In [None]:
class DATA():                                                                   

    def __init__(self, dirname):
        self.dirPath = os.path.join(DATA_DIR, dirname)
        self.filelist = os.listdir(self.dirPath)
        self.size = len(self.filelist)
        self.fileIndex = 0
        self.dataIndex = 0
        self.batch = None
        self.batchLabel = None

    def readImage(self, filename):                                               # IMAGE IS READ
        img = cv2.imread(filename, 3)
        ht, wd, channels = img.shape
        ycr_cb_img = cv2.cvtColor(cv2.resize(img, (READ_SIZE, READ_SIZE)), cv2.COLOR_BGR2YCR_CB)
        return ycr_cb_img

    def preprocessImage(self, img):                                              # IMAGE IS PREPROCESSED
        img = im2double(img)
        Labels = cropFunction(img, SCALE)
        bicubic_img = cv2.resize(img, None, fx=1.0/SCALE, fy=1.0/SCALE, interpolation=cv2.INTER_CUBIC)
        Inputs = cv2.resize(bicubic_img, None, fx=SCALE/1.0, fy=SCALE/1.0, interpolation=cv2.INTER_CUBIC)
        Inputs = Inputs[:, :, 0]                                                # INPUT IMAGE IS RESIZED
        Labels = Labels[:, :, 0]
        return Inputs, Labels

    def processImage(self, file):                                                # IMAGE IS PROCESSED
        self.batch = []
        self.batchLabel = []
        img = self.readImage(os.path.join(DATA_DIR, self.dirPath, file))
        Inputs, Labels = self.preprocessImage(img)                               # PREPROCESS FUNCTION IS CALLED
        xx = yy = 0
        for i in range(0, READ_SIZE - INPUT_SIZE, STRIDE):
            xx += 1
            yy = 0
            for j in range(0, READ_SIZE - INPUT_SIZE, STRIDE):                  
                yy += 1                                                         
                subInput = Inputs[i:(i+INPUT_SIZE), j:(j+INPUT_SIZE)]
                subLabel = Labels[(i+PAD):(i+PAD+OUTPUT_SIZE), (j+PAD):(j+PAD+OUTPUT_SIZE)]
                subInput = np.reshape(subInput, (INPUT_SIZE, INPUT_SIZE, DIM))      # EVERY ELEMENT AND LABEL IN ARRAY IS RESHAPED 
                subLabel = np.reshape(subLabel, (OUTPUT_SIZE, OUTPUT_SIZE, DIM))   
                self.batch.append(subInput)                                    # VALUES ARE APPENDED BACK TO THE BATCH
                self.batchLabel.append(subLabel)

    def generate_batch(self):                                                   # BATCH IS GENERATED
        if self.dataIndex >= (NUM_SUBIMG-1) or self.batch is None:
            self.dataIndex = 0
            self.processImage(self.filelist[self.fileIndex])
            self.fileIndex = (self.fileIndex + 1) % self.size
        batch = np.asarray(self.batch[self.dataIndex:(self.dataIndex+BATCH_SIZE)])
        label = np.asarray(self.batchLabel[self.dataIndex:(self.dataIndex+BATCH_SIZE)])
        self.dataIndex = self.dataIndex + BATCH_SIZE
        return batch, label


In [None]:
def DataTests():
    data = DATA(TEST_DIR)                                                       # TEST DATA IS SELECTED FROM IT'S DIRECTORY
    for i in range(int(2*(NUM_SUBIMG / BATCH_SIZE))):
        batch, label = data.generate_batch()
        print(batch.shape)
        print(i)

Image reconstruction occurs below

In [None]:
def reconstruct(image, file):                                                   # IMAGE IS RECONSTRUCTED
    savePath = os.path.join(OUT_DIR, file[:-4] + "reconstructed.jpg")
    print("savePath ", savePath)
    cv2.imwrite(savePath, image)

def deprocess(imgs):                                                            # DEPROCESSED
    imgs = imgs * 255
    imgs[imgs > 255] = 255
    imgs[imgs < 0] = 0
    return imgs.astype(np.uint8)

def stitch(patches):                                                            # ALL THE PACHES ARE STICHED TO A SINGLE OUTPUT IMAGE
    image = np.zeros((SUB_IMG*OUTPUT_SIZE, SUB_IMG*OUTPUT_SIZE, DIM))
    for ind, patch in enumerate(patches):
        j = ind % SUB_IMG
        i = ind // SUB_IMG
        image[i*OUTPUT_SIZE:(i*OUTPUT_SIZE+OUTPUT_SIZE), j*OUTPUT_SIZE:(j*OUTPUT_SIZE+OUTPUT_SIZE), :] = patch
    image = deprocess(image)
    return image


Defining layers in CNN

In [None]:

class Layer():

    def __init__(self, shape, mean, stddev):                                    # INITIALLY RANDOM VALUES ARE SET TO THE WEIGHTS
        self.weights = tf.Variable(tf.random_normal(shape=shape, mean=mean, stddev=stddev))
        self.biases = tf.Variable(tf.zeros(shape=[shape[-1]]))                  # ZEROS ARE SET TO BIASES

    def feedForward(self, inputData, stride=None):          
        raise NotImplementedError                                               # HANDLING EXCEPTION


In [None]:
class ConvolutionalLayer(Layer):                                                 # CONVOLUTION LAYER CONSTRUCTION

    def __init__(self, shape, mean, stddev):
        super(ConvolutionalLayer, self).__init__(shape, mean, stddev)

    def feedForward(self, inputData, stride):                                 # FEED FORWARDS APPROX THE INPUT FUNCS
        conv = tf.nn.conv2d(inputData, self.weights, stride, padding="VALID")
        output_data = tf.nn.relu(tf.nn.bias_add(conv, self.biases))
        return output_data

In [None]:
class OutputLayer(Layer):                                                      # OUTER LAYER GENERATION 

    def __init__(self, shape, mean, stddev):
        super(OutputLayer, self).__init__(shape, mean, stddev)

    def feedForward(self, inputData, stride):
        Output_Data = tf.nn.bias_add(tf.nn.conv2d(inputData, self.weights, stride, padding="VALID"), self.biases)
        return Output_Data                                                      # BIAS IS RESTRICTED TO 1-D 

The Super-Resolution Convolutional Neural Network model :

In [None]:
class MODEL():                                                                  

    def __init__(self):
        self.inputs = tf.placeholder(shape=[None, INPUT_SIZE, INPUT_SIZE, DIM], dtype=tf.float32)
        self.labels = tf.placeholder(shape=[None, OUTPUT_SIZE, OUTPUT_SIZE, DIM], dtype=tf.float32)
        self.logits = None                                                      # PLACEHOLDER IS SET TO THE TENSORFLOW
        self.output = None
        self.loss = None

    def build(self):
        inputData = self.inputs                                                # THE PATCH EXTRACTION AND AGGREGATION ARE
                                                                                # ALSO FORMULATED AS CONVOLUTIONAL LAYERS
        conv1 = ConvolutionalLayer(shape= [9, 9, DIM, 64], mean = 0.0, stddev = 0.001)     
        h = conv1.feedForward(inputData=inputData, stride=[1, 1, 1, 1])
        conv2 = ConvolutionalLayer(shape = [1, 1, 64, 32], mean = 0.0, stddev = 0.001)
        h = conv2.feedForward(inputData=h, stride=[1, 1, 1, 1])
        outerLayer = OutputLayer(shape = [5, 5, 32, DIM], mean = 0.0, stddev = 0.001)
        self.output = outerLayer.feedForward(inputData=h, stride=[1, 1, 1, 1])
        self.loss = tf.reduce_mean(tf.squared_difference(self.labels, self.output))

    def train(self, data):                                                      # TRAIN MODEL
        opt = tf.train.AdamOptimizer(INITIAL_LEARNING_RATE).minimize(self.loss)
        saver = tf.train.Saver()                                                # MODEL IS TRAINED WITH THE MINIMUM LEARNING RATE
        with tf.Session() as session:
            session.run(tf.global_variables_initializer())
            print('All variables are Initialized')
            for epoch in range(NUM_EPOCHS):                                     # ITERATED  OVER THE EPOCH COUNT
                avgCost = 0                                                    
                totalBatch = 0                                                 # TOTAL BATCH IS SET TO 0 FOR EACH EPOCH ITERATION
                for batch in range(int(data.size*(NUM_SUBIMG / BATCH_SIZE))):
                    batch_X, batch_Y = data.generate_batch()
                    feedDict = {self.inputs: batch_X, self.labels: batch_Y}    # TO FEED VALUE TO TF PLACEHOLDERS
                    _, lossValue = session.run([opt, self.loss], feed_dict=feedDict)
                    totalBatch += 5
                    avgCost += lossValue                                        # VALUE LOST IS SUMMED TO THE AVG COST 
                avgCost = avgCost / totalBatch
                print("Epoch : ", (epoch + 1), "cost = ", "{:.3f}".format(avgCost))

            savePath = saver.save(session, os.path.join(MODEL_DIR, "model" + str(BATCH_SIZE) + "_" + str(NUM_EPOCHS) + ".ckpt"))
            print("Model saved in path: %s" % savePath)

    def test(self, data):                                                       # TEST MODEL
        tempSaver = tf.train.Saver()
        with tf.Session() as session:
            tempSaver.restore(session, os.path.join(MODEL_DIR, "model" + str(BATCH_SIZE) + "_" + str(NUM_EPOCHS) + ".ckpt"))
            avgCost = 0                                                        # AVG COST IS SET TO 0 FOR EACH ITERATION
            totalBatch = int(data.size*(NUM_SUBIMG / BATCH_SIZE))
            for batch in range(totalBatch):
                batch_x, batch_y = data.generate_batch()                        # X, Y BATCHES ARE GENERATED FOR THE DATA
                feedDict = {self.inputs: batch_x, self.labels: batch_y}
                pred_y, loss = session.run([self.output, self.loss], feed_dict=feedDict)
                avgCost += loss      
            avgCost = avgCost / totalBatch
            print("cost =", "{:.3f}".format(avgCost))

    def SRGenerate(self, data):                                                # SUPER RESOLUTION IMAGE GENERATION
        tempSaver = tf.train.Saver()
        with tf.Session() as session:
            tempSaver.restore(session, os.path.join(MODEL_DIR, "model" + str(BATCH_SIZE) + "_" + str(NUM_EPOCHS) + ".ckpt"))
            for file in data.filelist:
                data.processImage(file)                                          # IMAGE IS PROCESSED AS BINARY ELEMENTS IN THE ARRAY
                batch = np.asarray(data.batch)
                feedDict = {self.inputs: batch}
                all_patches = session.run(self.output, feed_dict=feedDict)         # IMAGE IS SPLIT AS PATCHES 
                #display_batch_patch(batch, patches)
                image = stitch(all_patches)
                reconstruct(image, file)                                        # IMAGE IS RECONSTRUCTED FROM PATCHES

Main module

In [None]:
def main():
    train_data = DATA(TRAIN_DIR)                                                
    model = MODEL()                                                             
    model.build()                                                               # BUILD MODEL
    
    model.train(train_data)                                                     # TRAIN MODEL
    
    test_data = DATA(TEST_DIR)                                                  # TEST MODEL
    model.test(test_data)
    model.SRGenerate(test_data)                                                 # SUPERRESOLUTION GENERATE

if __name__ == "__main__": 
    main()                                                                      # MAIN MODULE
                                                                                                                

All variables are Initialized
Epoch :  1 cost =  0.003
Epoch :  2 cost =  0.002
Epoch :  3 cost =  0.002
Epoch :  4 cost =  0.002
Epoch :  5 cost =  0.002
Epoch :  6 cost =  0.002
Epoch :  7 cost =  0.002
Epoch :  8 cost =  0.002
Epoch :  9 cost =  0.002
Epoch :  10 cost =  0.002
Epoch :  11 cost =  0.002
Epoch :  12 cost =  0.002
Epoch :  13 cost =  0.002
Epoch :  14 cost =  0.002
Epoch :  15 cost =  0.002
Epoch :  16 cost =  0.002
Epoch :  17 cost =  0.002
Epoch :  18 cost =  0.002
Epoch :  19 cost =  0.002
Epoch :  20 cost =  0.002
Model saved in path: /content/ImageResolutionEnhancement/Model/model5_20.ckpt
INFO:tensorflow:Restoring parameters from /content/ImageResolutionEnhancement/Model/model5_20.ckpt
cost = 0.010
INFO:tensorflow:Restoring parameters from /content/ImageResolutionEnhancement/Model/model5_20.ckpt
savePath  /content/ImageResolutionEnhancement/Result/Img5reconstructed.jpg
savePath  /content/ImageResolutionEnhancement/Result/Img2reconstructed.jpg
savePath  /content/I