In [1]:
import numpy as np

In [17]:
from enum import Enum

class CompressionMode(Enum):
    """
    Compression mode enum class
    """
    LOW = 1
    HIGH = 2


**Module 0: read_image**

In [2]:
N = 8

In [3]:
from PIL import Image
import numpy as np

def read_image(image_path, N):
    """
    Read image and convert it to a numpy array in grayscale
    :param image_path: path to the image
    :param N: block size (N*N)
    :return: image array, and padding length and width
    """

    # Load the image
    image = Image.open(image_path)

    #image to gray scale
    image = image.convert('L')

    image_array = np.array(image)

    img_lenght, img_width = image_array.shape

    padding_length = 0
    padding_width = 0

    # check if the image is divisible by N and zero pad if necessary
    if img_lenght % N != 0:
        padding_length = N - img_lenght % N
        image_array = np.concatenate((image_array, np.zeros((padding_length, img_width))), axis=0)

    if img_width % N != 0:
        padding_width = N - img_width % N
        image_array = np.concatenate((image_array, np.zeros((img_lenght, padding_width))), axis=1)

    return image_array, padding_length, padding_width

In [5]:
def read_image(image_path, N):
    """
    Read image and convert it to a numpy array in grayscale
    :param image_path: path to the image
    :param N: block size (N*N)
    :return: image array, and padding length and width
    """

    # Load the image
    image = Image.open(image_path)

    #image to gray scale
    image = image.convert('L')

    image_array = np.array(image)

    img_lenght, img_width = image_array.shape

    # save gray scale image

    padding_length = 0
    padding_width = 0

    # check if the image is divisible by N and zero pad if necessary
    if img_lenght % N != 0:
        padding_length = N - img_lenght % N
        image_array = np.concatenate((image_array, np.zeros((padding_length, img_width))), axis=0)

    if img_width % N != 0:
        padding_width = N - img_width % N
        image_array = np.concatenate((image_array, np.zeros((img_lenght, padding_width))), axis=1)

    return image_array, padding_length, padding_width

In [7]:
image_path = "/content/palestine.jpg"
image_array, padding_length, padding_width = read_image(image_path, N)

**Module 1: Blockify**

In [8]:
def blockify_image(image_array, N):
    """
    Divide an image array into blocks of size (N*N).
    :param image_array: numpy array of image
    :param N: size of blocks (N*N)
    :return: list of blocks each of shape (N*N)
    """
    img_lenght, img_width = image_array.shape

    blocks = image_array.reshape(img_lenght // N, N, img_width // N, N)
    blocks = blocks.swapaxes(1, 2)

    return blocks

In [9]:
test = np.array([[1,2,3,4,5,6,7,8,],
[9,10,11,12,13,14,15,16,],
[17,18,19,20,21,22,23,24,],
[25,26,27,28,29,30,31,32,],
[33,34,35,36,37,38,39,40,],
[41,42,43,44,45,46,47,48,],
[49,50,51,52,53,54,55,56,],
[57,58,59,60,61,62,63,64,],])

block_test = blockify_image(test,4)

block_test

array([[[[ 1,  2,  3,  4],
         [ 9, 10, 11, 12],
         [17, 18, 19, 20],
         [25, 26, 27, 28]],

        [[ 5,  6,  7,  8],
         [13, 14, 15, 16],
         [21, 22, 23, 24],
         [29, 30, 31, 32]]],


       [[[33, 34, 35, 36],
         [41, 42, 43, 44],
         [49, 50, 51, 52],
         [57, 58, 59, 60]],

        [[37, 38, 39, 40],
         [45, 46, 47, 48],
         [53, 54, 55, 56],
         [61, 62, 63, 64]]]])


**Deblockify**

In [14]:
def deblockify_image(blocks):
    """
    Reconstruct an image from its blocks
    :param blocks: list of blocks
    :return: image array
    """
    N = len(blocks[0][0])               # block size
    image_length = len(blocks) * N      # number of vertical blocks * block size
    image_width = len(blocks[0]) * N    # number of horizontal blocks * block size

    image_array = blocks.reshape(image_length // N, image_width // N, N, N)
    image_array = image_array.swapaxes(1, 2).reshape(-1, image_length, image_width).squeeze()

    return image_array

test_deblock = deblockify_image(block_test)

test_deblock

array([[ 1,  2,  3,  4,  5,  6,  7,  8],
       [ 9, 10, 11, 12, 13, 14, 15, 16],
       [17, 18, 19, 20, 21, 22, 23, 24],
       [25, 26, 27, 28, 29, 30, 31, 32],
       [33, 34, 35, 36, 37, 38, 39, 40],
       [41, 42, 43, 44, 45, 46, 47, 48],
       [49, 50, 51, 52, 53, 54, 55, 56],
       [57, 58, 59, 60, 61, 62, 63, 64]])

**Module 2: DCT**

In [10]:
import numpy as np

def DCT_Basis(u, v, N):
    """
    Return the DCT Basis function for u,v and N
    :param u: the horizontal frequency index of the DCT output
    :param v: the vertical frequency index of the DCT output
    :param N: size of block (N*N)
    :return: DCT Basis function for given u and v
    """
    b = np.zeros((N,N))
    for x in range(N):
        for y in range(N):
            b[x][y] = np.cos((2 * x + 1) * u * np.pi / (2 * N)) * np.cos((2 * y + 1) * v * np.pi / (2 * N))

    return b

def DCT(A, dct_basis):
    """
    Return the DCT of a block
    :param A: numpy array of the block
    :param dct_basis: List of DCT Basis function
    :return: DCT of A
    """
    N = len(A)
    C = np.zeros((N,N))

    for u in range(N):
        for v in range(N):
            C[u][v] = np.sum(np.multiply(A, dct_basis[u][v]))

    #Normalization
    C = C/16
    C[0,:] = C[0,:]/2
    C[:,0] = C[:,0]/2
    #C = np.round(C)

    return C

In [11]:
dct_basis = np.zeros((N,N,N,N))

for u in range(N):
    for v in range(N):
        dct_basis[u][v] = DCT_Basis(u,v,N)

test_dct = DCT(test, dct_basis)

test_dct

array([[ 3.25000000e+01, -3.22116151e+00, -8.88178420e-15,
        -3.36727400e-01,  8.88178420e-16, -1.00451452e-01,
        -8.88178420e-15, -2.53511614e-02],
       [-2.57692921e+01, -3.55271368e-15,  1.77635684e-15,
         0.00000000e+00,  0.00000000e+00, -8.43769499e-15,
         6.21724894e-15,  4.44089210e-15],
       [-6.21724894e-15,  1.11022302e-15,  4.44089210e-16,
        -1.11022302e-16,  0.00000000e+00, -3.33066907e-16,
        -2.22044605e-16, -4.44089210e-16],
       [-2.69381920e+00, -6.66133815e-16, -8.88178420e-16,
        -3.33066907e-16,  0.00000000e+00, -1.66533454e-15,
         3.33066907e-16,  1.49880108e-15],
       [ 2.22044605e-16, -6.66133815e-16, -1.11022302e-15,
         3.33066907e-16, -6.66133815e-16, -9.99200722e-16,
         2.22044605e-16,  3.88578059e-16],
       [-8.03611615e-01, -8.88178420e-16, -1.11022302e-16,
        -1.11022302e-15, -8.88178420e-16, -2.22044605e-16,
         5.55111512e-16,  1.11022302e-16],
       [-8.43769499e-15,  1.110223

**IDCT**

In [16]:
def IDCT_Basis(x, y, N):
    """
    Return the IDCT Basis function for x,y and N
    :param x: the horizontal image index of the IDCT output
    :param y: the vertical image index of the IDCT output
    :param N: size of block (N*N)
    :return: DCT Basis function for given x and y
    """
    b = np.zeros((N, N))
    for u in range(N):
        for v in range(N):
            b[u][v] = np.cos((2 * x + 1) * u * np.pi / (2 * N)) * np.cos((2 * y + 1) * v * np.pi / (2 * N))

    return b

def IDCT(C, idct_basis):
    """
    Return the IDCT of a block
    :param C: numpy array of the block
    :param idct_basis: List of IDCT Basis function
    :return: IDCT of C
    """
    N = len(C)
    A = np.zeros((N,N))

    for x in range(N):
        for y in range(N):
            basis = idct_basis[x][y]
            A[x][y] = np.sum(np.multiply(C, basis))

    return A

idct_basis = np.zeros((N,N,N,N))

for x in range(N):
    for y in range(N):
        idct_basis[x][y] = IDCT_Basis(x,y,N)

test_idct = IDCT(test_dct, idct_basis)

test_idct

array([[ 1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.],
       [ 9., 10., 11., 12., 13., 14., 15., 16.],
       [17., 18., 19., 20., 21., 22., 23., 24.],
       [25., 26., 27., 28., 29., 30., 31., 32.],
       [33., 34., 35., 36., 37., 38., 39., 40.],
       [41., 42., 43., 44., 45., 46., 47., 48.],
       [49., 50., 51., 52., 53., 54., 55., 56.],
       [57., 58., 59., 60., 61., 62., 63., 64.]])

**Module 3: Quantizer**

In [19]:
compression_mode = CompressionMode.LOW

In [20]:
def get_quantization_table(CompressionMode):
    """
    Get quantization table based on compression mode
    :param CompressionMode: enum class
    :return: quantization table
    """
    if CompressionMode == CompressionMode.LOW:
        return np.array([[1, 1, 1, 1, 1, 2, 2, 4],
                        [1, 1, 1, 1, 1, 2, 2, 4],
                        [1, 1, 1, 1, 2, 2, 2, 4],
                        [1, 1, 1, 1, 2, 2, 4, 8],
                        [1, 1, 2, 2, 2, 2, 4, 8],
                        [2, 2, 2, 2, 2, 4, 8, 8],
                        [2, 2, 2, 4, 4, 8, 8, 16],
                        [4, 4, 4, 4, 8, 8, 16, 16]])
    elif CompressionMode == CompressionMode.HIGH:
        return np.array([[1, 2, 4, 8, 16, 32, 64, 128],
                        [2, 4, 4, 8, 16, 32, 64, 128],
                        [4, 4, 8, 16, 32, 64, 128, 128],
                        [8, 8, 16, 32, 64, 128, 128, 256],
                        [16, 16, 32, 64, 128, 128, 256, 256],
                        [32, 32, 64, 128, 128, 256, 256, 256],
                        [64, 64, 128, 128, 256, 256, 256, 256],
                        [128, 128, 128, 256, 256, 256, 256, 256]])
    else:
        raise ValueError('Invalid compression mode. Valid modes are: low, high')

def quantize(block, CompressionMode):
    """
    Quantize a block based on compression mode
    :param block: numpy array of block
    :param CompressionMode: enum class
    :return: quantized block
    """
    return np.round(np.divide(block, get_quantization_table(CompressionMode)))


test_quantized = quantize(test, compression_mode)

test_quantized

array([[ 1.,  2.,  3.,  4.,  5.,  3.,  4.,  2.],
       [ 9., 10., 11., 12., 13.,  7.,  8.,  4.],
       [17., 18., 19., 20., 10., 11., 12.,  6.],
       [25., 26., 27., 28., 14., 15.,  8.,  4.],
       [33., 34., 18., 18., 18., 19., 10.,  5.],
       [20., 21., 22., 22., 22., 12.,  6.,  6.],
       [24., 25., 26., 13., 13.,  7.,  7.,  4.],
       [14., 14., 15., 15.,  8.,  8.,  4.,  4.]])

**Dequantizer**

In [23]:
def dequantize(block, CompressionMode):
    """
    Dequantize a block based on compression mode
    :param block: numpy array of block
    :param CompressionMode: enum class
    :return: dequantized block
    """
    return np.round(np.multiply(block, get_quantization_table(CompressionMode)))

test_dequantized = dequantize(test_quantized, compression_mode)
test_dequantized

array([[ 1.,  2.,  3.,  4.,  5.,  6.,  8.,  8.],
       [ 9., 10., 11., 12., 13., 14., 16., 16.],
       [17., 18., 19., 20., 20., 22., 24., 24.],
       [25., 26., 27., 28., 28., 30., 32., 32.],
       [33., 34., 36., 36., 36., 38., 40., 40.],
       [40., 42., 44., 44., 44., 48., 48., 48.],
       [48., 50., 52., 52., 52., 56., 56., 64.],
       [56., 56., 60., 60., 64., 64., 64., 64.]])

**zigzag transform (2D to 1D)**

In [24]:
def get_zigzag_indices(N):
    """
    Return the zigzag transform for a block of size N*N
    :param N: size of block (N*N)
    :return: 1D array zigzag transform for a 2D block of size (N*N)
    """

    zigzag_indices = [(i,j) for i in range(N) for j in range(N)]
    zigzag_indices.sort(key = lambda x: (x[0]+ x[1], x[1]) if (x[0]+x[1])%2 == 0 else (x[0]+x[1], x[0]))

    return zigzag_indices

def zigzag_transform(block):
    """
    Return 1d array zigzag-traversed from the 2d block (N*N)
    :param block: 2d block (N*N)
    :return: 1d array (N*N)
    """
    N = block.shape[0]

    zigzag_indices = get_zigzag_indices(N)

    _1d_array = np.zeros(N*N)

    for i,zigzag_index in enumerate(zigzag_indices):
        _1d_array[i] = block[zigzag_index[0], zigzag_index[1]]

    return _1d_array

test_1d = zigzag_transform(test)

test_1d

array([ 1.,  2.,  9., 17., 10.,  3.,  4., 11., 18., 25., 33., 26., 19.,
       12.,  5.,  6., 13., 20., 27., 34., 41., 49., 42., 35., 28., 21.,
       14.,  7.,  8., 15., 22., 29., 36., 43., 50., 57., 58., 51., 44.,
       37., 30., 23., 16., 24., 31., 38., 45., 52., 59., 60., 53., 46.,
       39., 32., 40., 47., 54., 61., 62., 55., 48., 56., 63., 64.])

**Reverse Zigzag transform**

In [26]:
def reverse_zigzag_transform(_1d_array, N):
    """
    Return 2d block (N*N) from the 1d array zigzag-traversed
    :param _1d_array: 1d array (N*N)
    :param N: size of block
    :return: 2d block (N*N)
    """
    zigzag_indices = get_zigzag_indices(N)

    block = np.zeros((N,N))

    for i,zigzag_index in enumerate(zigzag_indices):
        block[zigzag_index[0], zigzag_index[1]] = _1d_array[i]

    return block

test_rev_zig = reverse_zigzag_transform(test_1d, N)
test_rev_zig

array([[ 1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.],
       [ 9., 10., 11., 12., 13., 14., 15., 16.],
       [17., 18., 19., 20., 21., 22., 23., 24.],
       [25., 26., 27., 28., 29., 30., 31., 32.],
       [33., 34., 35., 36., 37., 38., 39., 40.],
       [41., 42., 43., 44., 45., 46., 47., 48.],
       [49., 50., 51., 52., 53., 54., 55., 56.],
       [57., 58., 59., 60., 61., 62., 63., 64.]])

**Module 5: Run-Length Encoder**

In [28]:
def run_length_encoder(image):
    """
    Run-Length Encoder for binary image compression.

    Parameters:
    - image (numpy.ndarray): Input binary image represented as a NumPy array.

    Returns:
    - numpy.ndarray: Encoded array containing alternating counts of consecutive zeros and non-zero values.

    Description:
    This function takes a binary image represented as a NumPy array and performs run-length encoding on it.
    """
    zeros_count = 0
    length = image.shape
    encoded = np.array([])
    for i in range(length[0]):
        if image[i] == 0:
            if zeros_count == 0:
                encoded = np.append(encoded, 0)
            zeros_count += 1
        else:
            if zeros_count != 0:
                encoded = np.append(encoded, zeros_count)
                zeros_count = 0
            encoded = np.append(encoded, image[i])
    if zeros_count != 0:
        encoded = np.append(encoded,zeros_count)
    return encoded


x = np.array([[ 1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.],
       [ 9., 10., 11., 12., 13., 14., 15., 16.],
       [17., 18., 19., 20., 21., 22., 23., 24.],
       [0, 0, 0, 0, 0, 0, 0, 0, ],
       [33., 34., 35., 36., 37., 38., 39., 40.],
       [41., 42., 43., 44., 45., 46., 47., 48.],
       [0, 0, 0, 0, 0, 0, 0, 0, ],
       [0, 0, 0, 0, 0, 0, 0, 0, ]]).reshape(1,-1).squeeze()

run_len_coded = run_length_encoder(x)
run_len_coded

array([ 1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11., 12., 13.,
       14., 15., 16., 17., 18., 19., 20., 21., 22., 23., 24.,  0.,  8.,
       33., 34., 35., 36., 37., 38., 39., 40., 41., 42., 43., 44., 45.,
       46., 47., 48.,  0., 16.])

**Run-Length-Decoder**

In [30]:
def run_length_decoder(encoded,no_vertical_blocks,no_horizontal_blocks,N):
    """
    Decode the encoded image
    :param encoded: encoded image
    :param no_vertical_blocks: Number of vertical blocks
    :param no_horizontal_blocks: Number of horizontal blocks
    :param N: block size (N*N)
    :return: decoded image
    """
    total_image_length = N*N*no_horizontal_blocks*no_vertical_blocks

    encoded_len = encoded.shape
    image = np.zeros(total_image_length)
    idx = 0
    for i in range(encoded_len[0]):
        if encoded[i - 1] == 0:
            continue
        if encoded[i] == 0:
            for j in range(int(encoded[i+1])):
                image[idx] = 0
                idx += 1
        else:
            image[idx] = encoded[i]
            idx += 1
    return image


run_len_dec = run_length_decoder(run_len_coded, 4, 4, 2)
run_len_dec

array([ 1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11., 12., 13.,
       14., 15., 16., 17., 18., 19., 20., 21., 22., 23., 24.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0., 33., 34., 35., 36., 37., 38., 39.,
       40., 41., 42., 43., 44., 45., 46., 47., 48.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.])

**Module 6: Huffman Encoder**

In [31]:
import heapq  # Import the heapq module for the priority queue implementation

class HuffmanNode:
    def __init__(self, symbol=None, frequency=None):
        self.symbol = symbol
        self.frequency = frequency
        self.left = None
        self.right = None

    def __lt__(self, other):
        return self.frequency < other.frequency

# Function to build the Huffman tree from symbol frequencies
def build_huffman_tree(frequencies):
    heap = [HuffmanNode(symbol=s, frequency=f) for s, f in frequencies.items()]  # Create a heap with HuffmanNodes
    heapq.heapify(heap)  # Convert the list into a min-heap data structure, the min-heap -> follows the property of a complete binary tree in which the value of the internal node is smaller than or equal to the value of the children of that node

    while len(heap) > 1:
        left = heapq.heappop(heap)  # Pop the smallest node from the heap, note -> left is a HuffmanNode
        right = heapq.heappop(heap)  # Pop the second smallest node from the heap

        internal_node = HuffmanNode(frequency=left.frequency + right.frequency)  # Create a new internal node
        internal_node.left = left  # Assign the left child
        internal_node.right = right  # Assign the right child

        heapq.heappush(heap, internal_node)  # Push the new internal node back into the heap

    return heap[0]  # Return the root of the Huffman tree

# Function to generate Huffman codes from the Huffman tree
def generate_huffman_codes(node, current_code="", codes=None):
    if codes is None:
        codes = {}

    if node.symbol is not None:
        codes[node.symbol] = current_code
    else:
        generate_huffman_codes(node.left, current_code + "0", codes)  # Recursively traverse left with '0'
        generate_huffman_codes(node.right, current_code + "1", codes)  # Recursively traverse right with '1'

    return codes


In [32]:
from collections import defaultdict, Counter  # Import defaultdict and Counter for convenient data structures

# Function to encode data using Huffman codes
def huffman_encode(data):
    frequencies = Counter(data)  # Count the frequency of each symbol in the input data
    root = build_huffman_tree(frequencies)  # Build the Huffman tree
    codes = generate_huffman_codes(root)  # Generate Huffman codes from the tree

    encoded_data = "".join(codes[symbol] for symbol in data)  # Encode the input data using Huffman codes
    return encoded_data, root  # Return the encoded data and the Huffman tree root

huff_coded, tree = huffman_encode(run_len_coded)

huff_coded

'1010100100010110111100000111110101101100011111001110011001101101111101001111011010011110100000010101100101010111001001001011011101101100001110001110001010111111000010110100001100000010001010111110000101110111001111111001001111101100011101'

**Huffman Decoder**

In [35]:
# Function to decode Huffman-encoded data
def huffman_decode(encoded_data, root):
    decoded_data = []
    current_node = root

    for bit in encoded_data:
        if bit == "0":
            current_node = current_node.left  # Traverse left with '0'
        else:
            current_node = current_node.right  # Traverse right with '1'

        if current_node.symbol is not None:
            decoded_data.append(current_node.symbol)  # Append the symbol when a leaf node is reached
            current_node = root  # Reset to the root for the next symbol

    return decoded_data  # Return the decoded data

dec_data = huffman_decode(huff_coded, tree)

dec_data

[1.0,
 2.0,
 3.0,
 4.0,
 5.0,
 6.0,
 7.0,
 8.0,
 9.0,
 10.0,
 11.0,
 12.0,
 13.0,
 14.0,
 15.0,
 16.0,
 17.0,
 18.0,
 19.0,
 20.0,
 21.0,
 22.0,
 23.0,
 24.0,
 0.0,
 8.0,
 33.0,
 34.0,
 35.0,
 36.0,
 37.0,
 38.0,
 39.0,
 40.0,
 41.0,
 42.0,
 43.0,
 44.0,
 45.0,
 46.0,
 47.0,
 48.0,
 0.0,
 16.0]

**Module 7: Finite Precision Arithmetic Encoder**

**Encoder**

In [5]:
from collections import Counter
import numpy as np

def calculate_frequencies(input_stream):
    element_counts = Counter(input_stream.tolist())  # Convert NumPy array to list
    return dict(element_counts)


def calculate_cumulative_frequency(symbol, frequency_dictionary):
    cumulative_frequency = 0
    for s in frequency_dictionary:
        cumulative_frequency += frequency_dictionary[s]
        if s == symbol:
            break
    return cumulative_frequency




def arithmetic_encode(input_stream, precision=32):
    """
    Encode a message using arithmetic coding.

    Parameters:
    - input_stream (numpy.ndarray): The input message to be encoded.
    - precision (int): The precision of the arithmetic coding (default is 32).

    Returns:
    - encoded_code (str): The binary-encoded message.
    - frequency_dict (dict): A dictionary containing symbol frequencies.
    """

    # Append termination symbol '!'
    input_stream = np.append(input_stream, np.array('!'))
    stream_size = len(input_stream)

    # Calculate symbol frequencies
    frequency_dict = calculate_frequencies(input_stream)

    # Initialization
    full_range = 2 ** precision
    half_range = full_range // 2
    quarter_range = half_range // 2
    lower_limit = 0
    upper_limit = full_range
    trails = 0
    encoded_code = []

    # Loop through symbols in the input stream
    for symbol in input_stream:
        frequency_symbol = frequency_dict[symbol]
        cumulative_frequency_high = calculate_cumulative_frequency(symbol, frequency_dict)
        cumulative_frequency_low = cumulative_frequency_high - frequency_symbol
        range_size = upper_limit - lower_limit

        # Update upper and lower limits based on cumulative frequencies
        upper_limit = lower_limit + range_size * cumulative_frequency_high // stream_size
        lower_limit = lower_limit + range_size * cumulative_frequency_low // stream_size

        # Binary scaling
        while True:
            if upper_limit < half_range:
                encoded_code.append('0')
                encoded_code.append('1' * trails)
                trails = 0
                lower_limit *= 2
                upper_limit *= 2
            elif lower_limit >= half_range:
                encoded_code.append('1')
                encoded_code.append('0' * trails)
                trails = 0
                lower_limit = 2 * (lower_limit - half_range)
                upper_limit = 2 * (upper_limit - half_range)
            elif lower_limit >= quarter_range and upper_limit < 3 * quarter_range:
                trails += 1
                lower_limit = 2 * (lower_limit - quarter_range)
                upper_limit = 2 * (upper_limit - quarter_range)
            else:
                break

    # Finalize encoding
    trails += 1
    if lower_limit <= quarter_range:
        encoded_code.append('0')
        encoded_code.append('1' * trails)
    else:
        encoded_code.append('1')
        encoded_code.append('0' * trails)

    # Convert the encoded code to a string
    encoded_code = ''.join(encoded_code)

    # Return the encoded code and symbol frequencies
    return encoded_code, frequency_dict


**Decoder**

In [6]:
# Main function for arithmetic decoding
def arithmetic_decode(encoded_code, frequency_dictionary, precision=32):
    """
    Decode an arithmetic encoded message.

    Parameters:
    - encoded_code (str): The binary-encoded message to be decoded.
    - frequency_dictionary (dict): A dictionary containing symbol frequencies.
    - precision (int): The precision of the arithmetic coding (default is 32).

    Returns:
    - decoded_message (list): The decoded message as a list of integers.
    """

    # Initialization
    code_size = len(encoded_code)
    encoded_code = [int(c) for c in encoded_code]
    stream_size = sum(frequency_dictionary.values())
    full = 2 ** precision
    half = full // 2
    quarter = half // 2
    low_range = 0
    high_range = full
    decoded_value = 0
    index = 1
    decoded_message = []

    # Decode initial value
    while index <= precision and index <= code_size:
        if encoded_code[index - 1] == 1:
            decoded_value = decoded_value + 2 ** (precision - index)
        index += 1

    flag = 1

    # Main decoding loop
    while flag:
        # Iterate through symbols in the frequency dictionary
        for symbol in frequency_dictionary:
            frequency_symbol = frequency_dictionary[symbol]
            cumulative_frequency_high = calculate_cumulative_frequency(symbol, frequency_dictionary)
            cumulative_frequency_low = cumulative_frequency_high - frequency_symbol
            range_size = high_range - low_range
            high_range_new = low_range + range_size * cumulative_frequency_high // stream_size
            low_range_new = low_range + range_size * cumulative_frequency_low // stream_size

            # Check if the decoded value falls within the current symbol's range
            if low_range_new <= decoded_value < high_range_new:
                decoded_message.extend([symbol])
                low_range = low_range_new
                high_range = high_range_new

                # Check for the termination symbol '!'
                if symbol == '!':
                    flag = 0
                break

        # Binary scaling and updating decoded value
        while True:
            if high_range < half:
                # Update ranges and decoded value for low range
                low_range *= 2
                high_range *= 2
                decoded_value *= 2
                if index <= code_size:
                    decoded_value += encoded_code[index - 1]
                    index += 1
            elif low_range >= half:
                # Update ranges and decoded value for high range
                low_range = 2 * (low_range - half)
                high_range = 2 * (high_range - half)
                decoded_value = 2 * (decoded_value - half)
                if index <= code_size:
                    decoded_value += encoded_code[index - 1]
                    index += 1
            elif low_range >= quarter and high_range < 3 * quarter:
                # Additional scaling for certain ranges
                low_range = 2 * (low_range - quarter)
                high_range = 2 * (high_range - quarter)
                decoded_value = 2 * (decoded_value - quarter)
                if index <= code_size:
                    decoded_value += encoded_code[index - 1]
                    index += 1
            else:
                break

    # Remove the termination symbol and convert symbols to integers
    decoded_message.pop()
    decoded_message = [int(float(c)) for c in decoded_message]

    # Return the decoded message
    return decoded_message


**Testing Module**

In [7]:
s = [1,2,5,56,21,6454,21,95,3,1,231,54,21,32,25,4,21,32,655,4,496,1232]

print(f"String initially {s}")

encoded_data, dict = arithmetic_encode(s)

print(f"Encoded Data {encoded_data}")

decoded_data = arithmetic_decode(encoded_data,dict)

print(f"Decoded Data {decoded_data}")


String initially [1, 2, 5, 56, 21, 6454, 21, 95, 3, 1, 231, 54, 21, 32, 25, 4, 21, 32, 655, 4, 496, 1232]
Encoded Data 0000001000010001110111110111000100000011000011011010010010011000000111010110100000111110110
Decoded Data [1, 2, 5, 56, 21, 6454, 21, 95, 3, 1, 231, 54, 21, 32, 25, 4, 21, 32, 655, 4, 496, 1232]


**Module 8: Convolutional Channel Encoding**

**Encoder**

In [8]:
def xor_char(a, b):
    if (a == '0' and b == '0') or (a == '1' and b == '1'):
        return '0'
    else:
        return '1'


def xor_sequence(A):
    C = '0'
    for a in A:
        C = xor_char(C, a)

    return C


def encode_sequence_with_polynomial(A, polynomial):
    C = ''
    for i in range(len(A)):
        if polynomial[i] == 0:
            C += '0'
        else:
            C += A[i]
    return xor_sequence(C)



def convert_sequence_to_string(sequence):
    """
    Convert a list of sequences into a single string.

    Parameters:
    - sequence (list): List of sequences.

    Returns:
    - S (list): List of characters representing the converted sequence.
    """
    S = []
    for i in range(len(sequence[0])):
        for j in range(len(sequence)):
            S.append(sequence[j][i])
    return S



def encode(A, generator_polynomials, k):
    """
    Encode a binary sequence using a set of generator polynomials and a given block size.

    Parameters:
    - A (str): Binary sequence to be encoded.
    - generator_polynomials (list): List of generator polynomials for encoding.
    - k (int): Block size for encoding.

    Returns:
    - encoded_sequence_string (list): List of characters representing the encoded sequence.
    """
    A = '0' * (k - 1) + A
    encoded_sequences = []

    # Iterate through each generator polynomial for encoding.
    for polynomial in generator_polynomials:
        C = ''

        # Iterate through each block of the binary sequence.
        for i in range(len(A) - k + 1):
            curr_list = A[i:i + k]
            curr_list = curr_list[::-1]

            # Encode the current block using the specified polynomial.
            C += encode_sequence_with_polynomial(curr_list, polynomial)

        # Append the encoded sequence for the current polynomial.
        encoded_sequences.append(C)

    # Convert the list of encoded sequences into a single string.
    encoded_sequence_string = convert_sequence_to_string(encoded_sequences)

    return encoded_sequence_string

**Decoder**

In [9]:
def generate_binary_sequences(n):
    binary_sequences = [bin(i)[2:].zfill(n) for i in range(2 ** n)]
    return binary_sequences

states = None
generator_polynomials_example = [[1, 1, 1], [0, 1, 1], [1, 0, 1]]

# Path Metric
pathes = {
    "00": ["00", "10"],
    "01": ["00", "10"],
    "10": ["01", "11"],
    "11": ["01", "11"]
}


def state_generator(bits, generator_polynomials, k):
    dict = {}
    binary_seq = generate_binary_sequences(bits)
    for seq in binary_seq:
        dict[seq] = []
    for seq in binary_seq:
        for bit in ['0', '1']:
            encoded_sequences = []
            for polynomial in generator_polynomials:
                C = ''
                for i in range(len(seq) - k + 2):
                    C += encode_sequence_with_polynomial(bit + seq, polynomial)
                encoded_sequences.append(C)
            encoded_sequence_string = convert_sequence_to_string(encoded_sequences)
            dict[seq].append(encoded_sequence_string)
    return dict


def binary_string_to_array(input_str):
    while len(input_str) % 3 != 0:
        input_str += '0'
    result_array = [input_str[i:i + 3] for i in range(0, len(input_str), 3)]
    return result_array


def compare_bits_with_array(main_str, str_array):
    result_array = []
    for str_in_array in str_array:
        if len(main_str) != len(str_in_array):
            raise ValueError("Input strings must have the same length")

        mismatch_count = sum(bit1 != bit2 for bit1, bit2 in zip(main_str, str_in_array))
        result_array.append(mismatch_count)
    return result_array


def combine_two_routes(route1, route2):
    combined_route = {
        "to": route2["to"],
        "value": route1["value"] + route2["value"],
        "dist": route1["dist"] + route2["dist"]
    }
    return combined_route


def check_path(routes, to):
    for route in routes:
        if route["to"] == to:
            return route
    return None


def drop_duplicates(routes):
    unique_routes = []

    for route in routes:
        duplicate_found = False
        for existing_route in unique_routes:
            if route["to"] == existing_route["to"]:
                if route["dist"] < existing_route["dist"]:
                    existing_route.update(route)
                duplicate_found = True
                break

        if not duplicate_found:
            unique_routes.append(route)

    return unique_routes


def min_dist_value(routes):
    if not routes:
        return None
    min_dist_route = min(routes, key=lambda x: x["dist"])
    return min_dist_route["value"]


def decode(sequence, generator_polynomials, k):
    states = state_generator(k - 1, generator_polynomials, 3)
    two_routes = []
    four_routes = []
    arr_strings = binary_string_to_array(sequence)
    for i in range(len(arr_strings)):
        eight_routes = []
        if i == 0:
            hamming_dist = compare_bits_with_array(arr_strings[i], ["000", "111"])
            two_routes.append({
                "to": pathes["00"][0],
                "value": "0",
                "dist": hamming_dist[0]
            })
            two_routes.append({
                "to": pathes["00"][1],
                "value": "1",
                "dist": hamming_dist[1]
            })

        elif i == 1:
            for state in pathes["00"]:
                hamming_dist = compare_bits_with_array(arr_strings[i], states[state])
                one_before = check_path(two_routes, state)
                for index in range(2):
                    temp_route = {
                        "to": pathes[state][index],
                        "value": str(index),
                        "dist": hamming_dist[index]
                    }

                    combined_route = combine_two_routes(one_before, temp_route)

                    four_routes.append(combined_route)

        else:
            for key, value in states.items():
                hamming_dist = compare_bits_with_array(arr_strings[i], value)

                for index in range(len(pathes[key])):
                    temp_route = {
                        "to": pathes[key][index],
                        "value": str(index),
                        "dist": hamming_dist[index]
                    }
                    one_before = check_path(four_routes, key)
                    combined_route = combine_two_routes(one_before, temp_route)
                    eight_routes.append(combined_route)
            four_routes = drop_duplicates(eight_routes)
    final_route = min_dist_value(four_routes)
    return final_route


In [12]:
s = '10110011001110011001'
K = 3
generator_polynomials = [[1, 1, 1], [1, 1, 0], [1, 0, 1]]
print(f"String initially {s}")

encoded_data = encode(s,generator_polynomials,K)

encoded_data = ''.join(encoded_data)

print(f"Encoded Data {encoded_data}")

decoded_data = decode(encoded_data,generator_polynomials,K)

print(f"Decoded Data {decoded_data}")


String initially 10110011001110011001
Encoded Data 111110010001011101111001011101111001100011101111001011101111
Decoded Data 10110011001110011001


**Module 9: BPSK Modulator**

**Transmitter**

In [13]:
import numpy as np


def bpsk_transmitter(bit_seq, fc, Ab, Tb, n):
    """
    BPSK transmitter
    :param bit_seq: bit sequence
    :param fc: carrier frequency
    :param Ab: amplitude of message signal pulse
    :param Tb: bit duration
    :param n: number of samples per bit duration
    :return: BPSK modulated signal
    """
    if fc < 10 / Tb:
        raise ValueError('Carrier frequency (fc) must be at least 10 times the the bit rate (1/Tb)')

    num_bits = len(bit_seq)  # number of bits
    n_total = num_bits * n  # total number of samples

    # t_total = num_bits * Tb     # total time duration of symbol/bit
    # dt = Tb/n
    # t = np.arange(0, t_total, dt) # time axis

    Eb = Ab ** 2 * Tb
    basis_function = generate_basis_function(fc, Tb, n)
    passbannd_signal = np.zeros(n_total)

    for i in range(num_bits):
        sign = (-1) ** (int(bit_seq[i]) + 1)  # bit 1 -> 1, bit 0 -> -1
        passbannd_signal[i * n:(i + 1) * n] = sign * np.sqrt(Eb) * basis_function

    return passbannd_signal  # , t


def generate_basis_function(fc, Tb, n):
    """
    Generate basis function for BPSK modulation
    :param fc: carrier frequency
    :param Tb: bit duration
    :param n: number of samples per bit duration
    :return: basis function
    """
    dt = Tb / n
    t_symbol = np.arange(0, Tb, dt)
    return np.sqrt(2 / Tb) * np.cos(2 * np.pi * fc * t_symbol)


**Receiver**

In [14]:
def bpsk_receiver(received_signal, fc, Tb, n):
    """
    BPSK correlator receiver
    :param received_signal: received signal
    :param fc: carrier frequency
    :param Tb: bit duration
    :param n: number of samples per bit duration
    :return: restored bit sequence
    """
    num_bits = len(received_signal) // n
    restored_bit_seq = np.zeros(num_bits)

    dt = Tb / n

    basis_function = generate_basis_function(fc, Tb, n)

    for i in range(num_bits):
        symbol = received_signal[i * n:(i + 1) * n]
        observable_element = detector(symbol, dt, basis_function)
        restored_bit_seq[i] = 1 if observable_element > 0 else 0

    return restored_bit_seq


def detector(symbol, dt, basis_function):
    """
    Correlator detector
    :param symbol: symbol to be detected
    :param t_symbol: time axis for symbol
    :param basis_function: basis function
    :return: observable element
    """
    return np.trapz(np.multiply(symbol, basis_function), dx=dt)


In [15]:
signal = [1,0,1,0,1,0,1,0,1,0,0,1,1]

fc = 0.1
Ab = 1
Tb = 100
n = 1000

print(f"Signal Initially {signal}")

modulated_signal = bpsk_transmitter(signal,fc,Ab,Tb,n)

print(f"Modulated Signal {modulated_signal}")

demodulated_signal = bpsk_receiver(modulated_signal,fc,Tb,n)

print(f"Demodulated Signal {demodulated_signal}")

Signal Initially [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 1]
Modulated Signal [1.41421356 1.41142293 1.40306207 ... 1.38916395 1.40306207 1.41142293]
Demodulated Signal [1. 0. 1. 0. 1. 0. 1. 0. 1. 0. 0. 1. 1.]
