# Section 0 - Useful functions for your project

In [None]:
# Import useful libraries
import random
import numpy as np
import scipy.io as io
import scipy.optimize as opt
import os

from tqdm import tqdm
from itertools import combinations


random.seed(0)
np.random.seed(0)

SCRIPT_DIR = os.getcwd()
X_STANDARD_SOL_PATH = os.path.join(SCRIPT_DIR, 'x_standard.npy')

Here-under we define the functions to:
- Encode a sentence into a binary vector
- Decode a binary vector into a sentence

In [29]:
def encoding_bin(mess):
    # Convert each character to its ASCII value and then to binary
    xi = [format(ord(char), '08b') for char in mess]

    # Get the number of characters
    m = len(xi)

    # Initialize an empty list for the binary vector
    x = []

    # Convert each binary string to a binary vector
    for i in range(m):
        x.append([int(bit) for bit in xi[i]])

    # Convert the list to a numpy array for easier manipulation
    x = np.array(x)


    # Return the binary vector and its dimensions
    d = x.shape[1]  # Number of bits per character
    x = x.flatten() # convert into a 1-d vector
    return x, d

def decoding_bin(x, d):
    # Ensure x is a binary vector (0s and 1s)
    x = np.clip(x, 0, 1)  # Clip values to be between 0 and 1
    x = np.round(x)        # Round values to the nearest integer

    # Initialize the output array
    y = np.zeros((len(x) // d, d), dtype=int)

    k = 0
    for i in range(len(x) // d):
        for j in range(d):
            y[i, j] = int(x[k])  # Fill the binary matrix
            k += 1

    # Convert binary to decimal and then to characters
    mess = ''.join(chr(int(''.join(map(str, row)), 2)) for row in y)

    return mess, y

The function below simulates the effect of the noisy channel

In [30]:
# Disrupts percenterror% of y entries randomly
def noisychannel(y, percenterror):
    m = len(y)                               # Length of the message
    K = int(np.floor(m * percenterror))      # Number of entries to corrupt
    I = np.random.permutation(m)[:K]         # Random indices to corrupt
    y_n = np.copy(y)                         # Copy of the orginal message
    vec = np.random.rand(K) * np.mean(y)
    y_n[I] = vec                             # Corruption of selected inputs
    return y_n

In [31]:
# Try it
message_in = "A crystal clear message"
binary_vector, dimensions = encoding_bin(message_in)
percenterror = 0.05
float_vector = binary_vector.astype(np.float32)
yprime = noisychannel(float_vector, percenterror)
print("Message sent:", message_in)
message_corr_decoded, binary_matrix = decoding_bin(yprime, dimensions)
print("Decoded noisy message:", message_corr_decoded)

Message sent: A crystal clear message
Decoded noisy message: @ cr9stal cl%Ar message


In [32]:
float_vector.shape

(184,)

# Section 1 - Decode the Message from Alice

In [33]:
# Load your mat file in Python
## Once your team is built, contact the Instructors by email to mention who is part of the group.
## You will then receive by return email your personal message from Alice to decrypt in .mat file.
## Alert: do not share it !

data = io.loadmat('messageFromAlice.mat')
# data is dictionnay where
## data['A'] is the encoding matrix exchanged between Alice and Bob
## data['d'] is the dimension
## data['yprime'] is the encrypted message received from Alice

# Load the arrays
A = data['A']
yprime = data['yprime'].T
yprime = np.squeeze(yprime)
d = data['d'][0][0]

In [34]:
# let's create some utility functions
def get_standard_format_matrix(A: np.ndarray) -> np.ndarray:
    m, p = A.shape
    I = np.eye(m)
    # let's prepare the 4 blocks
    ab1 = np.concatenate([-A, I, -np.eye(m), np.zeros((m, m)), np.zeros((m, p))], axis=1)
    ab2 = np.concatenate([A, I, np.zeros((m, m)), -np.eye(m), np.zeros((m, p))], axis=1)
    ab3 = np.concatenate([-np.eye(p), np.zeros((p, m)), np.zeros((p, m)), np.zeros((p, m)), -np.eye(p)], axis=1)
    
    Astandard = np.concatenate([ab1, ab2, ab3], axis=0)

    assert Astandard.shape == (2 * m + p, 2 * p + 3 * m), f"Expected {(2 * m + p, 2 * p + 3 * m)}. Found: {Astandard.shape}"

    return Astandard

def get_cost_coefficients(A: np.ndarray) -> np.ndarray:
    m, p = A.shape
    # let's build the cost coefficient 
    c = np.concatenate([np.zeros((1, p)), 
                        np.ones((1, m)),
                        np.zeros((1, m)),
                        np.zeros((1, m)),
                        np.zeros((1, p)),                        
                        ], 
                        axis=1).squeeze() 

    # c = [0, 0, 0, 0, 0...1, 1, 1,] (p 0s and m 1s)
    assert c.shape == (2 * p + 3 * m, ), f"Expected {(2 * p + 3 * m, )}. Found: {c.shape}"

    return c

def get_eq_constraints(A: np.ndarray, y_noise: np.ndarray) -> np.ndarray:
    m, p = A.shape
    y_2d = np.expand_dims(y_noise, axis=0)
    b_standard = np.concatenate([-y_2d, y_2d, -np.ones((1, p))], axis=1).squeeze()
    assert b_standard .shape == (2 * m + p,), f"Expected {(2 * m + p,)}. Found: {b_standard .shape}"
    return b_standard


In [35]:
def solve_relaxed(A: np.ndarray, y_noise: np.ndarray):
    # according to the scipy.linprog documentation
    # the function accepts 5 arguments
    # c, Aub, bub, Aeq, b_eq, lb, ub

    # minimize: c @ x
    # such that
    # A_ub @ x <= b_ub
    # A_eq @ x == b_eq
    # lb <= x <= ub

    c = get_cost_coefficients(A)
    Astandard = get_standard_format_matrix(A)
    b_standard = get_eq_constraints(A, y_noise)

    x_standard = opt.linprog(c, A_ub=None, b_ub=None, A_eq=Astandard, b_eq=b_standard, bounds=(0, None)).x
    return x_standard

Use your algorithm to solve

$min_{0 <= x^{'} <= 1} ||A*x^{'} - y^{'}||_1$


In [36]:
# save x_standard once instead of rerunning everytime
if not os.path.exists(X_STANDARD_SOL_PATH):
    x_standard = solve_relaxed(A, yprime)
    np.save(X_STANDARD_SOL_PATH, x_standard)
else:
    x_standard = np.load(X_STANDARD_SOL_PATH)

# since x_standard contains xprime, t and slack variables, we need to extract x' before proceeding
xprime = x_standard[:A.shape[1]]

# Display the result:
d = 8    # Number of bits per character
message_decoded, binary_matrix = decoding_bin(xprime, d)
print("The recovered message is:", message_decoded)

The recovered message is: You can claim your personal reward by going to Student affairs, giving you code=1350 and ask for you reward


# Section 2 - Generate and Decode your own messages

This section is dedicated to the fifth question of the project:
- Sending an encrypted message through a channel with sparse Gaussian noise...
- Encode and decode a message yourself:

In [37]:
MY_MESSAGE = "Test...test...Ayhem Speaking..."

In [38]:
from typing import Tuple
def prepare_message(message: str, percent_error:float = 0.1) -> Tuple[int, np.ndarray]:
    # Message in binary form
    binary_vector, d = encoding_bin(message)
    x = binary_vector.astype(np.float32)

    # Length of the message
    size = x.shape
    n = size[0]

    # Length of the message which will be sent
    m = 4*n

    # Encoding matrix: we take a randomly generated matrix
    A = np.random.randn(m,n)

    # Message you wish to send
    y = A@x

    yprime = noisychannel(y, percent_error)    

    return A, yprime


def decode_relaxed_messages(message: str, percent_error: float = 0.1):
    A, yprime = prepare_message(message=message, percent_error=percent_error)

    m, n = A.shape

    file_name = os.path.join(SCRIPT_DIR, f'my_msg_{round(percent_error, 4)}.npy')

    if not os.path.exists(file_name): 
        x_standard = solve_relaxed(A, yprime)
        np.save(file_name, x_standard)
        return n, x_standard
    else:
        return n, np.load(file_name)    

Find x approximately from yprime by solving:


$min_{0 <= x^{'} <= 1} ||A*x^{'} - y^{'}||_1$

In [39]:
# run the method without adding noise
A, yprime = prepare_message(message=MY_MESSAGE, percent_error=0)
_, n_org = A.shape
Astandard = get_standard_format_matrix(A)
m, n = Astandard.shape
x_standard = solve_relaxed(A, yprime)
xprime = x_standard[:n_org]
d = 8    # Number of bits per character
message_decoded, binary_matrix = decoding_bin(xprime, d)
print(f"The recovered message is with 0 noise level:", message_decoded)        

print(f"m : {m}, n: {n}") # 2232, 3472
print(f"at least {n - m} zero entries") # 1240
print(f"Found: {len([x for x in x_standard if x == 0])} zero entries") # 3182

The recovered message is with 0 noise level: Test...test...Ayhem Speaking...
m : 2232, n: 3472
at least 1240 zero entries
Found: 3178 zero entries


In [40]:
print(f"The actual message: {MY_MESSAGE}\n\n")

for pe in [0, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]:
    n, x = decode_relaxed_messages(MY_MESSAGE, pe)
    xprime = x[:n]
    d = 8    # Number of bits per character
    message_decoded, binary_matrix = decoding_bin(xprime, d)
    print(f"The recovered message is with {pe} noise level:", message_decoded)        

The actual message: Test...test...Ayhem Speaking...


The recovered message is with 0 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.05 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.1 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.2 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.3 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.4 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.5 noise level: @      (1@   @  (  ,
The recovered message is with 0.6 noise level:                                
The recovered message is with 0.7 noise level:                                
The recovered message is with 0.8 noise level:                                


# Section 3 - Dikin's Method

This section is dedicated to the sixth question of the project:
- Implement the Dikin's Method and compare its results with the previous ones.

In [41]:
def dikin_initialization(A: np.ndarray, b:np.ndarray, epsilon: float) -> np.ndarray:
    # quick check: make sure the shapes match
    if A.ndim != 2:
        raise ValueError(f"The initialization function expects 'A' to be 2 dimensionsal. Found: {A.shape}")

    if b.ndim not in (1, 2):
        raise ValueError(f"The function expects the 'b' argument to be at most 2 dimensional. Found: {b.shape}")
    
    if b.ndim == 2 and 1 not in b.shape:
        raise ValueError(f"if the 'b' argument is 2 dimensional, then it has to have a singleton dimension. Found: {b.shape}")  

    if epsilon <= 0 or epsilon > 10 ** -3:
        raise ValueError("The value epsilon is expected in the range ]0, 0.001]")


    # the scipy function expects a 1 dimensional np.array
    b = np.squeeze(b)

    # the initial point can be found by solving the following problem:
    # min x 0 
    # such that A x = b
    # x >= epsilon 
    _, n = A.shape
    c = np.zeros((n,))
    x_0 = opt.linprog(c = c, A_ub = None, b_ub = None, A_eq = A, b_eq=b, bounds = (epsilon , None)).x

    if x_0 is None:
        return None

    assert np.allclose(A @ x_0, b) , "Ax = b issues"
    assert np.all(x_0 >= epsilon - 10 ** -5), "epislon issues" 

    return np.expand_dims(x_0, axis=-1)

In [42]:
from typing import Optional
import numpy.linalg as la

def dikin_algorithm(A: np.ndarray, 
                     b: np.ndarray, 
                     c: np.ndarray, 
                     epsilon:float,
                     alpha: float = 5 * 10 ** -4,
                     max_iterations: int = 10 ** 3) -> Optional[np.ndarray]:
    # the problem is assumed to be on the standard form

    # step1: initialization
    x_k = dikin_initialization(A, b, epsilon=epsilon)


    if x_k is None:
        return None
    
    _, n = A.shape

    if c.ndim == 1:
        c = np.expand_dims(c, axis=-1) 

    assert c.shape == (n, 1), "make sure the vector of cost coefficients is of the correct shape"

    # define "e"
    e_vec = np.ones((1, n))
    
    iter_counter = 0

    while iter_counter <= max_iterations:
        # extract the diagonal matrix out of x_k
        # x_k = 
        x_dk = np.diag(x_k.squeeze())

        # step2: computation of dual estimates
        w_k = la.inv((A @ x_dk) @ (x_dk  @ A.T)) @ (A @ x_dk) @ x_dk @ c 

        # step3: computation of reduced costs
        r_k = c - A.T @ w_k

        # step4: check for optimality
        if np.all(r_k >= 0) and (e_vec @ x_dk @ r_k).item() <= epsilon:
            return x_k.squeeze()
        
        # step5: compute dky
        dy_k = -x_dk @ r_k

        # step6: 
        ## check for unboundedness
        if np.all(dy_k > 0):
            return None
        
        # check for optimal solution
        if np.all(dy_k == 0):
            return x_k.squeeze()
        
        
        # step7:
        # find the minimum value out of [- 1 / (dk_y (i))] out of i such that (dk_y (i) < 0) 
        step_k = np.min([-1 / (x) for x  in dy_k.squeeze() if x < 0]).item() * alpha

        x_k = x_k + step_k * (x_dk @ dy_k)

        # make sure to increment the counter
        iter_counter += 1


In [43]:
def run_dikin_algo(percent_error: float):
    A, yprime = prepare_message(message=MY_MESSAGE, percent_error=percent_error)

    _, n = A.shape

    cost_vec = get_cost_coefficients(A)
    b_eq = get_eq_constraints(A, yprime)
    A_eq = get_standard_format_matrix(A)

    
    file_name = os.path.join(SCRIPT_DIR, f"dikin_my_msg_{percent_error}.npy") 
    
    if not os.path.exists(file_name):
        x_dikin = dikin_algorithm(A_eq, b_eq, cost_vec, 10 ** -3)        
        np.save(x_dikin, file_name)
    else:
        return n, np.load(file_name)
    
    return n, x_dikin

In [44]:
for pe in [0, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]:
    n, x = run_dikin_algo(pe)
    xprime = x[:n]
    d = 8    # Number of bits per character
    message_decoded, binary_matrix = decoding_bin(xprime, d)
    print(f"The recovered message is with {pe} noise level:", message_decoded) 

  y[i, j] = int(x[k])  # Fill the binary matrix


The recovered message is with 0 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.05 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.1 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.2 noise level: Uåãto/&tDjp/,«}h5m ÛP``{`*#$..
The recovered message is with 0.3 noise level: RP<+§Z4$".¨	=hq	Ux/åR *¤_
The recovered message is with 0.4 noise level: D¶q,.
%!Ù@PH déz)?É&j
The recovered message is with 0.5 noise level: PA2A,TA<ö $A)"P# sa,D
The recovered message is with 0.6 noise level: YeÁ.
$shABb Ð0I & 
	$6&covered message is with 0.7 noise level: UEBD" !rA	 $ba 
The recovered message is with 0.8 noise level:         @ @      @   


# Section 4 - Integer Programming

This section is dedicated to the seventh question of the project:
- by imposing binary variables: can you recover your message with a higher noise level?

In [47]:
def solve_integer(A: np.ndarray, y_noise: np.ndarray):
    # according to the scipy.linprog documentation
    # the function accepts 5 arguments
    # c, Aub, bub, Aeq, b_eq, lb, ub

    # minimize: c @ x
    # such that
    # A_ub @ x <= b_ub
    # A_eq @ x == b_eq
    # lb <= x <= ub

    c = get_cost_coefficients(A)
    Astandard = get_standard_format_matrix(A)
    b_standard = get_eq_constraints(A, y_noise)

    _, n = A.shape

    integr = np.zeros(len(c))
    for i in range(n):
        integr[i] = 1

    return n, opt.linprog(c, A_eq=Astandard, b_eq=b_standard, integrality=integr, method='highs', options={'maxiter': 100}).x

In [48]:
def decode_integer_messages(message: str, percent_error: float = 0.1):
    A, yprime = prepare_message(message=message, percent_error=percent_error)

    m, n = A.shape

    file_name = os.path.join(SCRIPT_DIR, f'integer_my_msg_{round(percent_error, 4)}.npy')

    if not os.path.exists(file_name): 
        n, x_standard = solve_integer(A, yprime)
        np.save(file_name, x_standard)
        return n, x_standard
    else:
        return n, np.load(file_name)   

In [49]:
for pe in [0, 0.05, 0.1, 0.2, 0.3, 0.4]:
    n, x = decode_integer_messages(MY_MESSAGE, pe)
    xprime = x[:n]
    d = 8    # Number of bits per character
    message_decoded, binary_matrix = decoding_bin(xprime, d)
    print(f"The recovered message is with {pe} noise level:", message_decoded) 

The recovered message is with 0 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.05 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.1 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.2 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.3 noise level: Test...test...Ayhem Speaking...
The recovered message is with 0.4 noise level: Test...test...Ayhem Speaking...
