In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random
import time

def generate_regular_ldpc(n, dv, dc):
    """
    Generates a Regular LDPC Matrix using MacKay & Neal's construction (Algorithm 2).

    Parameters:
    n (int): Code length (number of columns / variable nodes).
    dv (int): Variable node degree (number of 1s per column).
    dc (int): Check node degree (number of 1s per row).

    Returns:
    tuple: (var_nodes, check_nodes) adjacency lists representing the Tanner Graph.
           Returns (None, None) if generation fails.
    """

    # --- Step 1: Calculate constraints ---
    # According to the conservation of edges: n * dv = m * dc
    # We must calculate the number of rows (m) derived from these parameters.
    if (n * dv) % dc != 0:
        print("Error: Invalid parameters. (n * dv) must be divisible by dc.")
        return None, None

    m = int((n * dv) / dc)

    print(f"Generating ({dv}, {dc})-Regular LDPC Code.")
    print(f"Structure: {n} Columns, {m} Rows.")

    # Safety mechanism: The algorithm is probabilistic and might hit a "dead end".
    # We allow a maximum number of retries before giving up.
    max_retries = 100
    attempt = 0

    # --- Step 2: The Construction Loop ---
    while attempt < max_retries:
        attempt += 1

        # Initialize the Adjacency Lists (The Tanner Graph Structure)
        # var_nodes[j] stores the list of row indices connected to column j.
        var_nodes = [[] for _ in range(n)]

        # check_nodes[i] stores the list of column indices connected to row i.
        check_nodes = [[] for _ in range(m)]

        # Keep track of how many connections each row currently has.
        # Initially, all rows have 0 connections.
        current_row_degrees = [0] * m

        success = True # Flag to verify if this attempt completes successfully

        # --- Step 3: Column-by-Column Filling ---
        for col_idx in range(n):

            # Find "Candidate Rows":
            # Identify all rows that have not yet reached the limit 'dc'.
            candidate_rows = []
            for row_idx in range(m):
                if current_row_degrees[row_idx] < dc:
                    candidate_rows.append(row_idx)

            # --- Deadlock Check ---
            # We must place 'dv' ones in the current column.
            # If valid candidate rows are fewer than 'dv', we cannot satisfy the constraint.
            if len(candidate_rows) < dv:
                success = False
                break # Break the inner loop to restart the process

            # --- Random Selection ---
            # Randomly select 'dv' distinct rows from the valid candidates.
            chosen_rows = random.sample(candidate_rows, dv)

            # --- Update Graph Connections ---
            for row_idx in chosen_rows:
                # 1. Update Variable Node (Column -> Rows)
                # This corresponds to "Bit-to-Check" connections.
                var_nodes[col_idx].append(row_idx)

                # 2. Update Check Node (Row -> Columns)
                # This corresponds to "Check-to-Bit" connections.
                check_nodes[row_idx].append(col_idx)

                # 3. Increment the weight counter for this row
                current_row_degrees[row_idx] += 1

        # --- Step 4: Final Validation ---
        if success:
            print(f"Success! Matrix generated on attempt {attempt}.")
            return var_nodes, check_nodes

    # If we exit the while loop, it means we failed after max_retries.
    print("Failed to generate matrix. Try increasing max_retries or checking parameters.")
    return None, None

In [None]:
# Test with the parameters from the slides or a small example
N = 12   # Number of columns
Dv = 3   # Ones per column
Dc = 4   # Ones per row

# Execute the function
v_nodes, c_nodes = generate_regular_ldpc(N, Dv, Dc)

# Visualization

def visualize_matrix(n, m, var_nodes):
    """
    可视化 LDPC 矩阵 H 的稀疏结构
    """
    # 1. 先把邻接表转换回普通的 0/1 矩阵 (仅用于绘图)
    H = np.zeros((m, n))
    for col_idx, connected_rows in enumerate(var_nodes):
        for row_idx in connected_rows:
            H[row_idx, col_idx] = 1

    # 2. 使用 spy 函数绘制
    plt.figure(figsize=(10, 6))
    plt.spy(H, markersize=5, color='black') # markersize控制点的大小
    plt.title(f"Sparsity Pattern of H Matrix ({n}x{m})")
    plt.xlabel("Variable Nodes (Columns)")
    plt.ylabel("Check Nodes (Rows)")
    plt.grid(False)
    plt.show()

# --- 测试 ---
# 假设你已经有了 v_nodes 和 c_nodes
visualize_matrix(N, len(c_nodes), v_nodes)

In [None]:
def spa_decoder(var_nodes, check_nodes, llr_channel, max_iter=50):
    """
    Log-Domain Sum-Product Algorithm (SPA) Decoder

    Args:
        var_nodes: List of lists (from Part 1)
        check_nodes: List of lists (from Part 1)
        llr_channel: Received LLRs from channel (numpy array of shape (N,))
        max_iter: Maximum number of iterations

    Returns:
        estimated_bits: Decoded bits (0 or 1)
        success: Boolean, True if syndrome check passed
    """
    n = len(var_nodes)
    m = len(check_nodes)

    # --- 1. Initialize Message Structure ---
    # We use a dictionary or sparse matrix to store messages.
    # key: (row_idx, col_idx), value: message
    # L_vc: Variable-to-Check messages (Phi)
    # L_cv: Check-to-Variable messages (Psi)
    L_vc = {}
    L_cv = {}

    # Initial state: Messages from Variable to Check = Channel LLR
    for col in range(n):
        for row in var_nodes[col]:
            L_vc[(row, col)] = llr_channel[col]

    # --- 2. Iteration Loop ---
    for iteration in range(max_iter):

        # === Step A: Check Node Update (Psi) ===
        # Corresponds to Slide 30
        for row in range(m):
            # Get all variable nodes connected to this check node
            connected_cols = check_nodes[row]

            # Calculate tanh product
            # Optimization: Calculate the product of all tanh values first,
            # then divide by the current tanh to get the "exclude current" product.
            tanh_vals = [np.tanh(L_vc[(row, col)] / 2.0) for col in connected_cols]
            
            for i, col in enumerate(connected_cols):
                # Exclude self (Leave-one-out)
                # Note: If tanh_vals[i] is 0, division would error.
                # A robust approach is to re-multiply excluding self.
                prod_exclude_me = 1.0
                for j, other_col in enumerate(connected_cols):
                    if i != j:
                        prod_exclude_me *= tanh_vals[j]

                # Prevent numerical overflow/underflow (clipping to open interval (-1, 1))
                prod_exclude_me = np.clip(prod_exclude_me, -0.999999, 0.999999)

                # Calculate L_cv (Psi)
                L_cv[(row, col)] = 2.0 * np.arctanh(prod_exclude_me)

        # === Step B: Decision & Syndrome Check ===
        # Corresponds to Slide 31-32
        # Calculate Posterior LLR
        L_posterior = np.zeros(n)
        estimated_bits = np.zeros(n, dtype=int)

        for col in range(n):
            # Initial channel information
            sum_val = llr_channel[col]
            # Add information from all connected check nodes
            for row in var_nodes[col]:
                sum_val += L_cv[(row, col)]

            L_posterior[col] = sum_val

            # Hard Decision
            # LLR < 0 implies higher probability of 1 (For BPSK +1/-1 mapping, usually LLR < 0 corresponds to bit 1)
            # Note: This depends on your LLR definition. Slide 29 states "if Gamma >= 0 then x=0 else x=1"
            if L_posterior[col] < 0:
                estimated_bits[col] = 1
            else:
                estimated_bits[col] = 0

        # Syndrome Check: z = x * H^T
        # Check if every parity check equation is satisfied (sum(bits) % 2 == 0)
        syndrome_valid = True
        for row in range(m):
            parity = 0
            for col in check_nodes[row]:
                parity += estimated_bits[col]
            if parity % 2 != 0:
                syndrome_valid = False
                break

        if syndrome_valid:
            return estimated_bits, True

        # === Step C: Variable Node Update (Phi) ===
        # Corresponds to Slide 32
        for col in range(n):
            for row in var_nodes[col]:
                # Simple approach: Total Sum - Message from this row (Check node)
                # L_vc = L_total - L_cv_from_this_row
                # Note: L_total here uses the L_posterior calculated in this round.
                # L_posterior contains Channel + All Checks.
                # Therefore, subtracting the current Check gives the "Extrinsic" information.
                L_vc[(row, col)] = L_posterior[col] - L_cv[(row, col)]

    # If max_iter reached without success
    return estimated_bits, False

In [None]:
def test_spa_decoder():
    print("=== Starting Unit Test (Slide 33-35) ===")

    # 1. Manually construct the H matrix from Slide 33 (4 rows, 6 columns)
    # Be very careful with indices here; they must strictly correspond to the positions of 1s in the matrix

    # var_nodes (Column -> Row)
    # Col 0: Connected to Row 0, 2
    # Col 1: Connected to Row 0, 1
    # Col 2: Connected to Row 1, 3
    # Col 3: Connected to Row 0, 3
    # Col 4: Connected to Row 1, 2
    # Col 5: Connected to Row 2, 3
    var_nodes = [
        [0, 2], [0, 1], [1, 3], [0, 3], [1, 2], [2, 3]
    ]

    # check_nodes (Row -> Column)
    # Row 0: Connected to Col 0, 1, 3
    # Row 1: Connected to Col 1, 2, 4
    # Row 2: Connected to Col 0, 4, 5
    # Row 3: Connected to Col 2, 3, 5
    check_nodes = [
        [0, 1, 3], [1, 2, 4], [0, 4, 5], [2, 3, 5]
    ]

    # 2. Input Channel LLR (Slide 33)
    gamma = np.array([-0.5, 2.5, -4.0, 5.0, -3.5, 2.5])

    print(f"Input LLR: {gamma}")

    # 3. Run the decoder
    # Note: To see the intermediate process, you might need to temporarily modify spa_decoder
    # Make it print(L_posterior) at the end of each loop iteration
    # Or run for 1 iteration, then 2, then 3 to check results

    # --- Test Iteration 1 ---
    print("\n--- Testing Iteration 1 ---")
    # This step requires your decoder to return posterior LLRs (L_posterior) for comparison
    # If your decoder only returns bits, it is recommended to add a debug mode to return LLRs
    # Here we assume manual verification of the printed values
    decoded_bits, success = spa_decoder(var_nodes, check_nodes, gamma, max_iter=1)

    expected_iter1 = np.array([-0.2676, 5.0334, -3.7676, 2.2783, -6.2217, -0.7173])
    print(f"Expected LLRs (Slide 34): {expected_iter1}")
    print("Please check if your decoder output is close to the above values")

    # --- Test Iteration 3 (Final Result) ---
    print("\n--- Testing Full Decoding (3 Iterations) ---")
    decoded_bits, success = spa_decoder(var_nodes, check_nodes, gamma, max_iter=3)

    expected_bits = np.array([0, 0, 1, 0, 1, 1]) # Corresponding to LLR > 0 is 0, < 0 is 1
    # Note: The transmitted codeword x in Slide 33 is [0, 0, 1, 0, 1, 1]

    print(f"Your decoding result:     {decoded_bits}")
    print(f"Expected decoding result: {expected_bits}")

    if np.array_equal(decoded_bits, expected_bits):
        print("✅ Final bit check passed!")
    else:
        print("❌ Final bit check failed!")

    if success:
        print("✅ Syndrome Check Passed")
    else:
        print("❌ Syndrome Check Failed")

# Run test
test_spa_decoder()

In [None]:
=== Starting Unit Test (Slide 33-35) ===
Input LLR: [-0.5  2.5 -4.   5.  -3.5  2.5]

--- Testing Iteration 1 ---
Expected LLRs (Slide 34): [-0.2676  5.0334 -3.7676  2.2783 -6.2217 -0.7173]
Please check if your decoder output is close to the above values

--- Testing Full Decoding (3 Iterations) ---
Your decoding result:     [0 0 1 0 1 1]
Expected decoding result: [0 0 1 0 1 1]
✅ Final bit check passed!
✅ Syndrome Check Passed

In [None]:
import csv  # New: Used for saving files
import os   # New: Used to check if file exists

def simulation_part3_with_save():
    # 1. Set parameters
    N = 1000
    dv = 4
    dc = 8
    R = 1 - (dv/dc) 
    
    # Filename
    filename = f"ldpc_results_N{N}_dv{dv}_dc{dc}.csv"

    # Generate matrix (Part 1)
    print(f"Generating Code (N={N}, dv={dv}, dc={dc})...")
    var_nodes, check_nodes = generate_regular_ldpc(N, dv, dc)
    if var_nodes is None: 
        print("Matrix generation failed!")
        return

    # 2. Set SNR range (dB)
    snr_range_db = np.arange(0,3.5,0.5) # 0, 0.5, ..., 3.0
    
    ber_results = [] 
    
    print(f"\nResults will be saved to: {filename}")
    print("\nStarting Monte Carlo Simulation...")
    print(f"{'SNR(dB)':<10} | {'Frames':<10} | {'Bit Errs':<10} | {'BER':<15}")
    print("-" * 55)

    # If file does not exist, write the header first
    file_exists = os.path.isfile(filename)
    with open(filename, mode='a', newline='') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(['SNR_dB', 'Frames', 'Bit_Errors', 'BER', 'FER'])

    # 3. Outer loop: Iterate over SNR
    for snr_db in snr_range_db:
        
        snr_linear = 10**(snr_db / 10.0)
        sigma = np.sqrt(1 / (2 * R * snr_linear))
        
        frame_count = 0
        bit_error_count = 0
        frame_error_count = 0
        
        # Stop conditions
        max_frames = 10000
        min_frame_errors = 50
        
        # 4. Inner loop
        while frame_error_count < min_frame_errors and frame_count < max_frames:
            frame_count += 1
            
            # --- A. Modulation & All-zero assumption ---
            tx_signal = np.ones(N) 
            
            # --- B. Add noise ---
            noise = np.random.randn(N) * sigma
            rx_signal = tx_signal + noise
            
            # --- C. Calculate LLR ---
            llr_channel = (2 * rx_signal) / (sigma ** 2)
            
            # --- D. Decoding ---
            decoded_bits, success = spa_decoder(var_nodes, check_nodes, llr_channel, max_iter=50)
            
            # --- E. Count errors ---
            errs = np.sum(decoded_bits)
            bit_error_count += errs
            if errs > 0:
                frame_error_count += 1
                
        # Calculate BER
        ber = bit_error_count / (frame_count * N)
        fer = frame_error_count / frame_count
        ber_results.append(ber)
        
        print(f"{snr_db:<10.2f} | {frame_count:<10} | {bit_error_count:<10} | {ber:<15.2e}")

        # --- Key: Write to file in real-time ---
        with open(filename, mode='a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([snr_db, frame_count, bit_error_count, ber, fer])

    # 5. Plotting
    plt.figure(figsize=(8, 6))
    plt.semilogy(snr_range_db, ber_results, 'b-o', label=f'LDPC ({N},{dv},{dc})')
    plt.grid(True, which="both")
    plt.xlabel('Eb/N0 (dB)')
    plt.ylabel('Bit Error Rate (BER)')
    plt.title('LDPC Performance over BIAWGNC')
    plt.legend()
    plt.show()

# Run simulation with save functionality
simulation_part3_with_save()