# Задание №2: Фрактальное сжатие

ФИО: Егор Александрович Чистов
Группа: 204

**Баллы за задание складываются из двух частей: баллы за выполнение промежуточных подзаданий и баллы за качество**

**Максимальное количество баллов за выполнение промежуточных подзаданий — 15**

**Баллы за качество выставляются по итогам сравнения всех решений**

## Правила сдачи
* У каждого подзадания указано максимальное количество баллов, которые можно за него получить
* Для сдачи необходимо в Google Classroom загрузить Jupyter-ноутбук с выполненными подзаданиями
* В некоторых ячейках есть строки (`# GRADED CELL: [function name]`), эти строки **менять нельзя**, они будет использоваться при проверке вашего решения
* Интерфейс функций и классов помеченных таким образом должен остаться без изменений
* Ячейка со строкой (`# GRADED CELL: [function name]`) должна содержать только **одну функцию или класс**
 * Лайфхак: функции можно определять внутри функций
* Никакие другие ячейки не будут использованы при проверке, они должны быть самодостаточны
* Запрещено импортировать иные библиотеки и функции, кроме указанных в первой ячейке с кодом  
(если сильно захочется что-то еще импортировать, спросите в чате курса)

## Немного теории
Алгоритм описан в главе про [сжатие изображений](https://compression.ru/book/part2/part2__3.htm#_Toc448152512).

### Определения
**Ранговый блок**: если исходное изображение разбивается на непересекающиеся блоки одинакового размера, замощающие всё изображение, то каждый такой блок называется *ранговым*; имеют меньший размер, чем доменные блоки.

**Доменный блок**: если исходное изображение разбивается блоки одинакового размера, которые могут и пересекаться, то каждый такой блок называется *доменным*; имеют больший размер, чем ранговые блоки.

**Идея алгоритма**:

При сжатии:
1. для каждого рангового блока найти наиболее похожий на него доменный блок (с учётом поворотов и симметрии)
2. выполнить преобразование яркости
3. в качестве сжатого изображения выступают коэффициенты преобразования ранговых блоков, эффективно записанные в файл (строку)

При декомпрессии:
1. Прочитать файл (строку), извлечь коэффициенты преобразований
2. Применить преобразования к исходному изображению (обычно просто серое) пока результат не стабилизируется

In [None]:
# Standard Python Library

import os
import itertools

from collections import namedtuple

# Additional Modules

import matplotlib.pyplot as plt
import numpy as np

from skimage import io
from skimage import data, img_as_float64
from skimage.metrics import mean_squared_error as mse, peak_signal_noise_ratio as psnr
from skimage.transform import resize
from skimage.color import rgb2gray, rgb2yuv, yuv2rgb

from tqdm import tqdm

Первым делом нужно загрузить картинку

In [None]:
lenna_rgb_512x512 = io.imread('test_files/lenna.bmp')
lenna_rgb_256x256 = resize(lenna_rgb_512x512, (256, 256))
lenna_gray_256x256 = np.rint(rgb2gray(lenna_rgb_256x256) * 255).astype('uint8')

`plt` — модуль для рисования графиков и всего остального

Очень удобная штука, будем пользоваться ей довольно часто

In [None]:
plt.imshow(lenna_gray_256x256, cmap='gray')

## Общие функции
В следующих клетках описаны функции и классы, которые будут использоваться **вами** при выполнении следующих подзаданий. Стоит с ними подробно ознакомиться, понять, что они делают, и поэкспериментировать.

**Не следует их менять.**

In [None]:
BlockTransform = namedtuple('BlockTransform', ['x', 'y', 'co', 'di', 'tr', 'bad'])
FractalCompressionParams = namedtuple(
    'FractalCompressionParams', [
        'height',
        'width',
        'is_colored',
        'block_size',
        'spatial_scale',
        'intensity_scale',
        'stride'
    ]
)

In [None]:
def derive_num_bits(length, stride):
    return np.ceil(np.log2(length / stride)).astype(int)

In [None]:
def is_colored(image):
    if len(image.shape) == 2:
        return False
    elif len(image.shape) == 3 and image.shape[-1] == 3:
        return True
    else:
        message = 'Invalid shape of the image: `{}`'
        raise ValueError(message.format(image.shape))

### [4 балла] Функция для нахождения наилучшего преобразования рангового блока

#### Описание

на входе функции подаются:
* исходное Ч/Б изображение (`image`)
* уменьшенное изображение (`resized_image`)
* координаты рангового блока (`x`, `y`)
* размер блока (`block_size`)
* шаг, через сколько пикселей перескакивать при переборе (`stride`)

на выходе функция должна выдавать:
* лучшее преобразование в смысле MSE, объект типа `BlockTransform`

In [None]:
# GRADED CELL: find_block_transform

BlockTransform = namedtuple('BlockTransform', ['x', 'y', 'co', 'di', 'tr', 'bad'])

def find_block_transform(image, resized_image, x, y, block_size, stride):
    '''Find best transformation for given rank block.

    Parameters
    ----------
    image : np.array
        Source B/W image.

    resized_image : np.array
        Resized source image.

    x, y : int, int
        Coordinates of the rank block.
    
    block_size : int
        Size of rank block.

    stride : int
        Vertical and horizontal stride for domain block search.

    Returns
    -------
    best_transform : BlockTransform
        Best transformation.
    '''
    
    def contrast_brightness(domain_block, rank_block):
        '''Find constast and brightness to minimize: `mse(contrast * domain_block + brightness, rank_block)`.

        Parameters
        ----------
        domain_block, rank_block : np.array, np.array
            Blocks of same size.

        Returns
        -------
        contrast : float
            Number in range [-1, -0,05] ∪ [0,05, 1]
        brightness : np.int8
            Number in range [-128, 127] 
        '''

        # We can rewrite equation as `Ap = rank_block.flatten`, where `A = [[domain_block.flatten 1]]` and `p = [[constrast], [brightness]]`.
        dbflat = domain_block.flatten()

        #[ [domain_block[0][0] 1]
        #  [domain_block[0][1] 1]
        #  ...
        #  [domain_block[n][n] 1] ]
        A = np.vstack([dbflat, np.ones(len(dbflat))]).T

        contrast, brightness = np.linalg.lstsq(A, rank_block.flatten(), rcond=None)[0]

        if contrast < -1:
            contrast = -1
        elif -0.05 < contrast <= 0:
            contrast = -0.05
        elif 0 < contrast < 0.05:
            contrast = 0.05
        elif contrast > 1:
            contrast = 1

        if brightness < -128:
            brightness = 128
        elif brightness > 127:
            brightness = 127
        else:
            brightness = np.rint(brightness).astype(np.int8)

        return contrast, brightness

    def find_flip_rotate(domain_block, rank_block, block_size):
        '''Find flip and rotate through split blocks to 4 quadrants and compare they.

        Parameters
        ----------
        domain_block, rank_block : np.array, np.array
            Compared blocks

        block_size : int
            Size of rank block.

        Returns
        -------
        bad: bool
            If blocks are not similar
        flip: 0, 1
            Number of flips.
        rotate: 0, 1, 2, 3
            Number of rotates.
        '''

        # Axes and quadrants
        # 0 -------------> y
        # | [[II,   I],
        # |  [III, IV]]
        # ↓
        # x

        I = np.s_[0 : block_size // 2, block_size // 2 : block_size]
        II = np.s_[0 : block_size // 2, 0 : block_size // 2]
        III = np.s_[block_size // 2 : block_size, 0 : block_size // 2]
        IV = np.s_[block_size // 2 : block_size, block_size // 2 : block_size]
        quadrants = [I, II, III, IV]

        db_sums = {num + 1: domain_block[quadrant].sum() for num, quadrant in enumerate(quadrants)}
        dbs = sorted(db_sums, key=db_sums.get)
        rb_sums = {num + 1: rank_block[quadrant].sum() for num, quadrant in enumerate(quadrants)}
        rbs = sorted(rb_sums, key=rb_sums.get)

        match_1 = dbs[rbs.index(1)]  # Domain's block qudrant that corresponds to rank block first quadrant
        match_2 = dbs[rbs.index(2)]  # Domain's block qudrant that corresponds to rank block second quadrant

        _ = 0
        nothing = (True, _, _)

        presets = ((      nothing, (False, 0, 0),       nothing, (False, 1, 1)),
                   ((False, 1, 0),       nothing, (False, 0, 1),       nothing),
                   (      nothing, (False, 1, 3),       nothing, (False, 0, 2)),
                   ((False, 0, 3),       nothing, (False, 1, 2),       nothing))

        bad, flip, rotate = presets[match_1 - 1][match_2 - 1]

        return bad, flip, rotate

    def domain_blocks(resized_image, block_size, stride, rank_block):
        '''Yield domain_block from resized_image.

        Parameters
        ----------
        resized_image : np.array
            Resized source image.

        block_size : int
            Size of rank block.

        stride : int
            Vertical and horizontal stride for domain block search.

        rank_block : np.array

        Returns
        -------
        x, y : int
            Coordinates of domain_block
        transform_code : int (3 bits)
            Number of rotates and flips
        transformed_block : np.array
            Transformed domain_block
        '''

        for x in range(0, resized_image.shape[0] - block_size + 1, stride):
            for y in range(0, resized_image.shape[1] - block_size + 1, stride):
                domain_block = resized_image[x : x + block_size, y : y + block_size]
                bad, flip, rotate = find_flip_rotate(domain_block, rank_block, block_size)
                if bad:
                    continue

                transformed_block = domain_block
                if (flip):
                    transformed_block = np.flip(transformed_block, axis=1)
                transformed_block = np.rot90(transformed_block, k=rotate)

                yield x, y, (rotate << 1) + flip, transformed_block

    best_transform = BlockTransform(0, 0, 0.75, 0, 0, True)
    best_transform_err = float('inf')

    rank_block = image[x : x + block_size,
                       y : y + block_size]

    for domain_x, domain_y, tr, domain_block in domain_blocks(resized_image, block_size, stride, rank_block):
        contrast, brightness = contrast_brightness(domain_block, rank_block)

        err = mse(contrast * domain_block + brightness, rank_block)
        if err < best_transform_err:
            best_transform = BlockTransform(domain_x, domain_y, contrast, brightness, tr, err >= 500)
            best_transform_err = err

    return best_transform

### [4 балла] Применение IFS к изображению

#### Описание

на входе функции подаются:
* исходное изображение (`image`)
* уменьшенное изображение (`resized_image`)
* IFS, массив объектов типа `BlockTransform` (`transforms`)
* размер блока (`block_size`)

на выходе функция должна выдавать:
* картинку после одинарного применения IFS

In [None]:
# GRADED CELL: perform_transform

def perform_transform(image, resized_image, transforms, block_size):
    '''Perform IFS on given image.
    
    Parameters
    ----------
    image : np.array
        Source image.

    resized_image : np.array
        Resized source image.

    transforms : list of BlockTransform's
        Given IFS, Iterated Function System
    
    block_size : int
        Size of rank block.

    Returns
    -------
    transformed_image : np.array
        Transformed image.
    '''

    def transforms_(image, block_size, transforms):
        '''Yield transforms for image.

        Parameters
        ----------
        image : np.array
            Resized source image.
            
        block_size : int
            Size of rank block.

        Returns
        -------
        x, y : int
            Coordinates of rank_block for transform

        qtransform : list of BlockTransform's
            List of all transforms for rank_block
        '''

        t_idx = 0

        def get_transforms(block_size, transforms):
            '''Recursively find transforms for current rank block.

            Parameters
            ----------
            block_size : int
                Size of rank block.
            
            transforms : list of BlockTransform's
                Given IFS, Iterated Function System

            Returns
            -------
            qtransform : list of BlockTransform's
                List of all transforms for rank_block
            '''

            nonlocal t_idx
            qtransforms = [transforms[t_idx]]
            t_idx += 1

            if qtransforms[0].bad and block_size >= 4:
                for i in range(4):
                    qtransforms.extend(get_transforms(block_size // 2, transforms))

            return qtransforms

        xs = range(0, image.shape[0] - block_size + 1, block_size)
        ys = range(0, image.shape[1] - block_size + 1, block_size)

        for x in xs:
            for y in ys:
                yield x, y, get_transforms(block_size, transforms)

    def apply_transforms(transformed_image, x, y, block_size, qtransforms):
        '''Recursively apply transforms for current rank block.

            Parameters
            ----------
            transformed_image : np.array
                Image to apply transforms

            x, y : int, int
                Coordinates of rank_block

            block_size : int
                Size of rank block.

            qtransform : list of BlockTransform's
                List of all transforms for rank_block
            '''

        transform = qtransforms.pop(0)
        if transform.bad:
            apply_transforms(transformed_image, x, y, block_size // 2, qtransforms)
            apply_transforms(transformed_image, x + block_size // 2, y, block_size // 2, qtransforms)
            apply_transforms(transformed_image, x, y + block_size // 2, block_size // 2, qtransforms)
            apply_transforms(transformed_image, x + block_size // 2, y + block_size // 2, block_size // 2, qtransforms)
        else:
            domain_block = resized_image[transform.x : transform.x + block_size,
                                         transform.y : transform.y + block_size]

            rot_times = transform.tr >> 1
            flip = transform.tr & 1
            if flip:
                domain_block = np.flip(domain_block, axis=1)
            domain_block = np.rot90(domain_block, k=rot_times)

            transformed_image[x : x + block_size,
                              y : y + block_size] = transform.co * domain_block + transform.di

    transformed_image = np.zeros(image.shape)

    for x, y, qtransforms in transforms_(image, block_size, transforms):
        apply_transforms(transformed_image, x, y, block_size, qtransforms)

    return transformed_image

### [7 баллов] Класс, реализующий интерфейс битового массива
Он понадобится для преобразования найденной IFS в строку, чтобы записать сжатый файл на диск.

In [None]:
# GRADED CELL: BitBuffer

class BitBuffer:
    '''Class that provides storing and and reading integer numbers 
    in continuous bytearray.

    Parameters
    ----------
    buffer : bytearray, optional (default=None)
        Input bytearray, for initialization.

    Attributes
    ----------
    _pushed_bits : int
        Count of pushed into last byte bits
    _left_bits : int
        Count of bits that can be popped from first byte
    _bufcap : int
        Max bits in byte
    _buffer : bytearray
        Bytearray that can contain any information.

    Examples
    --------
    >>> buffer = BitBuffer()
    >>> buffer.push(1, 1)
    >>> x = buffer.pop(1)
    >>> print(x)
    1
    >>> buffer.push(125, 18)
    >>> x = buffer.pop(18)
    >>> print(x)
    125
    >>> buffer.push(5, 3)
    >>> x = buffer.pop(3)
    >>> print(x)
    5

    >>> dy = transform.y // stride
    >>> buffer.push(dy, self._num_bits_ver)
    '''

    def __init__(self, buffer=None):
        self._pushed_bits = 0
        self._left_bits = 8
        self._bufcap = 8
        self._buffer = buffer or bytearray(1)

    def to_bytearray(self):
        '''Convert to bytearray.
    
        Returns
        -------
        buffer: bytearray
            Bytearray that contains all data.
        '''

        return self._buffer

    def _push_bit(self, bit):
        '''Push given bit to buffer.

        Parameters
        ----------
        bit: int
            Input bit.
        '''

        if self._pushed_bits == self._bufcap:
            self._buffer.append(0)
            self._pushed_bits = 0
        self._buffer[-1] |= bit << (self._bufcap - 1 - self._pushed_bits)
        self._pushed_bits += 1

    def _pop_bit(self):
        '''Pop one bit from buffer.

        Returns
        -------
        bit: int
            Popped bit.
        '''

        if not self._left_bits:
            self._buffer.pop(0)
            self._left_bits = self._bufcap
        bit = (self._buffer[0] & 1 << (self._left_bits - 1)) >> (self._left_bits - 1)
        self._left_bits -= 1

        return bit

    def push(self, x, n_bits):
        '''Push given integer to buffer.
    
        Parameters
        ----------
        x: int
            Input number.

        n_bits: int
            Number of bits for store input number,
            should be greater than log2(x).
        '''

        assert x < 2 ** n_bits

        bits_left = n_bits

        while bits_left:
            bit = (x & (1 << (bits_left - 1))) >> (bits_left - 1)
            self._push_bit(bit)
            bits_left -= 1

    def pop(self, n_bits):
        '''Pop n_bits from buffer and transform it to a number.
    
        Parameters
        ----------
        n_bits: int
            Number of bits for pop from buffer.

        Returns
        -------
        x: int
            Extracted number.
        '''

        bits_left = n_bits
        x = 0

        while bits_left:
            x |= self._pop_bit() << (bits_left - 1)
            bits_left -= 1

        return x

### [Баллы за качество] Класс, реализующий интерфейс архиватора изображений

#### Условие
* Класс будет тестироваться как на черно-белых, так и на **цветных** изображениях
* Для цветных изображений необходимо переходить в YUV, сжимать, а потом обратно в RGB для финального результата
* В качестве оценки алгоритма будет использоваться кривая размер-качество, построенная на основе запуска метода compress2, с параметрами качества [0, 20, 40, 60, 80, 100]
* Следует обеспечить непрерывную монотонную зависимость реального качества декодированного изображения от параметра качества
* Баллы будут выставляться исходя из того, насколько построенный график размер-качество лежит близко к верхнему левому углу (высокое качество и низкий размер)
* За красивые графики с равномерно распределенными узлами [0 ... 100] и без точек перегиба выставляются дополнительные баллы
* Ограничение времени работы (суммарно сжатие и разжатие) на всех уровнях качества: 8 минут

**Интерфейсом данного класса считаются только методы compress2 и decompress, остальные можно менять как угодно**`

In [None]:
# GRADED CELL: FractalCompressor
 
class FractalCompressor:
    '''Class that performs fractal compression/decompression of images.
 
    Attributes
    ----------
    _num_bits_ver : int
        Number of bits for store VERTICAL OFFSET for each transformation.
    
    _num_bits_hor : int
        Number of bits for store HORIZONTAL OFFSET for each transformation.
 
    _num_bits_con : int
        Number of bits for store INTENSITY SCALE for each transformation.
 
    _num_bits_pix : int
        Number of bits for store INTENSITY OFFSET for each transformation.
        
    _num_bits_tfm : int
        Number of bits for store TRANFORMATION INDEX for each transformation.
 
    _num_bits_bad : int
        Number of bits for store flag of split into 4 block for each transformation.
 
    Examples
    --------
    >>> comp = FractalCompressor()
    >>> compressed_image = comp.compress(image, block_size=8, stride=2)
    >>> decompressed_image = comp.decompress(compressed_image, num_iters=9)
    >>> yet_another_compressed_image = comp.compress(image, 8, 4, 0.5, 0.7)
    >>> yet_another_decompressed_image = comp.compress(yet_another_compressed_image, 5)
    '''
 
    def __init__(self):
        self._num_bits_ver = 8
        self._num_bits_hor = 8
        self._num_bits_con = 8
        self._num_bits_pix = 8
        self._num_bits_tfm = 3
        self._num_bits_bad = 1
 
    def _float2int(self, f):
        '''Convert float value from range [-1.0, 1.0] to uint8 value in range [0, 255]
        
        Parameters
        ----------
        f : float
            Number to convert
        
        Returns
        -------
        u : np.uint8
            Converted number
        '''
 
        assert -1 <= f <= 1, f"{f} must be in [-1, 1]"
 
        return int((f + 1) * 127)
 
    def _int2float(self, u):
        '''Convert uint8 value from range [0, 255] to float value in range [-1, 1]
        
        Parameters
        ----------
        u : np.uint8
            Number to convert
        
        Returns
        -------
        f : float
            Converted number
        '''
 
        return u / 127 - 1
 
    def _int2uint(self, i):
        '''Convert int8 value from range [-128, 127] to uint8 value in range [0, 255]
        
        Parameters
        ----------
        i : np.int8
            Number to convert
        
        Returns
        -------
        u : np.uint8
            Converted number
        '''
 
        return i + 128
    
    def _uint2int(self, u):
        '''Convert int8 value from range [0, 255] to uint8 value in range [-128, 127]
        
        Parameters
        ----------
        u : np.uint8
            Number to convert
        
        Returns
        -------
        i : np.int8
            Converted number
        '''
 
        return u - 128
 
    def _add_header(self, buffer, params):
        '''Store header in buffer.
    
        Parameters
        ----------
        buffer: BitBuffer
            
        params: FractalCompressionParams
            Parameters that should be stored in buffer.
 
        Note
        ----
        This method must be consistent with `_read_header`.
        '''
 
        buffer.push(params.height, 9)
        buffer.push(params.width, 9)
        buffer.push(params.is_colored, 1)
        buffer.push(params.block_size, 8)
        buffer.push(self._float2int(params.spatial_scale), 8)
        buffer.push(self._float2int(params.intensity_scale), 8)
        buffer.push(params.stride, 8)
 
    def _read_header(self, buffer):
        '''Read header from buffer.
    
        Parameters
        ----------
        buffer: BitiBuffer
 
        Returns
        -------
        params: FractalCompressionParams
            Extracted parameters.
            
        Note
        ----
        This method must be consistent with `_add_header`.
        '''
 
        params = FractalCompressionParams(
            height = buffer.pop(9),
            width = buffer.pop(9),
            is_colored = buffer.pop(1),
            block_size = buffer.pop(8),
            spatial_scale = self._int2float(buffer.pop(8)),
            intensity_scale = self._int2float(buffer.pop(8)),
            stride = buffer.pop(8)
        )
 
        return params
 
    def _add_to_buffer(self, buffer, transform, stride):
        '''Store block transformation in buffer.
    
        Parameters
        ----------
        buffer: BitBuffer
 
        transform: BlockTransform
            
        stride: int
            Vertical and horizontal stride for domain block search.
 
        Note
        ----
        This method must be consistent with `_read_transform`.
        '''
 
        buffer.push(transform.bad, self._num_bits_bad)
        if not transform.bad:
            buffer.push(transform.x // stride, derive_num_bits(2 ** self._num_bits_ver, stride))
            buffer.push(transform.y // stride, derive_num_bits(2 ** self._num_bits_hor, stride))
            buffer.push(self._float2int(transform.co), self._num_bits_con)
            buffer.push(self._int2uint(transform.di), self._num_bits_pix)
            buffer.push(transform.tr, self._num_bits_tfm)
 
    def _read_transform(self, buffer, stride):
        '''Read block transformation from buffer.
    
        Parameters
        ----------
        buffer: BitBuffer
 
            
        stride: int
            Vertical and horizontal stride for domain block search.
            
        Returns
        -------
        transform: BlockTransform
            Extracted block transformation.
 
        Note
        ----
        This method must be consistent with `_add_to_buffer`.
        '''
 
        bad = bool(buffer.pop(self._num_bits_bad))
 
        if bad:
            transform = BlockTransform(x=0, y=0, co=0.0, di=0, tr=0, bad=True)
        else:
            transform = BlockTransform(
                x = buffer.pop(derive_num_bits(2 ** self._num_bits_ver, stride)) * stride,
                y = buffer.pop(derive_num_bits(2 ** self._num_bits_hor, stride)) * stride,
                co = self._int2float(buffer.pop(self._num_bits_con)),
                di = self._uint2int(buffer.pop(self._num_bits_pix)),
                tr = buffer.pop(self._num_bits_tfm),
                bad = bad
            )
 
        return transform
    
    def _ifs2buf(self, params, transformations):
        '''Store compression parameters and IFS in buffer.
    
        Parameters
        ----------
        params: FractalCompressionParams
            Parameters of the compression.
 
        transformations: list of BlockTransform's
            Given IFS.
 
        Returns
        -------
        buffer: BitBuffer
 
        Note
        ----
        This method must be consistent with `_buf2ifs`.
        '''
        
        buffer = BitBuffer()
        self._add_header(buffer, params)
        for t in transformations:
            self._add_to_buffer(buffer, t, params.stride)
 
        return buffer
    
    def _buf2ifs(self, buffer):
        '''Store compression parameters and IFS in buffer.
    
        Parameters
        ----------
        buffer: BitBuffer
 
        Returns
        -------
        params: FractalCompressionParams
            Extracted compression parameters.
 
        transforms, transforms_{u,y,v}: list of BlockTransform's
            Extracted IFS.
 
        Note
        ----
        This method must be consistent with `_ifs2buf`.
        '''
 
        def read_transform_for_block(buffer, block_size, stride):
            '''Recursively get transforms for current rank block.
 
            Parameters
            ----------
            buffer : BitBuffer
 
            block_size : int
                Size of rank block.
            
            stride : int
                Vertical and horizontal stride for domain block search.
 
            Returns
            -------
            qtransform : list of BlockTransform's
                List of all transforms for rank_block
            '''
 
            qtransforms = [self._read_transform(buffer, stride)]
            if qtransforms[0].bad and block_size >= 4:
                for i in range(4):
                    qtransforms.extend(read_transform_for_block(buffer, block_size // 2, stride))
            return qtransforms
 
        def read_transforms(buffer, num_transforms, block_size, stride):
            '''Read transforms.
 
            Parameters
            ----------
            buffer : BitBuffer
 
            num_transforms : int
                Number of transforms to read
 
            block_size : int
                Size of rank block.
            
            stride : int
                Vertical and horizontal stride for domain block search.
 
            Returns
            -------
            transforms : list of BlockTransform's
                List of all transforms
            '''
 
            transforms = []
            for _ in range(num_transforms):
                transforms.extend(read_transform_for_block(buffer, block_size, stride))
 
            return transforms
 
        params = self._read_header(buffer)
 
        num_transforms = int(params.height * params.width / params.block_size ** 2)
 
        if params.is_colored:
            transforms_u = read_transforms(buffer, num_transforms, params.block_size, params.stride)
            transforms_y = read_transforms(buffer, num_transforms, params.block_size, params.stride)
            transforms_v = read_transforms(buffer, num_transforms, params.block_size, params.stride)
 
            return params, transforms_u, transforms_y, transforms_v
        else:
            transforms = read_transforms(buffer, num_transforms, params.block_size, params.stride)
 
            return params, transforms, None, None
 
    def _compress_one_component(self, image, block_size, stride, block_size_limit):
        '''Compress one color component of input image
        
        Parameters
        ----------
        image : np.array
            Source image.
 
        block_size: int, optional (default=8)
            Size of rank block.
 
        stride: int, optional (default=1)
            Vertical and horizontal stride for domain block search.
 
        block_size_limit : int
            Min block_size to use
 
        Returns
        -------
        transformations : list of BlockTransform's
            Transformations for color component.
        '''
 
        def compress_block(image, resized_image, x, y, block_size, stride, block_size_limit):
            '''Recursively find transforms for rank block.
        
            Parameters
            ----------
            image : np.array
                Source image.
 
            resized_image : np.array
                Resized source image.
 
            x, y : int, int
                Coordinates of the rank block.
 
            block_size : int
                Size of rank block.
 
            stride : int
                Vertical and horizontal stride for domain block search.
            
            block_size_limit : int
                Min block_size to use
 
            Returns
            -------
            transforms : list of BlockTransform's
                Transformations for rank block.
            '''
 
            transforms = [find_block_transform(
                image, resized_image,
                x, y, block_size, stride
            )]
            if transforms[0].bad:
                if block_size > block_size_limit:
                    for x_, y_ in ((x, y), (x + block_size // 2, y), (x, y + block_size // 2), (x + block_size // 2, y + block_size // 2)):
                        transforms.extend(compress_block(
                            image, resized_image,
                            x_, y_, block_size // 2, stride, block_size_limit
                        ))
                else:
                    transforms[0] = transforms[0]._replace(bad=False)
 
            return transforms
 
        image = image.astype('float')
 
        # Instead of reducing each domain block we will reduce the entire image
        resized_image = resize(image, (image.shape[0] // 2, image.shape[1] // 2))
 
        # Splitting source image into rank blocks
        xs = range(0, image.shape[0] - block_size + 1, block_size)
        ys = range(0, image.shape[1] - block_size + 1, block_size)
 
        transformations = []
        for x, y in tqdm(itertools.product(xs, ys), total=len(xs) * len(ys)):
            transforms = compress_block(
                image, resized_image,
                x, y, block_size, stride, block_size_limit
            )
            transformations.extend(transforms)
 
        return transformations
 
    def compress(self, image, block_size=8, stride=4,
                 spatial_scale=0.5, intensity_scale=0.75, block_size_limit=8):
        '''Compress input image
        
        Parameters
        ----------
        image : np.array
            Source image.
 
        block_size: int, optional (default=8)
            Size of rank block.
 
        stride: int, optional (default=1)
            Vertical and horizontal stride for domain block search.
        
        spatial_scale : float, optional (default=0.5)
            ({rank block size} / {domain block size}) ratio, must be <1.
        
        intensity_scale : float, optional (default=0.75)
            Reduce coefficient for image intensity.
 
        block_size_limit : int, optional (default=8)
            Min block_size to use
 
        Returns
        -------
        byte_array: bytearray
            Compressed image.
            
        Note
        ----
        This method must be consistent with `decompress`.
        '''
 
        assert stride <= block_size, "stride must be less than or equal to block_size"
 
        if (is_colored(image)):
            image = rgb2yuv(image)
            y = np.rint(image[:,:,0] * 255).astype(np.uint8)
            u = np.rint(image[:,:,1] * 127 + 127).astype(np.uint8)
            v = np.rint(image[:,:,2] * 127 + 127).astype(np.uint8)
            transformations = []
            transformations.extend(self._compress_one_component(y, block_size, stride, block_size_limit))
            transformations.extend(self._compress_one_component(u, block_size, stride, block_size_limit))
            transformations.extend(self._compress_one_component(v, block_size, stride, block_size_limit))
        else:
            transformations = self._compress_one_component(image, block_size, stride, block_size_limit)
 
        params = FractalCompressionParams(
            height = image.shape[0],
            width = image.shape[1],
            is_colored = is_colored(image),
            block_size = block_size,
            spatial_scale = spatial_scale,
            intensity_scale = intensity_scale,
            stride = stride
        )
 
        buffer = self._ifs2buf(params, transformations)
        return buffer.to_bytearray()
 
    def compress2(self, image, quality=40):
        '''Compress input image
        
        Parameters
        ----------
        image : np.array
            Source image.
 
        quality: int, optional (default=50)
            Quality of image compression
 
        Returns
        -------
        byte_array: bytearray
            Compressed image.
            
        Note
        ----
        This method must be consistent with `decompress`.
        '''
 
        presets = {
            0: {
                "block_size": 32,
                "stride": 1,
                "block_size_limit": 32
            },
            20: {
                "block_size": 16,
                "stride": 2,
                "block_size_limit": 16
            },
            40: {
                "block_size": 16,
                "stride": 3,
                "block_size_limit": 8
            },
            60: {
                "block_size": 16,
                "stride": 2,
                "block_size_limit": 4
            },
            80: {
                "block_size": 8,
                "stride": 4,
                "block_size_limit": 4
            },
            100: {
                "block_size": 8,
                "stride": 3,
                "block_size_limit": 4
            },
        }
        
        
        try:
            preset = presets[quality]
        except KeyError:
            raise ValueError(f'quality must be in {tuple(presets.keys())}')
 
        return self.compress(image, **preset)
 
    def decompress(self, byte_array, num_iters=16):
        '''Compress input image
        
        Parameters
        ----------
        byte_array: bytearray
            Compressed image.
 
        num_iters: int, optional (default=10)
            Number of iterations to perform IFS.
 
        Returns
        -------
        image: np.array
            Decompressed image.
            
        Note
        ----
        This method must be consistent with `compress`.
        '''
 
        def decompress_one_component(params, transforms, num_iters):
            '''Recursively apply transforms for rank block.
 
                Parameters
                ----------
                params: FractalCompressionParams
                    Extracted compression parameters.
 
                transforms : list of BlockTransform's
                    Given IFS, Iterated Function System
 
                num_iters: int
                    Number of iterations to perform IFS.
 
                Returns
                -------
                image: np.array
                    Transformed image.
 
                '''
 
            image = np.zeros((params.height, params.width))
 
            for _ in range(num_iters):
                # Instead of reducing each domain block we will reduce the entire image
                resized_image = resize(image, (image.shape[0] // 2, image.shape[1] // 2))
                image = perform_transform(image, resized_image, transforms, params.block_size)
 
            return image
 
        buffer = BitBuffer(buffer=byte_array.copy())
        params, transforms_u, transforms_y, transforms_v = self._buf2ifs(buffer)
 
        if params.is_colored:
            y = decompress_one_component(params, transforms_u, num_iters)
            u = decompress_one_component(params, transforms_y, num_iters)
            v = decompress_one_component(params, transforms_v, num_iters)
 
            yuv = np.zeros((params.width, params.height, 3), dtype=np.float)
            for i in range(params.width):
                for j in range(params.height):
                    yuv[i][j][0] = y[i][j] / 255
                    yuv[i][j][1] = (u[i][j] - 127) / 127
                    yuv[i][j][2] = (v[i][j] - 127) / 127
 
            rgb = yuv2rgb(yuv)
            rgb[rgb < 0] = 0
 
            return np.rint(255 * rgb).astype(np.uint8)
        else:
            transforms = transforms_u
            gray = decompress_one_component(params, transforms, num_iters)
            gray[gray < 0] = 0
            return gray.astype(np.uint8)

## Пробуем применить FractalCompressor

In [None]:
comp = FractalCompressor()

In [None]:
result_16x4 = comp.compress(lenna_gray_256x256, block_size=16, stride=4, block_size_limit=16)

Размер сжатого изображения в байтах == длина полученного массива `bytearray`

In [None]:
len(result_16x4)

### Эволюция изображения при декомпрессии
Выглядит как увеличение фотографии в CSI: Место прреступления

In [None]:
n_iters = [1, 2, 4, 8, 16]

imgs = [comp.decompress(result_16x4, n) for n in n_iters]
_, axs = plt.subplots(ncols=len(imgs) + 1, figsize=(18, 6))
for index in range(len(imgs)):
    axs[index].imshow(imgs[index], cmap='gray')
    axs[index].set_title(f'its: {n_iters[index]}, psnr: {round(psnr(imgs[index], lenna_gray_256x256), 2)}')
axs[-1].imshow(lenna_gray_256x256, cmap='gray')
axs[-1].set_title('orig')

plt.show()

### Цветное изображение

In [None]:
result_rgb_16x8 = comp.compress(lenna_rgb_256x256, block_size=16, stride=8, block_size_limit=16)

In [None]:
len(result_rgb_16x8)

In [None]:
def weighted_psnr(ref, img):
    assert ref.shape == img.shape, "Shape mismatch"
    if is_colored(img):
        ref_yuv = rgb2yuv(ref)
        img_yuv = rgb2yuv(img)
        
        return (4 * psnr(ref_yuv[..., 0], img_yuv[..., 0]) +
                    psnr(ref_yuv[..., 1], img_yuv[..., 1]) +
                    psnr(ref_yuv[..., 2], img_yuv[..., 2])
               ) / 6
    else:
        return psnr(ref, img)

In [None]:
n_iters = [1, 2, 4, 8, 16]

imgs = [comp.decompress(result_rgb_16x8, n) for n in n_iters]
_, axs = plt.subplots(ncols=len(imgs) + 1, figsize=(18, 6))
for index in range(len(imgs)):
    axs[index].imshow(imgs[index])
    axs[index].set_title(f'its: {n_iters[index]}, psnr: {round(weighted_psnr(lenna_rgb_256x256, imgs[index]), 2)}')
axs[-1].imshow(lenna_rgb_256x256, cmap='gray')
axs[-1].set_title('orig')

plt.show()

### Поиграемся с параметрами сжатия
Понятно, что при увеличении перебора мы, во-первых, увеличиваем время вычислений, а во-вторых, улучшаем итоговое качество изображения после сжатия и декомпрессии.

Чтобы увеличить перебор можно уменьшить размер шага `stride` или уменьшить размер доменного блока `block_size`. Но не рекомендуется делать блок размером меньше 4х4.

In [None]:
result_16x2 = comp.compress(lenna_gray_256x256, block_size=16, stride=2, block_size_limit=16)

In [None]:
n_iters = [1, 2, 4, 8, 16]

imgs = [comp.decompress(result_16x2, n) for n in n_iters]
_, axs = plt.subplots(ncols=len(imgs) + 1, figsize=(18, 6))
for index in range(len(imgs)):
    axs[index].imshow(imgs[index], cmap='gray')
    axs[index].set_title(f'its: {n_iters[index]}, psnr: {round(psnr(imgs[index], lenna_gray_256x256), 2)}')
axs[-1].imshow(lenna_gray_256x256, cmap='gray')
axs[-1].set_title('orig')

plt.show()

In [None]:
result_8x4 = comp.compress(lenna_gray_256x256, block_size=8, stride=4, block_size_limit=8)

In [None]:
n_iters = [1, 2, 4, 8, 16]

imgs = [comp.decompress(result_8x4, n) for n in n_iters]
_, axs = plt.subplots(ncols=len(imgs) + 1, figsize=(18, 6))
for index in range(len(imgs)):
    axs[index].imshow(imgs[index], cmap='gray')
    axs[index].set_title(f'its: {n_iters[index]}, psnr: {round(psnr(imgs[index], lenna_gray_256x256), 2)}')
axs[-1].imshow(lenna_gray_256x256, cmap='gray')
axs[-1].set_title('orig')

plt.show()

In [None]:
result_8x2 = comp.compress(lenna_gray_256x256, block_size=8, stride=2, block_size_limit=8)

In [None]:
n_iters = [1, 2, 4, 8]

imgs = [comp.decompress(result_8x2, n) for n in n_iters]
_, axs = plt.subplots(ncols=len(imgs) + 1, figsize=(18, 6))
for index in range(len(imgs)):
    axs[index].imshow(imgs[index], cmap='gray')
    axs[index].set_title(f'its: {n_iters[index]}, psnr: {round(psnr(imgs[index], lenna_gray_256x256), 2)}')
axs[-1].imshow(lenna_gray_256x256, cmap='gray')
axs[-1].set_title('orig')

plt.show()

## Построим график качества
Качество в данном случае будет измеряться по PSNR (а значит в децибелах).

Это базовый график для понимания соотношения между коэффициентом сжатия и качеством, получаемым на выходе. Можно посмотреть, как он будет меняться в зависимости от количества итераций при декомпрессии, например.

In [None]:
def weighted_psnr(ref, img):
    assert ref.shape == img.shape, "Shape mismatch"
    if is_colored(img):
        ref_yuv = rgb2yuv(ref)
        img_yuv = rgb2yuv(img)
        
        return (4 * psnr(ref_yuv[..., 0], img_yuv[..., 0]) +
                    psnr(ref_yuv[..., 1], img_yuv[..., 1]) +
                    psnr(ref_yuv[..., 2], img_yuv[..., 2])
               ) / 6
    else:
        return psnr(ref, img)

In [None]:
quality = [0, 20, 40, 60, 80, 100]

def test_image(img):
    compressed_images = [comp.compress2(img, quality=q) for q in quality]
    decompressed_images = [comp.decompress(compressed) for compressed in compressed_images]
    compression_rates = np.array([len(compressed) for compressed in compressed_images]) / img.size
    psnrs = [weighted_psnr(img, decompressed) for decompressed in decompressed_images]
    return compression_rates, psnrs

In [None]:
def test_collection(collection):
    results = []
    for image in collection:
        results.append(test_image(image))
    return results

In [None]:
results = test_collection([lenna_gray_256x256])

In [None]:
def plot_results(results):
    _, ax = plt.subplots(figsize=(8, 6))
 
    for result in results:
        compression_rates, psnrs = result
        ax.plot(compression_rates * lenna_gray_256x256.size, psnrs, marker='o', ms=10, ls='-.')
 
    ax.set_xlabel('Compression Size', fontsize=16)
    ax.set_ylabel('PSNR, dB', fontsize=16)
 
    plt.show()

In [None]:
plot_results(results)

In [None]:
collection = []
for image_name in os.listdir('test_files'):
    image = resize(io.imread(os.path.join('test_files', image_name)), (256, 256))
    if is_colored(image):
        image = np.rint(rgb2gray(image) * 255).astype('uint8')
    collection.append(image)
test_and_plot_collection(collection)

## Улучшим алгоритм
Одним из основных способов улучшения сжатия изображений является разбиение картинки не на равные блоки, а на блоки разных размеров. Как дополнительную часть задания, мы предлагаем реализовать разбиение квадродеревом, это позволит более гибко настраивать параметры сжатия и получить лучшие результаты.

<center>Пример разбиения изображения на блоки с использованием квадродерева</center>

Исходное изображение | Разбиение квадродеревом
- | -
![Source image](images/house.jpg) | ![Segmentation](images/quadtree.jpg)

## Базовые тесты

In [None]:
TRANSFORM_UNIT_TESTS = (((np.array([[1, 2], 
                                    [3, 4]]),
                         np.array([[4, 6], 
                                   [8, 10]]),
                         0, 0, 2, 1), 
                        1.5),
                       ((np.array([[1, 2], 
                                   [3, 4]]),
                         np.array([[4, 6, 7, 6], 
                                   [6, 7, 5, 4]]),
                         0, 0, 2, 1), 
                        0),
                       ((np.array([[1, 2], 
                                   [3, 4]]),
                         np.array([[4, 8, 6, 8], 
                                   [6, 7, 5, 8]]),
                         0, 0, 2, 1), 
                        0),
                       ((np.array([[1, 2], 
                                   [3, 4]]),
                         np.array([[4, 2, 3, 6], 
                                   [6, 4, 5, 5]]),
                         0, 0, 2, 2),
                        0.5))
    
    
def test_transform():
    for test, answer in TRANSFORM_UNIT_TESTS:
        transform = find_block_transform(*test)
        img, resized_img, x, y, block_size, stride = test
        transformed = perform_transform(np.zeros_like(img), resized_img, [transform], block_size)
        loss = mse(img, transformed)
        if loss > answer + 1e-5:
            return False
    return True

In [None]:
print(test_transform())

In [None]:
def test_bit_buffer():
    def fill():
        bb = BitBuffer()
        bb.push(15, 6)
        bb.push(0, 7)
        bb.push(1, 1)
        bb.push(100, 400)
        answer = [100, 1, 0, 15]
        return bb, answer

    bb, answer = fill()
    res1 = []
    res1.append(bb.pop(400))
    res1.append(bb.pop(1))
    res1.append(bb.pop(7))
    res1.append(bb.pop(6))
    if res1 == answer:
        return True
    bb, answer = fill()
    res2 = []
    res2.append(bb.pop(6))
    res2.append(bb.pop(7))
    res2.append(bb.pop(1))
    res2.append(bb.pop(400))
    if res2 == answer[::-1]:
        return True
    return False

In [None]:
print(test_bit_buffer())