In [482]:
import math
import random

# 2.1 Echelon Form and Systematic Encoding Matrix

In [483]:
def read_matrix():
    """

    Read matrix and return list of matrix

    Args:
        no argument

    Returns:
        matrix: list of matrix
    """
    
    matrix = open("H1.txt", "r").read().splitlines()
    for i in range(len(matrix)):
        matrix[i] = list(matrix[i])
        matrix[i] = [int(i) for i in matrix[i]]
    return matrix

def row_swap(matrix, i, j):
    """

    Swap 2 rows in matrix

    Args:
        matrix: matrix h
        i: row i
        j: row j

    Returns:
        matrix: updated matrix
    """
    
    matrix[i], matrix[j] = matrix[j], matrix[i]
    return matrix
    
def row_addtion(matrix, i, j):
    """

    Perform row addtition i = i + j

    Args:
        matrix: matrix h
        i: row i
        j: row j

    Returns:
        matrix: updated matrix
    """
    
    # R_i = R_i + R_j
    matrix[i] = [a ^ b for a, b in zip(matrix[i], matrix[j])]
    return matrix

def find_leading_one(matrix, row_i, column_i, k):
    """

    Find 1 in current column, row, return its row index

    Args:
        matrix: matrix h
        row_i: current top row index
        column_i: current top column index
        k: number of rows

    Returns:
        index: index of top row with 1 that satisfies condition
    """
    
    while row_i <= k-1:
        if matrix[row_i][column_i] == 1:
            return row_i
        else:
            row_i += 1
    return -1

In [484]:
def echelon_and_generator_matrix(matrix):
    """

    Read from matrix, return echelon form and generator matrix

    Args:
        matrix: matrix h

    Returns:
        echelon: Echelon form of h
        generator: Generator from h^
    """
    
    n = len(matrix[0])
    k = len(matrix)
    # Echelon
    for i in range(n-k):
        current_column_index = k+i
        # First find the row with leading one that fits the description of the identity matrix
        row_with_leading_one = find_leading_one(matrix, i, current_column_index, k)
        if row_with_leading_one == -1:
            print("Error: Cannot obtain echelon form")
        # Swap the row with leading one with the current top row
        matrix = row_swap(matrix, i, row_with_leading_one)
        # Remove 1s in rows other than current row
        for other_i in range(n-k):
            if other_i != i and matrix[other_i][current_column_index] == 1:
                matrix = row_addtion(matrix, other_i, i)
    echelon = matrix

    # Generator
    generator = []
    # Identity Matrix
    for i in range(k):
        row_i = []
        for j in range(k):
            if j == i:
                row_i.append(1)
            else:
                row_i.append(0)
        generator.append(row_i)
    # P^T
    for i in range(k):
        row_i = []
        for j in range(k):
            row_i.append(echelon[i][j])
        generator.append(row_i)  
        
    return echelon, generator

# 2.3 LDPC-Decoder

In [485]:
def read_y():
    """

    Read from file y1.txt, and return a list of list of 6 bits

    Args:
        No arument

    Returns:
        y: list of list of 6 bits
    """
    
    # Read
    f = open("y1.txt", "r").read()
    # Clean \n
    bit_string = ''.join(bit.strip() for bit in f)
    # Split to chunk of 6
    y = [bit_string[i:i+6] for i in range(0, len(bit_string), 6)]
    y = [[int(j) for j in i] for i in y]
    return y

def obtain_factor_graph(matrix):
    """

    Derive factor graph from matrix

    Args:
        matrix: matrix h

    Returns:
        f: factor graph of h
    """
    
    f = [[] for i in range(len(matrix))]
    for i in range(len(matrix)):
        for j in range(len(matrix[i])):
            if matrix[i][j] == 1:
                f[i].append(j)
    return f

def calculate_v2c(c2v_messages, v2c_messages, m_v, factor_graph):
    """

    Update variable to check messages

    Args:
        c2v_messages: list of check to variable messages
        v2c_messages: list of variable to check messages
        m_v: list of initial likelihood
        factor_graph: list of factor graph

    Returns:
        v2c_messages: list of variable to check messages
    """
    
    for i in range(len(m_v)):
        sum = m_v[i]
        for j in range(len(factor_graph)):
            if i in factor_graph[j]:
                sum += c2v_messages[j][i]
        for j in range(len(factor_graph)):
            if i in factor_graph[j]:
                v2c_messages[j][i] = sum - c2v_messages[j][i]
    return v2c_messages

def calculate_c2v(c2v_messages, v2c_messages, factor_graph):
    """

    Update check to variable messages

    Args:
        c2v_messages: list of check to variable messages
        v2c_messages: list of variable to check messages
        factor_graph: list of factor graph

    Returns:
        c2v_messages: list of check to variable messages
    """
    
    for i in range(len(c2v_messages)):
        # Get total product
        tanh_product = 1
        for f_i in factor_graph[i]:
            tanh_product *= math.tanh(v2c_messages[i][f_i]/2)
        # Remove individual variable from total product
        for f_i in factor_graph[i]:
            current_tan_product = tanh_product/math.tanh(v2c_messages[i][f_i]/2)
            if not math.isclose(current_tan_product, -1, abs_tol=1e-10) and not math.isclose(current_tan_product, 1, abs_tol=1e-10):
                c2v_messages[i][f_i] = 2*math.atanh(current_tan_product)
    return c2v_messages

def calculate_l(l, c2v_messages, m_v, factor_graph):
    """

    Update beliefs

    Args:
        l: beliefs
        c2v_messages: list of check to variable messages
        m_v: list of initial likelihood
        factor_graph: list of factor graph

    Returns:
        l: List of beliefs
    """
    
    for i in range(len(l)):
        sum_c2v = 0
        for j in range(len(c2v_messages)):
            if i in factor_graph[j]:
                sum_c2v += c2v_messages[j][i]
        l[i] = sum_c2v + m_v[i]
    return l

def parity_check(h, x):
    """

    Check if Hx = 0

    Args:
        h: parity check matrix
        x: decoded word

    Returns:
        Bool: True or False
    """
    
    for row in h:
        sum = 0
        for i in range(len(row)):
            if row[i] == 1 and x[i] == 1:
                sum += 1
        if sum%2 != 0:
            return False
    return True

In [486]:
def ldpc_decoder(h, y, p, max_iter):
    """

    LDPC-decoder based on Loopy Belief Propagation for Binary
    Symmetric Channel

    Args:
        h: parity check matrix
        y: word y
        p: noise ratio
        max_iter: max number of iterations

    Returns:
        x: decoded word
        bool: success (0 for True, -1 for False)
        iteration: number of iteration
    """
    # Factor graph
    factor_graph = obtain_factor_graph(h)
    # Code word
    x = y
    # Log-Likelihood Ratio
    llr = math.log((1-p)/p)
    # m_v is log-likelihood conditioned on observed value
    m_v = [llr if i == 0 else -llr for i in y]
    # l stands for log-likelihood list
    l = m_v
    # Declare variables
    v2c_messages = [[0 for j in range(len(h[0]))] for i in range(len(h))]
    c2v_messages = [[0 for j in range(len(h[0]))] for i in range(len(h))]
    # Set up v2c
    for i in range(len(v2c_messages)):
        for j in range(len(v2c_messages[i])):
            if j in factor_graph[i]:
                v2c_messages[i][j] = m_v[j]
    # Iteration
    for iteration in range(max_iter):
        # Update v2c
        if iteration != 0:
            v2c_messages = calculate_v2c(c2v_messages, v2c_messages, m_v, factor_graph)
            
        # Update c2v
        c2v_messages = calculate_c2v(c2v_messages, v2c_messages, factor_graph)
        
        # Belief Update
        l = calculate_l(l, c2v_messages, m_v, factor_graph)
        # print(l)
        
        # Decision Rule
        x = [0 if i >= 0 else 1 for i in l]
        # print(x)
        
        # Parity Check
        if parity_check(h, x):
            # print("Iteration took to converge: "+str(iteration+1))
            return x, 0, iteration+1
        
    return x, -1

# 2.4 Recover Message

In [487]:
def recover_message(matrix, list_x):
    """

    This takes list of decoded vectors, and interpret using
    ASCII symbols.

    Args:
        matrix: matrix h
        list_x: list of decoded vectors

    Returns:
        Message: a message string
    """
    
    # Convert list to string
    list_x = [''.join(str(item) for item in x) for x in list_x]
    bits_string = ''.join(list_x)
    # Keep the first 252 bits then remove last 4 bits
    bits_string = bits_string[:252]
    clean_bits = bits_string[:-4]
    # Separate to chunks of 8
    chunks = [clean_bits[i:i+8] for i in range(0, len(clean_bits), 8)]
    # Translate to ASCII
    message = ''.join([chr(int(chunk, 2)) for chunk in chunks])
    
    return message

In [488]:
# Print Echelon and generator
h = read_matrix()
echelon, generator = echelon_and_generator_matrix(h)
print("Echelon")
for i in echelon:
    print(i)
print("Generator")
for i in generator:
    print(i)

# Encode
# k = len(echelon)
# message = "This is my encoder and decoder."
# binary_message = ''.join(format(ord(char), '08b') for char in message)+"0000"
# y_to_encode = [binary_message[i:i+k] for i in range(0, len(binary_message), k)]
# y_to_file = []
# for y in y_to_encode:
#     y_i = []
#     for row in generator:
#         sum = 0
#         for i in range(len(row)):
#             if row[i] == 1 and int(y[i]) == 1:
#                 sum += 1
#         y_to_file.append(sum%2)
# with open("y1.txt", 'w') as file:
#     for y in y_to_file:
#         r = random.randint(1, 100)
#         if r <= 10:
#             file.write(str(0) + '\n')
#         else:
#             file.write(str(y) + '\n')     
    
# Decode
y_list = read_y()
noise_ratio = 0.1
max_iter = 20
list_x = []
k = len(echelon)
total_iteration = 0
for y in y_list:
    x, success, iteration = ldpc_decoder(echelon, y, noise_ratio, max_iter)
    list_x.append(x[:k])
    total_iteration += iteration
print("Decoded Message")
print("Average Iteration: "+str(total_iteration/len(y_list)))
print(recover_message(echelon, list_x))

Echelon
[1, 1, 1, 1, 0, 0]
[0, 1, 1, 0, 1, 0]
[1, 1, 0, 0, 0, 1]
Generator
[1, 0, 0]
[0, 1, 0]
[0, 0, 1]
[1, 1, 1]
[0, 1, 1]
[1, 1, 0]
Decoded Message
Average Iteration: 1.1428571428571428
This iq my cncoder 1nd dgcoder.
