In [1]:
#!/usr/bin/env python3

from gnuradio import gr, blocks, digital, fec, channels
import numpy as np

class UTF8LDPCCommSystem(gr.top_block):
    def __init__(self, text, matrix_file, max_iter=50, eb_no_db=3.0):
        gr.top_block.__init__(self, "UTF-8 LDPC Comm System")

        # === PARAMETERS ===
        self.bits_per_symbol = 1  # BPSK
        self.samp_rate = 32000
        self.text = text
        # Convert Eb/N0 (dB) to noise variance
        eb_no_linear = 10**(eb_no_db/10.0)
        snr_linear = eb_no_linear * self.bits_per_symbol
        noise_variance = 1.0 / snr_linear

        # === SOURCE & UTF-8 ENCODING ===
        byte_array = bytearray(self.text, 'utf-8')
        self.src = blocks.vector_source_b(list(byte_array), repeat=False)
        self.unpack = blocks.unpack_k_bits_bb(8)

        # === LDPC ENCODER ===
        # Load parity-check matrix from AList file
        self.ldpc_H = fec.ldpc_H_matrix(matrix_file, 0)
        self.ldpc_enc = fec.encoder(
            fec.ldpc_par_mtrx_encoder_make(self.ldpc_H),
            False, 0
        )

        # === MAPPING & CHANNEL ===
        # Map bits [0,1] -> BPSK symbols [+1, -1]
        self.chunks_to_syms = digital.chunks_to_symbols_bc((1.0, -1.0), 1)
        self.channel = channels.channel_model(
            noise_voltage=np.sqrt(noise_variance),
            frequency_offset=0.0,
            epsilon=1.0,
            taps=[1.0],
            noise_seed=0,
            block_tags=False
        )

        # === RECEIVER: DEMOD -> LDPC DECODER -> PACK ===
        # BPSK demod: hard decision slicing
        self.demod = digital.binary_slicer_fb()
        # Soft-decision LDPC decoder with belief propagation
        self.ldpc_dec = fec.decoder(
            fec.ldpc_decoder_make(matrix_file, max_iter),
            False, 0
        )
        self.pack = blocks.pack_k_bits_bb(8)
        self.snk = blocks.vector_sink_b()

        # === CONNECT FLOWGRAPH ===
        self.connect(self.src, self.unpack)
        self.connect(self.unpack, self.ldpc_enc)
        self.connect(self.ldpc_enc, self.chunks_to_syms)
        self.connect(self.chunks_to_syms, self.channel)
        self.connect(self.channel, self.demod)
        self.connect(self.demod, self.ldpc_dec)
        self.connect(self.ldpc_dec, self.pack)
        self.connect(self.pack, self.snk)

    def get_received_text(self):
        data = bytes(self.snk.data())
        return data.decode('utf-8', errors='replace')

if __name__ == '__main__':
    tx_text = "Hello, LDPC in GNU Radio!"
    # Path to an AList-format parity-check matrix
    matrix_file = './ldpc_matrix.alist'
    tb = UTF8LDPCCommSystem(tx_text, matrix_file, max_iter=20, eb_no_db=5.0)
    tb.run()
    rx = tb.get_received_text()
    print("Transmitted:", tx_text)
    print("Received   :", rx)


: 

In [1]:
import numpy as np
from pyldpc import make_ldpc
from scipy.sparse import csr_matrix

# ── 1) PARAMETERS ───────────────────────────────────────────────────────────────
n      = 240   # codeword length
d_v    = 2      # variable‐node degree (number of 1s per column)
d_c    = 4      # check‐node degree (number of 1s per row)
systematic = True
sparse     = True

# ── 2) MAKE LDPC MATRICES ───────────────────────────────────────────────────────
# H is a (M × N) sparse matrix, G is the generator matrix
H, G = make_ldpc(n, d_v, d_c, systematic=systematic, sparse=sparse)

# Convert to CSR for easy indexing
H_csr = csr_matrix(H)
M, N = H_csr.shape

# ── 3) COMPUTE WEIGHTS ───────────────────────────────────────────────────────────
# row weights and column weights
row_weights = np.diff(H_csr.indptr)
H_csc       = H_csr.tocsc()
col_weights = np.diff(H_csc.indptr)

max_rw = row_weights.max()
max_cw = col_weights.max()

# ── 4) DUMP AList ────────────────────────────────────────────────────────────────
with open('ldpc_matrix.alist', 'w') as f:
    # header: #rows, #cols
    f.write(f"{M} {N}\n")
    # max 1s per row, max 1s per col
    f.write(f"{max_rw}\n")
    f.write(f"{max_cw}\n")
    # each of the M rows: list of column‐indices (1-based), padded with zeros
    for i in range(M):
        cols = (H_csr.indices[H_csr.indptr[i]:H_csr.indptr[i+1]] + 1).tolist()
        f.write(" ".join(map(str, cols + [0]*(max_rw-len(cols)))) + "\n")
    # each of the N columns: list of row‐indices (1-based), padded with zeros
    for j in range(N):
        rows = (H_csc.indices[H_csc.indptr[j]:H_csc.indptr[j+1]] + 1).tolist()
        f.write(" ".join(map(str, rows + [0]*(max_cw-len(rows)))) + "\n")

print("Wrote parity‐check matrix to ldpc_matrix.alist")


Wrote parity‐check matrix to ldpc_matrix.alist
