# Algorithm 1: Encoder

<font color="red">Input </font>: input_file, output_file, A_name, max_error, bi, callback=None, ssim_stop=False, min_n=MIN_N, max_n=MAX_N  
<font color="red">Output </font>: encoded_file, z_file, bytes_written, z_bytes_written, raw_size, n0_cumu

1. Open the input image and convert to YCbCr
2. Get image dimensions: width (w), height (h), and depth
3. Initialize raw file output and write header with MAGIC_NUMBER, version, w, h, depth, A_id, bi, min_n, max_n

4. Initialize OMP (Orthogonal Matching Pursuit) with basis DCT1_Haar1_qt for different block sizes
5. For each color channel (depth):
   5.1 Extract sub-image blocks
   5.2 Apply OMP to get sparse representation
   5.3 Store results in x_list

6. Write encoded blocks to file with vector precision and block size

7. Compress the raw file using zlib
8. Save compressed data to .zf file

9. Return the encoded file, compressed file, bytes written, compressed bytes, raw size, and cumulative statistics

In [1]:
import numpy as np
from numpy import array, concatenate, sign
from math import pi, cos, sin, log, sqrt
from sklearn.linear_model import OrthogonalMatchingPursuit
from scipy.sparse import csc_matrix
from struct import pack, unpack, calcsize
import zlib
from PIL import Image

In [2]:
def quantize(x, v_fmt):
    for i in range(len(x)):
        x[i] = truncate(x[i], v_fmt)

def truncate(value, format_spec):
    """Truncate the value to the specified format."""
    try:
        return float(f"{value:{format_spec}}")
    except ValueError as e:
        raise ValueError(f"Invalid format code '{format_spec}' for value '{value}'") from e

def write_vector_as_pairs(f, x, n0, v_fmt):
    """Write a sparse vector as a list of pairs (pos, value)"""
    f.write("B", int(n0))
    pos_fmt = "B" if len(x) <= 256 else "H"
    if n0 > 0:
        for i, value in enumerate(x):
            if value != 0:
                f.write(pos_fmt, i)
                quantized_value = truncate(value, v_fmt)
                f.write("f", float(quantized_value))     

def write_vector(f, x, v_fmt):
    quantize(x, v_fmt)
    n0 = np.linalg.norm(x, 0)
    write_vector_as_pairs(f, x, n0, v_fmt)

def sub_image(im_data, N, i, j, k):
    """
    Extracts a sub-image from a larger image array.

    Parameters:
    - im_data: The full image data as a NumPy array.
    - N: The size of the block.
    - i, j: The starting indices for the block.
    - k: The channel index.

    Returns:
    - The extracted sub-image as a NumPy array.
    """
    h0, h1, w0, w1 = i, i + N, j, j + N
    return im_data[h0:h1, w0:w1, k]

def set_sub_image(sub_img, im_data, N, i, j, k):
    """
    Places a sub-image back into the larger image array.

    Parameters:
    - sub_img: The sub-image to be placed back.
    - im_data: The full image data as a NumPy array.
    - N: The size of the block.
    - i, j: The starting indices for the block.
    - k: The channel index.
    """
    h0, h1, w0, w1 = i, i + N, j, j + N
    im_data[h0:h1, w0:w1, k] = sub_img

def mode_to_bpp(mode):
    """Convert image mode to bits per pixel."""
    if mode == 'L':  # 8-bit pixels, black and white
        return 8
    elif mode == 'RGB':  # 24-bit color
        return 24
    elif mode == 'RGBA':  # 32-bit color with alpha
        return 32
    elif mode == 'YCbCr':  # 24-bit color (YUV)
        return 24
    else:
        raise ValueError(f"Unsupported image mode: {mode}")
        
def min_sparcity(max_error, N):
    """Calculate minimum sparsity based on max_error and block size N."""
    return int(np.ceil(max_error * N**2))

In [3]:
class RawFile:
    def __init__(self, name, mode):
        """Open file with name and mode"""
        self.f = open(name, mode)
        self.wnibble = None     # nibble pending to write
        self.rnibble = None     # nibble pending to read
        self.queue = []         # other data pending to write

    def __enter__(self):
        """Enter the runtime context related to this object."""
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Exit the runtime context related to this object."""
        self.close()

    def write(self, fmt, *args):
        """Pack and write args with format fmt"""
        if fmt != "n":
            if self.wnibble is None:
                fmt = "!" + fmt  # big-endian
                args = [a.encode('utf-8') if isinstance(a, str) else a for a in args]
                self.f.write(pack(fmt, *args))
            else:
                self.queue.append((fmt, args))
        else: # Nibbles still not being used at the final example
            if self.wnibble is None:
                self.wnibble = (int(args[0]) + 8) & 0xf
            else:
                self.wnibble <<= 4
                self.wnibble |= (int(args[0]) + 8) & 0xf
                self.f.write(pack("B", self.wnibble))
                self.wnibble = None

                for fmt, args in self.queue:
                    self.write(fmt, *args)
                self.queue = []

    def read(self, fmt):
        """Read data with format fmt and unpack"""
        if fmt != "n":
            fmt = "!" + fmt  
            data = self.f.read(calcsize(fmt))
            udata = unpack(fmt, data)
            return [u.decode('utf-8') if isinstance(u, bytes) else u for u in udata] if len(udata) > 1 else udata[0]
        else:
            if self.rnibble is not None:
                udata = self.rnibble
                self.rnibble = None
            else:
                data = self.f.read(calcsize("B"))
                udata = unpack("B", data)[0]
                self.rnibble = (udata & 0xf) - 8
                udata = (udata >> 4) - 8
            return udata

    def tell(self):
        """Return the current file position"""
        return self.f.tell()

    def seek(self, offset, whence=0):
        """Move the file pointer to the specified position"""
        return self.f.seek(offset, whence)

    def close(self):
        """Close the file"""
        if self.wnibble is not None:
            self.write("n", 0)
        self.f.close()

In [4]:
def print_progress(message, processed, total):
    """Print progress message."""
    print(message % (processed, total))


def get_progress(stats, im_data, min_n):
    """Get progress information for print."""
    total_blocks = (im_data.shape[0] // min_n) * (im_data.shape[1] // min_n)
    processed_blocks = sum(stats.values())
    return processed_blocks, total_blocks


In [5]:
def psi(x):
	if 0 <= x < 0.5:
		return 1
	if 0.5 <= x < 1:
		return -1
	return 0

def phi(x):
	if 0 <= x < 1:
		return 1
	return 0

def DCT_II_f(k, N):
    """Discrete Cosine Transform Type II"""
    def f(x):
        return np.cos(pi * (x + 0.5) * k / N)
    return f

def h(i, N):
    """Generate the h function for the Haar basis."""
    if i == 0:
        print("no phi function defined for i=0 in h(i,N))
        return #phi

    n, k = [(n, k) for n in range(int(log(N, 2))) for k in range(2 ** n)][i - 1]
    return lambda x: 2 ** (n / 2.0) * psi(2 ** n * x - k)

def v(h, N):
    """Generate the v vector for the Haar basis."""
    return [h(i / float(N)) for i in range(N)]

def w(k, N):
	c = sqrt(2) ** sign(k)
	return  [ c * DCT_II_f(k, N)(i / float(N)) for i in range(N) ]

def W(k1, k2, n, N):
    """Generate the W matrix for the basis."""
    def ro(t):
        return [[cos(t), -sin(t)], [sin(t), cos(t)]]

    def theta(n, N):
        return pi * n / (2.0 * N)

    def g(k1, k2, N1, N2, v):
        return DCT_II_f(k1, N1)(v[0]) * DCT_II_f(k2, N2)(v[1])

    def W_elem(i, j):
        return g(k1, k2, 8, 8, np.dot(ro(theta(n, N)), [[i / 8.0], [j / 8.0]]))

    return [[W_elem(i, j) for j in range(8)] for i in range(8)]

def DCT1_qt(rows, cols):
    """Generate the DCT basis matrix."""
    return np.array([w(k, rows) for k in range(cols)]).T

def Haar1_qt(rows, cols):
    """Generate the Haar basis matrix."""
    return np.array([v(h(i, cols), rows) for i in range(cols)]).T

def DCT1_Haar1_qt(rows, cols):
    """Combine DCT and Haar basis matrices."""
    dct_matrix = DCT1_qt(rows, cols // 2)
    haar_matrix = Haar1_qt(rows, cols // 2)
    return np.concatenate((dct_matrix, haar_matrix), axis=1)

In [6]:
def _omp_code(x_list, im_data, im_rec, omp_d, max_error, bi, N, k, stats, ssim_stop, min_n, max_n, callback):
    processed_blocks = 0  

    for i in range(im_data.shape[0] // N):
        for j in range(im_data.shape[1] // N):
            sub_img = sub_image(im_data, N, i, j, k)
            b = sub_img.flatten()  
            
            A = omp_d.get(N)
            if A.shape[1] > b.size:
                A = A[:, :b.size]
                
            omp = OrthogonalMatchingPursuit(n_nonzero_coefs=min(A.shape[1], b.size))
            omp.fit(A, b)
            x = omp.coef_
            
            x_list.append((N, x))
            
            processed_blocks += 1
            
            if processed_blocks % 100 == 0:
                print_progress("Block stats: " + str(stats) + "    Progress: %d/%d", processed_blocks, (im_data.shape[0] // N) * (im_data.shape[1] // N))
            
            if callback:
                callback(sub_img, i, j, max_error, processed_blocks)
            
            stats[N] = stats.get(N, 0) + 1
            
            if ssim_stop and check_ssim(sub_img, im_rec, N, k):
                return processed_blocks
    
    return processed_blocks


In [7]:
MIN_N = 8
MAX_N = 32

def code(input_file, output_file, max_error, bi, callback=None, ssim_stop=False, min_n=MIN_N, max_n=MAX_N):
    """Compress input_file with the given parameters into output_file"""
    print("N", min_n, max_n)

    # STEP 0 - HEADER ###########################
    version = FIF_VER
    A_id = 0  # FIXME Use BASIS_MAP_ID[A_name]

    im = Image.open(input_file)
    im = im.convert('YCbCr')
    w, h = im.size
    depth = mode_to_bpp(im.mode) // 8
    raw_size = w * h * depth1

    print(f"Image Mode: {im.mode}, Depth: {depth}, Width: {w}, Height: {h}")

    with RawFile(output_file, 'wb') as f:
        f.write(HEADER_FMT, MAGIC_NUMBER, version, w, h, depth, A_id, bi, min_n, max_n)

        # STEP 1 - OMP ###########################
        im_data = np.array(im.getdata()).reshape(h, w, depth)
        im_rec = (0, h, 0, w)
        stats = {} 
        n0_cumu = 0

        omp_d = {}
        n = min_n
        while n <= max_n:
            A = DCT1_Haar1_qt(n * n, A_COLS)  
            print(f"Initializing omp_d[{n}] with matrix A of shape {A.shape}")
            omp_d[n] = A 
            n *= 2

        x_list = []

        for k in range(depth): 
            n0 = _omp_code(x_list, im_data, im_rec, omp_d, max_error, bi, 
                           max_n, k, stats, ssim_stop, min_n, max_n, callback)
            n0_cumu += n0

        for N, x in x_list:
            if min_n < max_n:
                f.write("B", N)
            v_fmt = ".2f"  
            write_vector(f, x.tolist(), v_fmt)

        bytes_written = f.tell()

    # STEP 2 - DEFLATE ###########################
    with open(output_file, 'rb') as f:
        compress = zlib.compressobj(9, zlib.DEFLATED, 15, 9, zlib.Z_DEFAULT_STRATEGY)
        zdata = compress.compress(f.read())
        zdata += compress.flush()

    z_file = output_file + ".zf"

    with open(z_file, 'wb') as f:
        f.write(zdata)
        z_bytes_written = f.tell()

    return output_file, z_file, bytes_written, z_bytes_written, raw_size, n0_cumu


## Example

In [None]:
MIN_N = 8
MAX_N = 32
A_COLS = 256

FIF_VER = 2
MAGIC_NUMBER = b'FIF'  # Ensure this is a bytes object
HEADER_FMT = '3sBiiBBBBB'


V_FMT_PRECISION = ".2f"  # Use this for floating-point precision formatting

import warnings

with warnings.catch_warnings():
    warnings.simplefilter("ignore", RuntimeWarning)
    # Your code with OrthogonalMatchingPursuit here

    # Example usage
    input_file = 'fif_codec/images/input/lena.png'
    output_file = 'path_to_output_file.fif'
    max_error = 0.1
    bi = 0  # Example value for basis index
    code(input_file, output_file, max_error, bi)

# Algorithm 2: Decoder
<font color="red">Input </font>: input_file, output_file  
<font color="red">Output </font>: decoded_image

1. Open the raw file and read header with MAGIC_NUMBER, version, w, h, depth, A_id, bi, min_n, max_n
2. Initialize image data matrix with zeros

3. For each color channel (depth):
   3.1 Decode the blocks using the OMP recursive decoder
   3.2 Reconstruct the image from decoded blocks

4. Convert YCbCr to RGB if needed
5. Save the decoded image to output_file


In [10]:
import numpy as np
from PIL import Image
from struct import unpack, calcsize
import zlib

def decode(input_file, output_file):
    """Decompress input_file into output_file"""

    with RawFile(input_file, 'rb') as f:
        mn, version, w, h, depth, A_id, bi, minN, maxN = f.read(HEADER_FMT)

        if mn != MAGIC_NUMBER:
            raise Exception(f"Invalid image format: Wrong magic number '{mn}'")

        if version != FIF_VER:
            raise Exception(f"Invalid codec version: {version}. Expected: {FIF_VER}")

        im_data = np.zeros((h, w, depth), dtype=np.float32)
        im_rec = (0, h, 0, w)

        with open(input_file + ".zf", 'rb') as zf:
            decompressor = zlib.decompressobj()
            decompressed_data = decompressor.decompress(zf.read()) + decompressor.flush()
            with RawFile(io.BytesIO(decompressed_data), 'rb') as f:
                for k in range(depth):
                    _omp_decode(f, im_data, im_rec, bi, maxN, minN, maxN, k)

        if depth == 1:
            im_data[:, :, 1] = im_data[:, :, 0]
            im_data[:, :, 2] = im_data[:, :, 0]

        YCbCr_to_RGB(im_data)

        im = Image.fromarray(im_data.astype('uint8'))
        im.save(output_file)

def _omp_decode(f, im_data, im_rec, bi, N, minN, maxN, k):
    """OMP recursive decoder"""

    h0, h1, w0, w1 = im_rec

    A = DCT1_Haar1_qt(N * N, A_COLS)

    v_fmt = V_FMT_PRECISION

    for i in range(h0 // N, h1 // N):
        for j in range(w0 // N, w1 // N):

            if minN < maxN and f.read("B") != N:
                f.seek(-1, 1)  # Rewind one byte
                iN, jN = i * N, j * N
                sub_rec = (iN, iN + N, jN, jN + N)
                _omp_decode(f, im_data, sub_rec, bi, N // 2, minN, maxN, k)
                continue

            x = np.array(read_vector(f, v_fmt))
            b = np.dot(A, x)

            for l in range(len(b)):
                b[l] = truncate(b[l], "B")

            sub_img = c_inv[bi](b, N)
            set_sub_image(sub_img, im_data, N, i, j, k)

def read_vector(f, v_fmt):
    """Read a vector from the file."""
    n = f.read("B")
    x = [f.read("f") for _ in range(n)]
    return x

def YCbCr_to_RGB(im_data):
    """Convert YCbCr to RGB."""
    Y = im_data[:, :, 0]
    Cb = im_data[:, :, 1] - 128
    Cr = im_data[:, :, 2] - 128

    R = Y + 1.402 * Cr
    G = Y - 0.344136 * Cb - 0.714136 * Cr
    B = Y + 1.772 * Cb

    im_data[:, :, 0] = np.clip(R, 0, 255)
    im_data[:, :, 1] = np.clip(G, 0, 255)
    im_data[:, :, 2] = np.clip(B, 0, 255)


Exception: Invalid image format: Wrong magic number 'FIF'

In [None]:
HEADER_FMT = '3sBiiBBBBB'
MAGIC_NUMBER = b'FIF'
FIF_VER = 2
A_COLS = 256
V_FMT_PRECISION = ".2f"


input_file = 'path_to_output_file.fif'
output_file = 'reconstructed_image.png'

decode(input_file, output_file)

N 8 32
Image Mode: YCbCr, Depth: 3, Width: 512, Height: 512
Initializing omp_d[8] with matrix A of shape (64, 256)
Initializing omp_d[16] with matrix A of shape (256, 256)
Initializing omp_d[32] with matrix A of shape (1024, 256)
Block stats: {32: 99}    Progress: 100/256
Block stats: {32: 199}    Progress: 200/256
Block stats: {32: 355}    Progress: 100/256
Block stats: {32: 455}    Progress: 200/256
Block stats: {32: 611}    Progress: 100/256
Block stats: {32: 711}    Progress: 200/256
