# LDPC Codes

Encoding method -Transform parity check matrix into approximate upper
triangulation form, then matrix multiplication
encoding

Decoding method - Message passing algorithm for BSC

In [9]:
import random
import numpy as np
import numba as nb
import pandas as pd
from tqdm import tqdm

In [10]:
def binary_symmetric_channel(x, p):
    if random.uniform(0,1) < p:
        return 1 - x
    else:
        return x

In [11]:
H = np.array([[1, 1, 1, 1, 0, 0],
              [0, 0, 1, 1, 0, 1],
              [1, 0, 0, 1, 1, 0]])

In [12]:
def gaussian_elimination(H):
    """
    Gaussian elimination of a parity check matrix H.
    """
    H_reduced = H.copy()
    n, k = H.shape
    for i in range(n):
        # find pivot
        for j in range(i, n):
            if H_reduced[j, i] == 1:
                break
        else:
            raise ValueError("H is not full rank")
        # swap rows
        H_reduced[[i, j], :] = H_reduced[[j, i], :]
        # eliminate
        for j in range(i+1, n):
            if H_reduced[j, i] == 1:
                H_reduced[j, :] = (H_reduced[j, :] + H_reduced[i, :]) % 2

    # identify identity columns
    indices = np.where(H_reduced.sum(axis=0) == 1)[0]
    # swap columns
    for i, index in enumerate(indices):
        H_reduced[:, [i, index]] = H_reduced[:, [index, i]]

    return H_reduced

In [13]:
H_reduced = gaussian_elimination(H)
H, H_reduced

(array([[1, 1, 1, 1, 0, 0],
        [0, 0, 1, 1, 0, 1],
        [1, 0, 0, 1, 1, 0]]),
 array([[1, 0, 0, 1, 1, 1],
        [0, 1, 0, 0, 1, 1],
        [0, 0, 1, 1, 0, 1]]))

In [14]:
def generator_matrix(H):
    """
    Construct a generator matrix G from a parity check matrix H.
    """
    H_reduced = gaussian_elimination(H)
    n, k = H_reduced.shape

    G = np.zeros((k, n), dtype=int)
    G[:n, :] = np.eye(n, dtype=int)
    G[n:, :] = H_reduced[:, n:].T

    return G, H_reduced

In [15]:
generator_matrix(H_reduced)

(array([[1, 0, 0],
        [0, 1, 0],
        [0, 0, 1],
        [1, 0, 1],
        [1, 1, 0],
        [1, 1, 1]]),
 array([[1, 0, 0, 1, 1, 1],
        [0, 1, 0, 0, 1, 1],
        [0, 0, 1, 1, 0, 1]]))

In [16]:
def encoder(G, t, p):
    """
    Encode a message t using a generator matrix G and a channel p.

    G : array_like
        Generator matrix.
    t : array_like
        Message to encode.
    p : float
        Channel error probability.

    x : array_like
        Encoded message.
    """
    
    x = np.dot(G.T, t) % 2
    return np.array([binary_symmetric_channel(xi, p) for xi in x])

In [17]:
G = generator_matrix(H_reduced)
t = np.array([1, 0, 1, 0, 0, 0]).reshape(-1, 1)
x = encoder(G, t, 0.1)
x

AttributeError: 'tuple' object has no attribute 'T'

In [18]:
# message passing rule
def var_to_check_msg(vars, check, p):
    """
    Compute the message from a variable node to a check node.
    """
    # compute marginal probability of check node
    p_check = p if check == 0 else 1 - p
    # compute message
    return np.prod(np.array([1 - 2*v for v in vars if v != check]))
    

def check_to_var_msg(vars, check, p):
    """
    Compute the message from a check node to a variable node.
    """
    # compute marginal probability of check node
    p_check = p if check == 0 else 1 - p
    # compute message
    return p_check * np.prod(np.array([1 - 2*v for v in vars if v != check]))

In [19]:
def init_p(received_codeword, epsilon):
    n = len(received_codeword)
    p = np.zeros((n))  # Initialize the messages q_{ij} for each variable node i

    for i in range(n):
        if received_codeword[i] == 0:
            p[i] = 1 - epsilon
            p[i] = epsilon
        else:
            p[i] = epsilon
            p[i] = 1 - epsilon

    return p

In [35]:
def init_q(n, m, p):
    q = np.zeros((n, m, 2))  # Initialize the messages q_{ij} for each check node j
    for j in range(m):
        for i in range(n):
            q[i, j, 0] = p[i]
            q[i, j, 1] = 1 - p[i]
    
    # normalize
    k = q.sum(axis=2, keepdims=True)
    q /= k

    return q, k

In [28]:
def lbp_ldpc_bsc_decoder(received_codeword, H, p, max_iter=20):
    
    n, m = H.shape  # n: number of variable nodes, m: number of check nodes
    x_hat = np.zeros(n)  # Initialize the estimated codeword
    p = init_p(received_codeword, p)  # Initialize the messages p_{i} - probability of bit i being 1
    q, k = init_q(n, m, p)  # Initialize the messages q_{ij} - var to check
    r = np.zeros((m, n, 2))  # Initialize the messages r_{ji} - check to var

    for _ in tqdm(range(max_iter)):

        # Update messages r_{ji}
        for i in range(n):
            for j in range(m):
                related_vars = np.where(H[:, j] == 1)[0]
                related_vars = related_vars[related_vars != i]  # Exclude the current variable node
                r[j, i, 0] = 0.5 + 0.5 * np.prod(1 - 2 * q[related_vars, j, 1])
                r[j, i, 1] = 1 - r[j, i, 0]
        # normalize
        r /= k

        # Update messages q_{ij}
        for j in range(m):
            for i in range(n):
                related_checks = np.where(H[i, :] == 1)[0]
                related_checks = related_checks[related_checks != j]  # Exclude the current check node
                prod_term = np.prod(r[related_checks, i, 1])
                q[i, j, 0] = prod_term * (1-p[i]) * r[j, i, 0]
                q[i, j, 1] = prod_term * p[i] * r[j, i, 1]
        # normalize
        q /= k

        # Calculate the bitwise estimate of every bit
        for i in range(n):
            related_checks = np.where(H[i, :] == 1)[0]
            Q_0 = k[i] * p[i] * np.prod(r[related_checks, i, 1])
            Q_1 = k[i] * (1-p[i]) * np.prod(r[related_checks, i, 0])
            likelihood_ratio = Q_1 / Q_0
            x_hat[i] = 1 if likelihood_ratio > 1 else 0

        # Check if the estimate is a correct
        if np.array_equal(x_hat, received_codeword):
            return x_hat, 0
            
    return x_hat, -1

In [29]:
H_data = pd.read_csv('h1.csv', header=None).values

In [30]:
H = np.array([np.array([int(i) for i in H_data[j][0].split()]) for j in range(len(H_data))])

In [31]:
y_data = pd.read_csv('y1.csv', header=None).values

In [32]:
y = y_data

In [33]:
y.shape

(1000, 1)

In [34]:
lbp_ldpc_bsc_decoder(y, H, 0.1, 100)

  likelihood_ratio = (p[i] * np.prod(r[related_checks, i, 1])) / ((1-p[i]) * np.prod(r[related_checks, i, 1]))
 21%|██        | 21/100 [02:38<09:57,  7.56s/it]


KeyboardInterrupt: 

In [None]:
def recover_english_message(decoded_signal):
    # Extract the first 248 bits
    extracted_bits = decoded_signal[:248]

    # Convert the 248 bits into a sequence of 31 ASCII symbols
    ascii_symbols = []
    for i in range(0, 248, 8):  # Step by 8 bits to get each ASCII symbol
        ascii_value = int(extracted_bits[i:i+8], 2)  # Convert 8-bit binary to integer
        ascii_symbols.append(chr(ascii_value))  # Convert ASCII value to character

    # Combine the ASCII symbols into the original English message
    original_message = ''.join(ascii_symbols)
    return original_message

# Example usage:
decoded_signal = lbp_ldpc_bsc_decoder(y, H, 0.1, 100)
original_message = recover_english_message(decoded_signal)
