In [1]:
!pip3 install matplotlib



In [2]:
# settings cell
IMAGE_WIDTH=256
IMAGE_HEIGHT=256
TILE_SIDE_SIZE = (8,8)

TILES_VERTICAL = IMAGE_HEIGHT//TILE_SIDE_SIZE[1]
TILES_HORIZONTAL = IMAGE_WIDTH//TILE_SIDE_SIZE[0]

In [3]:
# Matrix class
class Matrix(list):
    # essentially still a list of lists
    def __init__(self, the_list):
        super().__init__(the_list)

    @property
    def nrow(self):
        return len(self)

    @property
    def ncol(self):
        return len(self[0])

    @property
    def dims(self):
        return self.nrow, self.ncol

    def __add__(self, M):
        assert(isinstance(M, Matrix))
        assert(self.dims == M.dims)
        return Matrix(
            [[x[0] + x[1] for x in zip(*rows)] for rows in zip(self, M)])

    def __sub__(self, M):
        assert(isinstance(M, Matrix))
        assert(self.dims == M.dims)
        return Matrix(
            [[x[0] - x[1] for x in zip(*rows)] for rows in zip(self, M)])


    def __iadd__(self, M):
        return self.__add__(M)
    
    def __isub__(self, M):
        return self.__sub__(M)

    """
    A helper function for matrix multiplication
    Parameters:
        u, v - respective column of Matrix 1 and row of Matrix 2
    """
    @staticmethod
    def dot_product(u, v):
        return sum([i*j for i,j in zip(u,v)])
    
    def MatMul(self, M, N):
        assert isinstance(M, (Matrix, int, float)) and isinstance(N, (Matrix, int, float))
        if isinstance(M, Matrix) and isinstance(N, Matrix):
            if M.ncol != N.nrow:
                raise TypeError("M and N should be either a compatible Matrix object, or a constant")
            res = Matrix([[0] * N.ncol for _ in range(M.nrow)])
            for i in range(M.nrow):
                for j in range(N.ncol):
                    res[i][j] = self.dot_product(M[i], [N[k][j] for k in range(N.nrow)])
        # multiplying matrix by a number is a thing
        else:
            # for myself, an intellectual who constantly forgets to use the operation in the correct order
            left, right = (M, N) if isinstance(M, Matrix) else (N, M)
            res = Matrix(
                [
                    [left[i][j] * right for j in range(left.ncol)] for i in range(left.nrow)  # type: ignore
                ]
            )

        return res

    def __mul__(self, M):
        return self.MatMul(self, M)

    def __rmul__(self, M):
        return self.MatMul(M, self)

    def add_rows(self, rows):
        super().extend(rows)

    def append(self, row):
        # yeet/extend data if dimensions are mismatched
        if len(row) < self.ncol:
            row.extend([0] * (self.ncol - len(row)))
            print("Warning: the appended list was smaller than the dimension of the Matrix")
        elif len(row) > self.ncol:
            [row.pop() for i in range(len(row) - self.ncol)]
            print("Warning: the appended list was bigger than the dimension of the Matrix")
        super().append(row)


    @property
    def transpose(self):
        return Matrix(map(list, zip(*self)))

    def show(self):
        print("Printing matrix: ")
        for i in self:
            print(i)

In [4]:
# Image import definition block
from PIL import Image
from os import path
from os import listdir
import numpy as np
import matplotlib.pyplot as plt
import math

images = []
C_MAX = 255

# convert color representation range: [0, 255] -> [-1, 1]
def encode_color_data(color: int):
    return color / C_MAX * 2 - 1

def decode_color_data(color: float):
    return abs(math.floor((color + 1) / 2 * 255))

def flatten_tile(tile: list):
    return [color for pixel in tile for color in pixel]

def split_to_tiles(image_data: list, image_shape, tile_size):
    tiles = []
    image_data = np.array(image_data).reshape(256,256,3)
    for j in range(TILES_VERTICAL):
        for k in range(TILES_HORIZONTAL):
            tiles.append(image_data[TILE_SIDE_SIZE[0]*j:TILE_SIDE_SIZE[0]*(j+1),TILE_SIDE_SIZE[0]*k:TILE_SIDE_SIZE[0]*(k+1),:].reshape(TILE_SIDE_SIZE[0]*TILE_SIDE_SIZE[0],3).tolist())
    return tiles

def load_image(filename, tile_size):
    image = Image.open(filename)
    image_data = image.getdata()
    print(image_data)
    return split_to_tiles(
        image_data, 
        image.size, 
        tile_size) 

def load_images(directory, tile_size):
    #for filename in listdir(directory):
        image_names = listdir("./imgs")
        for image_name in image_names[:10]:
            images.append(load_image(path.join(directory, image_name), tile_size))



In [5]:
# launch cell
load_images('./imgs', TILE_SIDE_SIZE)


<ImagingCore object at 0x7fc16814b950>
<ImagingCore object at 0x7fc1678f9af0>
<ImagingCore object at 0x7fc1678f9af0>
<ImagingCore object at 0x7fc1678f9af0>
<ImagingCore object at 0x7fc1678f9af0>
<ImagingCore object at 0x7fc191c80270>
<ImagingCore object at 0x7fc191c80270>
<ImagingCore object at 0x7fc191c80270>
<ImagingCore object at 0x7fc191c80270>
<ImagingCore object at 0x7fc191c80270>


In [6]:
# visualize the tiling
from mpl_toolkits.axes_grid1 import ImageGrid

def plot_image(image):
    fig = plt.figure(figsize=TILE_SIDE_SIZE)
    grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(TILES_HORIZONTAL, TILES_VERTICAL),  # creates NxN grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

    for ax, im in zip(grid, image):
        # Iterating over the grid returns the Axes.
        # images 
        im_2d = [ im[x:x+TILE_SIDE_SIZE[0]] for x in range(0, len(im), TILE_SIDE_SIZE[0]) ] 
        ax.imshow(im_2d)



In [7]:
# Neural Network logic class
from random import random
class NeuralCompressor():
    '''
    Linear recirculation network class.
    Compresses an image using 
    Parameters:
        tile_width, tile_height: dimensions of the compressed tile (referred to as n, m in the assignment)
        compression_rate: kinda self-explanatory. (referred to as Z in the assignment)
        
    '''
    def __init__(self, tile_width, tile_height, compression_rate, alpha = 5*10e-5):
        self.alpha = alpha
        self.tile_width = tile_width
        self.tile_height = tile_height
        # random weights in range [-1, 1)
        # encode_matrix = [
        #         [
        #             2*random() - 1
        #             # columns = amount of neurons in the hidden layer
        #             for _ in range(tile_width*tile_height*3//compression_rate)
        #             # 
        #         ] for _ in range(tile_width*tile_height*3)
        #     ]
        # self.encode_weights = Matrix(encode_matrix)
        # self.decode_weights = self.encode_weights.transpose

        self.encode_weights = np.loadtxt('encode_weights.txt')
        self.decode_weights = np.loadtxt('decode_weights.txt')
    
    def train(self, tile):
        tile = Matrix(tile)
        compressed = tile * self.encode_weights
        output = compressed * self.decode_weights
        error = output - tile 
        # make a step in weights in the direction of the error vector
        encoding_weights_diff = self.alpha * tile.transpose * error * self.decode_weights.transpose
        self.encode_weights -= encoding_weights_diff
        decoding_weights_diff = self.alpha * compressed.transpose * error
        self.decode_weights -=  decoding_weights_diff
        # returning a vector rather than a one-row matrix
        return error[0]
    
    def test(self, tile: list):
        result = self.decompress(self.compress(tile))
        return result
    
    '''
    Parameters:
        tile: a flattened vector of color data in the range [0,255] which will be used as an input to the neural network.
    '''
    def compress(self, tile: list):
        return Matrix([encode_color_data(color) for color in tile]) * self.encode_weights

        
    def decompress(self, encoded_tile: list):
        return [decode_color_data(color) for color in encoded_tile * self.decode_weights]


In [8]:
# Neural Network logic class
from random import random
import numpy as np
class NeuralCompressorNumpy():
    '''
    Linear recirculation network class. Adaptive learn step used because using higher-dimensioned tiles is feasible due to performance improvements.
    Parameters:
        tile_width, tile_height: dimensions of the compressed tile (referred to as n, m in the assignment)
        compression_rate: kinda self-explanatory. (referred to as Z in the assignment)
        alpha: the learning step
    '''
    def __init__(self, tile_width, tile_height, compression_rate, alpha = 3*10e-5):
        self.alpha = alpha
        encode_weights_size = (3*tile_height*tile_width,3*tile_height*tile_width//compression_rate)
        # self.encode_weights = np.random.uniform(low=-1, high=1, size=encode_weights_size)/10
        # self.decode_weights = self.encode_weights.T
        self.encode_weights = np.loadtxt('encode_weights.txt')
        self.decode_weights = np.loadtxt('decode_weights.txt')
    
    '''
    Arguments:
        tile: flattened color data array represented in the range [-1,1]
    '''
    def train(self, tile):
        tile = np.asarray(tile)
        compressed = np.asmatrix(tile@self.encode_weights)
        output = compressed@self.decode_weights
        error = output - tile
        decode_weights_diff = self.alpha*(compressed.T@error)
        self.decode_weights -= decode_weights_diff
        encode_weights_diff = self.alpha*(np.asmatrix(tile).T@error@self.decode_weights.T) 
        self.encode_weights -= encode_weights_diff
        return np.asarray(error).reshape(-1).tolist()
    '''
    Arguments: 
        tile: an array of color info represented in the range [0,255]
    '''
    def test(self, tiles: list):
        compressed = [self.compress(tile) for tile in tiles]
        result = [ 
            np.reshape(np.asarray(self.decompress(tile)), (TILE_SIDE_SIZE[0], TILE_SIDE_SIZE[1], 3)) for tile in compressed]
        img = np.zeros((256,256,3))
        for i in range(TILES_VERTICAL):
            for j in range(TILES_HORIZONTAL):
                img[i*TILE_SIDE_SIZE[1]:(i+1)*TILE_SIDE_SIZE[1], j*TILE_SIDE_SIZE[0]:(j+1)*TILE_SIDE_SIZE[0]] = result[i*TILES_VERTICAL+j]        
        return img
    
    '''
    Arguments:
        tile: an array of color info represented in the range [-1,1]
    '''
    def compress(self, tile: list):
        converted_tile = np.asarray([encode_color_data(color) for pixel in tile for color in pixel])
        return (converted_tile@self.encode_weights).tolist()

        
    def decompress(self, encoded_tile: list):
        return [decode_color_data(color) for color in np.asarray(encoded_tile)@self.decode_weights]

In [9]:
# Helper functions for the NN
def MSE(error):
    # 1/n cause it's MEAN square error.
    return sum([x*x for x in error])/len(error)


In [10]:
# NN settings cell
alpha = 1*10e-5
max_error = 1
Z = 6
learning_samples = 10240
variant = 'numpy'

if variant == 'manual':
    N = NeuralCompressor(TILE_SIDE_SIZE[0], TILE_SIDE_SIZE[1], Z, alpha = alpha)
elif variant == 'numpy':
    N = NeuralCompressorNumpy(TILE_SIDE_SIZE[0], TILE_SIDE_SIZE[1], Z, alpha = alpha)


In [11]:
from cmath import inf
from tqdm import tqdm
# flatten the array to be a 1d-array of colors converted to the required format for the "training" function
sample_data = [
                [
                    encode_color_data(color) for color in flatten_tile(tile)
                ] for image in images for tile in image 
            ][:learning_samples]

full_MSE = inf
errors = []
current_error = []
for tile in tqdm(sample_data):
    tile_error = MSE(N.train(tile))
    current_error.append(tile_error)
full_MSE = sum(current_error)
print(full_MSE)
np.savetxt('encode_weights.txt', N.encode_weights)
np.savetxt('decode_weights.txt', N.decode_weights)


100%|██████████| 10240/10240 [00:21<00:00, 470.67it/s]


66.33035083842462


In [12]:
pic = N.test(images[2])
Image.fromarray(pic.astype(np.uint8)).save("out.png")