In [None]:
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import cv2

In [None]:
class PCA:
    def __init__(self, n_components):
        self.n_components = n_components

    def fit(self, data):
        # Center the data by subtracting the mean
        num_images, num_pixels = data.shape
        self.mean = np.mean(data, axis=0)
        centered_data = data - self.mean

        # Calculate the covariance matrix
        cov_matrix = np.cov(centered_data, rowvar=False)

        # Compute the eigenvalues and eigenvectors
        eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)

        # Sort eigenvalues and eigenvectors in descending order
        sorted_indices = np.argsort(eigenvalues)[::-1]  # This line calculates the indices that would sort the eigenvalues in ascending order and then reverses the order
        self.eigenvalues = eigenvalues[sorted_indices]
        self.eigenvectors = eigenvectors[:, sorted_indices]

        # Select the top n_components eigenvectors
        self.components = self.eigenvectors[:, :self.n_components]

    def compress(self, data, output_file="compression_output.npy"):
        # Center the data (subtract the mean)
        centered_data = data - self.mean
        
        # Compress the data by projecting it onto the components
        compressed_data = np.dot(centered_data, self.components)
        
        compressed_data = compressed_data.astype("int16")
        self.components = self.components.astype("float16")
        self.mean = self.mean.astype("uint8")

        # Store the data into a binary file using np.save (for single array)
        with open(output_file, 'wb') as f:
            # Saving compressed data, components, and mean to the binary file
            np.save(f, compressed_data)  # Saving compressed data
            np.save(f, self.components)  # Saving components (PCA basis)
            np.save(f, self.mean)        # Saving the mean
        
        print(f"Compression details saved to {output_file}.")


    def decompress(self, file_name="compression_output.npy"):
        # Open the file in read mode and load the data
        with open(file_name, 'rb') as f:
            compressed_data = np.load(f)  # Load the compressed data
            components = np.load(f)       # Load the components
            mean = np.load(f)            # Load the mean


        decompressed_data = np.dot(compressed_data, components.T) + mean
        return decompressed_data

### Comparision with JPEG

In [None]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
from scipy.fftpack import dct, idct
import bitstring
import os
import io
from PIL import Image
import heapq
from collections import defaultdict

In [None]:
class JPEG:
    def __init__(self, Q = 50):
        self.Q = Q
        self.quantization_matrix = np.array([   [16, 11, 10, 16, 24, 40, 51, 61],
                                                [12, 12, 14, 19, 26, 58, 60, 55],
                                                [14, 13, 16, 24, 40, 57, 69, 56],
                                                [14, 17, 22, 29, 51, 87, 80, 62],
                                                [18, 22, 37, 56, 68, 109, 103, 77],
                                                [24, 35, 55, 64, 81, 104, 113, 92],
                                                [49, 64, 78, 87, 103, 121, 120, 101],
                                                [72, 92, 95, 98, 112, 100, 103, 99],])
        self.scaled_quantization_matrix = np.round(self.quantization_matrix * (50.0/Q))
    
    # define the dct and idct functions for the blocks
    def _dct2(self, block):
        dct_val = dct(dct(block.T, norm='ortho').T, norm='ortho')
        return dct_val

    def _idct2(self, block):
        return idct(idct(block.T, norm='ortho').T, norm='ortho')
    
    # Runs quantization while compression
    def _quantize(self, block):
        return np.round(block / self.scaled_quantization_matrix)

    # Runs dequantization while decompression
    def _dequantize(self, block):
        return block * self.scaled_quantization_matrix
    
    # breaks the image into 8x8 patches
    def _chunkify(self, image):
        chunks = []
        h,w = np.shape(image)
        for i in range(0, h, 8):
            for j in range(0, w, 8):
                chunk = image[i:i+8, j:j+8]
                chunks.append(chunk)

        return chunks

    # recombines the patches into the original image size
    def _dechunkify(self, chunks, original_shape):
        h, w = original_shape
        image = np.zeros((h, w), dtype=chunks[0].dtype)  # Create an empty array with the original shape
        chunk_idx = 0  # Index to keep track of the current chunk

        for i in range(0, h, 8):
            for j in range(0, w, 8):
                # Place the current chunk into the corresponding position in the image
                image[i:i+8, j:j+8] = chunks[chunk_idx]
                chunk_idx += 1

        return image

    # flattens the matrix into its zig-zag traversal
    def _zigzag(self, matrix):
        rows, cols = matrix.shape
        result = []

        for s in range(rows + cols - 1):
            if s % 2 == 0:  # Even diagonals (move up-right)
                x = min(s, rows - 1)
                y = s - x
                while x >= 0 and y < cols:
                    result.append(matrix[x, y])
                    x -= 1
                    y += 1
            else:  # Odd diagonals (move down-left)
                y = min(s, cols - 1)
                x = s - y
                while y >= 0 and x < rows:
                    result.append(matrix[x, y])
                    x += 1
                    y -= 1

        return np.array(result)

    # converts a flattened matrix into the original form by placing the elements in a zig zag manner
    def _reverse_zigzag(self, flattened, rows, cols):
        # Initialize an empty 2D matrix
        matrix = np.zeros((rows, cols), dtype=flattened.dtype)

        # Fill the matrix using the reverse zig-zag order
        index = 0
        for s in range(rows + cols - 1):
            if s % 2 == 0:  # Even diagonals (move up-right)
                x = min(s, rows - 1)
                y = s - x
                while x >= 0 and y < cols:
                    matrix[x, y] = flattened[index]
                    index += 1
                    x -= 1
                    y += 1
            else:  # Odd diagonals (move down-left)
                y = min(s, cols - 1)
                x = s - y
                while y >= 0 and x < rows:
                    matrix[x, y] = flattened[index]
                    index += 1
                    x += 1
                    y -= 1

        return matrix
    
    # Helper functions for huffman encoding and RLE
    def _build_huffman_tree(self, frequencies):
        # Create a priority queue (min-heap) to build the Huffman tree
        heap = [[weight, [symbol, ""]] for symbol, weight in frequencies.items()]
        heapq.heapify(heap)
        
        while len(heap) > 1:
            low = heapq.heappop(heap)
            high = heapq.heappop(heap)
            for pair in low[1:]:
                pair[1] = '0' + pair[1]
            for pair in high[1:]:
                pair[1] = '1' + pair[1]
            heapq.heappush(heap, [low[0] + high[0]] + low[1:] + high[1:])
        
        # Generate the Huffman codes
        huffman_codes = {}
        for symbol, code in heap[0][1:]:
            huffman_codes[symbol] = code
        return huffman_codes

    def _run_length_encoding(self, matrix):
        rle = []
        zero_count = 0
        
        for value in matrix:
            if value == 0:
                zero_count += 1  # Increment zero counter
                if zero_count == 16:
                    rle.append((15, 0))
                    zero_count = 0
            else:
                rle.append((zero_count, value))  # Store the number of zeros before the value
                zero_count = 0  # Reset zero counter after encountering a non-zero value
        
        return rle

    def _huffman_encoding(self, values):
        frequencies = defaultdict(int)
        for val in values:
            frequencies[val] += 1
        
        huffman_codes = self._build_huffman_tree(frequencies)
        return huffman_codes

    # Function to encode the image post scaling dct and quantization into rle and huffman and store into a bin file
    def _encode(self, flattened_matrices, filename):
        with open(filename, 'wb') as f:
            rle_result = []

            for i in range(0,len(flattened_matrices)):
                rle = self._run_length_encoding(flattened_matrices[i])
                rle_result.extend(rle)
                rle_result.append((0,0))

            values = [value for count, value in rle_result]  # Only values for Huffman encoding
            huffman_codes = self._huffman_encoding(values)    

            # Store the Quality Factor in the file 
            f.write(np.array([self.Q], dtype=np.uint8).tobytes()) 
            f.flush()
        

            # Store the Huffman table in the file (value -> huffman code mapping)
            f.write(np.array([len(huffman_codes)], dtype=np.uint16).tobytes())  # Write the number of unique values in Huffman table
            f.flush()            
 

            # Collect all Huffman codes
            all_huffman_codes = []

            for value, code in huffman_codes.items():
                f.write(np.array([value], dtype=np.int16).tobytes())  
                f.flush()
                size_in_bits = len(code)
                f.write(np.array([size_in_bits], dtype=np.uint8).tobytes())  # Write the size of the Huffman code
                f.flush()
                all_huffman_codes.append(code)

            all_huffman_codes_str = ''.join(all_huffman_codes)
            total_size_in_bits = len(all_huffman_codes_str)

            f.write(np.array([total_size_in_bits], dtype=np.uint32).tobytes())
            f.flush()

            padding_length = 0
            if len(all_huffman_codes_str) % 8 != 0:
                padding_length = 8 - (len(all_huffman_codes_str) % 8)
            all_huffman_codes_str = all_huffman_codes_str + '0' * padding_length

            bitstream = bitstring.BitStream(bin=all_huffman_codes_str)
            f.write(bitstream.bytes)  # Write the Huffman code as bytes
            f.flush()

            # To store the RLE Result
            # Store the size of the rle_result
            f.write(np.array([len(rle_result)], dtype=np.uint32).tobytes())  
            f.flush()            

            all_huffman_codes = []
            all_huffman_codes_str = ''

            for count, value in rle_result:
                huffman_code = huffman_codes[value]  # Find Huffman code for this value
                size_in_bits = len(huffman_code)  # The size in bits of the Huffman code

                f.write(np.array([count], dtype=np.uint8).tobytes())
                f.write(np.array([size_in_bits], dtype=np.uint8).tobytes())

                f.flush()

                all_huffman_codes.append(huffman_code)

            all_huffman_codes_str = ''.join(all_huffman_codes)
            total_size_in_bits = len(all_huffman_codes_str)

            # Calculate padding to ensure byte alignment
            if total_size_in_bits % 8 != 0:
                padding_length = 8 - (total_size_in_bits % 8)
                all_huffman_codes_str += '0' * padding_length  # Add padding to the bitstream
            else:
                padding_length = 0

            # Write the total size in bits to the file (before padding)
            f.write(np.array([total_size_in_bits], dtype=np.uint32).tobytes())
            f.flush()

            # Convert the padded bitstream into a byte array
            bitstream = bitstring.BitStream(bin=all_huffman_codes_str)
            f.write(bitstream.bytes)  # Write the Huffman codes as bytes
            f.flush()



    # Finds the original value given the huffman code and the huffman table
    def _decode_huffman_code(self, code_bits, huffman_codes):
        # Reverse the Huffman coding to find the value for the given code bits
        for value, huffman_code in huffman_codes.items():
            if code_bits == huffman_code:
                return value
        raise ValueError(f"Unknown Huffman code: {code_bits}")

    # takes as input the original image and a quality factor Q
    def compress(self, image, filename):
        image = image.astype('float32') # cast to float
        image = image - 128.0 # rescale the values
        chunks = self._chunkify(image) # break into 8x8 chunks 
        
        flattened_matrices = []
        for chunk in chunks:
            quantized_matrix = self._quantize(self._dct2(chunk))
            flattened_matrix = self._zigzag(quantized_matrix)
            flattened_matrices.append(flattened_matrix)

        self._encode(flattened_matrices, filename)

    def decompress(self, filename):
        with open(filename, 'rb') as f:
            # Read the Quality Factor (Q)
            Q = np.fromfile(f, dtype=np.uint8, count=1)[0]
            
            # Read the Huffman table (number of entries)
            num_huffman_codes = np.fromfile(f, dtype=np.uint16, count=1)[0]

            huffman_codes = {}
            
            table_entries = []
            # Read each Huffman code (value, size)
            for _ in range(num_huffman_codes):
                value = np.fromfile(f, dtype=np.int16, count=1)[0]
                size_in_bits = np.fromfile(f, dtype=np.uint8, count=1)[0]
                table_entries.append((value,size_in_bits))
            
            length_huffman_codes = np.fromfile(f, dtype=np.uint32, count=1)[0]
            huffman_codes_bitstream = f.read((length_huffman_codes + 7) // 8)  # Read the total size in bytes
            huffman_codes_bitstream = bitstring.BitStream(bytes=huffman_codes_bitstream).bin

            current_pos = 0
            for value, size_in_bits in table_entries:
                huffman_codes[value] = huffman_codes_bitstream[current_pos:current_pos+size_in_bits]
                current_pos+=size_in_bits
            

            # get the number of rle values
            num_rle_values = np.fromfile(f, dtype=np.uint32, count=1)[0]

            # 3. Decode the RLE-encoded values and sizes
            rle_result = []
            table_entries = []

            for _ in range(num_rle_values):
                count = np.fromfile(f, dtype=np.uint8, count=1)[0]
                size_in_bits = np.fromfile(f, dtype=np.uint8, count=1)[0]
                table_entries.append((count, size_in_bits))
            
            # print(table_entries)
            
            length_huffman_codes = np.fromfile(f, dtype=np.uint32, count=1)[0]
            huffman_codes_bitstream = f.read((length_huffman_codes + 7) // 8)  # Read the total size in bytes
            huffman_codes_bitstream = bitstring.BitStream(bytes=huffman_codes_bitstream).bin

            current_pos = 0
            for count, size_in_bits in table_entries:
                value = self._decode_huffman_code(huffman_codes_bitstream[current_pos:current_pos+size_in_bits], huffman_codes)
                rle_result.append((count, value))
                current_pos+=size_in_bits


            # Reconstruct the original flattened matrices from RLE
            flattened_matrices = []
            
            current_patch = []
            for count, value in rle_result:
                if count == 0 and value == 0:
                    current_patch = current_patch + [0]*(64 - len(current_patch))
                    flattened_matrices.append(np.array(current_patch))
                    current_patch = []
                else:
                    current_patch = current_patch + [0]*count + [value]


            # convert it back to the original patch dimensions
            for i in range(len(flattened_matrices)):
                
                flattened_matrices[i] = self._reverse_zigzag(flattened_matrices[i], 8, 8)

                # now de-quantize the matrix and rescale
                flattened_matrices[i] = (flattened_matrices[i] * self.scaled_quantization_matrix) 
                flattened_matrices[i] = self._idct2(flattened_matrices[i]) + 128
            
            # combine the patches together
            restored_image = self._dechunkify(flattened_matrices, (8*int(np.sqrt(len(flattened_matrices))), 8*int(np.sqrt(len(flattened_matrices)))))
            return restored_image

In [None]:
def calculate_rmse(original_image, decompressed_image):
    # Flatten the images to 1D arrays for easier comparison
    original_image = original_image.astype(np.float64)
    decompressed_image = decompressed_image.astype(np.float64)

    # Compute the squared differences
    squared_diff = (original_image - decompressed_image) ** 2

    # Calculate the mean squared error (MSE)
    mse = np.mean(squared_diff)

    # RMSE is the square root of MSE
    rmse = np.sqrt(mse)
    
    return rmse

def get_image_size_in_bits(image_path):
    # Get image size in bytes
    size_in_bytes = os.path.getsize(image_path)
    # Convert bytes to bits
    return size_in_bytes * 8

def calculate_compression_rate(original_path, compressed_data_path):
    """Calculate compression rate as size of compressed data vs original size."""
    return get_image_size_in_bits(original_path) / get_image_size_in_bits(compressed_data_path)

def calculate_bpp(compressed_image_path, image_shape):
    # Get the compressed file size in bits (1 byte = 8 bits)
    file_size_in_bits = os.path.getsize(compressed_image_path) * 8

    # Calculate the total number of pixels in the image
    num_pixels = image_shape[0] * image_shape[1]

    # Compute BPP
    bpp = file_size_in_bits / num_pixels

    return bpp

In [None]:
import os
import cv2
import matplotlib.pyplot as plt
import numpy as np

# Parameters
quality_factors = range(10, 101, 10)  # Quality factors from 10 to 100 in steps of 5
images_directory = "data/"  # Directory with at least 20 images
output_directory = "compressed_images_bin/"  # Directory to save compressed images
graphs_directory = "pca_comparison/"
compressed_images_directory = "compressed_images/"

# Process images
image_files = [os.path.join(images_directory, f) for f in os.listdir(images_directory)]

# Loop through each image and process
i=0
for image_path in image_files:
    print(i)
    i+=1
    rmse_values_pca = []
    bpp_values_pca = []
    rmse_values_custom = []
    bpp_values_custom = []
    compression_rate_pca = []
    compression_rate_custom = []
    
    original_image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

    for q in quality_factors:
        
        # Custom JPEG Compression (assuming custom JPEG class is defined)
        jpeg = JPEG(Q=q)  # Assuming your custom JPEG implementation is called 'JPEG'
        
        compressed_path_custom = os.path.join(output_directory, f"custom_compressed_q{q}_{os.path.splitext(os.path.basename(image_path))[0]}")
        jpeg.compress(original_image, compressed_path_custom)  
        decompressed_image_custom = jpeg.decompress(compressed_path_custom)  # Decompress using custom method

        decompressed_image_path_custom = os.path.join(compressed_images_directory, f"custom_compressed_q{q}_{os.path.splitext(os.path.basename(image_path))[0]}.jpg")

        # Save the decompressed custom image as a .jpg file
        cv2.imwrite(decompressed_image_path_custom, decompressed_image_custom)

        # Check for shape mismatch
        if original_image.shape != decompressed_image_custom.shape:
            print(f"Error in image_path: {image_path}, Quality Factor: {q}")
            continue

        # Calculate RMSE and BPP for custom method
        rmse_custom = calculate_rmse(original_image, decompressed_image_custom)
        bpp_custom = calculate_bpp(compressed_path_custom, original_image.shape)

        # Calculate Compression Rate for custom method
        original_size_custom = os.path.getsize(image_path)
        compressed_size_custom = os.path.getsize(compressed_path_custom)
        compression_rate_custom.append(original_size_custom / compressed_size_custom)
    
        rmse_values_custom.append(rmse_custom)
        bpp_values_custom.append(bpp_custom)

    for k in range(1, 251, 25):
            # PCA Compression and Decompression
            pca_1 = PCA(k)
            pca_1.fit(original_image)
            pca_1.compress(original_image)
            decompressed_image_pca = pca_1.decompress("compression_output.npy")
            rmse_values_pca.append(calculate_rmse(original_image, decompressed_image_pca))
            compression_rate_pca.append(calculate_compression_rate(image_path, "compression_output.npy"))
            bpp_values_pca.append(calculate_bpp("compression_output.npy", original_image.shape))


    # Plot the comparison of RMSE vs BPP for both methods (OpenCV vs Custom JPEG)
    plt.figure(figsize=(8, 6))
    plt.plot(bpp_values_pca, rmse_values_pca, marker='o', linestyle='-', label='PCA', color='blue')
    plt.plot(bpp_values_custom, rmse_values_custom, marker='o', linestyle='-', label='Custom JPEG', color='red')
    plt.xlabel('Bits Per Pixel (BPP)')
    plt.ylabel('Root Mean Squared Error (RMSE)')
    plt.title(f'RMSE vs BPP for Image {os.path.splitext(os.path.basename(image_path))[0]}')
    plt.grid(True)
    plt.legend()
    plt.tight_layout()

    # Save the comparison plot
    plot_filename = os.path.join(graphs_directory, f"rmse_vs_bpp_comparison_{os.path.splitext(os.path.basename(image_path))[0]}.png")
    plt.savefig(plot_filename)
    plt.close()  # Close the figure to avoid overlapping plots

    # Plot Compression Rate vs k for PCA
    plt.figure(figsize=(8, 6))
    plt.plot(range(1, 501, 50), compression_rate_pca, marker='o', linestyle='-', label='PCA', color='blue')
    plt.xlabel('k')
    plt.ylabel('Compression Rate')
    plt.title(f'Compression Rate vs k for Image {os.path.splitext(os.path.basename(image_path))[0]}')
    plt.grid(True)
    plt.legend()
    plt.tight_layout()

    # Save the compression rate plot
    rate_plot_filename = os.path.join(graphs_directory, f"compression_rate_vs_k_{os.path.splitext(os.path.basename(image_path))[0]}.png")
    plt.savefig(rate_plot_filename)
    plt.close()  # Close the figure to avoid overlapping plots
