In [9]:
#############################
### Amit Kumar Singh Yadav ##
#############################
# Project 2: Vector Quantization and Discrete Cosine Transform
# Part 1 - Vector Quantization

"""Write a program to implement vector quantization on a gray-scale image using a "vector"
that consists of a 4x4 block of pixels. Design your codebook using all the blocks in the image
as training data, using the Generalized Lloyd algorithm. Then quantize the image using your codebook.
Explore the impact of different codebook sizes,for example, L=128 and L=256. 
Next, train your codebook on a collection of 10 images, and quantize your original image using the new codebook.
Compare your results on the new codebook to your previous results, and explain any differences

Need to test method on at least two different images.
"""

###############################
### Importing the packages ####
###############################
import numpy as np
import math
from collections import defaultdict
import lbg
import cv2

###############################
### Defining Helper functions #
###############################

#########################################################################      
# helper function to get MSE
#########################################################################
def getMSE(input1,input2):
    return(np.sum((np.array(input1) - np.array(input2))**2))

#########################################################################
# function to calculate PSNR between two images
# taken as it is from Project #1
#########################################################################
def getPSNR(targetImg:np.ndarray, predictedImg:np.ndarray) -> float:
        # Mean Square Error zero means no noise
        # hence PSNR will explode
        # PSRN is not good measure for identical images
        mean_square_error = np.mean((predictedImg - targetImg) ** 2)
        if(mean_square_error == 0):
            print("images are identical")
            return 100.0 # for simplicity
        # note it is in db
        return 20 * math.log10(255.0 / math.sqrt(mean_square_error))

#######################################################################    
# helper function to find error of a vector from all the training data
#######################################################################
def getError(vector, training_data):
    return np.sum( (np.array(vector)-np.array(training_data)) **2/ len(training_data))

#########################################################################
# function to prepocess each image 
# convert to grayscale and then resize it to 512 x 512 by default
#########################################################################   
def preprocess(img_to_compress_path, all_images_path, size = (512,512)):
    # read the image in grayscale
    img_to_compress = cv2.imread(img_to_compress_path, cv2.IMREAD_GRAYSCALE)
    # we will resize th image to 512 x 512
    # let's resize
    img_to_compress = cv2.resize(img_to_compress,size, interpolation = cv2.INTER_AREA)
    # let's save it as orginal image to quantize to PSNR calculation
    cv2.imwrite("original_grayscale.png",img_to_compress)
    
    all_images_list = [] #to store grey-scale imgs
    for path in all_images_path:
        tmp = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
        
        all_images_list.append(cv2.resize(tmp,size, interpolation = cv2.INTER_AREA))
    return img_to_compress, all_images_list

   
###########################################################################
# helper function to extract vectors from give set of images by default 4x4
###########################################################################
def GetVector(train_resized_img_list, size=(4,4)):
    train_vec = []
    for train_img in train_resized_img_list:
        for i in range(0, train_img.shape[0], size[0]):
            for j in range(0, train_img.shape[1], size[1]):
                train_vec.append(train_img[i:i+size[0], j:j+size[1]].reshape((size[0]*size[1])))
    train_vec = np.array(train_vec)
    return train_vec

#########################################################################
# helper function to find Nearest point when encoding
#########################################################################
def getNearestCode(point_2_encode, codebook):
    tmp_code = np.zeros((codebook.shape[0],))
    for i in range(0, codebook.shape[0]):
        tmp_code[i] = np.mean((np.subtract(point_2_encode,codebook[i])**2))
    return np.argmin(tmp_code,axis=0)


###########################################################################
# helper function to create codebook from given vectors
###########################################################################
def GetCodebook(train_vec,codebook_size):
    # initial codebook
    codebook = []
    initial_code = []
    # let's get mean 16-dimensional vector 
    # this can be the best codebook of size 1 :>
    
    mean_vec = train_vec / len(train_vec)
    # # sum of all the mean vectors 
    sum_vec = np.sum(mean_vec, axis=0)
    # intial code will be 
    initial_code = sum_vec.tolist()
    codebook.append(initial_code)
    # get error
    error = getError(initial_code,train_vec)
    # generate codebook of given size
    # now we can keep updating codebook by adding more and more codes in it
    # unless the size is equal to given codebook_size
    while len(codebook) < codebook_size:
        codebook = extendCodebook(train_vec,codebook, 0.05, error)
    return np.array(codebook)
           
#####################################################################
# helper function to extend the codebook
#####################################################################
def extendCodebook(training_vec, start_codebook,threshold,initial_error):
    # training_vec
    # start_codebook
    # threshold 
    # initial_error
    
    # let's define empty extended code book
    extended_codebook =[]
    for code in start_codebook: 
        extended_codebook.extend(((np.array(code) * (1.0 + threshold)).tolist(),
                                  (np.array(code) * (1.0 - threshold)).tolist()))

    # k-means using LBG
    avg_error = 0
    
    # any value higher than initial threshold is good
    error = threshold + 1 
    itern = 0
    while error > threshold:
        # loop unless means/centroid are within a limit
        
        # let's intialize a nearest neighbour for all the training examples
        nearest_vec = [None] * len(training_vec) 
        # this will be a dictionary that maps each training vec to a code in codebook 
        dict_vec = defaultdict(list) #a dictionary to map vec to codebook values
        # this dicitonary will just map indices
        dict_id = defaultdict(list) #map ideces
        # for each training vector
        for train_indx,train_v in enumerate(training_vec):
            # initialized initial minimum error as None
            min_error = None
            # nearest_code_id is set to None, we don't know yet
            nearest_code_id = None
            # now for current extended codebook, we need to find error of each of the training vector
            # from it
            
            for codebook_indx,codebook_code in enumerate(extended_codebook):
                # get MSE error for training vec from each code iteratively
                current_error = getMSE(train_v,codebook_code)
                # save code with minimum error 
                if min_error is None or current_error < min_error:
                    min_error = current_error
                    nearest_vec[train_indx] = codebook_code
                    nearest_code_id = codebook_indx
            # at end of it, for given training vector we have nearest code indx and code value
            # save it in the dictionary
            dict_vec[nearest_code_id].append(train_v)
            dict_id[nearest_code_id].append(train_indx)

        # Now we have got clusteres
        # let update codebook
        for code_indx in range(len(extended_codebook)):
            # all training vectors in current code_indx cluster
            train_vecs = dict_vec.get(code_indx) or []
            num_vecs = len(train_vecs)
            if num_vecs > 0:
                # let's find new mean/center
                
                tmp_sum = np.sum(np.array(train_vecs)/len(train_vecs), axis = 0)
                new_mean = tmp_sum.tolist()
                
                # save the updated code in the extended_codebook
                extended_codebook[code_indx] = new_mean
                # update nearest for all the training examples
                for indx in dict_id[code_indx]:
                    nearest_vec[indx] = new_mean

        # re-calculate error
        prev_error = avg_error if avg_error > 0 else initial_error
        avg_error = getError(nearest_vec,training_vec)
        error = (prev_error - avg_error) / prev_error

    return extended_codebook
            
                

#########################################################################
# Helper function to VQ Encode an image 
#########################################################################
def VQEncoder(img_2_encode, codebook, size=(4,4)):
    # encoded img 
    # loop and find index of nearest code
    encoded_img = np.zeros((img_2_encode.shape[0] // size[0], img_2_encode.shape[1] // size[1]))
    row = 0
    for i in range(0,img_2_encode.shape[0],size[0]):
        col = 0
        for j in range(0,img_2_encode.shape[1],size[1]):
            vector_2_encode = img_2_encode[i:i+4,j:j+4].reshape((size[0]*size[1])).copy()
            encoded_img[row,col] = getNearestCode(vector_2_encode,codebook)
            col += 1
        row += 1
    return encoded_img
            
#########################################################################
# Helper function to VQ Decode an image
#########################################################################
def VQDecoder(compressed_img, codebook, size=(4,4)):
    decoded_img = np.zeros((compressed_img.shape[0] * size[0], compressed_img.shape[1] * size[1]))
    row = 0
    for i in range(0,compressed_img.shape[0]):
        col = 0
        for j in range(0, compressed_img.shape[1]):
            decoded_img[row:row+size[0],col:col+size[1]] = codebook[int(compressed_img[i,j])].reshape((size[0],size[1]))
            col += size[1]
        row += size[0]
    return decoded_img

    
########################################################################
# code to preprocess all the images, extract 4x4 training data from 
# all the images
# create codebook using it
# compress using the codebook
# decompress using the codebook
# getPSNR value for evaluating quality of reconstructed image
########################################################################
def VectorQuantization(img_to_compress_path, all_images_path, codebook_size)->bool:
    img_to_compress, all_train_img  = preprocess(img_to_compress_path,all_images_path)
    num_of_train_img = len(all_images_path) 
    # let's generate vector from all the training images
    training_vec = GetVector(all_train_img)
    # using these training vectors, let's generate the code book
    print('wait: creating codebook!')
#     codebook = lbg.generate_codebook(training_vec, codebook_size)
# the generate codebook package was downloaded from here, it works good for 2-D data but 
# is very slow of more than two dimensional data.
    codebook = GetCodebook(training_vec, codebook_size)
    # we have the codebook, now let's encode the given image
    print('compressing/encdoing the given image')
    encoded_img = VQEncoder(img_to_compress, codebook)
    print('decoding')
    decoded_img = VQDecoder(encoded_img, codebook)
    cv2.imwrite("recons_with_s_" + str(codebook_size) + "train_s_" + str(num_of_train_img)+".png",decoded_img)
    print("PSNR of Quantized Reconstructed Image w.r.t Original Image at Size=", codebook_size, ": ", getPSNR(img_to_compress,decoded_img))
    return True
        


In [15]:
codebook_size_list = [32, 64,128,256]

img_to_compress_path = 'barbara.png'
# same_image_list = ['barbara.png'] 
# for codebook_size in codebook_size_list:
#     run_success = VectorQuantization(img_to_compress_path, same_image_list, codebook_size)
    
    
different_10_img_list = [
    'pool.png', 'tulips.png', 'watch.png','mountain.png', 'zelda.png', 
    'arctichare.png','baboon.png', 'monarch.png', 'cat.png', 'airplane.png']

for codebook_size in codebook_size_list:
    run_success = VectorQuantization(img_to_compress_path, different_10_img_list, codebook_size)
    

wait: creating codebook!
compressing/encdoing the given image
decoding
PSNR of Quantized Reconstructed Image w.r.t Original Image at Size= 32 :  22.97466333306286
wait: creating codebook!
compressing/encdoing the given image
decoding
PSNR of Quantized Reconstructed Image w.r.t Original Image at Size= 64 :  23.691524497967173
wait: creating codebook!
compressing/encdoing the given image
decoding
PSNR of Quantized Reconstructed Image w.r.t Original Image at Size= 128 :  24.048097321036725
wait: creating codebook!
compressing/encdoing the given image
decoding
PSNR of Quantized Reconstructed Image w.r.t Original Image at Size= 256 :  24.373858565347177


In [14]:
#############################
### Amit Kumar Singh Yadav ##
#############################
# Project 2: Vector Quantization and Discrete Cosine Transform
# Part 2 - Discrete Cosine Transform

"""
Write a program that examines the effect of approximating an image with a partial set of DCT coefficients.
Using an 8×8 DCT, reconstruct the image with K<64 coefficients, when K=2,4,8,16, and 32. 
How many coefficients are necessary to provide a "satisfactory" reconstruction?
Define how you characterize "satisfactory" reconstruction.
Need to test method on at least two different images.

"""

#############################
### Helper functions ##
#############################
import math
import numpy as np
from numpy import *
import cv2 # for reading/writing the image

##########################################################
# function to get DCT Matrix
##########################################################
def getDCTMatrix(size=(8,8)):
    # create empty dct matrix 
    # we will fill it
    matrix = np.zeros(size)
    # let's fill element inside the DCT matrix
    # value for u = 0
    for i in range(size[0]):
        matrix[0,i] = sqrt(2.0/8) / sqrt(2.0)
    # value for remaining elements i.e. u!=0
    for u in range(1,size[0]):
        for v in range(size[1]):
            matrix[u,v] = sqrt(2.0/8) * cos((pi/8) * u * (v + 0.5))
    return matrix

##########################################################    
# helper function to calculateDCT
# input will be block of image (8x8)
# and matrix for getting DCT
##########################################################
def calculateDCT(block,matrix):
    return np.dot(np.dot(matrix,block),matrix.transpose())

##########################################################
# helper function to calculateInvDCT
# input will be block of image (8x8)
# and matrix for getting IDCT, i.e., transpose(DCT)
##########################################################
def calculateInvDCT(block,matrix):
    # we know that InvDCT = transpose of DCT
    return np.dot(np.dot(matrix.transpose(),block),matrix)

##########################################################
# helper function to reconstruct the image using K elements
# input will be K, full_dct_block
##########################################################
# this code is made with help from 
# https://github.com/tesfagabir/Digital-Image-Processing/blob/master/10-Implementing-JPEG-Data-Compression-in-Python.ipynb
# there can be an optimized version of it
def getFirstKCoef(full_dct_block,K):
    full_dct_block = np.array(full_dct_block)
    # size of first_k_dct_block will be 
    # same but some values will be zero
    first_k_dct_block = np.zeros(full_dct_block.shape)
    
    # we will go in zigzag order in order to get first K coefficients
    (i,j) = (0,0)
    # initial start direction 
    direction = 'right'
    
    # in total there are 64 coeffiecient, we need first K from them
    for k in range(K):
        first_k_dct_block[i][j] = full_dct_block[i][j]
        # if next step is to go in right just increase j 
        if direction == 'right':
            j += 1
            # if it's last row elements, we can't go down, just go up right
            if i == full_dct_block.shape[0] - 1:
                direction = 'up_right'
            else:
                # else keep going down left
                direction = 'down_left'
         # if last step was down_left then take that value
        elif direction == 'down_left':
            i += 1
            j -= 1
            # if it's last row element go to right element
            if i == full_dct_block.shape[0] - 1:
                direction = 'right'
            elif j == 0:
                # else keep going down
                direction = 'down'
        # if it was from down then either go up right if it's first column
        # otherwise go down_left
        elif direction == 'down':
            i += 1
            if j == 0:
                direction = 'up_right'
            else:
                direction = 'down_left'
        # if last step was up_right and u reach last column then just go down
        # else go right
        elif direction == 'up_right':
            i -= 1
            j += 1
            if j == full_dct_block.shape[1] - 1:
                direction = 'down'
            elif i == 0:
                direction = 'right'

    return first_k_dct_block
        

# function to calculate PSNR between two images
# taken as it is from Project #1
def getPSNR(targetImg:np.ndarray, predictedImg:np.ndarray) -> float:
        # Mean Square Error zero means no noise
        # hence PSNR will explode
        # PSRN is not good measure for identical images
        mean_square_error = np.mean((predictedImg - targetImg) ** 2)
        if(mean_square_error == 0):
            print("images are identical")
            return 100.0 # for simplicity
        # note it is in db
        return 20 * math.log10(255.0 / math.sqrt(mean_square_error))

In [13]:
########################################
########## Initialization ##############
########################################
K_list = [2,4,8,16,32]
for K in K_list:
    resize_dim = (512,512)
    img_path = "./boat.png"

    ########################################
    ########## Read Image and Resize #######
    ########################################
    img = cv2.imread(img_path,cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img,resize_dim, interpolation = cv2.INTER_AREA)

    ###################################
    ########## Apply DCT ##############
    ###################################
    h, w = [512,512]
    size=(8,8) # block size
    dct_img = np.zeros((h,w))
    recon_img = np.zeros((h,w))
    dct_img_till_k = np.zeros((h,w))
    dct_block = getDCTMatrix(size)
    # take dct of image and make another image keeping only first K coefficients
    # Encoder gives dct_img_till_k  (to compress it remove some DCT)
    for i in range(0,h,size[0]):
        for j in range(0,w,size[1]):
            # extract block of give size and do DCT on it
            img_block = img[i:i+size[0], j:j+size[1]]
            dct_img_block = calculateDCT(img_block,dct_block)
            # copy to main image
            dct_img[i:i+size[0],j:j+size[1]] = np.copy(dct_img_block)
            dct_img_till_k[i:i+size[0],j:j+size[1]] = getFirstKCoef(dct_img_block,K)

    #inverse dct for each 8x8 block
    for i in range(0,h,size[0]):
        for j in range(0,w,size[1]):
            img_block = dct_img_till_k[i:i+size[0], j:j+size[1]]
            recon_blk = calculateInvDCT(img_block,dct_block)
            recon_img[i:i+size[0],j:j+size[1]] = np.copy(recon_blk)


    #save reconstructed image
    cv2.imwrite("recons_img_with_K_" + str(K) + ".png",recon_img)
    #calculate psnr
    psnr_val = getPSNR(img,recon_img)
    print("PSNR of Reconstructed Image w.r.t Original Image at K:", K, "= ", psnr_val)

    # Now let's see what happens if we even quantize the image
    quantized_img = np.zeros((512,512))
    quantized_dct_img = np.zeros((512,512))
    dequantized_dct_img = np.zeros((512,512))
    dequantized_dct_img_till_k = np.zeros((512,512))
    quantized_recons_img = np.zeros((512,512))

    # quantization coeeficients from 8x8 DCT block from Lecture Slide
    quantization_coeff = [
            [16, 11, 10, 16, 24, 40, 51, 61],
            [12, 12, 14, 19, 26, 58, 60, 55],
            [14, 13, 16, 24, 40, 57, 69, 56],
            [14, 17, 22, 29, 51, 87, 80, 62],
            [18, 22, 37, 56, 68, 109, 103, 77],
            [24, 35, 55, 64, 81, 104, 113, 92],
            [49, 64, 78, 87, 103, 121, 120, 101],
            [72, 92, 95, 98, 112, 100, 103, 99]
                        ]

    # step will be: first convert img to dct -> quantize the DCT
    # dequantized the DCT image -> do IDCT -> get image

    # we have DCT block and dct_img from previous step, just do quantization
    for i in range(0,h,size[0]):
        for j in range(0,w,size[1]):
            quantized_dct_img[i:i+size[0],j:j+size[1]] = np.around(np.divide(dct_img[i:i+size[0],j:j+size[1]],quantization_coeff))

    # for code simplicity, we will not get first K from it after quantization
    # we will just dequantize and then take the first K
    # this help us to short the code
    # but not it is not pragmatic
    # as image changed to DCT -> extract first K and then quantize is optimal case


    # let's do de-quantization
    for i in range(0,h,size[0]):
        for j in range(0,w,size[1]):
            dequantized_dct_img[i:i+size[0],j:j+size[1]] = np.around(np.multiply(quantized_dct_img[i:i+size[0],j:j+size[1]],quantization_coeff))

    # now we have dequantized the whole image, let's get the first K DCT coefficients
    for i in range(0,h,size[0]):
        for j in range(0,w,size[1]):
            dct_img_block = dequantized_dct_img[i:i+8, j:j+8]
            dequantized_dct_img_till_k[i:i+size[0],j:j+size[1]] = getFirstKCoef(dct_img_block,K)

    # now inverse DCT to get quantized_recons_img
    #inverse dct for each 8x8 block
    for i in range(0,h,size[0]):
        for j in range(0,w,size[1]):
            img_block = dequantized_dct_img_till_k[i:i+size[0], j:j+size[1]]
            recon_blk = calculateInvDCT(img_block,dct_block)
            quantized_recons_img[i:i+size[0],j:j+size[1]] = np.copy(recon_blk)


    #save reconstructed image
    cv2.imwrite("quantized_recons_img_with_K_" + str(K) + ".png",quantized_recons_img)
    #calculate psnr
    psnr_val = getPSNR(img,quantized_recons_img)
    print("PSNR of Quantized Reconstructed Image w.r.t Original Image at K:", K, "= ", psnr_val)


PSNR of Reconstructed Image w.r.t Original Image at K: 2 =  23.104735234067803
PSNR of Quantized Reconstructed Image w.r.t Original Image at K: 2 =  23.09814919994263
PSNR of Reconstructed Image w.r.t Original Image at K: 4 =  25.267811658539397
PSNR of Quantized Reconstructed Image w.r.t Original Image at K: 4 =  25.247560977066094
PSNR of Reconstructed Image w.r.t Original Image at K: 8 =  28.16735839920744
PSNR of Quantized Reconstructed Image w.r.t Original Image at K: 8 =  28.092216566580706
PSNR of Reconstructed Image w.r.t Original Image at K: 16 =  31.465280191747603
PSNR of Quantized Reconstructed Image w.r.t Original Image at K: 16 =  31.029886100428904
PSNR of Reconstructed Image w.r.t Original Image at K: 32 =  36.56856620282969
PSNR of Quantized Reconstructed Image w.r.t Original Image at K: 32 =  33.27543315813593
