In [1]:
import numpy as np
from scipy import integrate

def lloyd_max(x, density, n_alphabet, epochs=100, verbose=False, init_codewords='random', random_state=None):
    assert init_codewords in ['random']
    rng = to_random_state(random_state)

    # Set seed to a fixed number
    rng.seed(42)

    # Initialize codewords
    if init_codewords == 'random':
        c_min, c_max = x.min() - 1, x.max() + 1
        c = rng.uniform(c_min, c_max, size=n_alphabet)

    # Initialize boundaries
    b = np.zeros((n_alphabet + 1))
    b[0] = -np.inf
    b[-1] = np.inf

    # Run for e epochs
    for e in range(epochs):
        b_before = b[1:-1].copy()
        c_before = c.copy()

        for j in range(1, n_alphabet):
            b[j] = 0.5 * (c[j-1] + c[j])

        for i in range(len(c)):
            bi = b[i]
            biplus1 = b[i+1]
            if bi == biplus1:
                c[i] = 0
            else:
                nom = integrate.quad(lambda t: t * density(t), b[i], b[i+1])[0]
                denom = integrate.quad(density, b[i], b[i+1])[0]
                c[i] = nom / denom

        # Compute delta and see if it decreases
        b_delta = np.abs(b[1:-1] - b_before).mean()
        c_delta = np.abs(c - c_before).mean()

        if verbose:
            print(e, b_delta, c_delta)

    return b, c

def to_random_state(rs):
    if not isinstance(rs, np.random.RandomState):
        rs = np.random.RandomState(rs)

    return rs


In [3]:
def density(x):
    return x ** 2

x = np.array([1, 2, 3, 4, 5])
n_codewords = 3
epochs = 100
verbose = True
random_state = 42

In [4]:
lloyd_max(x, density, n_codewords, epochs, verbose, init_codewords='random', random_state=None)

0 4.5119440102179 1.4207944056309714
1 0.3874541799535596 1.8366281332297552
2 1.254549008302571 1.3589942145744136
3 0.7613428479522675 0.551487759322237
4 0.40312928296141015 0.3164215871687574
5 0.23252517826935826 0.21002132152954398
6 0.1470291271547448 0.08135489746513154
7 0.02503392018836803 1.609126397387339
8 1.1863457240056585 1.5012813869585273
9 0.8506823426645405 2.5488057341715163e+36
10 1.9116043006286373e+36 2.230205017401167e+36
11 9.558021503151367e+35 7.158433826284592e+35
12 4.779010751636891e+35 3.389547734079544e+35
13 2.3895053757582e+35 1.6430811930944687e+35
14 1.1947526878735246e+35 8.08960334927835e+34
15 5.973763438862721e+34 4.0136159382572826e+34
16 2.9868817199153145e+34 1.999028743491235e+34
17 1.4934408600598006e+34 9.97570649927024e+33
18 7.467204296103033e+33 4.9829946460741465e+33
19 3.733602163238573e+33 2.4902826927896587e+33
20 1.8668010715581372e+33 1.2448376989362698e+33
21 9.33400536874879e+32 6.2234290692619066e+32
22 4.6670024477034064e+32 3

  nom = integrate.quad(lambda t: t * density(t), b[i], b[i+1])[0]
  denom = integrate.quad(density, b[i], b[i+1])[0]
  in the extrapolation table.  It is assumed that the requested tolerance
  cannot be achieved, and that the returned result (if full_output = 1) is 
  the best which can be obtained.
  denom = integrate.quad(density, b[i], b[i+1])[0]
  in the extrapolation table.  It is assumed that the requested tolerance
  cannot be achieved, and that the returned result (if full_output = 1) is 
  the best which can be obtained.
  nom = integrate.quad(lambda t: t * density(t), b[i], b[i+1])[0]
  integration interval.
  nom = integrate.quad(lambda t: t * density(t), b[i], b[i+1])[0]
  integration interval.
  denom = integrate.quad(density, b[i], b[i+1])[0]


(array([           -inf, -2.55041341e+36, -2.55041341e+36,             inf]),
 array([-2.55041341e+36, -2.55041341e+36, -2.55041341e+36]))

In [9]:
import numpy as np
from scipy import integrate
from typing import Optional, Tuple, List, Callable


class LloydMaxQuantizer:
    def __init__(
        self,
        x: np.ndarray,
        density: Callable[[float], float],
        n_codewords: int,
        epochs: int = 100,
        verbose: bool = False,
        random_state: Optional[int] = None,
        init_codewords: str = "random",
    ):
        """
        Lloyd-Max quantizer implementation.

        Args:
            x (np.ndarray): Input data.
            density (Callable[[float], float]): Density function.
            n_codewords (int): Number of codewords (quantization levels).
            epochs (int, optional): Number of training epochs. Defaults to 100.
            verbose (bool, optional): Whether to print verbose output. Defaults to False.
            random_state (int, optional): Random seed for reproducibility. Defaults to None.
            init_codewords (str, optional): Initialization method for codewords. Either 'random' or 'kmeans++'.
                                            Defaults to 'random'.
        """
        self.x = x
        self.density = density
        self.n_codewords = n_codewords
        self.epochs = epochs
        self.verbose = verbose
        self.random_state = random_state
        self.init_codewords = init_codewords

    def fit(self) -> Tuple[np.ndarray, np.ndarray]:
        """
        Fit the Lloyd-Max quantizer to the input data.

        Returns:
            Tuple[np.ndarray, np.ndarray]: Tuple containing the boundaries and codewords.
        """
        rng = np.random.RandomState(self.random_state)

        # Initialize codewords
        c_min, c_max = self.x.min() - 1, self.x.max() + 1
        codewords = (
            self._initialize_codewords_kmeanspp(c_min, c_max, rng)
            if self.init_codewords == "kmeans++"
            else self._initialize_codewords_random(c_min, c_max, rng)
        )

        # Initialize boundaries
        boundaries = np.zeros((self.n_codewords + 1))
        boundaries[0] = -np.inf
        boundaries[-1] = np.inf

        # Run for specified epochs
        for _ in range(self.epochs):
            boundaries_before = boundaries[1:-1].copy()
            codewords_before = codewords.copy()

            # Calculate new boundaries (cutlines)
            for j in range(1, self.n_codewords):
                boundaries[j] = 0.5 * (codewords[j - 1] + codewords[j])

            # Calculate new codewords (centroids)
            for i in range(len(codewords)):
                bi = boundaries[i]
                biplus1 = boundaries[i + 1]
                if bi == biplus1:
                    codewords[i] = 0
                else:
                    numerator, _ = integrate.quad(
                        lambda t: t * self.density(t), boundaries[i], boundaries[i + 1]
                    )
                    denominator, _ = integrate.quad(
                        self.density, boundaries[i], boundaries[i + 1]
                    )
                    codewords[i] = numerator / denominator

            # Compute delta and see if it decreases
            boundaries_delta = np.abs(boundaries[1:-1] - boundaries_before).mean()
            codewords_delta = np.abs(codewords - codewords_before).mean()

            if self.verbose:
                print(
                    "Epoch:",
                    _,
                    "Boundaries Delta:",
                    boundaries_delta,
                    "Codewords Delta:",
                    codewords_delta,
                )

            # Check stopping criteria
            if boundaries_delta == 0 and codewords_delta == 0:
                break

        return boundaries, codewords

    def _initialize_codewords_random(
        self, c_min: float, c_max: float, rng: np.random.RandomState
    ) -> np.ndarray:
        """
        Initialize codewords randomly.

        Args:
            c_min (float): Minimum value for codewords initialization.
            c_max (float): Maximum value for codewords initialization.
            rng (np.random.RandomState): Random number generator.

        Returns:
            np.ndarray: Randomly initialized codewords.
        """
        return rng.uniform(c_min, c_max, size=self.n_codewords)

    def _initialize_codewords_kmeanspp(
        self, c_min: float, c_max: float, rng: np.random.RandomState
    ) -> np.ndarray:
        """
        Initialize codewords using k-means++ algorithm.

        Args:
            c_min (float): Minimum value for codewords initialization.
            c_max (float): Maximum value for codewords initialization.
            rng (np.random.RandomState): Random number generator.

        Returns:
            np.ndarray: Codewords initialized using k-means++.
        """
        codewords = np.zeros((self.n_codewords))
        codewords[0] = rng.uniform(c_min, c_max)

        for j in range(1, self.n_codewords):
            distances = self._calculate_distances(self.x, codewords[:j])
            probabilities = self._calculate_probabilities(distances)
            codewords[j] = self._sample_new_codeword(self.x, probabilities, rng)

        return codewords

    @staticmethod
    def _calculate_distances(x: np.ndarray, codewords: np.ndarray) -> np.ndarray:
        """
        Calculate distances between input data and codewords.

        Args:
            x (np.ndarray): Input data.
            codewords (np.ndarray): Codewords.

        Returns:
            np.ndarray: Distances between input data and codewords.
        """
        distances = np.zeros_like(x, dtype=np.float64)
        for i in range(len(codewords)):
            distances += np.abs(x - codewords[i])
        return distances

    @staticmethod
    def _calculate_probabilities(distances: np.ndarray) -> np.ndarray:
        """
        Calculate probabilities based on distances.

        Args:
            distances (np.ndarray): Distances between input data and codewords.

        Returns:
            np.ndarray: Probabilities based on distances.
        """
        return 1 / distances

    @staticmethod
    def _sample_new_codeword(
        x: np.ndarray, probabilities: np.ndarray, rng: np.random.RandomState
    ) -> float:
        """
        Sample a new codeword based on probabilities.

        Args:
            x (np.ndarray): Input data.
            probabilities (np.ndarray): Probabilities.
            rng (np.random.RandomState): Random number generator.

        Returns:
            float: Sampled new codeword.
        """
        normalized_probabilities = probabilities / np.sum(probabilities)
        return rng.choice(x, p=normalized_probabilities)


In [10]:
quantizer = LloydMaxQuantizer(x, density, n_codewords, epochs, verbose, random_state)
boundaries, codewords = quantizer.fit()

print("Boundaries:", boundaries)
print("Codewords:", codewords)


Epoch: 0.0007190574597011334 Boundaries Delta: 4.5119440102179 Codewords Delta: 1.4207944056309714
Epoch: 1.6933754750425578e-05 Boundaries Delta: 0.3874541799535596 Codewords Delta: 1.8366281332297552
Epoch: 0.00014919208302188736 Boundaries Delta: 1.254549008302571 Codewords Delta: 1.3589942145744136
Epoch: 0.00025335632386003226 Boundaries Delta: 0.7613428479522675 Codewords Delta: 0.551487759322237
Epoch: 0.0007948053256630061 Boundaries Delta: 0.40312928296141015 Codewords Delta: 0.3164215871687574
Epoch: 0.001413313845059605 Boundaries Delta: 0.23252517826935826 Codewords Delta: 0.21002132152954398
Epoch: 0.0002957701756418629 Boundaries Delta: 0.1470291271547448 Codewords Delta: 0.08135489746513154
Epoch: 0.000681889749461817 Boundaries Delta: 0.02503392018836803 Codewords Delta: 1.609126397387339
Epoch: 0.00022898191449627348 Boundaries Delta: 1.1863457240056585 Codewords Delta: 1.5012813869585273
Epoch: 0.00024468112526765395 Boundaries Delta: 0.8506823426645405 Codewords Delt

  numerator, _ = integrate.quad(
  denominator, _ = integrate.quad(
  in the extrapolation table.  It is assumed that the requested tolerance
  cannot be achieved, and that the returned result (if full_output = 1) is 
  the best which can be obtained.
  denominator, _ = integrate.quad(
  in the extrapolation table.  It is assumed that the requested tolerance
  cannot be achieved, and that the returned result (if full_output = 1) is 
  the best which can be obtained.
  numerator, _ = integrate.quad(
  integration interval.
  numerator, _ = integrate.quad(
  integration interval.
  denominator, _ = integrate.quad(


In [11]:
quantizer = LloydMaxQuantizer(x, density, n_codewords, epochs, verbose, random_state, init_codewords='kmeans++')
boundaries, codewords = quantizer.fit()

print("Boundaries:", boundaries)
print("Codewords:", codewords)


Epoch: 0.00104396980688648 Boundaries Delta: 4.061810178271044 Codewords Delta: 0.9691017030483308
Epoch: 0.00030105487185672075 Boundaries Delta: 0.4533204026261295 Codewords Delta: 0.3668798403418682
Epoch: 0.0004312896097484753 Boundaries Delta: 0.2713166750764815 Codewords Delta: 0.13951370717302547
Epoch: 0.00035918757837372084 Boundaries Delta: 0.11923582160883628 Codewords Delta: 2.185987481882481
Epoch: 0.0009017550515633133 Boundaries Delta: 1.549551623138734 Codewords Delta: 1.6241046251697615
Epoch: 0.0006305027595194801 Boundaries Delta: 0.5731001097959898 Codewords Delta: 0.7155536376643242
Epoch: 0.00041575004214511324 Boundaries Delta: 0.5303480014776556 Codewords Delta: 1.8932616784320715e+35
Epoch: 2.1089590635180016e+59 Boundaries Delta: 1.4199462588240537e+35 Codewords Delta: 1.6566039686271103e+35
Epoch: 6.277101735386681e+59 Boundaries Delta: 7.099731294113126e+34 Codewords Delta: 5.317309302580016e+34
Epoch: 4.368899587722111e+59 Boundaries Delta: 3.54986564707880

  numerator, _ = integrate.quad(
  denominator, _ = integrate.quad(
  in the extrapolation table.  It is assumed that the requested tolerance
  cannot be achieved, and that the returned result (if full_output = 1) is 
  the best which can be obtained.
  denominator, _ = integrate.quad(
  in the extrapolation table.  It is assumed that the requested tolerance
  cannot be achieved, and that the returned result (if full_output = 1) is 
  the best which can be obtained.
  numerator, _ = integrate.quad(
  integration interval.
  numerator, _ = integrate.quad(
  integration interval.
  denominator, _ = integrate.quad(
