# Scalar [Quantization](https://en.wikipedia.org/wiki/Quantization_(signal_processing)) of Digital Audio

See https://github.com/vicente-gonzalez-ruiz/quantization.

In [None]:
%%bash
if [ -d "$HOME/repos" ]; then
    echo "\"$HOME/repos\" exists"
else
    mkdir ~/repos
    echo Created $HOME/repos
fi

In [None]:
%%bash
if [ -d "$HOME/repos/scalar_quantization" ]; then
    cd $HOME/repos/scalar_quantization
    echo "$HOME/repos/scalar_quantization ... "
    git pull 
else
    cd $HOME/repos
    git clone https://github.com/vicente-gonzalez-ruiz/scalar_quantization.git
fi

In [None]:
!ln -sf ~/repos/scalar_quantization/common.py .
!ln -sf ~/repos/scalar_quantization/quantization.py .
!ln -sf ~/repos/scalar_quantization/midtread_quantization.py .
!ln -sf ~/repos/scalar_quantization/midrise_quantization.py .
!ln -sf ~/repos/scalar_quantization/companded_quantization.py .
!ln -sf ~/repos/scalar_quantization/deadzone_quantization.py .
import common
import midtread_quantization as midtread
import midrise_quantization as midrise
import deadzone_quantization as deadzone
import companded_quantization as companded

In [None]:
%matplotlib inline

import math
import numpy as np
import time

## Subjective comparison

In [None]:
import sounddevice as sd
sr = 44100
duration = 2.0  # seconds
x = sd.rec(int(duration * sr), samplerate=sr, channels=1, dtype=np.int16)
print("Speak!")
while sd.wait():
    pass
print("done")

In [None]:
sd.play(x)
common.plot(np.linspace(0, len(x)-1, len(x)), x, "Time", "Amplitude", "Audio Signal")

In [None]:
len(x)

In [None]:
dir(midtread)

In [None]:
Q_step = 10000
Q_midtread = midtread.Midtread_Quantizer(Q_step=Q_step, min_val=-32768, max_val=32767)
Q_midrise = midrise.Midrise_Quantizer(Q_step=Q_step, min_val=-32768, max_val=32767)
Q_deadzone = deadzone.Deadzone_Quantizer(Q_step=Q_step, min_val=-32768, max_val=32767)
Q_companded = companded.Companded_Quantizer(Q_step=Q_step, min_val=-32768, max_val=32767)

In [None]:
y_T, k_T  = Q_midtread.encode_and_decode(x)
y_T = y_T.astype(np.int16)  # soundevice process up to 16 bits/sample
y_R, k_R  = Q_midrise.encode_and_decode(x)
y_R = y_R.astype(np.int16)
y_D, k_D = Q_deadzone.encode_and_decode(x)
y_D = y_D.astype(np.int16)
y_C, k_C  = Q_companded.encode_and_decode(x)
y_C = y_C.astype(np.int16)

In [None]:
sd.play(y_T.astype(np.int16))
common.plot(np.linspace(0, len(y_T)-1, len(y_T)), y_T, "Time", "Amplitude", "Mid-tread ($\Delta={}$)".format(Q_step))
time.sleep(duration)

In [None]:
sd.play(y_R)
common.plot(np.linspace(0, len(y_R)-1, len(y_R)), y_R, "Time", "Amplitude", "Mid-rise ($\Delta={}$)".format(Q_step))
time.sleep(duration)

In [None]:
sd.play(y_D)
common.plot(np.linspace(0, len(y_D)-1, len(y_D)), y_D, "Time", "Amplitude", "Dead-zone ($\Delta={}$)".format(Q_step))
time.sleep(duration)

In [None]:
sd.play(y_C)
common.plot(np.linspace(0, len(y_C)-1, len(y_C)), y_C, "Time", "Amplitude", "Companded Dead-zone ($\Delta={}$)".format(Q_step))

In [None]:
common.plot(np.linspace(0, len(k_T)-1, len(k_T)), k_T, "Time", "Representation Codes", "Mid-tread ($\Delta={}$)".format(Q_step))
common.plot(np.linspace(0, len(k_R)-1, len(k_R)), k_R, "Time", "Representation Codes", "Mid-rise ($\Delta={}$)".format(Q_step))
common.plot(np.linspace(0, len(k_D)-1, len(k_D)), k_D, "Time", "Representation Codes", "Dead-zone ($\Delta={}$)".format(Q_step))
common.plot(np.linspace(0, len(k_C)-1, len(k_C)), k_C, "Time", "Representation Codes", "Companded Dead-zone ($\Delta={}$)".format(Q_step))

In [None]:
error_T = x - y_T
error_R = x - y_R
error_D = x - y_D
error_C = x - y_C

In [None]:
sd.play(error_T)
common.plot(np.linspace(0, len(y_T)-1, len(y_T)), error_T, "Time", "Amplitude Error", "Mid-tread ($\Delta={}$)".format(Q_step))
time.sleep(duration)

In [None]:
sd.play(error_R)
common.plot(np.linspace(0, len(y_R)-1, len(y_R)), error_R, "Time", "Amplitude Error", "Mid-rise ($\Delta={}$)".format(Q_step))
time.sleep(duration)

In [None]:
sd.play(error_D)
common.plot(np.linspace(0, len(y_D)-1, len(y_D)), error_D, "Time", "Amplitude Error", "Dead-zone ($\Delta={}$)".format(Q_step))
time.sleep(duration)

In [None]:
sd.play(error_C)
common.plot(np.linspace(0, len(y_C)-1, len(y_C)), error_C, "Time", "Amplitude Error", "Companded Dead-zone ($\Delta={}$)".format(Q_step))

## Objective comparison (Rate/Distortion curve)

In [None]:
%%bash
if [ -d "$HOME/repos/information_theory" ]; then
    cd $HOME/repos/image_IO
    echo "$HOME/repos/information_theory ... "
    git pull 
else
    cd $HOME/repos
    git clone https://github.com/vicente-gonzalez-ruiz/information_theory.git
fi

In [None]:
try:
    import skimage
except:
    !pip install scikit-image
    import skimage

In [None]:
!ln -sf ~/repos/information_theory/distortion.py .
!ln -sf ~/repos/information_theory/information.py .
import distortion
import information

### RMSE vs bit-rate

In [None]:
def RD_curve(x, quantizer):
    points = []
    for q_step in range(1, 32768, 256):
        Q = quantizer(Q_step=q_step, min_val=-32768, max_val=32767)
        y, k = Q.encode_and_decode(x)
        rate = information.entropy(k.flatten())
        distortion_ = distortion.RMSE(x, y)
        points.append((rate, distortion_))
        print(rate, distortion_)
    return points

In [None]:
midtread_RD_points = RD_curve(x, midtread.Midtread_Quantizer)

In [None]:
midrise_RD_points = RD_curve(x, midrise.Midrise_Quantizer)

In [None]:
deadzone_RD_points = RD_curve(x, deadzone.Deadzone_Quantizer)

In [None]:
companded_RD_points = RD_curve(x, companded.Companded_Quantizer)

In [None]:
import matplotlib
import matplotlib.pyplot as plt

In [None]:
plt.title("RD Tradeoff")
plt.xlabel("Bits per Sample")
plt.ylabel("RMSE")
#plt.xscale("log")
#plt.yscale("log")
plt.scatter(*zip(*midtread_RD_points), s=2, c='b', marker="o", label='Mid-tread')
plt.scatter(*zip(*midrise_RD_points), s=2, c='c', marker="o", label='Mid-rise')
plt.scatter(*zip(*deadzone_RD_points), s=2, c='r', marker="o", label='Dead-zone')
plt.scatter(*zip(*companded_RD_points), s=2, c='g', marker="o", label='Companded Dead-zone')
plt.legend(loc='upper right')
plt.gcf().set_size_inches(1.0 * plt.gcf().get_size_inches())
plt.show()

In [None]:
def QstepD_curve(x, quantizer):
    QstepD_points = []
    for q_step in range(1, 32768, 256):
        Q = quantizer(Q_step=q_step, min_val=-32768, max_val=32767)
        y, k = Q.encode_and_decode(x)
        distortion_ = distortion.RMSE(x, y)
        QstepD_points.append((q_step, distortion_))
        print(q_step, distortion_)
    return QstepD_points

In [None]:
midtread_QstepD_points = QstepD_curve(x, midtread.Midtread_Quantizer)

In [None]:
midrise_QstepD_points = QstepD_curve(x, midrise.Midrise_Quantizer)

In [None]:
deadzone_QstepD_points = QstepD_curve(x, deadzone.Deadzone_Quantizer)

In [None]:
companded_QstepD_points = QstepD_curve(x, companded.Companded_Quantizer)

In [None]:
plt.title("$\Delta$/D comparative")
plt.xlabel("$\Delta$")
plt.ylabel("RMSE")
#plt.xscale("log")
#plt.yscale("log")
plt.scatter(*zip(*midtread_QstepD_points), s=2, c='b', marker="o", label='Mid-tread')
plt.scatter(*zip(*midrise_QstepD_points), s=2, c='c', marker="o", label='Mid-rise')
plt.scatter(*zip(*deadzone_QstepD_points), s=2, c='r', marker="o", label='Dead-zone')
plt.scatter(*zip(*companded_QstepD_points), s=2, c='g', marker="o", label='Companded Dead-zone')
plt.legend(loc='upper right')
plt.show()

In general, we can say that the distortion grows logarithmically with the quantization step.

In [None]:
def QstepR_curve(x, quantizer):
    points = []
    for q_step in range(1, 32768, 256):
        Q = quantizer(Q_step=q_step, min_val=-32768, max_val=32767)
        y, k = Q.encode_and_decode(x)
        rate = information.entropy(k.flatten())
        points.append((q_step, rate))
        print(q_step, rate)
    return points

In [None]:
midtread_QstepR_points = QstepR_curve(x, midtread.Midtread_Quantizer)

In [None]:
midrise_QstepR_points = QstepR_curve(x, midrise.Midrise_Quantizer)

In [None]:
deadzone_QstepR_points = QstepR_curve(x, deadzone.Deadzone_Quantizer)

In [None]:
companded_QstepR_points = QstepR_curve(x, companded.Companded_Quantizer)

In [None]:
plt.title("$\Delta$/R comparative")
plt.xlabel("$\Delta$")
plt.ylabel("Bits per sample")
#plt.xscale("log")
#plt.yscale("log")
plt.scatter(*zip(*midtread_QstepR_points), s=2, c='b', marker="o", label='Mid-tread')
plt.scatter(*zip(*midrise_QstepR_points), s=2, c='c', marker="o", label='Mid-rise')
plt.scatter(*zip(*deadzone_QstepR_points), s=2, c='r', marker="o", label='Dead-zone')
plt.scatter(*zip(*companded_QstepR_points), s=2, c='g', marker="o", label='Companded Dead-zone')
plt.legend(loc='upper right')
plt.show()

In general, we can say that the bit-rate decreases logarithmically with the quantization step.

### Using a logaritmic version of RMSE

The [HAS](https://en.wikipedia.org/wiki/Auditory_system) is more sensitive to the variations of quiet sounds than to the variations of the loud sounds. For this reason, let's see what happens when we give more importance to the quiet sounds that to the loud sounds, using a compression of the dynamic range of the sounds.

In [None]:
def log_average_energy(x):
    '''In fact, average logaritmic energy.'''
    return np.sum(np.log(np.abs(x.astype(np.double))+1)*np.log(np.abs(x.astype(np.double))+1))/len(x)

In [None]:
def log_RMSE(x, y):
    error_signal = x - y
    return math.sqrt(log_average_energy(error_signal))

In [None]:
def log_RD_curve(x, quantizer):
    RD_points = []
    for q_step in range(1, 32768, 256):
        Q = quantizer(Q_step=q_step, min_val=-32768, max_val=32767)
        y, k = Q.encode_and_decode(x)
        rate = information.entropy(k.flatten())
        distortion_ = log_RMSE(x, y)
        RD_points.append((rate, distortion_))
        print(rate, distortion_)
    return RD_points

In [None]:
log_midtread_RD_points = log_RD_curve(x, midtread.Midtread_Quantizer)

In [None]:
log_midrise_RD_points = log_RD_curve(x, midrise.Midrise_Quantizer)

In [None]:
log_deadzone_RD_points = log_RD_curve(x, deadzone.Deadzone_Quantizer)

In [None]:
log_companded_RD_points = log_RD_curve(x, companded.Companded_Quantizer)

In [None]:
plt.title("R/D comparative")
plt.xlabel("Bits per sample")
plt.ylabel("(log) RMSE")
#plt.yscale("linear")
#plt.xscale("log")
plt.xlim(1, 16)
plt.plot(*zip(*log_midtread_RD_points), c='b', marker=".", label='Mid-tread')
plt.plot(*zip(*log_midrise_RD_points), c='c', marker=".", label='Mid-rise')
plt.plot(*zip(*log_deadzone_RD_points), c='r', marker=".", label='Dead-zone')
plt.plot(*zip(*log_companded_RD_points), c='g', marker=".", label='Companded Dead-zone')
plt.legend(loc='upper right')
plt.show()

In [None]:
# Could be useful if we replace RMSE by SNR
def SNR(x, y):
    signal_energy = compute_average_energy(x)
    error_energy = compute_average_energy(x-y)
    print("signal energy =", signal_energy)
    print("error energy =", error_energy)
    return 10*math.log(signal_energy/error_energy)