In [14]:
from functools import cache
from galois import GF, Poly, berlekamp_massey
import math
import numpy as np

class MyReedSolomon:
    def __init__(self, n, k, field=None, alpha=None, c=1):
        self.field = field or GF(2 ** 8, repr='int')
        self.alpha = field(alpha) if alpha else self.field.primitive_element
        self.n = n
        self.k = k
        self.c = c
        self.distance = n - k
        self.roots = self.alpha ** (c + np.arange(0, self.distance))
        self.generator_poly = Poly.Roots(self.roots)

    def encode(self, message):
        if len(message) < self.k:
            message = list(message) + [0] * (self.k - len(message))
        shifted_message = Poly(message, field=self.field) * self._x_to_the_power(self.distance)
        remainder = shifted_message % self.generator_poly
        full_message = shifted_message - remainder
        return full_message.coefficients(self.n)

    def decode(self, message, erasures=(), bm=False, verbose=False):
        assert len(message) == self.n
        message = Poly(message)
        locator, omega = self.get_locator_omega(message, bm, erasures)
        if verbose:
            print(f"omega: {omega}")
            print(f"locator: {locator}")
        locator_roots = locator.roots()
        if verbose:
            print(f"inverse_roots: {locator_roots}")
        error_positions = (locator_roots ** -1).log()
        if verbose:
            print(f"error positions: {error_positions}")
        if any(log > self.n for log in error_positions):
            return f"Bad error positions: {error_positions}"
        # Calculate the Forney error. Note that we calculate the negative magnitude, and then
        # add rather than subtract.
        error_magnitudes = omega(locator_roots) / locator.derivative()(locator_roots)
        if self.c != 1:
            error_magnitudes *= locator_roots ** (self.c - 1)
        fixed_message = message + Poly.Degrees(error_positions, error_magnitudes)
        return fixed_message.coefficients(self.n)

    def get_locator_omega(self, message, bm, erasures):
        raw_syndrome = message(self.roots)
        syndrome = Poly(raw_syndrome, order='asc')
        omega = None
        if erasures:
            erasure_locator = math.prod(Poly([- self.alpha ** i, 1], field=self.field)
                                        for i in erasures)
            erasure_syndrome = syndrome * erasure_locator % self._x_to_the_power(self.distance)
            if bm:
                temp = erasure_syndrome.coefficients(order='asc')
                error_locator = berlekamp_massey(temp, 'minimal').reverse()
            else:
                error_locator, _ = self.sugiyama_get_locators(erasure_syndrome, erasure_count=len(erasures))
            locator = erasure_locator * error_locator
        else:
            if bm:
                locator = berlekamp_massey(raw_syndrome, 'minimal').reverse()
            else:
                locator, omega = self.sugiyama_get_locators(syndrome)
        if omega is None:
            omega = (locator * syndrome) % self._x_to_the_power(self.distance)
        return locator, omega


    def sugiyama_get_locators(self, syndromes: Poly, erasure_count:int = 0) -> tuple[Poly, Poly]:
        value1, value2 = self._x_to_the_power(self.distance), syndromes
        old_r, new_r = value1, value2
        old_s, new_s = Poly.Zero(self.field), Poly.One(self.field)
        old_t, new_t = Poly.One(self.field), Poly.Zero(self.field),
        assert value2 * old_s + value1 * old_t == old_r
        assert value2 * new_s + value1 * new_t == new_r
        while new_r.degree >= (self.distance - erasure_count) / 2:
            old_r, (q, new_r) = new_r, divmod(old_r, new_r)
            old_s, new_s = new_s, (old_s - q * new_s)
            old_t, new_t = new_t, (old_t - q * new_t)
            assert value2 * new_s + value1 * new_t == new_r
        return new_s, new_r

    @cache
    def _x_to_the_power(self, power):
        return Poly.Degrees([power], field=self.field)

'done'


'done'

In [15]:
def test():
    field = GF(19, 2, repr='int')
    rs1 = MyReedSolomon(15, 5, field, c=2)
    message = rs1.encode([1, 2, 3, 4, 5])
    bad_message = message.copy()
    bad_message[1:4] = [5, 5, 5]
    print(message)
    assert np.array_equal(message, result := rs1.decode(bad_message)), result
    assert np.array_equal(message, result := rs1.decode(bad_message, erasures=(13,))), result
    assert np.array_equal(message, result := rs1.decode(bad_message, bm=True)), result
    assert np.array_equal(message, result := rs1.decode(bad_message, erasures=(13,), bm=True)), result
    return 'Done'


test()


[  1   2   3   4   5 159 243 331 102  75 101 327 104 110   3]


'Done'

In [None]:
import qrcode
# from qrcode.base import RS_BLOCK_TABLE
# https://dev.to/maxart2501/let-s-develop-a-qr-code-generator-part-viii-different-sizes-1e0e
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H)
qr.add_data("fy@fyellin.com")    # Version 1; 26/9.  Version2: 44/16.  Beyond that, we have interleaving
qr.make(fit=True)
message = qr.data_cache
assert qr.version <= 2, qr.version
n, k = (26, 9) if qr.version == 1 else (44, 16)
field = GF(2 ** 8, repr='int')
rs = MyReedSolomon(n, k, field=GF(2, 8), c=0)
assert tuple(message) == tuple(rs.encode(message[0:k]))
qr.make_image()

Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 744, in make_thread_stack_str
    if is_real_file(my_file) and not is_jup_cell:
       ^^^^^^^^^^^^^^^^^^^^^
  File "/Applications/PyCharm.app/Contents/plugins/python-ce/helpers/pydev/pydevd_file_utils.py", line 559, in is_real_file
    return not _is_int(filename) and not filename.startswith("<ipython-input")
                                         ^^^^^^^^^^^^^^^^^^^
AttributeError: 'tuple' object has no attribute 'startswith'
