# Practical part for the PDC project

In [82]:
import numpy as np

## Functions handling bit vectors

In [83]:
# Generates a 78-character ASCII string at random.
def generate_ascii_string():
    return "".join([chr(int(np.random.rand()*127)) for i in range(0, 78)])

# Performs a conversion from ASCII string to binary symbols
def ascii_str_to_binary(ascii_str):
    return str(''.join(format(ord(i), '08b')[1:] for i in ascii_str))

# Performs the inverse conversion, from a binary stream to ASCII characters
def binary_to_ascii_str(binstr):
    byte_binstr = "".join(['0' + byte for byte in split_bit_str(binstr, 7)])
    binary_int = int(byte_binstr, 2)
    byte_number = (binary_int.bit_length() + 7) // 8
    binary_array = binary_int.to_bytes(byte_number, "big")
    return binary_array.decode()

# Splits a bit string composed of 7-bit codewords into chunks of size 'chunk_size'.
def split_bit_str(bit_str, chunk_size):
    return [bit_str[i:i+chunk_size] for i in range(0, len(bit_str), chunk_size)]

# Gets the whole alphabet from a codeword length
def get_alphabet_from_codeword_length(length):
    return [f'{i:0{length}b}' for i in range(0, 2**length)]

## Decoding criterion functions

In [84]:
# Computes the distance between a symbol and every point of a given constellation and returns the index of the nearest point in the constellation
def minimum_distance_criterion(constellation, symbol):
    return np.argmin([((symbol.real - point.real)**2 + (symbol.imag - point.imag)**2) for point in constellation])

## Debugging functions to check the encoding/decoding states

In [85]:
# Gets every possible codeword lengths for which the length divides the total length of the original bitstring
def compute_codeword_lengths(bit_str):
    n = len(bit_str)
    return [i for i in range(1, n+1) if n % i == 0]

# Gets all the positions where bit_str1 is different from bit_str2 (bit_str1 and bit_str2 MUST HAVE THE SAME SIZE !)
def get_diff_positions(bit_str1, bit_str2):
    return [i for i in range(0, len(bit_str1)) if bit_str1[i] != bit_str2[i]]

## Constellation-related functions

### m-QAM

In [86]:
# Computes the PAM constellation for a given number of points and a distance for each of these points.
def pam(n, d):
    right_side = np.array([d*i + (d/2) for i in range(0, int(n/2))])
    left_side = -right_side
    return np.append(np.sort(left_side), right_side)

# Computes the whole m-QAM constellation, to be used to map each codeword to a point generated here
# WARNING: m MUST BE A POWER OF 2 HERE !!!
def m_qam(m, d, max_energy = (3/2)):
    if d > max_energy:
        raise OverflowError("ERROR: the distance you have set is too big for the project.")

    # First we handle the weird configurations
    if m == 2:
        return pam(m, d)

    elif m == 8:
        real_axis_pam = pam(4, d)
        imaginary_axis_pam = pam(2, d)
        return np.array([[complex(re, im) for re in real_axis_pam] for im in imaginary_axis_pam]).flatten()

    else:    
        # First we take the square root of the QAM size, if it is an integer then we will generate a perfect square
        # Otherwise, we try to find the nearest perfect square with sides of even lengths such that it contains more points than our desired constellation
        axis_len = int(np.sqrt(m))

        if axis_len**2 != m:
             # We want axes containing an even number of points, and a bigger constellation than the one we requested in this case
            if axis_len % 2 != 0:
                axis_len += 1
            else:
                axis_len += 2

        # We compute the difference between the number of points in the nearest perfect square and the number of points in our requested constellation
        nb_points_to_delete = abs(m - axis_len**2)

        # if the difference is positive => we need to remove 'diff_with_nearest_square' points from the nearest perfect square
        # => we generate the perfect square before removing points inside it
        real_axis_pam = pam(axis_len, d)
        if real_axis_pam.max() > max_energy:
            raise OverflowError("ERROR: You have some points that are outside the constrained square. Try to lower the distance or reduce the size of the constellation.")
        
        imaginary_axis_pam = pam(axis_len, d)[::-1]
        points_pairs = np.array([[complex(re, im) for re in real_axis_pam] for im in imaginary_axis_pam])

        # Now that the constellation has been generated, if diff_with_nearest_square is not 0 we remove the additional points
        if nb_points_to_delete > 0:
            nb_points_to_remove_per_quarter = nb_points_to_delete / 4
            nb_rows_to_affect_per_quarter = int(np.sqrt(nb_points_to_remove_per_quarter))

            # If the square root isn't whole => we must re-organize the zones where to delete the points (they are not squares anymore)
            difference_from_whole_square = nb_points_to_remove_per_quarter - nb_rows_to_affect_per_quarter**2

            if difference_from_whole_square > 0:
                nb_rows_to_affect_per_quarter = difference_from_whole_square
                nb_points_to_remove_per_quarter_row = nb_points_to_remove_per_quarter / difference_from_whole_square
            else:
                nb_points_to_remove_per_quarter_row = nb_rows_to_affect_per_quarter

            # First we set the points to delete to 0
            for j in range(0, int(nb_rows_to_affect_per_quarter)):
                for i in range(0, int(nb_points_to_remove_per_quarter_row)):
                    points_pairs[j, i] = 0
                    points_pairs[j, len(points_pairs[0])-1-i] = 0
                    points_pairs[len(points_pairs[0])-1-j, i] = 0
                    points_pairs[len(points_pairs[0])-1-j, len(points_pairs[0])-1-i] = 0

        # We flatten the array to make it easier to map the codewords
        points_pairs = points_pairs.flatten()

        # Then, we remove all the points set to 0 (impossible to have if the dimension is a square with sides of even length, so no worries)
        points_pairs = np.delete(points_pairs, np.argwhere(points_pairs == 0.0+0.0j))

        return points_pairs

# Gets the maximum distance for a given QAM configuration
def get_max_qam_distance(m, tolerance, max_energy):
    if m == 2:
        pam_size = 2
    elif m == 8:
        pam_size = 4
    else:
        pam_size = int(np.sqrt(m))
        if pam_size**2 != m:
            # We want axes containing an even number of points, and a bigger constellation than the one we requested in this case
            if pam_size % 2 != 0:
                pam_size += 1
            else:
                pam_size += 2

    max_distance = 0.0
    for d in np.arange(0, max_energy, tolerance):
        real_axis_pam = pam(pam_size, d)
        if real_axis_pam.max() <= max_energy:
            max_distance = d
        else:
            return max_distance
    return max_distance
    

# Performs the QAM encoding (in a linear fashion for the moment => might be re-worked)
def m_qam_encoder(codeword, constellation, alphabet, energy_per_symbol=0):
    try:
        k = alphabet.index(codeword)
        m = len(alphabet)
        return constellation[k]
    except OverflowError as ovferr:
        raise

## Server-related functions

In [87]:
def serialize_complex(complex_vector,file_name):
    complex_vector = complex_vector.reshape(-1)
    np.savetxt(file_name,np.concatenate([np.real(complex_vector),
    np.imag(complex_vector)]))

def deserialize_complex(file_name):
    tx_data = np.loadtxt(file_name)
    N_sample = tx_data.size
    N_sample = N_sample//2
    tx_data = tx_data[0:N_sample] + 1j*tx_data[N_sample:(2*N_sample)]
    return tx_data

## Encoder part

In [88]:
# Encodes a string into a sequence of complex numbers & adds dummy points in front of it, given an encoder, a constellation, an alphabet and a number of dummy points
def encode_string(raw_str, n_dummy_symbols, encoder, dummy_symbol, constellation = [], alphabet = []):
    if n_dummy_symbols >= 100:
        raise OverflowError("ERROR: The batch of dummy samples cannot be equal or exceed 100 symbols.") 
    bit_str = ascii_str_to_binary(raw_str)
    splitted_bit_str = split_bit_str(bit_str, int(np.log2(len(alphabet))))
    splitted_bit_str_size = len(splitted_bit_str)
    if splitted_bit_str_size > 100:
        raise OverflowError("ERROR: The string without the dummy symbols cannot exceed 100 symbols.") 

    if splitted_bit_str_size + n_dummy_symbols > 100:
        raise OverflowError(f"ERROR: the number of dummy samples is too high, the max value you can set here is {100 - splitted_bit_str_size}.")
            
    theta_estimator_batch = np.full((n_dummy_symbols, 1), dummy_symbol)

    return np.append(theta_estimator_batch, np.array([encoder(codeword=codeword, constellation=constellation, alphabet=alphabet) for codeword in splitted_bit_str]))

## Decoder part

In [89]:
# Decodes a bitstring outputted by the channel, given a constellation, an alphabet, a dummy symbol and the number of sent dummy symbols.

def decode_str(noisy_str, dummy_symbol, n_dummy_symbols, alphabet, constellation):
    # Isolate the dummy symbols to estimate the phase shift
    dummy_symbols = noisy_str[0:n_dummy_symbols]
    
    # Estimate the phase shift (np.angle gives a value between (-pi, pi])
    phase = (np.angle(np.sum(dummy_symbols)) + np.pi) - (np.angle(dummy_symbol) + np.pi) 

    # print(f"Estimated phase: {phase}, phase with normalization: {phase % 2*np.pi}")
    # Apply the inverse phase shift to every codeword
    dephased_symbols = np.exp(-1j * phase) * noisy_str[n_dummy_symbols:]

    decoded_codewords = [alphabet[minimum_distance_criterion(constellation, symbol)] for symbol in dephased_symbols]
    resulting_bit_str = "".join(decoded_codewords)
    return resulting_bit_str

## Putting everything together

In [90]:
# The string to be processed
str_to_process = generate_ascii_string() # TODO: CHANGE THE STRING WITH THE GIVEN ONE
print(str_to_process)

[?3v_m/3
[n{(n@FdrU+gOYi'bOMRLED|r6m0'xq+hYAIvr9;X(n
<T5MZCw


In [91]:
# Invariant values (constraint)
max_energy = 3/2
dummy_symbol = max_energy + 1j * max_energy

# FOR A 64-QAM
codeword_size = 6 
n_dummy_symbols = 4
distance_margin = 0.0001
encoder = m_qam_encoder

# Generating the constellation & alphabet from codeword size
constellation_size = 2**codeword_size
distance = get_max_qam_distance(constellation_size, distance_margin, max_energy)
constellation = m_qam(constellation_size, distance, max_energy)
alphabet = get_alphabet_from_codeword_length(codeword_size)

In [92]:
# Encoding the string
encoded_str = encode_string(
    str_to_process, 
    constellation=constellation, 
    alphabet=alphabet, 
    n_dummy_symbols=n_dummy_symbols, 
    dummy_symbol=dummy_symbol, 
    encoder=encoder
)

# Serializing the output to a text file to send it to the server
serialize_complex(encoded_str, "input.txt")

Command for the server (with VPN) : python3 client/client.py --input_file=input.txt --output_file=output.txt --srv_hostname=iscsrv72.epfl.ch --srv_port=80

(or if we use a conda environment: prefix the command by 'conda run')

In [93]:
# Extracting the noisy output from the generated file
noisy_str = deserialize_complex("output.txt")

# Decoding the bit string
decoded_bitstr = decode_str(
    noisy_str, 
    dummy_symbol=dummy_symbol, 
    n_dummy_symbols=n_dummy_symbols, 
    alphabet=alphabet,
    constellation=constellation
)

# Converting the bitstring into ASCII and printing it
decoded_ascii = binary_to_ascii_str(decoded_bitstr)
print(decoded_ascii)
print(f"Is perfectly decoded: {decoded_ascii == str_to_process}")


[?3v_m/3
[n{(n@FdrU+gOYi'bOMRLED|r6m0'xq+hYAIvr9;X(n
<T5MZCw
Is perfectly decoded: True
