Load data

In [27]:
import os
import glob
import h5py
import imageio
import scipy.misc
import scipy.ndimage
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image 
from matplotlib.pyplot import imread

# Input setup
# prepare data
dataset = "Data"
filenames = os.listdir(dataset) # print(filenames)
data_dir  = os.path.join(os.getcwd(), dataset) # print(data_dir)
data      = glob.glob(os.path.join(data_dir,"*.bmp")) # print(data)
sub_input_sequence = []
sub_label_sequence = []
# Loop through the images
for i in range(len(data)):
    image = imageio.imread(data[i]).astype(np.float)
    #image = scipy.misc.imread(data[i], mode='YCbCr').astype(np.float)
    #image = imread(data[i])  # imgplot = plt.imshow(image / 255.)
    scale = 3
    h, w, _ = image.shape
    h = h - np.mod(h, scale)
    w = w - np.mod(w, scale)
    label = image[0:h, 0:w, :]
    
    image = image / 255.
    label = label / 255.

    input_ = scipy.ndimage.interpolation.zoom(label, (1./scale), prefilter=False)
    input_ = scipy.ndimage.interpolation.zoom(input_, (scale/1.), prefilter=False)    
    
    stride = 14
    image_size = 33
    label_size = 21
    pad = abs(image_size - label_size) / 2
    for x in range(0, h - image_size+1, stride):
        for y in range(0, w - image_size+1, stride):
            sub_input = input_[x:x+image_size, y:y+image_size, 0]
            sub_label = label[x+int(pad):x+int(pad)+label_size, y+int(pad):y+int(pad)+label_size, 0]
            sub_input = sub_input.reshape([image_size, image_size, 1])
            sub_label = sub_label.reshape([label_size, label_size, 1])
            
            sub_input_sequence.append(sub_input)
            sub_label_sequence.append(sub_label)

arrData = np.asarray(sub_input_sequence)
arrLabel = np.asarray(sub_label_sequence)
savepath = os.path.join(os.getcwd(), 'checkpoint\\train.h5')
with h5py.File(savepath, 'w') as hf:
    hf.create_dataset('data', data=arrData)
    hf.create_dataset('label', data=arrLabel) 

X_train = arrData;
y_train = arrLabel;

Build the model

In [28]:
import tensorflow as tf
EPOCHS = 1500
BATCH_SIZE = 128
def SRCNN(input_image): #9-5-5
    mu = 0
    sigma = 0.1
    # Layer 1: Convolution Input =. Output=.
    conv1_kernel = tf.Variable(tf.truncated_normal(shape=(9, 9, 1, 64), mean = mu, stddev = sigma))
    conv1_bias   = tf.Variable(tf.zeros(64))
    conv1_output = tf.nn.conv2d(input_image, conv1_kernel, strides=[1, 1, 1, 1], padding='VALID') + conv1_bias
    conv1_act = tf.nn.relu(conv1_output)
    # Layer 2: Convolution Input =. Output =.
    conv2_kernel = tf.Variable(tf.truncated_normal(shape=(5, 5, 64, 32), mean = mu, stddev = sigma))
    conv2_bias   = tf.Variable(tf.zeros(32))
    conv2_output = tf.nn.conv2d(conv1_act, conv2_kernel, strides=[1, 1, 1, 1], padding='SAME') + conv2_bias
    conv2_act = tf.nn.relu(conv2_output)
    # Layer 3: Convolution Input =. Output =.
    conv3_kernel = tf.Variable(tf.truncated_normal(shape=(5, 5, 32, 1), mean = mu, stddev = sigma))
    conv3_bias   = tf.Variable(tf.zeros(1))
    conv3_output = tf.nn.conv2d(conv2_act, conv3_kernel, strides=[1, 1, 1, 1], padding='VALID') + conv3_bias
    conv3_act = tf.nn.relu(conv3_output)
    
    return conv3_act


In [29]:
# Insert training pipeline
input_image = tf.placeholder(tf.float32, [None, 33, 33, 1], name='input_image')
labels = tf.placeholder(tf.float32, [None, 21, 21, 1], name='labels')
output = SRCNN(input_image)
loss = tf.reduce_mean(tf.square(labels - output))
training_operation = tf.train.GradientDescentOptimizer(learning_rate = 0.0001).minimize(loss)


In [None]:
# Train the model
from pathlib import Path
saver = tf.train.Saver()
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    num_examples = len(X_train)
    
    model_file = Path("model/SRCNN.ckpt.meta")
    if model_file.is_file():
        saver.restore(sess, 'model/SRCNN.ckpt')
        print("Loaded model from disk") 
    
    print("Training...")
    print()
    counter = 0
    for i in range(EPOCHS):
        Error = 0
        for offset in range(0, num_examples, BATCH_SIZE):
            end = offset + BATCH_SIZE
            batch_x, batch_y = X_train[offset:end], y_train[offset:end]
            counter = counter + 1
            _, err = sess.run([training_operation, loss], feed_dict={input_image: batch_x, labels: batch_y})
            Error = err
            if counter % 10 == 0:
                print("Epoch: [%2d], step: [%2d], loss: [%.8f]" % ((i+1), counter, err))
            
        print("EPOCH {} ...".format(i+1))
        print("Error rate = {:.8f}".format(Error))
        print()
        
    saver.save(sess, 'model/SRCNN.ckpt')
    # tf.train.write_graph(sess.graph.as_graph_def(), '.', 'model/SRCNN.pbtxt', as_text=True)
    # tf.train.write_graph(sess.graph.as_graph_def(), '.', 'model/SRCNN.pb', as_text=False)
    print("Model saved")
    

Training...

Epoch: [ 1], step: [10], loss: [0.14234193]
Epoch: [ 1], step: [20], loss: [0.12261878]
Epoch: [ 1], step: [30], loss: [0.09283951]
Epoch: [ 1], step: [40], loss: [0.13785802]
Epoch: [ 1], step: [50], loss: [0.11991202]
Epoch: [ 1], step: [60], loss: [0.07196879]
Epoch: [ 1], step: [70], loss: [0.14545460]
Epoch: [ 1], step: [80], loss: [0.05851944]
Epoch: [ 1], step: [90], loss: [0.07959670]
Epoch: [ 1], step: [100], loss: [0.10954367]
Epoch: [ 1], step: [110], loss: [0.04556477]
Epoch: [ 1], step: [120], loss: [0.06691538]
Epoch: [ 1], step: [130], loss: [0.12221613]
Epoch: [ 1], step: [140], loss: [0.06523453]
Epoch: [ 1], step: [150], loss: [0.07307412]
Epoch: [ 1], step: [160], loss: [0.13125998]
Epoch: [ 1], step: [170], loss: [0.05763770]
EPOCH 1 ...
Error rate = 0.05636177

Epoch: [ 2], step: [180], loss: [0.07178632]
Epoch: [ 2], step: [190], loss: [0.09836701]
Epoch: [ 2], step: [200], loss: [0.06385397]
Epoch: [ 2], step: [210], loss: [0.07876655]
Epoch: [ 2], s

In [17]:
# Setup model evaluation pipeline
# Input Test setup
# prepare data
dataset = "Test"
filenames = os.listdir(dataset) 
print(filenames)
data_dir  = os.path.join(os.getcwd(), dataset) 
print(data_dir)
data      = glob.glob(os.path.join(data_dir,"*.bmp")) 
print(data)
sub_input_sequence = []
sub_label_sequence = []
# Loop through the images
for i in range(len(data)):
    image = imageio.imread(data[i]).astype(np.float)
    #image = imread(data[i])  # imgplot = plt.imshow(image / 255.)
    scale = 3
    h, w, _ = image.shape
    h = h - np.mod(h, scale)
    w = w - np.mod(w, scale)
    label = image[0:h, 0:w, :]
    
    image = image / 255.
    label = label / 255.

    input_ = scipy.ndimage.interpolation.zoom(label, (1./scale), prefilter=False)
    input_ = scipy.ndimage.interpolation.zoom(input_, (scale/1.), prefilter=False)    
    
    stride = 14
    image_size = 33
    label_size = 21
    pad = abs(image_size - label_size) / 2
    
    nx = ny = 0
    for x in range(0, h - image_size+1, stride):
        nx += 1
        ny = 0
        for y in range(0, w - image_size+1, stride):
            ny += 1
            sub_input = input_[x:x+image_size, y:y+image_size, 0]
            sub_label = label[x+int(pad):x+int(pad)+label_size, y+int(pad):y+int(pad)+label_size, 0]
            sub_input = sub_input.reshape([image_size, image_size, 1])
            sub_label = sub_label.reshape([label_size, label_size, 1])
            
            sub_input_sequence.append(sub_input)
            sub_label_sequence.append(sub_label)

arrData = np.asarray(sub_input_sequence)
arrLabel = np.asarray(sub_label_sequence)
savepath = os.path.join(os.getcwd(), 'checkpoint\\test.h5')
with h5py.File(savepath, 'w') as hf:
    hf.create_dataset('data', data=arrData)
    hf.create_dataset('label', data=arrLabel) 

X_test = arrData;
y_test = arrLabel;

print(X_test.shape)
print(y_test.shape)




['baby_GT.bmp', 'test_image.png']
C:\Users\dwith\Documents\Github\cnn_networks\code\tensorflow\SRCNN\Test
['C:\\Users\\dwith\\Documents\\Github\\cnn_networks\\code\\tensorflow\\SRCNN\\Test\\baby_GT.bmp']
(1225, 33, 33, 1)
(1225, 21, 21, 1)


In [6]:
# Test the model
# Insert training pipeline
import imageio

from pathlib import Path
saver = tf.train.Saver()

def merge(images, size):
    h, w = images.shape[1], images.shape[2]
    img = np.zeros((h*size[0], w*size[1], 1))
    for idx, image in enumerate(images):
        i = idx % size[1]
        j = idx // size[1]
        img[j*h:j*h+h, i*w:i*w+w, :] = image
    return img

with tf.Session() as sess:
    print("Testing...")
    model_file = Path("model/SRCNN.ckpt.meta")
    if model_file.is_file():
        saver.restore(sess, 'model/SRCNN.ckpt')
        print("Loaded model from disk") 
    
    result = sess.run(output, feed_dict={input_image: X_test, labels: y_test})
    result = merge(result, [nx, ny])
    result = result.squeeze()
    image_path = os.path.join(os.getcwd(), dataset)
    image_path = os.path.join(image_path, "test_image.png")
    # scipy.misc.imsave(image_path, result)
    imageio.imsave(image_path, result)
    # imsave(result, image_path)

Testing...
INFO:tensorflow:Restoring parameters from model/SRCNN.ckpt
Loaded model from disk
float64
float64


