# **Import libraries**

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import cv2
from tqdm.notebook import tqdm
import os

# **Data preprocessing**

In [None]:
dataPath = '../input/images1/pngs'
imageShape = (256, 256, 3)
imageSize = imageShape[0]

In [None]:
def showImage(imgMatrix):
    cv2_imshow(imgMatrix)

def getShape(data):
    return data.shape, data.reshape(-1).shape[0]

def getBlocksParams(blockHeight,  blockWidth,  imgSize):
    horizBlocksAmount =  imgSize // blockHeight
    vertBlocksAmount =  imgSize //  blockWidth
    return horizBlocksAmount, vertBlocksAmount,\
         horizBlocksAmount * vertBlocksAmount

def normalizeData(data):
    return data * (2 / np.max(data)) - 1

def checkImageShape(imgMatrix):
    if imgMatrix.shape != imageShape:
        print("Wrong shape - {}".format(imgMatrix.shape))

Dataset uploading

In [None]:
for adr, dirs,  fileNames in os.walk(dataPath):
    nFiles = len( fileNames)
    trainData = np.zeros(shape=(nFiles, *imageShape))
    for  fileIndex, fileName in enumerate( fileNames):
        img = cv2.imread(adr + '/' + fileName)
        checkImageShape(img)
        trainData[ fileIndex] = normalizeData(img)
    print("Dataset uploaded. All images have correct shape.")
        
print('Dataset shape - {}, total number of pixels - {}'.format(*getShape(trainData)))

In [None]:
blockHeight,  blockWidth = 16, 16
horizBlocksAmount, vertBlocksAmount, totalBlocksAmount = getBlocksParams(blockHeight,  blockWidth, imageSize)  
print('Total amount of blocks per image - {}'.format(totalBlocksAmount))

Convert dataset with images into dataset with blocks. Each block has height and width entered earlier.

In [None]:
imagesAmount = trainData.shape[0]  
dataWithBlocks = np.zeros((totalBlocksAmount * imagesAmount, blockHeight *  blockWidth * 3))

for  imageIndex in range(imagesAmount):
    pic = trainData[ imageIndex]
    for vertIndex in range(vertBlocksAmount):
        for horizIndex in range(horizBlocksAmount):
            dataWithBlocks[totalBlocksAmount *  imageIndex + horizBlocksAmount * vertIndex + horizIndex, :] = \
                             pic[ blockWidth * vertIndex :  blockWidth * (vertIndex + 1),\
                                  blockHeight * horizIndex : blockHeight * (horizIndex + 1), :]\
                                  .reshape(blockHeight *  blockWidth * 3)
            
print("Shape of the data separated by blocks - {}".format(dataWithBlocks.shape))

# **Neural network modeling**

In [None]:
class RecyclingNN():
    def __init__(self,  blockHeight : int,  blockWidth : int,\
                 compressionFactor : int = 3, alpha : float = 5*10e-5) -> None:
        self.alpha = alpha
        self.W1 = np.random.normal(0., pow(3 *  blockWidth *  blockHeight // compressionFactor, -0.5),\
                                   size=(3 *  blockWidth *  blockHeight, 3 *  blockWidth *  blockHeight // compressionFactor))
        self.W2 = self.W1.T       
        self.compression_factor = compressionFactor

    def forward(self, X : np.array, returnCompressed=False) -> np.array:
        splittedPic = []  # blocks array
        for vertBlockIndex in range(vertBlocksAmount):
                for horizBlockIndex in range(horizBlocksAmount):
                    splittedPic.append(X[ blockWidth * vertBlockIndex :  blockWidth * (vertBlockIndex + 1),\
                                          blockHeight * horizBlockIndex : blockHeight * (horizBlockIndex + 1), :]\
                                          .reshape(blockHeight *  blockWidth * 3) * 2 - 1)
        encodedPic = []  
        for block in splittedPic:
            encodedPic.append(block @ self.W1)
        X = np.array(encodedPic)
        if returnCompressed:
            compressionFactor = self.compression_factor
            compresFactorOverAxis = compressionFactor // 2
            compressImageShape = (imageShape[0] // compresFactorOverAxis, imageSize // compresFactorOverAxis, 3)
            compressedPic = np.zeros(compressImageShape)
            self.X = X
            compresBlockHeight = blockHeight // compresFactorOverAxis
            compresBlockWidth =  blockWidth // compresFactorOverAxis

            compressHorizBlocksAmount, compressVertBloksAmount, _ = getBlocksParams(
                compresBlockHeight,
                compresBlockWidth,
                imageSize // compresFactorOverAxis
            )
            self.compressHorizBlocksAmount = compressHorizBlocksAmount
            self.compressVertBloksAmount = compressVertBloksAmount
            self.compresBlockWidth = compresBlockWidth
            self.compresBlockHeight = compresBlockHeight
            self.xShape = X.shape

            for vertIndex in range(compressVertBloksAmount):
                for horizIndex in range(compressHorizBlocksAmount):
                    compressedPic[vertIndex * compresBlockWidth : (vertIndex + 1) * compresBlockWidth,\
                                   horizIndex * compresBlockHeight : (horizIndex + 1) * compresBlockHeight] = \
                                   X[vertIndex * compresBlockWidth + horizIndex].reshape(compresBlockHeight, compresBlockWidth, 3) / 2 + .5
        
        
        pic = np.zeros(imageShape)
        for vertBlockIndex in range(vertBlocksAmount):
            for horizBlockIndex in range(horizBlocksAmount):
                pic[vertBlockIndex *  blockWidth : (vertBlockIndex + 1) *  blockWidth,\
                    horizBlockIndex * blockHeight : (horizBlockIndex + 1) * blockHeight] = \
                    (X[vertBlockIndex *  blockWidth + horizBlockIndex] @ self.W2).reshape(blockHeight,  blockWidth, 3) / 2 + .5
        if returnCompressed:
            return compressedPic, pic
        else:
            return pic
        
    def forwardForCompres(self, compres):
        X = np.ones(self.xShape)
        for  vertIndex in range(self.compressVertBloksAmount):
            for horizIndex in range(self.compressHorizBlocksAmount):
                X[vertIndex * self.compresBlockWidth +  horizIndex] = \
                    compres[vertIndex * self.compresBlockWidth : (vertIndex + 1) * self.compresBlockWidth,\
                            horizIndex * self.compresBlockHeight : (horizIndex + 1) * self.compresBlockHeight].reshape((192,)) * 2 -.5 
        X = self.X
        pic = np.zeros(imageShape)
        for vertBlockIndex  in range(vertBlocksAmount):
            for horizBlockIndex in range(horizBlocksAmount):
                pic[vertBlockIndex  *  blockWidth : (vertBlockIndex  + 1) *  blockWidth,\
                    horizBlockIndex * blockHeight : (horizBlockIndex + 1) * blockHeight] = \
                    (X[vertBlockIndex  *  blockWidth + horizBlockIndex] @ self.W2).reshape(blockHeight,  blockWidth, 3) / 2 + .5
        return pic
        
    def saveWeights (self):
        np.savetxt('w1.csv', self.W1, delimiter=',')
        np.savetxt('w2.csv', self.W2, delimiter=',')
        
    def uploadWeights(self, path="../input/weights-for-rnn/"):
        self.W1 = np.loadtxt(open(path + 'w1.csv'), delimiter=",")
        self.W2 = np.loadtxt(open(path + 'w2.csv'), delimiter=",")
        
    def backprop(self, X):
        compressed = (X @ self.W1)[np.newaxis]
        output = np.dot(compressed, self.W2)
        error = (output - X)
        self.W2 -= self.alpha * np.dot(compressed.T, error)
        X = X[np.newaxis]
        self.W1 -= self.alpha * np.dot(np.dot(X.T, error), self.W2.T)
        return np.abs(error).sum()

    def __call__(self, X, returnCompressed=False):
        return self.forward(X, returnCompressed)

Here you can upload pretrained weghts for NN where compression factor is 3. 

In [None]:
nn = RecyclingNN(blockHeight,  blockWidth, 4, alpha=5e-5)

**Training process...**

In [None]:
errorValues = []
trainSize = int(input("Input train data size (<{})".format(dataWithBlocks.shape[0])))
assert trainSize < dataWithBlocks.shape[0], "Wrong train size"

In [None]:
for blockIndex in tqdm(range(trainSize)):
    errorValues.append(nn.backprop(dataWithBlocks[blockIndex]))

In [None]:
plt.plot(errorValues);

In [None]:
#network testing
matplotlib.rcParams['figure.figsize'] = [12, 30]
fig, axes = plt.subplots(10, 2)

for rowIndex in range(10):
    pic = trainData[np.random.randint(0, trainData.shape[0])]
    test = nn(pic)
    axes[rowIndex][0].imshow(pic)
    axes[rowIndex][1].imshow(test)
axes[0][0].set_title("Input")
axes[0][1].set_title("Output");

**Compressed pictures visualising**

In [None]:
matplotlib.rcParams['figure.figsize'] = [12, 12]

imageInd = np.random.randint(0, trainData.shape[0])
pic = trainData[imageInd]
compressed, test = nn(pic, True)

cv2.imwrite("{}_compr.png".format(imageInd), compressed)
uploadedCompressed = cv2.imread("{}_compr.png".format(imageInd))
test = nn.forwardForCompres(uploadedCompressed)
fig, axs = plt.subplots(1,3)
axs[0].imshow(pic)
axs[1].imshow(compressed)
axs[2].imshow(test)
axs[0].set_title("Input")
axs[1].set_title("Compressed image")
axs[2].set_title("Output from compressed");