In [1]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import json
from scipy.fftpack import dct, idct
from math import log10, sqrt 

# global Quantization Parameter to be used for compression (higher values increase compression)
# Expects QP from 0 - 50
QP_value = 1

########################################################
################### Helper functions ###################
########################################################

# implement 2D DCT
def dct2(a):
    return dct(dct(a.T, norm='ortho').T, norm='ortho')

# implement 2D IDCT
def idct2(a):
    return idct(idct(a.T, norm='ortho').T, norm='ortho')

def get_dim(QP_value):
    resizing_factor = 100 - QP_value * 1.8
    # Ensure the resizing factor is between 10 and 100
    resizing_factor = max(min(resizing_factor, 100), 10)

    width = int(image.shape[1] * resizing_factor / 100)
    height = int(image.shape[0] * resizing_factor / 100)
    return (width, height)

def padArray(arr):
    height, width = arr.shape
    new_height = (height + 7) // 8 * 8
    new_width = (width + 7) // 8 * 8
    padded_array = np.zeros((new_height, new_width), dtype=arr.dtype)
    padded_array[:height, :width] = arr # copy values

    # Pad the right side with values from the last column
    padded_array[:height, width:new_width] = np.tile(arr[:, -1], (1, new_width - width))
    # Pad the bottom with values from the last row
    padded_array[height:new_height, :] = np.tile(arr[-1, :], (new_height - height, 1))
    return padded_array

# Creating tree nodes
class NodeTree(object):

    def __init__(self, left=None, right=None):
        self.left = left
        self.right = right

    def children(self):
        return (self.left, self.right)

    def nodes(self):
        return (self.left, self.right)

    def __str__(self):
        return '%s_%s' % (self.left, self.right)

# Main function implementing huffman coding
def huffman_code_tree(node, left=True, binString=''):
    if type(node) is np.float64:
        return {node: binString}
    (l, r) = node.children()
    d = dict()
    d.update(huffman_code_tree(l, True, binString + '0'))
    d.update(huffman_code_tree(r, False, binString + '1'))
    return d

def encode_huffman(arr):
  # Calculating frequency
  input = arr.flatten()
  freq = {}
  for c in input:
      if c in freq:
          freq[c] += 1
      else:
          freq[c] = 1

  freq = sorted(freq.items(), key=lambda x: x[1], reverse=True)

  nodes = freq

  while len(nodes) > 1:
      (key1, c1) = nodes[-1]
      (key2, c2) = nodes[-2]
      nodes = nodes[:-2]
      node = NodeTree(key1, key2)
      nodes.append((node, c1 + c2))

      nodes = sorted(nodes, key=lambda x: x[1], reverse=True)

  # create huffman code tree
  huffmanCode = huffman_code_tree(nodes[0][0])

  #encode the array
  encode = ""
  for val in input:
    encode += huffmanCode[val]
  return encode, huffmanCode

# splits the image into 8x8 blocks and applies DCT to each block
# the blocks are then compressed (depending on QP some values are set to 0)
# then each block is huffman encoded and stored into a binary string
def applyDCT(img):
  imgDCT = np.zeros((img.shape[0], img.shape[1]), dtype=np.double)
  encodedBinaryImg = ""
  huffmanCodes = []
  for i in range(img.shape[0]//8):
    for j in range(img.shape[1]//8):
      imgBlock = img[i*8:i*8+8, j*8:j*8+8]
      imgBlockDCT = dct2(imgBlock) # apply DCT

      imgBlockDCT = np.round(imgBlockDCT) # round values to full integers
      threshold = int(round(8 - QP_value * 7 / 50.0)) # QP (lossy) compression
      threshold = max(min(threshold, 8), 1)
      imgBlockDCT[threshold:, threshold:] = 0
      #imgBlockDCT[2:8, 2:8] = 0
      binary, huffmanCode = encode_huffman(imgBlockDCT)
      encodedBinaryImg += binary
      huffmanCodes.append(huffmanCode)

      imgDCT[i*8:i*8+8, j*8:j*8+8] = imgBlockDCT
  
  return imgDCT, encodedBinaryImg, huffmanCodes

# decodes the binary string and restores the original 8x8 DCT block
def decode_huffman(binary, huffCode):
  # decode 1 block at a time
  imgBlockDCT = np.zeros((64), dtype=np.double)
  currentStr = ""
  pixel = 0
  i = 0
  for char in binary:
    if pixel >= 64:
      break
    currentStr += char
    i += 1

    for char, code in huffCode.items():
      if code == currentStr:
        imgBlockDCT[pixel] = float(char)
        currentStr = ""
        pixel +=1

  return imgBlockDCT.reshape(8, 8), binary[i:]

def PSNR(original, compressed): 
    mse = np.mean((original - compressed) ** 2) 
    if(mse == 0):  # MSE is zero means no noise is present in the signal . 
                  # Therefore PSNR have no importance. 
        return 100
    max_pixel = 255.0
    psnr = 20 * log10(max_pixel / sqrt(mse)) 
    return psnr 

####################################################################
################## Start of main function code #####################
################## Encoder part of the code    #####################
####################################################################

# Read raw input image
imageRaw = cv2.imread('img.png')

# convert to correct  color space
image = cv2.cvtColor(imageRaw, cv2.COLOR_BGR2YUV)

# split into y u v channels
y, u, v = cv2.split(image)
originalSize = y.shape

# downsample chrominance channels based on QP input (lossy compression)
# QP of 50 scales them to 10% size, QP 0f 0 keeps them at 100%
ds_u = cv2.resize(u, get_dim(QP_value), interpolation = cv2.INTER_AREA)
ds_v = cv2.resize(v, get_dim(QP_value), interpolation = cv2.INTER_AREA)

# size them back up to the same size of Y channel
ds_u = cv2.resize(ds_u, (originalSize[1], originalSize[0]), interpolation=cv2.INTER_AREA)
ds_v = cv2.resize(ds_v, (originalSize[1], originalSize[0]), interpolation=cv2.INTER_AREA)

# pad images to multiples of 8
paddedY = padArray(y)
paddedU = padArray(ds_u)
paddedV = padArray(ds_v)

# compressed output and huffman codes for each block
encodedBinaryImg = ""
huffmanCodes = []

# apply the DCT to each channel and store them
imgDCTY, binaryString, huffmanCode = applyDCT(paddedY)
encodedBinaryImg += binaryString
huffmanCodes += huffmanCode

imgDCTU, binaryString, huffmanCode = applyDCT(paddedU)
encodedBinaryImg += binaryString
huffmanCodes += huffmanCode

imgDCTV, binaryString, huffmanCode = applyDCT(paddedV)
encodedBinaryImg += binaryString
huffmanCodes += huffmanCode

# store the compressed "jpg" as file (its not a real jpg..)
# note: this compressed version does not include the huffman code tree 
#       the resulting filesize would be much bigger than the input image..
file_name = "out.bin"
byte_data = bytes(int(encodedBinaryImg[i:i+8], 2) for i in range(0, len(encodedBinaryImg), 8))
with open(file_name, 'wb') as f:
    f.write(byte_data)



##########################################################
############### Decoder part of the code #################
##########################################################
    
# now the file can be read and restored back to a PNG
# read binary data
with open("out.bin", 'rb') as jpgFile:
    byte_data = jpgFile.read()
# Convert the file back to a binary string
binary_data = ''.join(format(byte, '08b') for byte in byte_data)

# arrays to recreate image
imgCompPaddedY = np.zeros((paddedY.shape[0], paddedY.shape[1]), dtype=np.uint8)
imgCompPaddedU = np.zeros((paddedU.shape[0], paddedU.shape[1]), dtype=np.uint8)
imgCompPaddedV = np.zeros((paddedV.shape[0], paddedV.shape[1]), dtype=np.uint8)

# Decode each 8x8 Block back into an image
# do this for each component of the image (Y,U,V)
decodedBlock = 0
for i in range(imgDCTY.shape[0]//8):
  for j in range(imgDCTY.shape[1]//8):
    imgBlockDCT, binary_data = decode_huffman(binary_data, huffmanCodes[decodedBlock])
    decodedBlock += 1
    imgBlock = idct2(imgBlockDCT)
    imgBlock[imgBlock < 0] = 0
    imgBlock[imgBlock > 255] = 255

    imgCompPaddedY[i*8:i*8+8, j*8:j*8+8] = imgBlock

for i in range(imgDCTU.shape[0]//8):
  for j in range(imgDCTU.shape[1]//8):
    imgBlockDCT, binary_data = decode_huffman(binary_data, huffmanCodes[decodedBlock])
    decodedBlock += 1
    imgBlock = idct2(imgBlockDCT)
    imgBlock[imgBlock < 0] = 0
    imgBlock[imgBlock > 255] = 255

    imgCompPaddedU[i*8:i*8+8, j*8:j*8+8] = imgBlock

for i in range(imgDCTV.shape[0]//8):
  for j in range(imgDCTV.shape[1]//8):
    imgBlockDCT, binary_data = decode_huffman(binary_data, huffmanCodes[decodedBlock])
    decodedBlock += 1
    imgBlock = idct2(imgBlockDCT)
    imgBlock[imgBlock < 0] = 0
    imgBlock[imgBlock > 255] = 255

    imgCompPaddedV[i*8:i*8+8, j*8:j*8+8] = imgBlock

# restore it back to the correct size (remove padding)
imgCompY = imgCompPaddedY[:originalSize[0], :originalSize[1]]

imgCompU = imgCompPaddedU[:originalSize[0], :originalSize[1]]
imgCompV = imgCompPaddedV[:originalSize[0], :originalSize[1]]

# merge the 3 channels and convert them to BRG color space
merged = cv2.merge([imgCompY, imgCompU, imgCompV])
imageCompressed = cv2.cvtColor(merged, cv2.COLOR_YUV2BGR)

# decoding complete, show the image
cv2.imshow("Input", imageRaw)
cv2.imshow("Compressed Image", imageCompressed)
cv2.imwrite("out.png", imageCompressed)

# calculate PSNR
print("PSNR value: " + str(PSNR(imageRaw, imageCompressed))) 

cv2.waitKey(0)
cv2.destroyAllWindows()



PSNR value: 39.86579449131032
