[![Binder](https://mybinder.org/badge_logo.svg)](https://nbviewer.org/github/Sistemas-Multimedia/Sistemas-Multimedia.github.io/blob/master/contents/gray_SQ/gray_SQ_LloydMax.ipynb)

[![Colab](https://badgen.net/badge/Launch/on%20Google%20Colab/blue?icon=notebook)](https://colab.research.google.com/github/Sistemas-Multimedia/Sistemas-Multimedia.github.io/blob/master/contents/gray_SQ/gray_SQ_LloydMax.ipynb)


# Lloyd-Max Quantization

Use K-means to find the centroid of each bin. See [scikit-learn's Vector Quantization Example](https://scikit-learn.org/stable/auto_examples/cluster/plot_face_compress.html#sphx-glr-auto-examples-cluster-plot-face-compress-py).

Notice that the centroids must be transmitted to the decoder.

In [None]:
!pip install Pygments

In [None]:
%%bash
if [ -d "$HOME/repos" ]; then
    echo "\"$HOME/repos\" exists"
else
    mkdir ~/repos
    echo Created $HOME/repos
fi

In [None]:
%%bash
if [ -d "$HOME/repos/scalar_quantization" ]; then
    cd $HOME/repos/scalar_quantization
    echo "$HOME/repos/scalar_quantization ... "
    git pull 
else
    cd $HOME/repos
    git clone https://github.com/vicente-gonzalez-ruiz/scalar_quantization.git
fi

In [None]:
%%bash
if [ -d "$HOME/repos/MRVC" ]; then
    cd $HOME/repos/MRVC
    echo "$HOME/repos/MRVC ... "
    git pull 
else
    cd $HOME/repos
    git clone https://github.com/Sistemas-Multimedia/MRVC.git
fi

In [None]:
%%bash
if [ -d "$HOME/repos/image_IO" ]; then
    cd $HOME/repos/image_IO
    echo "$HOME/repos/image_IO ... "
    git pull 
else
    cd $HOME/repos
    git clone https://github.com/vicente-gonzalez-ruiz/image_IO.git
fi

In [None]:
%%bash
if [ -d "$HOME/repos/information_theory" ]; then
    cd $HOME/repos/image_IO
    echo "$HOME/repos/information_theory ... "
    git pull 
else
    cd $HOME/repos
    git clone https://github.com/vicente-gonzalez-ruiz/information_theory.git
fi

In [None]:
!ln -sf ~/MRVC/src/logging_config.py .
!ln -sf ~/repos/scalar_quantization/quantization.py .
!ln -sf ~/repos/scalar_quantization/deadzone_quantization.py .
!ln -sf ~/repos/scalar_quantization/LloydMax_quantization.py .
!ln -sf ~/repos/information_theory/distortion.py .
!ln -sf ~/repos/information_theory/information.py .
!ln -sf ~/repos/image_IO/image_1.py .
!ln -sf ~/repos/image_IO/logging_config.py .

In [None]:
try:
    import matplotlib.pyplot as plt
except:
    !pip install matplotlib
    import matplotlib
    import matplotlib.pyplot as plt
    import matplotlib.axes as ax
    #plt.rcParams['text.usetex'] = True
    #plt.rcParams['text.latex.preamble'] = [r'\usepackage{amsmath}'] #for \text command
%matplotlib inline

try:
    import scipy
except:
    !pip install scipy
    
try:
    import cv2
except:
    !pip install opencv-python
    !pip install opencv-python-headless # Binder compatibility
    import cv2

try:
    import skimage
except:
    !pip install scikit-image
    import skimage
    
try:
    from sklearn import cluster
except:
    !pip install sklearn
    from sklearn import cluster

try:
    import colored
except:
    !pip install colored
    import colored

#try:
#    import warnings
#except:
#    !pip install warnings
#    import warnings

import pylab
import math
import numpy as np
from scipy import signal
import cv2
import os
import deadzone_quantization as deadzone
import LloydMax_quantization as quantization
import distortion
#import image_3 as RGB_image
import image_1 as gray_image
import colored
import information
import gzip

## Configuration

In [None]:
home = os.environ["HOME"]
fn = home + "/repos/MRVC/images/lena_bw/"
#fn = home + "/repos/MRVC/images/Hommer_bw/"
!ls -l {fn}

# Quantizer selection
quantizer = quantization.LloydMax_Quantizer

n_clusters = 4  # Number of bins
N_tries = 4  # Number of times K-means is run
#N_bins = range(2, 128, 1)
N_bins = [2, 4, 8, 16, 32, 64, 128] #range(2, 128, 1)
gray_image.write = gray_image.debug_write # faster
#gray_image.write = gray_image.write # higher compression

## Read the image and show it

In [None]:
img = gray_image.read(fn, 0)
gray_image.show(img, fn + "000.png")

## Lloyd-Max quantization using K-means

In [None]:
!pygmentize quantization.py

In [None]:
!pygmentize LloydMax_quantization.py

## Example

In [None]:
QSS = 4 # Quantization Step Size

In [None]:
Q = quantizer(img, Q_step=QSS, min_val=0, max_val=255)
print("decision_levels =", Q.get_decision_levels())
print("representation_levels =", Q.get_representation_levels())

In [None]:
def plot(x, y, xlabel='', ylabel='', title=''):
    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.set_title(title)
    ax.grid()
    ax.xaxis.set_label_text(xlabel)
    ax.yaxis.set_label_text(ylabel)
    ax.plot(x, y)
    plt.show(block=False)

In [None]:
x = np.linspace(0, 255, 500) # Input samples
y, k = Q.quan_dequan(x)
plot(x, y, "Input Sample", "Reconstructed Sample", f"Lloyd-Max Quantizer ({fn}, $\Delta={QSS}$)")

Neither, the decision levels nor the representation levels are equally spaced. This is a direct consequence of the PDF of the input image. Thus, those input ranges where the number of gray-tones are more frequent, the resolution of the quantizer is increased. The representation levels are placed where the MSE is minimized.

In [None]:
y, k = Q.quan_dequan(img)
print("Used quantization indexes:", np.unique(k))
gray_image.show_normalized(k, f"Quantization Indexes ($\\Delta={QSS}$)")
gray_image.show(y, f"Dequantized Image ($\\Delta={QSS}$)")
print("MSE =", distortion.MSE(img, y))
print("SSIM =", distortion.SSIM(img, y))
print("entropy =", information.entropy(k.flatten()))

In [None]:
representation_levels = np.round(Q.get_representation_levels()).astype(np.uint8)
representation_levels

In [None]:
representation_levels[1:]

In [None]:
representation_levels[::-1][0]

In [None]:
np.append(np.diff(representation_levels), representation_levels[::-1][0])

In [None]:
representation_levels = np.round(Q.get_representation_levels()).astype(np.uint8)
representation_levels_DPCM = np.append(np.diff(representation_levels), representation_levels[::-1][0]) # We start at the end!
print(representation_levels)
print(representation_levels_DPCM)
#with gzip.GzipFile("/tmp/representation_levels.npy.gz", "w") as f:
#    np.save(file=f, arr=representation_levels)
#np.save(file="/tmp/representation_levels.npy.gz", arr=representation_levels)

fileobj = open("/tmp/representation_levels.bin", mode='wb')
representation_levels.tofile(fileobj)
print("Bytes needed for the representation levels:", os.path.getsize("/tmp/representation_levels.bin"))

with gzip.GzipFile("/tmp/representation_levels.bin.gz", "w") as f:
    representation_levels.tofile(f)
print("Bytes needed for the representation levels:", os.path.getsize("/tmp/representation_levels.bin.gz"))


## RD curve
We compare two versions:
1. When the centroids are initialized using an scalar quantizer.
2. When the centroids are initialized at random.

In [None]:
def RD_curve_random_init(img, N_bins):
    points = []
    flatten_img = img.reshape((-1, 1))  # flatten
    for n in N_bins:
        #k_means = cluster.KMeans(n_clusters=n, n_init=N_tries)
        k_means = cluster.KMeans(n_clusters=n)
        k_means.fit(flatten_img)
        centroids = k_means.cluster_centers_.squeeze().astype(np.uint8)  # Centroids
        k = k_means.labels_.astype(np.uint8)  # Labels of the centroids
        #y = np.choose(k, centroids)
        #y = centroids[k[range(len(k))]]
        y = centroids[k]
        y.shape = img.shape
        k.shape = img.shape
        print("Quantization indexes:", np.unique(k))
        rate = gray_image.write(k, "/tmp/" + str(n) + '_', 0)*8/(k.shape[0]*k.shape[1])
        with open("/tmp/representation_levels.uint8", mode="wb") as f:
            centroids.tofile(f)
        rate += (os.path.getsize("/tmp/representation_levels.uint8")*8/(k.shape[0]*k.shape[1]))
        #with gzip.GzipFile("/tmp/representation_levels.npy.gz", "w") as f:
        #    np.save(file=f, arr=centroids)
        #rate += (os.path.getsize("/tmp/representation_levels.npy.gz")*8/(k.shape[0]*k.shape[1]))
        _distortion = distortion.RMSE(img, y)
        #if n<16:
        #    plt.imshow(y, cmap=plt.cm.gray, vmin=0, vmax=256)
        #    plt.show()
        gray_image.show_normalized(k, f"Quantization Indexes ($\\Delta={QSS}$)")
        points.append((rate, _distortion))
        print(f"n={n:>3}, rate={rate:>7} bits/pixel, distortion={_distortion:>6.1f}")
    return points
RD_points_random_init = RD_curve_random_init(img, N_bins)

In [None]:
def RD_curve_sorted_labels(img, N_bins):
    points = []
    for n in N_bins:
        Q_step = 256//n
        Q = quantizer(img, Q_step=Q_step, min_val=0, max_val=255)
        y, k = Q.quan_dequan(img)
        print("Quantization indexes:", np.unique(k))
        rate = gray_image.write(k, "/tmp/" + str(n) + '_', 0)*8/(k.shape[0]*k.shape[1])
        with open("/tmp/representation_levels.uint8", mode="wb") as f:
            Q.get_representation_levels().tofile(f)
        rate += (os.path.getsize("/tmp/representation_levels.uint8")*8/(k.shape[0]*k.shape[1]))
        #with gzip.GzipFile("/tmp/representation_levels.npy.gz", "w") as f:
        #    np.save(file=f, arr=Q.get_representation_levels())
        #rate += (os.path.getsize("/tmp/representation_levels.npy.gz")*8/(k.shape[0]*k.shape[1]))
        #_distortion = distortion.RMSE(img, y)
        _distortion = distortion.RMSE(img, np.round(y).astype(np.uint8))
        gray_image.show_normalized(k, f"Quantization Indexes ($\\Delta={QSS}$)")
        #if n<16:
        #    plt.imshow(y, cmap=plt.cm.gray, vmin=0, vmax=256)
        #    plt.show()
        points.append((rate, _distortion))
        print(f"n={n:>3}, rate={rate:>7} bits/pixel, distortion={_distortion:>6.1f}")
    return points

RD_points_sorted_labels = RD_curve_sorted_labels(img, N_bins)

In [None]:
pylab.figure(dpi=150)
pylab.plot(*zip(*RD_points_sorted_labels), c='m', marker='x', label="Using sorted labels", linestyle="dotted")
pylab.plot(*zip(*RD_points_random_init), c='b', marker='x', label="Using unsorted labels", linestyle="dotted")
pylab.title(f"Rate/Distortion Performance")
pylab.xlabel("Bits/Pixel")
pylab.ylabel("RMSE")
pylab.legend(loc='upper right')
pylab.show()

It is more efficient in terms of rate (and faster, because K-Means runs only one time) to initialize the centroids (notice that this disables the random selection of the initial centroids and therefore, it converts to K-means in a deterministic clustering algorithm).

In [None]:
with open(f"LloydMax_RD_points.txt", 'w') as f:
    for item in RD_points_sorted_labels:
        f.write(f"{item[0]}\t{item[1]}\n")

## What happens if we increase the granuality?
Let's see the effect of using a finer quantization step (size).

In [None]:
N_bins = [i for i in range(2, 128, 1)]
print(N_bins)

In [None]:
def _(a, cmap, vmin, vmax):
    pass
plt.show = print
plt.imshow = _
RD_points_finer = RD_curve_sorted_labels(img, N_bins)

In [None]:
pylab.figure(dpi=150)
pylab.plot(*zip(*RD_points_sorted_labels), c='m', marker='x', label=f"Using powers of 2", linestyle="dotted")
pylab.plot(*zip(*RD_points_finer), c='g', marker='.', label=f"Using more quantization steps", linestyle="dotted")
pylab.title(fn)
pylab.xlabel("Bits/Pixel")
pylab.ylabel("RMSE")
pylab.legend(loc='upper right')
pylab.show()

As it can be seen, the quantization steps that are powers of 2 describe reasonably well the convex hull of the RD curve.

## RDO (Rate Distortion Optimization)

So far, the design of the Lloyd-Max quantizer has been independent of the rate (only the distortion has been minimized). However, the final RD performance of the encoding system depends on both opposed features (rate and distortion), generating a tradeoff that can be optimized under certain conditions.

For example, let's imagine that the image is quantized by tiles. Thus, in each tile, a "local" Lloyd-Max quantizer can be used, potentially decreasing the distortion (the PDF of a tile is more accurate describing the counts of the gray-scale values of the tile, than the PDF of the whole image). Besides, each tile requires at least a slighly different set of representation levels, aspect that will increase the rate (the representation levels must be transmitted in the code-stream). Supposing that all the tiles have the same size, which tiling-configuration (tile size) is the most RD-efficient?

The following code determines the optimal tile size (in a linear and supposedly convex search space spawned by the side of the squared tiles, that are powers of two) for different RD points. To determine the RD performance in a point $i$, we will use the Lagrangian:
$$
\mathbf{J}_i = \mathbf{D}_i + \mathbf{\lambda}_i \mathbf{R}_i,
$$
where $\mathbf{D}$ stands for distortion and $\mathbf{R}$ for (bit-)rate. A $\mathbf{\lambda}_i$ is known as a Lagrangian Multipler that controls the relative importance between $\mathbf{D}$ and $\mathbf{R}$. For example, if $\mathbf{\lambda}_i=0$, and we are minimizing $\mathbf{J}_i$, we are only interested in minimizing the distortion and therefore, we are "exploring" the bottom-right segment of the RD curve (considering a RMSE VS bit-rate RD curve), if $\mathbf{\lambda}_i=\infty$ (in the practice, a very large value) we are minimizing the RD tradeoff at the top-left segment of the RD curve (high compression ratio and probably, a higher distortion), and intermediate values of $\mathbf{\lambda}$ correspond to other intermediate points of the RD curve.

Finally, notice that we have defined $\mathbf{\lambda}_i$ as a continuous parameter, but the number of points of the RD curve is finite. Therefore, to determine $\mathbf{\lambda}_i$ for the point $i$ of the curve, we can define that:
$$
\mathbf{\lambda}_i = \frac{\mathbf{D}_i-\mathbf{D}_{i+1}}{\mathbf{R}_{i+1}-\mathbf{R}_i}
$$
and that:
$$
\mathbf{\lambda}_{\mathbf{\Delta_i}=1}=0.
$$

In [None]:
_lambda = []
for i in range(len(RD_points_sorted_labels)-1):
    _lambda.append((RD_points_sorted_labels[i][1] - RD_points_sorted_labels[i+1][1])/(RD_points_sorted_labels[i+1][0]-RD_points_sorted_labels[i][0]))
_lambda.append(0)
print(_lambda)

In [None]:
N_bins = [2, 4, 8, 16, 32, 64, 128] #range(2, 128, 1)

In [None]:
tile_sides = [256, 128, 64, 32, 16]
# The search space is so small that a sequential search should be fast enough.

for n in N_bins:
    K = np.empty_like(img)
    prev_J = 10^10
    for ts in tile_sides:
        tiles_in_y = img.shape[0]//ts
        tiles_in_x = img.shape[1]//ts
        number_of_tiles = tiles_in_y*tiles_in_x


In [None]:
tile_side = 64  # Must be a multiple of both dimensions of the image
tiles_in_y = img.shape[0]//tile_side
tiles_in_x = img.shape[1]//tile_side
number_of_tiles = tiles_in_y*tiles_in_x
K = np.empty_like(img)
Y = np.empty_like(img)
def RD_curve(img, N_bins):
    points = []
    for n in N_bins:
        Q_step = 256//n
        rate = 0
        _distortion = 0
        prev_representation_levels = np.zeros(n, dtype=np.int8)
        #print(prev_representation_levels.shape)
        for ty in range(tiles_in_y):
            for tx in range(tiles_in_x):
                tile = img[ty*tile_side:(ty+1)*tile_side, tx*tile_side:(tx+1)*tile_side]
                Q = quantizer(tile, Q_step=Q_step, min_val=0, max_val=255)
                #Q = quantizer(img, Q_step=Q_step, min_val=0, max_val=255)
                y, k = Q.quan_dequan(tile)
                K[ty*tile_side:(ty+1)*tile_side, tx*tile_side:(tx+1)*tile_side] = k
                Y[ty*tile_side:(ty+1)*tile_side, tx*tile_side:(tx+1)*tile_side] = y
                #representation_levels = Q.get_representation_levels()
                #with gzip.GzipFile("/tmp/representation_levels.npy.gz", "w") as f:
                #    diff_representation_levels = representation_levels - prev_representation_levels
                #    np.save(file=f, arr=diff_representation_levels)
                #    print(representation_levels, prev_representation_levels, diff_representation_levels)
                #prev_representation_levels[:] = representation_levels
                #rate += (os.path.getsize("/tmp/representation_levels.npy.gz")*8/(img.shape[0]*img.shape[1]))
                _distortion += distortion.RMSE(tile, y)
        rate += gray_image.write(K, "/tmp/" + str(n) + '_', 0)*8/(img.shape[0]*img.shape[1])
        _distortion /= number_of_tiles
        points.append((rate, _distortion))
        print(f"n={n:>3}, rate={rate:>7} bits/pixel, distortion={_distortion:>6.1f}")
        gray_image.show_normalized(K, f"Quantization Indexes (n={n})")
        gray_image.show(Y, f"Reconstruction (n={n})")
    return points

RDO_points = RD_curve(img, N_bins)

In [None]:
pylab.figure(dpi=150)
pylab.plot(*zip(*RD_points_sorted_labels), c='m', marker='x', label=f"No tiling", linestyle="dotted")
pylab.plot(*zip(*RDO_points), c='b', marker='x', label=f"RDO over tile size", linestyle="dotted")
pylab.title(f"Rate/Distortion Performance")
pylab.xlabel("Bits/Pixel")
pylab.ylabel("RMSE")
pylab.legend(loc='upper right')
pylab.show()

It is posible to improve the RD curve using stocastic search? The idea is to, given a K-Means solution, perform a (finite) number of training steps of the classifier with the hope of finding a better one (remember that K-Means obtains different solutions when it is run several times). If this happens, the process is repeated. Otherwise, we stop.

To determine if the current classifier is better than the previous one, we can use a Lagrangian Multiplier
$$
J = D+\lambda R,
$$
where the tradeoff between rate (R) and distortion (D) is quantified for a given slope $\lambda$. Thus, the smaller the $J$, the better the classifier.

class RDO(quantization.LloydMax_Quantizer):

    def __init__(img, Q_step, max_number_of_iters, _lambda):
        self.Q = super().__init__(img, Q_step=QS, min_val=0, max_val=255)
        self.img = img
        self.Q_step = Q_step
        self.max_number_of_iters = max_number_of_iters
        #self.lambda = _lambda

    def RDO_step():
        y, k = self.Q.quan_dequan(self.img)
        R = gray_image.write(k, "/tmp/" + str(n) + '_', 0)*8/(k.shape[0]*k.shape[1])
        D = distortion.RMSE(img, y)
        current_J = D + _lambda * R
        next_Q = self.quantizer(self.img, Q_step=self.Q_step, min_val=0, max_val=255)
        y, k = next_Q.quan_dequan(self.img)
        R = gray_image.write(k, "/tmp/" + str(n) + '_', 0)*8/(k.shape[0]*k.shape[1])
        D = distortion.RMSE(img, y)
        next_J = D + _lambda * R
        if next_J < current_J:
            return next_Q
        else:
            return current_Q

    def RDO():
        Q = self.quantizer(self.img, Q_step=self.Q_step, min_val=0, max_val=255)
        for i in range(max_number_of_iters):
            Q = RDO_step(Q)
        return Q

In [None]:
import copy

def optimize_step(Q, Q_step, _lambda):
    print("Q.get_representation_levels=", Q.get_representation_levels())
    y, k = Q.quan_dequan(img)
    R = gray_image.write(k, "/tmp/" + str(Q_step) + '_Q_', 0)*8/(k.shape[0]*k.shape[1])
    D = distortion.RMSE(img, y)
    current_J = D + _lambda * R
    next_Q = copy.deepcopy(Q)
    next_Q.retrain(img)
    print("next_Q.get_representation_levels=", next_Q.get_representation_levels())
    #next_Q = quantizer(img, Q_step=Q_step, min_val=0, max_val=255)
    y, k = next_Q.quan_dequan(img)
    R = gray_image.write(k, "/tmp/" + str(Q_step) + '_next_Q_', 0)*8/(k.shape[0]*k.shape[1])
    D = distortion.RMSE(img, y)
    next_J = D + _lambda * R
    print(current_J, next_J)
    if next_J < current_J:
         return next_Q
    else:
         return Q

def optimize(Q, Q_step, _lambda, max_number_of_iters):
    #ext_Q = quantizer(img, Q_step, min_val=0, max_val=255)
    for i in range(max_number_of_iters):
        Q = optimize_step(Q, Q_step, _lambda)
    return Q

In [None]:
N_bins = 32
Q_step = 256//N_bins
max_number_of_iters = 10
N_bins = [2, 4, 8, 16, 32, 64, 128]
_lambda = 200

def RDO_curve(img, N_bins):
    points = []
    for n in N_bins:
        Q_step = 256//n
        Q = quantizer(img, Q_step=Q_step, min_val=0, max_val=255)
        Q = optimize(Q, Q_step, max_number_of_iters, _lambda)
        y, k = Q.quan_dequan(img)
        print("Quantization indexes: ", np.unique(k))
        rate = gray_image.write(k, "/tmp/" + str(n) + '_', 0)*8/(k.shape[0]*k.shape[1])
        _distortion = distortion.RMSE(img, y)
        if n<16:
            plt.imshow(y, cmap=plt.cm.gray, vmin=0, vmax=256)
            plt.show()
        points.append((rate, _distortion))
        print(f"n={n:>3}, rate={rate:>7} bits/pixel, distortion={_distortion:>6.1f}")
    return points

RDO_points = RDO_curve(img, N_bins)

In [None]:
import time
while True:
    time.sleep(1)

## Ignore the rest ...

In [None]:
np.random.seed(0)  # makes the random numbers predictable
flatten_img = img.reshape((-1, 1))  # flatten
k_means = cluster.KMeans(n_clusters=n_clusters, n_init=N_tries)
k_means.fit(flatten_img)  # train
centroids = k_means.cluster_centers_.squeeze()  # Centroids
print("centroids =", centroids)
labels = k_means.labels_  # Labels of the centroids
labels.shape = img.shape
print(len(labels), len(centroids), img.shape, n_clusters, len(centroids.flatten()))
gray_image.show_normalized(labels, "Labels")

Notice that the indexes (labels) are not sorted by the corresponding intensity values.

In [None]:
# create an array from labels and values
#img_dequantized = np.choose(labels, centroids)
#img_dequantized = centroids[range(len(labels)), labels]
#img_dequantized = np.empty_like(flatten_img)
#for i in range(len(labels)):
#    img_dequantized[i] = centroids[labels[i]]
#img_dequantized = centroids[labels[range(len(labels))]]
img_dequantized = centroids[labels]
#img_dequantized.shape = img.shape

print("labels", labels, labels.shape)
print("max and min dequantized image", img_dequantized.max(), img_dequantized.min())
prediction = k_means.predict(flatten_img)  # Quantize
prediction.shape = img.shape
print("prediction =", prediction)
print("prediction.shape =", prediction.shape)

vmin = img.min()
vmax = img.max()

gray_image.show(img_dequantized, "Dequantized")
new_img_dequantized = centroids[prediction]
gray_image.show(new_img_dequantized, "New Dequantized")