In [2]:
import numpy as np
from abc import ABC, abstractmethod

In [17]:
class LinearBlockCode(ABC):
    
    def __init__(self, G: np.ndarray):
        self._k, self._n = G.shape
        self._G = G.astype(np.uint8)
        self._lazy_H = None
        self._lazy_codewords = None
        self._lazy_d_min = None

    @property
    def G(self) -> np.ndarray:
        return self._G

    @property
    def H(self) -> np.ndarray:
        if self._lazy_H is None:
            self._prepare_parity()
        return self._lazy_H
    
    @property
    def n(self) -> int:
        return self._n

    @property
    def k(self) -> int:
        return self._k

    @property
    def d_min(self) -> int:
        if self._lazy_d_min is None:
            self._prepare_codewords()
        return self._lazy_d_min

    @property
    def radius(self) -> int:
        return (self.d_min - 1) // 2
    
    @abstractmethod
    def encode(self, message: np.ndarray) -> np.ndarray:
        pass

    @abstractmethod
    def decode(self, received: np.ndarray) -> np.ndarray:
        pass

    @abstractmethod
    def detect(self, received: np.ndarray) -> bool:
        pass

    def _prepare_codewords(self):
        total = 1 << self.k
        m = ((np.arange(total)[:, None] >> np.arange(self.k - 1, -1, -1)) & 1).astype(np.uint8)
        codewords = (m @ self.G) & 1
        weights = codewords.sum(axis=1)
        order = np.argsort(weights)
    
        self._lazy_codewords = codewords[order]
        nz = weights[weights > 0]
        self._lazy_d_min = int(nz.min()) if nz.size else 0

    def _prepare_parity(self):
        if self._lazy_H is not None:
            return
        I_k = np.eye(self.k, dtype=np.uint8)    
        P = self.G[:, self.k:]
        I_nk = np.eye(self.n - self.k, dtype=np.uint8)
        H = np.hstack([P.T, I_nk])
        self._lazy_H = H.astype(np.uint8)

    def standard_table(self):
        if self._lazy_codewords is None:
            self._prepare_codewords()
    
        codewords = self._lazy_codewords
        n = self.n
    
        total = 1 << n
        all_vecs = ((np.arange(total)[:, None] >>
                     np.arange(n - 1, -1, -1)) & 1).astype(np.uint8)
    
        unused = {tuple(v) for v in all_vecs}
        table = []
    
        for v in all_vecs:
            t = tuple(v)
            if t not in unused:
                continue
    
            row = (v ^ codewords)  # coset = leader + code
            table.append(row)
    
            for r in row:
                unused.discard(tuple(r))
    
            if not unused:
                break
    
        return table

    def coset_leaders(self):
        if self._lazy_H is None:
            self._prepare_parity()
    
        H = self._lazy_H
        total = 1 << self.n
    
        vecs = ((np.arange(total)[:, None] >>
                 np.arange(self.n - 1, -1, -1)) & 1).astype(np.uint8)
    
        vecs = vecs[np.argsort(vecs.sum(axis=1))]
    
        leaders = {}
        for v in vecs:
            s = tuple((v @ H.T) & 1)
            if s not in leaders:
                leaders[s] = v
            if len(leaders) == (1 << (self.n - self.k)):
                break
    
        return leaders


In [11]:
class ParityCheckCode(LinearBlockCode):
    def __init__(self, k: int):
        super().__init__(np.concatenate([np.eye(k, dtype=np.uint8), np.ones((k, 1), dtype=np.uint8)], axis=1))
        
    def encode(self, message: np.ndarray) -> np.ndarray:
        return (message @ self.G) & 1

    def detect(self, received: np.ndarray) -> bool:
        s = (self.H @ received) & 1
        return bool(s.any())
    
    def decode(self, received: np.ndarray) -> np.ndarray:
        return received[:self.k]


parity = ParityCheckCode(k=3)

msg = np.array([1, 0, 1], dtype=np.uint8)
cw = parity.encode(msg)

print("Message:", msg)
print("Codeword:", cw)

# Noise
r = cw.copy()
r[2] ^= 1

print("Received:", r)
print("Detected:", parity.detect(r))
print("Decoded:", parity.decode(r))

Message: [1 0 1]
Codeword: [1 0 1 0]
Received: [1 0 0 0]
Detected: True
Decoded: [1 0 0]


In [12]:
class Hamming74(LinearBlockCode):

    def __init__(self):
        G = np.array([
            [1,0,0,0,0,1,1],
            [0,1,0,0,1,0,1],
            [0,0,1,0,1,1,0],
            [0,0,0,1,1,1,1]
        ], dtype=np.uint8)

        super().__init__(G)

    def encode(self, message: np.ndarray) -> np.ndarray:
        return (message @ self.G) & 1

    def detect(self, received: np.ndarray) -> bool:
        s = (self.H @ received) & 1
        return bool(s.any())

    def decode(self, received: np.ndarray) -> np.ndarray:
        s = (self.H @ received) & 1

        if not s.any():
            return received[:self.k]

        for i in range(self.n):
            if np.array_equal(self.H[:, i], s):
                corrected = received.copy()
                corrected[i] ^= 1
                return corrected[:self.k]

        return received[:self.k]

In [13]:
h74 = Hamming74()

print("n =", h74.n)
print("k =", h74.k)
print("d_min =", h74.d_min)
print("radius =", h74.radius)

msg = np.array([1,0,1,1], dtype=np.uint8)
cw = h74.encode(msg)

print("Message:", msg)
print("Codeword:", cw)

# Noise
r = cw.copy()
r[2] ^= 1

print("Received:", r)
print("Detected:", h74.detect(r))
print("Decoded:", h74.decode(r))

n = 7
k = 4
d_min = 3
radius = 1
Message: [1 0 1 1]
Codeword: [1 0 1 1 0 1 0]
Received: [1 0 0 1 0 1 0]
Detected: True
Decoded: [1 0 1 1]


In [14]:
def print_standard_table(code: LinearBlockCode):
    table = code.standard_table()
    width = code.n + 1
    coset_count = len(table)
    coset_size = table[0].shape[0]
    codeword_width = code.n
    col_width = codeword_width + 1
    header = " " * (codeword_width + 3)
    for j in range(coset_size):
        header += f"{j:>{col_width}}"
    print(header)
    print("-" * (codeword_width + 3 + col_width * coset_size))
    for i, row in enumerate(table):
        leader_str = "".join(map(str, row[0]))
        line = f"{leader_str} | "
        for v in row:
            line += "".join(map(str, v)).rjust(col_width)
        print(line)


In [15]:
table = h74.standard_table()

print("Number of cosets:", len(table))
print("Coset size:", table[0].shape[0])
print_standard_table(h74)

Number of cosets: 8
Coset size: 16
                 0       1       2       3       4       5       6       7       8       9      10      11      12      13      14      15
------------------------------------------------------------------------------------------------------------------------------------------
0000000 |  0000000 0010110 0011001 0100101 0101010 1110000 1001100 1000011 0111100 0110011 0001111 1010101 1011010 1100110 1101001 1111111
0000001 |  0000001 0010111 0011000 0100100 0101011 1110001 1001101 1000010 0111101 0110010 0001110 1010100 1011011 1100111 1101000 1111110
0000010 |  0000010 0010100 0011011 0100111 0101000 1110010 1001110 1000001 0111110 0110001 0001101 1010111 1011000 1100100 1101011 1111101
0000011 |  0000011 0010101 0011010 0100110 0101001 1110011 1001111 1000000 0111111 0110000 0001100 1010110 1011001 1100101 1101010 1111100
0000100 |  0000100 0010010 0011101 0100001 0101110 1110100 1001000 1000111 0111000 0110111 0001011 1010001 1011110 1100010 1101101 

In [18]:
def print_leaders_and_syndromes(code: LinearBlockCode):
    leaders = code.coset_leaders()

    n = code.n
    m = code.n - code.k
    col_w = max(n, m) + 2

    header = f"{'Syndrome'.ljust(col_w)}| {'Leader'}"
    print(header)
    print("-" * (col_w + 2 + n))

    for s, v in sorted(leaders.items()):
        s_str = "".join(map(str, s))
        v_str = "".join(map(str, v))
        print(f"{s_str.ljust(col_w)}| {v_str}")


In [19]:
print_leaders_and_syndromes(h74)

Syndrome | Leader
------------------
000      | 0000000
001      | 0000001
010      | 0000010
011      | 1000000
100      | 0000100
101      | 0100000
110      | 0010000
111      | 0001000


In [68]:
class RepetitionCode(LinearBlockCode):
    def __init__(self, k: int, r: int):
        self._k = k
        self._n = k * r
        self.r = r
        G = np.zeros((k, self._n), dtype=np.uint8)
        for i in range(k):
            G[i, i*r:(i+1)*r] = 1
        super().__init__(G)

    def encode(self, message: np.ndarray) -> np.ndarray:
        return (message @ self.G) & 1

    def detect(self, received: np.ndarray) -> bool:
        return any(not np.all(received[i*self.r:(i+1)*self.r] == received[i*self.r]) for i in range(self._k))

    def decode(self, received: np.ndarray) -> np.ndarray:
        return np.array([int(np.sum(received[i*self.r:(i+1)*self.r]) > self.r // 2) for i in range(self._k)], dtype=np.uint8)

repeat = RepetitionCode(4,4)

print("n =", repeat.n)
print("k =", repeat.k)
print("d_min =", repeat.d_min)
print("radius =", repeat.radius)

msg = np.array([1,0,1,1], dtype=np.uint8)
cw = repeat.encode(msg)

print("Message:", msg)
print("Codeword:", cw)

# Noise
r = cw.copy()
r[2] ^= 1

print("Received:", r)
print("Detected:", repeat.detect(r))
print("Decoded:", repeat.decode(r))

n = 16
k = 4
d_min = 4
radius = 1
Message: [1 0 1 1]
Codeword: [1 1 1 1 0 0 0 0 1 1 1 1 1 1 1 1]
Received: [1 1 0 1 0 0 0 0 1 1 1 1 1 1 1 1]
Detected: True
Decoded: [1 0 1 1]


In [81]:
class CyclicCode(LinearBlockCode):
    def __init__(self, g: np.ndarray, n: int):
        self.g = g.astype(np.uint8)
        self._G = self._build_G(n)
        super().__init__(self._G)
        self._prepare_parity()

    def _build_G(self,n):
        k = n - (len(self.g) - 1)
        G = np.zeros((k, n), dtype=np.uint8)
        G[0, :len(g)] = g
        for i in range(1, k):
            G[i, :] = np.roll(G[i-1, :], 1)
        return G

    def encode(self, message: np.ndarray) -> np.ndarray:
        m = message.astype(np.uint8)
        n = self.n
        k = self.k
        r = n - k
        m_shifted = np.concatenate([m, np.zeros(r, dtype=np.uint8)])
        remainder = m_shifted.copy()

        for i in range(k):
            if remainder[i]:
                remainder[i:i+r+1] ^= self.g
    
        parity = remainder[k:]
        return np.concatenate([m, parity])

    def detect(self, received: np.ndarray) -> bool:
        s = (self.H @ received) & 1
        return bool(s.any())

    def decode(self, received: np.ndarray) -> np.ndarray:
        s = (self.H @ received) & 1
        if not s.any():
            return received[:self.k]
        return received[:self.k]

In [85]:
# Hamming (7,4) as cyclic

g = np.array([1,0,1,1], dtype=np.uint8)  # x^3 + x + 1
h74c = CyclicCode(g, 7)

print("n =", h74c.n)
print("k =", h74c.k)
print("d_min =", h74c.d_min)
print("radius =", h74c.radius)

msg = np.array([1,0,1,1], dtype=np.uint8)
cw = h74c.encode(msg)

print("Message:", msg)
print("Codeword:", cw)

# Noise
r = cw.copy()
r[2] ^= 1

print("Received:", r)
print("Detected:", h74c.detect(r))
print("Decoded:", h74c.decode(r))

n = 7
k = 4
d_min = 3
radius = 1
Message: [1 0 1 1]
Codeword: [1 0 1 1 0 0 0]
Received: [1 0 0 1 0 0 0]
Detected: True
Decoded: [1 0 0 1]
