# Imports & Utils

In [45]:
import os
from PIL import Image
import numpy as np
import struct

# Encoder & Decoder Functions

- input: (b1 r1 times) b2 b3 (b4 r2 times) b5
- b is repeated r times (the frequency of b)
- output: b1 r1 b2 1 b3 1 b4 r2 b5 1

In [25]:
def erle_encoder_v1(arr_1D, threshold=10):
    encoded_1D = []
    length = len(arr_1D)
    i = 0
    count = 1
    while i < length:
        # last character
        if i == length - 1:
            encoded_1D.append(arr_1D[i])
            encoded_1D.append(count)
            break
        # Check the threshold condition using a while loop for adjacent elements
        byte = arr_1D[i]
        while i + 1 < length and abs(byte - arr_1D[i + 1]) <= threshold and count < 255:
            count += 1
            i += 1  # Move to the next element

        # Add the current element and the count
        encoded_1D.append(byte)
        encoded_1D.append(count)

        i += 1
        count = 1

    return encoded_1D

In [26]:
def erle_decoder_v1(encoded_1D):
    decoded_1D = []
    i = 0

    while i < len(encoded_1D):
        byte = encoded_1D[i]
        count = 1
        if i + 1 < len(encoded_1D):
            i += 1
            count = encoded_1D[i]

        for _ in range(count):
            decoded_1D.append(byte)
        i += 1

    return decoded_1D

- input: (b1 r1 times) b2 b3 (b4 r2 times) b5
- b is repeated r times (the frequency of b)
- output: 0 b1 r1 b2 b3 0 b4 r2 b5
- 0 (special character) specifies that, the next byte, in this case, b1 has a frequency of r1
- In case of r = 1, we ignore that
- the output can't end with zero

In [27]:
def erle_encoder_v2(arr_1D, threshold=10):
    encoded_1D = []
    separator = 0

    i = 0
    while i < len(arr_1D):
        byte = arr_1D[i]
        count = 1
        # one byte can store upto 255
        while i + 1 < len(arr_1D) and abs(byte -  arr_1D[i+1]) <= threshold and count < 255:
            count += 1
            i += 1

        if count > 1 or byte == separator:
            encoded_1D.append(separator)
            encoded_1D.append(byte)
            encoded_1D.append(count)
        else:
            encoded_1D.append(byte)
        i += 1

    return encoded_1D

In [28]:
def erle_decoder_v2(encoded_1D):
    decoded_1D = []
    separator = 0

    i = 0
    while i < len(encoded_1D):
        byte = encoded_1D[i]
        count = 1

        if byte == separator:
            count = 0 # reset the count
            if i < len(encoded_1D) - 1:
                i += 1
                byte = encoded_1D[i]
            if i < len(encoded_1D) - 1:
                i += 1
                count = encoded_1D[i]

        for _ in range(count):
            decoded_1D.append(byte)

        i += 1

    return decoded_1D

# Helper Functions

In [64]:
def erel_interface_image_compression(image_file, compressed_file, threshold=10, version=1):
    if version not in [1, 2]:
        raise ValueError("Invalid option! Please use 1 or 2.")

    image = Image.open(image_file)
    image_data = np.array(image)
    image_shape = image_data.shape
    image_array_1D = image_data.flatten()

    encoder_functions = {1: erle_encoder_v1, 2: erle_encoder_v2}
    image_compressed = encoder_functions[version](image_array_1D, threshold)

    # save compressed data
    with open(compressed_file, "wb") as f:
        height, width, *channels = image_shape
        shape_str = f"{height}x{width}" + (f"x{channels[0]}" if channels else "")
        f.write(f"{shape_str}\n".encode())
        f.write(bytearray(image_compressed))

In [65]:
def erel_interface_image_decompression(compressed_file, image_file, version=1):
    if version not in [1, 2]:
        raise ValueError("Invalid option! Please use 1 or 2.")

    with open(compressed_file, "rb") as f:
        shape_line = f.readline().decode().strip()
        shape = tuple(map(int, shape_line.split('x')))
        compressed_data = list(f.read())

    decoder_functions = {1: erle_decoder_v1, 2: erle_decoder_v2}
    decompressed_1D = decoder_functions[version](compressed_data)

    decompressed_2D = np.array(decompressed_1D, dtype=np.uint8).reshape(shape)

    image = Image.fromarray(decompressed_2D)
    image.save(image_file)

In [66]:
def erel_interface_image_multichannel_compression(image_file, compressed_file, threshold=10, version=1):
    if version not in [1, 2]:
        raise ValueError("Invalid option! Please use 1 or 2.")

    image = Image.open(image_file).convert("RGB")
    image_data = np.array(image)
    height, width, channels = image_data.shape

    compressed_channels = []
    for c in range(channels):
        flat_channel = image_data[:, :, c].flatten()

        encoder_functions = {1: erle_encoder_v1, 2: erle_encoder_v2}
        compressed_channels.append(encoder_functions[version](flat_channel, threshold))

    with open(compressed_file, "wb") as f:
        f.write(struct.pack("III", height, width, channels))
        for ch in compressed_channels:
            # store channel size
            f.write(struct.pack("I", len(ch)))
            f.write(bytearray(ch))

In [67]:
def erel_interface_image_multichannel_decompression(compressed_file, output_image, version=1):
    if version not in [1, 2]:
        raise ValueError("Invalid option! Please use 1 or 2.")

    with open(compressed_file, "rb") as f:
        height, width, channels = struct.unpack("III", f.read(12))

        compressed_channels = []
        for _ in range(channels):
            channel_size = struct.unpack("I", f.read(4))[0]
            compressed_channels.append(list(f.read(channel_size)))

    decoder_functions = {1: erle_decoder_v1, 2: erle_decoder_v2}
    decompressed_channels = [decoder_functions[version](ch) for ch in compressed_channels]

    image_data = np.zeros((height, width, channels), dtype=np.uint8)
    for c in range(channels):
        image_data[:, :, c] = np.array(decompressed_channels[c], dtype=np.uint8).reshape(height, width)

    image = Image.fromarray(image_data, mode="RGBA" if channels == 4 else "RGB")
    image.save(output_image)

# Print Functions

In [41]:
def file_details(file_path):
    if os.path.exists(file_path):
        print(f"File Name: {os.path.basename(file_path)}")
        print(f"File Size: {os.path.getsize(file_path)} bytes")
    else:
        print("File does not exist.")

In [42]:
def calculate_compression_ratio(original_file, compressed_file):
    original_size = os.path.getsize(original_file)
    compressed_size = os.path.getsize(compressed_file)

    if compressed_size == 0:
        return float('inf'), 100.0

    compression_ratio = original_size / compressed_size
    compression_factor = (1 - (compressed_size / original_size)) * 100

    return compression_ratio, compression_factor

In [43]:
def show(image_file):
    image = Image.open(image_file)
    image.show(image)

# Interface

In [None]:
image_file = "Color500.bmp"
file_details(image_file)

compression_interfaces = {1: erel_interface_image_compression, 2: erel_interface_image_multichannel_compression}
decompression_interfaces = {1: erel_interface_image_decompression, 2: erel_interface_image_multichannel_decompression}

interface = 2
encoder_version = 2
threshold = 10 # In case of RLE, threshold is zero

compressed_file = "compressed_file.bin"
compression_interfaces[interface](image_file, compressed_file, threshold, encoder_version)
file_details(compressed_file)

print("Compression ratio: ", calculate_compression_ratio(image_file, compressed_file)[0])

decompressed_image_file = "decompressed_image.bmp"
decompression_interfaces[interface](compressed_file, decompressed_image_file, encoder_version)
file_details(decompressed_image_file)

# show(image_file)
# show(decompressed_image_file)

File Name: Color500.bmp
File Size: 750054 bytes


  while i + 1 < len(arr_1D) and abs(byte -  arr_1D[i+1]) <= threshold and count < 255:


File Name: compressed_file.bin
File Size: 644200 bytes
Compression ratio:  1.1643185346165787
File Name: decompressed_image.bmp
File Size: 750054 bytes
