In [200]:
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import time

mps_device = (torch.device("mps") if torch.backends.mps.is_available() 
              else (torch.device("cuda") if torch.backends.cuda.is_available()
                    else torch.device("cpu")))

In [201]:
# Code Generation
def generator(nr_codewords):
    bits = torch.randint(0, 2, size=(nr_codewords, 1, 4), dtype=torch.int)
    
    return bits

In [202]:
# Code Generator
num = 1000000
bits_info = generator(num)
print(bits_info.shape)

torch.Size([1000000, 1, 4])


Hamming(7,4) Encoder

In [203]:
class hamming_encode(torch.nn.Module):
    def __init__(self):
        """
        Use Hamming(7,4) to encode the data.

        Args:
            data: data received from the Hamming(7,4) encoder(Tensor)
            generator matrix: generate the parity code

        Returns:
            encoded data: 4 bits original info with 3 parity code.
        """
        super(hamming_encode, self).__init__()

        # Define the generator matrix for Hamming(7,4)
        self.generator_matrix = torch.tensor([
            [1, 0, 0, 0],
            [0, 1, 0, 0],
            [0, 0, 1, 0],
            [0, 0, 0, 1],
            [1, 1, 0, 1],
            [1, 0, 1, 1],
            [0, 1, 1, 1],
        ], dtype=torch.int)

    def forward(self, input_data):
        # Ensure input_data has shape (batch_size)
        # assert input_data.size(0) == self.generator_matrix.shape[1], "Input data must have same generator matrix row number bits."

        # Perform matrix multiplication to encode the data
        # result_tensor = (self.generator_matrix @ input_data.squeeze(1).mT).unsqueeze(1).T % 2
        result_tensor = torch.matmul(input_data, self.generator_matrix.t())% 2

        return result_tensor

In [204]:
# Generation Encoded Data with 3 parity bits
encoder = hamming_encode()
encoded_codeword = encoder(bits_info)
print(encoded_codeword.shape)

torch.Size([1000000, 1, 7])


BPSK Modulator + Noise

In [205]:
# BPSK Modulator and Add Noise After Modulator
class bpsk_modulator(torch.nn.Module):
    def __init__(self):
        """
        Use BPSK to compress the data, which is easily to transmit.

        Args:
            codeword: data received from the Hamming(7,4) encoder(Tensor)

        Returns:
            data: Tensor contain all data modulated and add noise
        """
        super(bpsk_modulator, self).__init__()

    def forward(self, codeword, snr_dB):

        # data = torch.tensor(data, dtype=float)
        data = codeword.to(dtype=torch.float).to(mps_device)

        # for i in range(data.shape[0]):
        bits = data
        bits = 2 * bits - 1

        # Add Gaussian noise to the signal
        noise_power = torch.tensor(10**(snr_dB / 10)).to(mps_device)
        amplitude = torch.sqrt(noise_power)
        noise = amplitude * torch.randn(bits.shape).to(mps_device)
        noised_signal = bits + noise
        # noised_signal = bits
        data = noised_signal

        return data

In [206]:
snr_dB = 15  # Signal-to-noise ratio in dB

# Modulate the signal
modulator = bpsk_modulator()
modulated_noise_signal = modulator(encoded_codeword.to(mps_device), snr_dB)
print(modulated_noise_signal)

tensor([[[  4.4001,  -2.9451,  -1.1604,  ...,  -6.6072,   3.4182,   8.4216]],

        [[ 14.6638,   7.2497,   7.9982,  ...,   7.6419,  -9.0567,  -0.1682]],

        [[  8.0632,  -1.7954,  -4.9499,  ...,   0.4012,  -1.4982,   4.0331]],

        ...,

        [[  5.5834,  -4.3076,   1.9942,  ...,  -5.6447,  -2.8221,  -2.1775]],

        [[  6.0556,  -2.7552,  -2.4462,  ...,  -0.1559,  -2.5919,   1.7843]],

        [[  4.4401,  -2.8805, -11.8337,  ...,  -1.5614,   2.6889,   4.1862]]],
       device='mps:0')


LLR Log-likelihood
y = s + n
Assuming that \( s \) is equally likely to be 0 or 1, and \( n \) is Gaussian with zero mean and variance \( N_0/2 \), where \( N_0 \) is the noise power spectral density.


In [207]:
# Log-Likelihood Ratio
def llr(signal, snr):
    """
    Calculate Log Likelihood Ratio (LLR) for a simple binary symmetric channel.

    Args:
        signal (torch.Tensor): Received signal from BPSK.
        noise_std (float): Standard deviation of the noise.

    Returns:
        llr: Log Likelihood Ratio (LLR) values.
    """

    # Assuming Binary Phase Shift Keying (BPSK) modulation
    noise_std = torch.sqrt(torch.tensor((1.0)/(10 ** (snr / 10)))).to(mps_device)

    # Calculate the LLR
    llr = 2 * signal / noise_std**2

    # return llr_values, llr
    return llr


In [208]:
llr_output = llr(modulated_noise_signal, snr_dB)
print("LLR values:", llr_output)

LLR values: tensor([[[ 278.2868, -186.2671,  -73.3892,  ..., -417.8788,  216.1864,
           532.6276]],

        [[ 927.4199,  458.5123,  505.8508,  ...,  483.3186, -572.7980,
           -10.6404]],

        [[ 509.9644, -113.5490, -313.0574,  ...,   25.3720,  -94.7520,
           255.0762]],

        ...,

        [[ 353.1223, -272.4338,  126.1216,  ..., -357.0032, -178.4840,
          -137.7144]],

        [[ 382.9921, -174.2513, -154.7142,  ...,   -9.8620, -163.9284,
           112.8514]],

        [[ 280.8154, -182.1815, -748.4294,  ...,  -98.7499,  170.0614,
           264.7580]]], device='mps:0')


LDPC Decoder

Strange behavior recording:
In 100 7-bit codewords, the speed on MPS(GPU) is slower than CPU. Reason unknown.

In [209]:
class LDPCBeliefPropagation(torch.nn.Module):
    def __init__(self, H, llr):
        """
        LDPC Belief Propagation.

        Args:
            H: Low density parity code for building tanner graph.
            llr: Log Likelihood Ratio (LLR) values. Only for 7-bit codeword.

        Returns:
            estimated_bits: the output result from belief propagation.
        """

        super(LDPCBeliefPropagation, self).__init__()
        self.llr = llr
        self.H = H
        self.num_check_nodes, self.num_variable_nodes = H.shape
        self.channel = llr.shape[2]

        # Initialize messages
        self.messages_v_to_c = torch.ones((self.num_variable_nodes, self.num_check_nodes, self.channel), dtype=torch.float).to(mps_device)
        self.messages_c_to_v = torch.zeros((self.num_check_nodes, self.num_variable_nodes, self.channel), dtype=torch.float).to(mps_device)

    def forward(self, max_iter):
        # start_time = time.time()
        for iteration in range(max_iter):
            
            # Variable to check node messages
            for i in range(self.num_variable_nodes):
                for j in range(self.num_check_nodes):
                    
                    # Compute messages from variable to check nodes
                    connected_checks = self.H[j, :] == 1
                    product = torch.prod(torch.tanh(0.5 * self.messages_v_to_c[connected_checks, j]),dim=0, keepdim=True)
                    self.messages_v_to_c[i, j] = torch.sign(self.llr[j]) * product

            # Check to variable node messages
            for i in range(self.num_check_nodes):
                for j in range(self.num_variable_nodes):
                    
                    # Compute messages from check to variable nodes
                    connected_vars = self.H[:, j] == 1
                    sum_msgs = torch.sum(self.messages_c_to_v[connected_vars, i]) - self.messages_v_to_c[j, i]
                    self.messages_c_to_v[i, j] = 2 * torch.atan(torch.exp(0.5 * sum_msgs))

        # Calculate the final estimated bits and only take first four bits
        estimated_bits = torch.sign(self.llr) * torch.prod(torch.tanh(0.5 * self.messages_c_to_v))
        
        return estimated_bits

In [210]:
def hard_decision_cutter(estimated_bits):
    tensor_1 = torch.tensor(1, device=mps_device)
    tensor_0 = torch.tensor(0, device=mps_device)
    estimated_bits = torch.where(estimated_bits > 0, tensor_1, tensor_0)
    estimated_bits = estimated_bits[:, :, 0:4]

    return estimated_bits

In [211]:
H = torch.tensor([ [1, 1, 1, 0, 0, 0, 0],
                   [0, 0, 1, 1, 1, 0, 0],
                   [0, 1, 0, 0, 1, 1, 0],
                   [1, 0, 0, 1, 0, 0, 1],], device=mps_device)
iter = 20
ldpc_bp = LDPCBeliefPropagation(H, llr_output.to(mps_device))
LDPC_result = ldpc_bp(iter)
final_result = hard_decision_cutter(LDPC_result)

print(LDPC_result)
# print(f"The Entire LDPC Belief propagation runs {time} seconds")

tensor([[[ 4.3497e-08, -4.3497e-08, -4.3497e-08,  ..., -4.3497e-08,
           4.3497e-08,  4.3497e-08]],

        [[ 4.3497e-08,  4.3497e-08,  4.3497e-08,  ...,  4.3497e-08,
          -4.3497e-08, -4.3497e-08]],

        [[ 4.3497e-08, -4.3497e-08, -4.3497e-08,  ...,  4.3497e-08,
          -4.3497e-08,  4.3497e-08]],

        ...,

        [[ 4.3497e-08, -4.3497e-08,  4.3497e-08,  ..., -4.3497e-08,
          -4.3497e-08, -4.3497e-08]],

        [[ 4.3497e-08, -4.3497e-08, -4.3497e-08,  ..., -4.3497e-08,
          -4.3497e-08,  4.3497e-08]],

        [[ 4.3497e-08, -4.3497e-08, -4.3497e-08,  ..., -4.3497e-08,
           4.3497e-08,  4.3497e-08]]], device='mps:0')


Comparation and Plot

In [193]:
def calculate_ber(transmitted_bits, origin_bits):
    # Ensure that both tensors have the same shape
    assert transmitted_bits.shape == origin_bits.shape, "Shapes of transmitted and received bits must be the same."

    # Calculate the bit errors
    errors = (transmitted_bits != origin_bits).sum().item()

    # Calculate the Bit Error Rate (BER)
    ber = errors / transmitted_bits.numel()

    return ber

In [194]:
# Describe the data:
# bits_info: original signal
bits_info = bits_info.to(mps_device)
decoded_bits = final_result #output from Maximum Likelihood
# decoded_bits = llr_output # Output from log-likelihood
# decoded_bits =

ber = calculate_ber(decoded_bits, bits_info)
print(ber)

0.449965
