In [1]:
import numpy as np
import numba
from numba import cuda, uint8

# BCH(7,4,3) parameters
n   = 7
k   = 4
t = 1        # corrects up to 1 error
deg = n - k  # 3

# generator polynomial coefficients as a Python tuple
# Numba will embed this as a constant in device code
g = np.array([1, 1, 0, 1], dtype=np.uint8)


In [2]:
@numba.njit
def encode_stream(msg: np.ndarray) -> np.ndarray:
    """
    msg: 1D array of 0/1, length multiple of k=4
    returns: 1D array of 0/1, length = (len(msg)//4)*7
    """
    blocks = msg.size // k
    cw = np.empty(blocks * n, dtype=np.uint8)
    for i in range(blocks):
        # slice off the next 4‐bit block
        block = msg[i*k : i*k + k]
        # call your jitted encode_nb
        codeword = encode_nb(block)
        # write it out
        cw[i*n : i*n + n] = codeword
    return cw

@numba.njit
def decode_stream(cw: np.ndarray) -> np.ndarray:
    """
    cw: 1D array of 0/1, length multiple of n=7
    returns: 1D array of 0/1, length = (len(cw)//7)*4
    """
    blocks = cw.size // n
    msg = np.empty(blocks * k, dtype=np.uint8)
    for i in range(blocks):
        # slice off the next 7‐bit codeword
        block = cw[i*n : i*n + n]
        # call your jitted decode_nb
        decoded = decode_nb(block)
        # write the recovered 4-bits
        msg[i*k : i*k + k] = decoded
    return msg

@numba.njit
def _syndrome(cw: np.ndarray) -> bool:
    """
    Return True iff cw (length 7) is a valid codeword,
    i.e. cw(x) % g(x) == 0 under GF(2) polynomial division.
    """
    # Work on a copy so we don’t destroy cw
    T = cw.copy()
    # Long division: for i = 6 down to 3
    for i in range(n-1, deg-1, -1):
        if T[i] == 1:
            # subtract g(x) * x^(i-deg)
            for j in range(deg+1):       # j=0..3
                T[i-deg + j] ^= g[j]
    # If remainder T[0..2] are all zero, it divides cleanly
    for j in range(deg):
        if T[j] != 0:
            return False
    return True

@numba.njit
def encode_nb(msg: np.ndarray) -> np.ndarray:
    """
    Systematic encode for BCH(7,4,3):
    msg: uint8[4] with bits m[0]..m[3]
    returns: uint8[7] codeword [r0,r1,r2,m0,m1,m2,m3]
    """
    # 1) Build shifted message with 3 zero parity positions
    B = np.zeros(n, dtype=np.uint8)
    for i in range(k):
        B[i + deg] = msg[i]

    # 2) Compute remainder of B(x) divided by g(x)
    T = B.copy()
    for i in range(n-1, deg-1, -1):
        if T[i] == 1:
            for j in range(deg+1):
                T[i-deg + j] ^= g[j]

    # 3) Assemble systematic codeword: [r0,r1,r2,m0..m3]
    cw = np.empty(n, dtype=np.uint8)
    for j in range(deg):
        cw[j] = T[j]
    for i in range(k):
        cw[i + deg] = msg[i]
    return cw

@numba.njit
def decode_nb(cw: np.ndarray) -> np.ndarray:
    """
    Brute-force single-error decode for BCH(7,4,3):
    cw: uint8[7] received codeword
    returns: uint8[4] corrected message bits
    """
    # 0) Quick check: if already valid, just slice off the message
    if _syndrome(cw):
        msg = np.empty(k, dtype=np.uint8)
        for i in range(k):
            msg[i] = cw[i + deg]
        return msg

    # 1) Try flipping each single bit
    for e in range(n):
        cw2 = cw.copy()
        cw2[e] ^= 1
        if _syndrome(cw2):
            msg = np.empty(k, dtype=np.uint8)
            for i in range(k):
                msg[i] = cw2[i + deg]
            return msg

    # 2) If we get here, too many errors – just return what we can
    msg = np.empty(k, dtype=np.uint8)
    for i in range(k):
        msg[i] = cw[i + deg]
    return msg



In [3]:
# --------------------
# Quick test
# --------------------
if __name__ == "__main__":
    msg = np.random.randint(0,2, size=100_000, dtype=np.uint8)
    # no padding needed since 100000 % 4 == 0

    # encode entire stream
    cw = encode_stream(msg)

    # inject errors anywhere in the coded stream
    cw_noisy = cw.copy()
    err_pos   = np.random.randint(0, cw.size)
    cw_noisy[err_pos] ^= 1

    # decode entire stream block-by-block
    decoded = decode_stream(cw_noisy)

    assert np.array_equal(decoded, msg), "Decoding failed!"
    print("✔ Full-stream encode/decode OK")

✔ Full-stream encode/decode OK


In [4]:
@cuda.jit(device=True)
def encode_block(msg_blk, out_cw):
    # build shifted message B
    B = cuda.local.array(n, uint8)
    for i in range(n):
        B[i] = 0
    for i in range(k):
        B[i + deg] = msg_blk[i]

    # polynomial long-division in GF(2)
    T = cuda.local.array(n, uint8)
    for i in range(n):
        T[i] = B[i]
    for i in range(n - 1, deg - 1, -1):
        if T[i]:
            for j in range(deg + 1):
                # use global tuple 'g' instead of constant memory
                T[i - deg + j] ^= uint8(g[j])

    # write systematic codeword
    for j in range(deg):
        out_cw[j] = T[j]
    for i in range(k):
        out_cw[i + deg] = msg_blk[i]

@cuda.jit(device=True)
def syndrome_block(cw_blk):
    T = cuda.local.array(n, uint8)
    for i in range(n):
        T[i] = cw_blk[i]
    for i in range(n - 1, deg - 1, -1):
        if T[i]:
            for j in range(deg + 1):
                T[i - deg + j] ^= uint8(g[j])
    for j in range(deg):
        if T[j]:
            return False
    return True

@cuda.jit(device=True)
def decode_block(cw_blk, out_msg):
    # quick no-error check
    if syndrome_block(cw_blk):
        for i in range(k):
            out_msg[i] = cw_blk[i + deg]
        return

    # try flipping each bit
    tmp = cuda.local.array(n, uint8)
    for e in range(n):
        for i in range(n):
            tmp[i] = cw_blk[i]
        tmp[e] ^= 1
        if syndrome_block(tmp):
            for i in range(k):
                out_msg[i] = tmp[i + deg]
            return

    # fallback to systematic bits
    for i in range(k):
        out_msg[i] = cw_blk[i + deg]

@cuda.jit

def encode_stream_gpu(msg, cw):
    i = cuda.grid(1)
    blocks = msg.size // k
    if i < blocks:
        # load block
        mblk = cuda.local.array(k, uint8)
        for j in range(k):
            mblk[j] = msg[i * k + j]
        # encode
        out_cw = cuda.local.array(n, uint8)
        encode_block(mblk, out_cw)
        # write
        base = i * n
        for j in range(n):
            cw[base + j] = out_cw[j]

@cuda.jit

def decode_stream_gpu(cw, msg):
    i = cuda.grid(1)
    blocks = cw.size // n
    if i < blocks:
        # load cw
        cblk = cuda.local.array(n, uint8)
        for j in range(n):
            cblk[j] = cw[i * n + j]
        # decode
        out_msg = cuda.local.array(k, uint8)
        decode_block(cblk, out_msg)
        # write
        base = i * k
        for j in range(k):
            msg[base + j] = out_msg[j]




In [5]:
# --------------------
# Host-side example
# --------------------
if __name__ == "__main__":

    total_bits = 100_000
    assert total_bits % k == 0
    msg_host = np.random.randint(0, 2, size=total_bits).astype(np.uint8)
    blocks = total_bits // k

    # Transfer to GPU
    msg_dev = cuda.to_device(msg_host)
    cw_dev = cuda.device_array(blocks * n, dtype=np.uint8)

    # Encode
    threads = 128
    grid = (blocks + threads - 1) // threads
    encode_stream_gpu[grid, threads](msg_dev, cw_dev)

    # Bring back and test
    cw = cw_dev.copy_to_host()
    # inject one error anywhere
    pos = np.random.randint(0, cw.size)
    cw[pos] ^= 1
    cw_dev = cuda.to_device(cw)

    # Decode
    msg_dev_out = cuda.device_array(blocks * k, dtype=np.uint8)
    decode_stream_gpu[grid, threads](cw_dev, msg_dev_out)

    decoded = msg_dev_out.copy_to_host()
    assert np.array_equal(decoded, msg_host)
    print("✔ CUDA-accelerated BCH(7,4,3) encode/decode works!")

✔ CUDA-accelerated BCH(7,4,3) encode/decode works!


In [6]:
@cuda.jit(device=True)
def encode_block_fast(msg_blk, out_cw):
    # unpack message bits
    m0 = msg_blk[0]; m1 = msg_blk[1]; m2 = msg_blk[2]; m3 = msg_blk[3]
    # remainder of x^3*m(x) mod g(x)=x^3+x+1
    p0 = m0 ^ m2 ^ m3
    p1 = m0 ^ m1 ^ m2
    p2 = m1 ^ m2 ^ m3
    out_cw[0] = p0; out_cw[1] = p1; out_cw[2] = p2
    out_cw[3] = m0; out_cw[4] = m1; out_cw[5] = m2; out_cw[6] = m3

@cuda.jit(device=True)
def decode_block_fast(cw_blk, out_msg):
    # cyclic syndrome s(x)=c(x) mod g(x), contributions for each bit
    s = uint8(0)
    if cw_blk[0]: s ^= uint8(1)
    if cw_blk[1]: s ^= uint8(2)
    if cw_blk[2]: s ^= uint8(4)
    if cw_blk[3]: s ^= uint8(3)
    if cw_blk[4]: s ^= uint8(6)
    if cw_blk[5]: s ^= uint8(7)
    if cw_blk[6]: s ^= uint8(5)
    # correct single-bit error
    if s != 0:
        if s == 1: idx = 0
        elif s == 2: idx = 1
        elif s == 4: idx = 2
        elif s == 3: idx = 3
        elif s == 6: idx = 4
        elif s == 7: idx = 5
        else:           idx = 6  # s==5
        cw_blk[idx] ^= uint8(1)
    # extract systematic message bits
    for i in range(k):
        out_msg[i] = cw_blk[i + deg]

@cuda.jit
def encode_stream_gpu(msg, cw):
    i = cuda.grid(1)
    blocks = msg.size // k
    if i < blocks:
        mblk = cuda.local.array(k, uint8)
        for j in range(k): mblk[j] = msg[i*k + j]
        out_cw = cuda.local.array(n, uint8)
        encode_block_fast(mblk, out_cw)
        base = i * n
        for j in range(n): cw[base + j] = out_cw[j]

@cuda.jit
def decode_stream_gpu(cw, msg):
    i = cuda.grid(1)
    blocks = cw.size // n
    if i < blocks:
        cblk = cuda.local.array(n, uint8)
        for j in range(n): cblk[j] = cw[i*n + j]
        out_msg = cuda.local.array(k, uint8)
        decode_block_fast(cblk, out_msg)
        base = i * k
        for j in range(k): msg[base + j] = out_msg[j]



In [None]:
if __name__ == "__main__":
    total_bits = 100_000
    assert total_bits % k == 0
    msg_host = np.random.randint(0, 2, size=total_bits).astype(np.uint8)
    blocks = total_bits // k

    # allocate GPU buffers
    msg_dev = cuda.to_device(msg_host)
    cw_dev = cuda.device_array(blocks * n, dtype=np.uint8)
    msg_out_dev = cuda.device_array(blocks * k, dtype=np.uint8)

    # launch config
    threads = 256
    grid = (blocks + threads - 1) // threads

    encode_stream_gpu[grid, threads](msg_dev, cw_dev)

    # inject one error every 7 bits
    cw = cw_dev.copy_to_host()
    for pos in range(0, cw.size, 7):
        cw[pos] ^= 1
    cw_dev = cuda.to_device(cw)

    decode_stream_gpu[grid, threads](cw_dev, msg_out_dev)

    decoded = msg_out_dev.copy_to_host()
    assert np.array_equal(decoded, msg_host)


    print("✔ Fast CUDA BCH encode/decode OK!")


✔ Fast CUDA BCH encode/decode OK!
