In [None]:
import os
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
import os
import random
import numpy as np
import cv2
import keras
import tensorflow as tf
from tqdm import tqdm
from keras import Input, Model
from keras.layers import UpSampling2D, Reshape
from keras.layers.convolutional import Conv2D,Conv2DTranspose
from keras.callbacks import ModelCheckpoint
from keras.utils import generic_utils

def resizeImg(path,newDir):
    imgList = os.listdir(path)
    for imgPath in tqdm(imgList):
        tmpPath = path+"/"+imgPath
        img = cv2.imread(tmpPath)
        img = cv2.resize(img,(256,256))
        tmpName = imgPath.split('.')
        cv2.imwrite(newDir+"/"+tmpName[0]+'.jpg',img)

def mosaicImg(path,newDir):
    imgList = os.listdir(path)
    for imgPath in tqdm(imgList):
        tmpPath = path+"/"+imgPath
        img = cv2.imread(tmpPath)
        mosaicPic = mosaic(img,85,85,105,105)
        cv2.imwrite(newDir+"/"+imgPath,mosaicPic)

def mosaic(img,x,y,w,h,neighbour = 14):
    for i in range(0,h - neighbour, neighbour):
        for j in range(0, w - neighbour,neighbour):
            box = [j+x,i+y,neighbour,neighbour]
            color = img[i+y][j+x].tolist()
            cv2.rectangle(img,(box[0],box[1]),(box[0]+neighbour-1,box[1]+neighbour-1),color,-1)
    return img

def preReadData(dataDir1,dataDir2):
    trainingData = []
    labelData = []
    for path in tqdm(os.listdir(dataDir1)):
        img = cv2.imread(dataDir1+"/"+path)
        trainingData.append(img)
    for path in tqdm(os.listdir(dataDir2)):
        img = cv2.imread(dataDir2+"/"+path)
        labelData.append(img)
    np.save('trainingData.npy',np.asarray(trainingData))
    np.save('labelData.npy',np.asarray(labelData))

def processImg(imgData):
    res = []
    for img in imgData:
        img = img / 255 - 0.5
        res.append(img)
    return np.asarray(res)

def loadData(trainingPath,labelPath,batch_size):
    trainingData = np.load(trainingPath)
    labelData =  np.load(labelPath)
    while True:
        for i in range(0,len(trainingData),batch_size):
            trainingImg = processImg(trainingData[i:i+batch_size])
            labelImg = processImg(labelData[i:i+batch_size])
            yield(trainingImg,labelImg)
   

def DeMosaicModel():
    encoderInput = Input(shape = (256,256,3,))
#------Encoder------------
    x = Conv2D(16, (3, 3), activation='relu', padding='same', strides=1)(encoderInput)
    x = Conv2D(16, (4, 4), activation='relu', padding='same', strides=2)(x)
    x = Conv2D(32, (3, 3), activation='relu', padding='same', strides=1)(x)
    x = Conv2D(32, (4, 4), activation='relu', padding='same', strides=2)(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same', strides=1)(x)
    x = Conv2D(64, (4, 4), activation='relu', padding='same', strides=2)(x)
    x = Conv2D(128, (4, 4), activation='relu', padding='same', strides=2)(x)
    x = Conv2D(128, (4, 4), activation='relu', padding='same', strides=2)(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', strides=1)(x)
    x = Conv2D(256, (4, 4), activation='relu', padding='same', strides=2)(x)

#------Decoder------------
    x = Conv2DTranspose(256, (3,3), activation='relu', padding = 'same',strides = 2)(x)
    x = Conv2DTranspose(128, (3,3), activation='relu', padding = 'same',strides = 1)(x)
    x = Conv2DTranspose(128, (2,2), activation='relu', padding = 'same',strides = 2)(x)
    x = Conv2DTranspose(64, (2,2), activation='relu', padding = 'same',strides = 2)(x)
    x = Conv2DTranspose(64, (2,2), activation='relu', padding = 'same',strides = 2)(x)
    x = Conv2DTranspose(32, (1,1), activation='relu', padding = 'same',strides = 1)(x)
    x = Conv2DTranspose(32, (2,2), activation='relu', padding = 'same',strides = 2)(x)
    x = Conv2DTranspose(16, (1,1), activation='relu', padding = 'same',strides = 1)(x)
    x = Conv2DTranspose(16, (2,2), activation='relu', padding = 'same',strides = 2)(x)
    encoderOutput = Conv2DTranspose(3, (1,1), activation='tanh', padding = 'same',strides = 1)(x)

    model = Model(inputs = encoderInput,outputs = encoderOutput)
    model_optimizer = tf.keras.optimizers.Adam(0.001)
    model.compile(optimizer=model_optimizer, loss='mse', metrics=['accuracy'])

    return model

def train(modelPath = '/content/drive/My Drive/Colab Notebooks/model/Demosaic.hdf5',batch_size = 50):
    model = DeMosaicModel()
    if os.path.exists(modelPath):
        model.load_weights(modelPath)
        print("Check point loaded!")
    trainingPath = "/content/drive/My Drive/Colab Notebooks/trainingData.npy"
    labelPath = "/content/drive/My Drive/Colab Notebooks/labelData.npy"
    dataGen = loadData(trainingPath,labelPath,batch_size)
    r_epochs = 0
    num_epochs = 200
    epoch_length = 228
    bestLoss = np.Inf
    iter_num = 0
    losses = np.zeros((epoch_length, 2))
    for epoch_num in range(num_epochs):
        progbar = generic_utils.Progbar(epoch_length)
        print('Epoch {}/{}'.format(r_epochs + 1, num_epochs))
        r_epochs += 1
        while True:
            X,Y = next(dataGen)
            modelLoss = model.train_on_batch(X,Y)
            losses[iter_num, 0] = modelLoss[0]
            losses[iter_num, 1] = modelLoss[1]
            
            iter_num += 1
            progbar.update(iter_num, [('loss', np.mean(losses[:iter_num, 0])), ('acc', np.mean(losses[:iter_num, 1]))])
            if iter_num == epoch_length:
                if modelLoss[0] < bestLoss:
                    model.save_weights(modelPath)
                iter_num = 0
                break

def predict(modelPath = '/content/drive/My Drive/Colab Notebooks/model/Demosaic.hdf5',testDataPath = '/content/drive/My Drive/Colab Notebooks/test',OutputPath = '/content/drive/My Drive/Colab Notebooks/output'):
    model = DeMosaicModel()
    if os.path.exists(modelPath):
        model.load_weights(modelPath)
        print("Model loaded!")
        testData = []
        for path in tqdm(os.listdir(testDataPath)):
            testImg = testDataPath+"/"+path
            testImg = cv2.imread(testImg)
            testImg = testImg / 255 - 0.5
            testData.append(testImg)
        testData = np.asarray(testData)
        resultImg = model.predict([testData])
        i = 0 
        for img in resultImg:
            i+=1
            img = 255 * (img + 0.5)
            cv2.imwrite(OutputPath+"/"+str(i)+".jpg",img)
    else:
        return



In [None]:
train()

In [None]:
predict()