In [1]:
import cvxpy as cp
import numpy as np
import tqdm
import scipy
import math
from scipy.special import xlogy
import time

## Dynamic Programming to find Optimal quantizer

Dynamic Programming for Quantization of q-ary Input Discrete Memoryless Channels

In [174]:
def calculate_transition_matrix(Px, N, Q, Phi):
    Ayx = np.zeros((N, Q))

    for j in range(Q):
        for i in range(N):
            Ayx[i, j] = Phi[j].cdf(S[i+1]) - Phi[j].cdf(S[i])

    Axy = np.zeros((Q, N))
    for m in range(Q):
        for n in range(N):
            Axy[m, n] = Px[m]*Ayx[n, m]/np.sum(Px*Ayx[n,:])

    # fix nan values by repeating nearest row
    Axy_cp = Axy.T.copy()

    nan_index = np.arange(N)[np.any(np.isnan(Axy.T), axis=1) == True]
    upper_half = nan_index[nan_index<N/2]
    lower_half = nan_index[nan_index>=N/2]

    if len(upper_half) > 0:
        upper_half_idx = upper_half[-1]
        Axy_cp[:upper_half_idx+1,:] = Axy_cp[upper_half_idx+1,:]

    if len(lower_half) > 0:
        lower_half_idx = lower_half[0]
        Axy_cp[lower_half_idx:,:] = Axy_cp[lower_half_idx-1,:]

    Axy = Axy_cp.T

    Py = np.matmul(Ayx, Px)

    Pxy = Axy*Py
    
    return Ayx, Axy, Py, Pxy

def calculate_cost_w(l, r, Pxy, Py):
    tmp = []
    dem = np.sum(Py[l:r+1])
    for k in range(l, r+1):
        tmp_tmp = []
        for i in range(Q):
            num = np.sum(Pxy[i,l:r+1])
            ent = xlogy(num/dem, num/dem)
            # print(num, dem, ent)
            tmp_tmp.append(ent)
            # print(num, dem, ent)
        tmp.append(Py[k]*sum(tmp_tmp))
    return -np.sum(tmp)

def dp_optimal_quantizer(N, M, Pxy, Py):
    DP = np.zeros((N, M))
    SOL = np.zeros((N, M))

    for n in range(N):
        DP[n, 0] = calculate_cost_w(0, n, Pxy, Py)
        SOL[n, 0] = 0

    for m in range(1, M):
        for n in np.arange(m, N-M+m+1)[::-1]:
            tmp = []
            for t in range(m-1, n):
                tmp.append(DP[t, m-1] + calculate_cost_w(t+1, n, Pxy, Py))
            # SOL[n, m] = np.argmin(tmp)
            SOL[n, m] = np.arange(m-1, n)[np.argmin(tmp)]
            t = int(SOL[n, m])
            DP[n, m] = DP[t, m-1] + calculate_cost_w(t+1, n, Pxy, Py)
            
    H = []
    h_prev = N
    H.append(h_prev)
    for m in np.arange(M)[::-1]:
        h_prev = int(SOL[h_prev-1, m]) + 1
        H.append(h_prev)
    H[-1] -= 1
    H = H[::-1]
    
    return H, DP[N-1, M-1]

## Convex Opt to find Optimal Input Distribution

In [6]:
def calculate_optimal_distribution(n, m, P, sum_x=1):
    '''
    copied from https://www.cvxpy.org/examples/applications/Channel_capacity_BV4.57.html
    '''

    # n is the number of different input values
    # m is the number of different output values
    if n*m == 0:
        print('The range of both input and output values must be greater than zero')
        return 'failed', np.nan, np.nan

    # x is probability distribution of the input signal X(t)
    x = cp.Variable(shape=n)

    # y is the probability distribution of the output signal Y(t)
    # P is the channel transition matrix
    y = P@x

    # I is the mutual information between x and y
    c = np.sum(np.array((xlogy(P, P) / math.log(2))), axis=0)
    I = c@x + cp.sum(cp.entr(y) / math.log(2))

    # Channel capacity maximised by maximising the mutual information
    obj = cp.Maximize(I)
    constraints = [cp.sum(x) == sum_x,x >= 0]

    # Form and solve problem
    prob = cp.Problem(obj,constraints)
    prob.solve()
    if prob.status=='optimal':
        return prob.status, prob.value, x.value
    else:
        return prob.status, np.nan, np.nan

# Test run

## init

In [178]:
X = np.array([-1.5, 0, 3.25])
Q = len(X)
N = 200
start = -8
end = 8
step = (end-start)/N
S = np.linspace(start, end, N+1)
M = 3

sigma = 1
Y = X + np.random.randn(Q)*sigma

Phi = [scipy.stats.norm(loc=X[i], scale=sigma) for i in range(Q)]

Px = [0.2, 0.2, 0.6]

In [179]:
# generate random Px
Px = []
max_rand = 100
Px.append(np.random.randint(0, max_rand + 1))
for i in range(1, Q-1):
    if max_rand + 1 - Px[i-1] <= 0:
        Px.append(0)
    else:
        Px.append(np.random.randint(0, max_rand + 1 - Px[i-1]))
last = max_rand - np.sum(Px)
Px.append(last)
Px = [p/max_rand for p in Px]

print(Px)

[0.84, 0.11, 0.05]


In [180]:
I_prev = -100
for it in tqdm.tqdm(range(10)):
    start = time.time()
    
    # dp to find optimal quantizer
    Ayx, Axy, Py, Pxy = calculate_transition_matrix(Px, N, Q, Phi)
    opt_H, opt_value = dp_optimal_quantizer(N, M, Pxy, Py)

    # cvxopt to find optimal input distribution
    # compute Azx
    Hx = S[opt_H]
    Azx = np.zeros((M, Q))
    for j in range(Q):
        for i in range(M):
            Azx[i, j] = Phi[j].cdf(Hx[i+1]) - Phi[j].cdf(Hx[i])
    status, obj_value, px_value = calculate_optimal_distribution(Q, M, Azx)
    
    stop = time.time()

    if status == "optimal":
        print("iter {}".format(it))
        print("    given input distribution {}".format(Px))
        print("    optimal quantizer {}".format(S[opt_H]))
        print("    optimal input distribution {}".format(px_value))
        print("    optimal I(X;Z) {}".format(obj_value))
        print("    took {:.4f}s".format(stop-start))
        Px = px_value
    else:
        print("cvxopt failed")
        break
        
    if abs((obj_value - I_prev)/I_prev) <= 0.0001:
        print("stopping criterion met")
        break
    I_prev = obj_value

 10%|████████▍                                                                           | 1/10 [00:38<05:46, 38.49s/it]

iter 0
    given input distribution [0.84, 0.11, 0.05]
    optimal quantizer [-8.   -0.48  1.68  8.  ]
    optimal input distribution [0.39601633 0.14303827 0.46094541]
    optimal I(X;Z) 0.9354425176383293
    took 38.4880s


 20%|████████████████▊                                                                   | 2/10 [01:17<05:10, 38.76s/it]

iter 1
    given input distribution [0.39601633 0.14303827 0.46094541]
    optimal quantizer [-8.   -0.4   1.44  8.  ]
    optimal input distribution [0.41890622 0.10845635 0.47263743]
    optimal I(X;Z) 0.9466174579869522
    took 38.9566s


 30%|█████████████████████████▏                                                          | 3/10 [01:56<04:30, 38.69s/it]

iter 2
    given input distribution [0.41890622 0.10845635 0.47263743]
    optimal quantizer [-8.   -0.32  1.36  8.  ]
    optimal input distribution [0.43224946 0.08954416 0.47820638]
    optimal I(X;Z) 0.9481405573140043
    took 38.5930s


 40%|█████████████████████████████████▌                                                  | 4/10 [02:34<03:52, 38.70s/it]

iter 3
    given input distribution [0.43224946 0.08954416 0.47820638]
    optimal quantizer [-8.   -0.24  1.36  8.  ]
    optimal input distribution [0.43780785 0.08187351 0.48031864]
    optimal I(X;Z) 0.9484137860302166
    took 38.7309s


 50%|██████████████████████████████████████████                                          | 5/10 [03:13<03:14, 38.81s/it]

iter 4
    given input distribution [0.43780785 0.08187351 0.48031864]
    optimal quantizer [-8.   -0.16  1.36  8.  ]
    optimal input distribution [0.44403543 0.07332791 0.48263666]
    optimal I(X;Z) 0.9485571555188846
    took 38.9994s


 50%|██████████████████████████████████████████                                          | 5/10 [03:52<03:52, 46.53s/it]

iter 5
    given input distribution [0.44403543 0.07332791 0.48263666]
    optimal quantizer [-8.   -0.16  1.36  8.  ]
    optimal input distribution [0.44403543 0.07332791 0.48263666]
    optimal I(X;Z) 0.9485571555188846
    took 38.8635s
stopping criterion met



