# c3LTC

In this notebook, we implement the locally testible code with constant rate, distance and locallity proposed in [Dinur et. al.](https://arxiv.org/abs/2111.04808).


In [2]:
from src.visualize_c3ltc import *
import numpy
import random
from pyvis import network as net
from tqdm import tqdm
from sage.coding.grs_code import ReedSolomonCode
from sage.rings.finite_rings.finite_field_prime_modn import FiniteField_prime_modn as GF
from sage.matrix.matrix_space import MatrixSpace
from sage.coding.linear_code import LinearCode
from sage.modules.vector_modn_dense import vector
from sage.groups.matrix_gps.matrix_group import *

from src.row_reduce_from_c import row_reduce_and_orthogonal,row_reduce_and_orthogonal2

In [3]:
class AEdge:
    def __init__(self, G, a, g):
        edge = self.canonical_a_edge(G, a, g)
        self.a = edge[0]
        self.g = edge[1]

    def canonical_a_edge(self, G, a, g):
        element_list = list(G)
        index_g = element_list.index(g)
        index_ag = element_list.index(a + g)
        min_index = min(index_g, index_ag)
        if min_index == index_g:
            return (a, g)
        if min_index == index_ag:
            return (-a, a + g)

    def __repr__(self):
        return "AEdge(%s, %s)" % (self.a, self.g)

    def __eq__(self, other):
        if isinstance(other, AEdge):
            return (self.a == other.a and self.g == other.g)
        else:
            return False

    def __ne__(self, other):
        return (not self.__eq__(other))

    def __hash__(self):
        return hash((tuple(self.g + self.a),tuple(self.g)))


class BEdge:
    def __init__(self, G, g, b):
        edge = self.canonical_B_edge(G, g, b)
        self.g = edge[0]
        self.b = edge[1]

    def canonical_B_edge(self, G, g, b):
        element_list = list(G)
        index_g = element_list.index(g)
        index_gb = element_list.index(g + b)
        min_index = min(index_g, index_gb)
        if min_index == index_g:
            return (g, b)
        if min_index == index_gb:
            return (g + b, -b)

    def __repr__(self):
        return "BEdge(%s, %s)" % (self.g, self.b)

    def __eq__(self, other):
        if isinstance(other, BEdge):
            return (self.g == other.g and self.b == other.b)
        else:
            return False

    def __ne__(self, other):
        return (not self.__eq__(other))

    def __hash__(self):
        return hash((tuple(self.g),tuple(self.g + self.b)))


class Square:
    def __init__(self, G, a, g, b):
        square = self.canonical_square(G, a, g, b)
        self.a = square[0]
        self.g = square[1]
        self.b = square[2]

    def canonical_square(self, G, a, g, b):
        element_list = list(G)
        option1 = str((a, g, b))
        option2 = str((-a, a + g, b))
        option3 = str((a, g + b, -b))
        option4 = str((-a, a + g + b, -b))
        minimal = min(option1, option2, option3, option4)
        if minimal == option1:
            return (a, g, b)
        if minimal == option2:
            return (-a, a + g, b)
        if minimal == option3:
            return (a, g + b,-b)
        if minimal == option4:
            return (-a, a + g + b, -b)

    def vertices(self):
        return (self.a+self.g, self.g, self.a+self.g+self.b, self.g+self.b)
        
    def __repr__(self):
        return hash((tuple(self.g + self.a),tuple(self.g),tuple(self.g + self.b)))

    def __eq__(self, other):
        if isinstance(other, Square):
            return (self.a == other.a and self.b == other.b and self.g == other.g)
        else:
            return False

    def __ne__(self, other):
        return (not self.__eq__(other))

    def __hash__(self):
        return hash(self.__repr__())


In [4]:
def tensor_decoding(tensor_word, C_A: LinearCode, C_B: LinearCode):
    """ Returns a decoded word in tensor code. 

    Keyword arguments:
    tensor_word -- matrix of length(C_A) x length(C_B).
    C_A -- Sage code object.
    C_B -- Sage code object.
    """

    n_a = len(C_A.parity_check_matrix().columns())
    n_b = len(C_B.parity_check_matrix().columns())
    field = C_A.base_field()

    def tensor_word_to_tuple(m):
        return tuple(m.reshape((n_a * n_b)))

    corrected_word = numpy.copy(tensor_word)
    iterating_set_C_B = [i for i in range(n_a)]
    iterating_set_C_A = [i for i in range(n_b)]
    past_words = []
    word_to_tuple = tensor_word_to_tuple(corrected_word)
    while (len(iterating_set_C_A) != 0 or len(iterating_set_C_B) != 0) and word_to_tuple not in past_words:
        past_words.append(word_to_tuple)

        new_iterating_C_A = []
        new_iterating_C_B = []
        for i in iterating_set_C_B:
            local_word = vector(field, corrected_word[i, :])
            try:
                corrected_locally = C_B.decode_to_code(local_word)
            except:
                new_iterating_C_B.append(i)
                continue
            for j in range(n_b):
                if local_word[j] != corrected_locally[j]:
                    new_iterating_C_A.append(j)
                corrected_word[i][j] = corrected_locally[j]
        for j in iterating_set_C_A:
            local_word = vector(field, corrected_word[:, j])
            try:
                corrected_locally = C_A.decode_to_code(local_word)
            except:
                new_iterating_C_A.append(j)
                continue
            for i in range(n_a):
                if local_word[i] != corrected_locally[i]:
                    new_iterating_C_B.append(i)
                corrected_word[i][j] = corrected_locally[i]
        iterating_set_C_A = set(new_iterating_C_A)
        iterating_set_C_B = set(new_iterating_C_B)
        word_to_tuple = tensor_word_to_tuple(corrected_word)
    return corrected_word


In [5]:
def random_generators(G, n, n_order_2=0):
    """ Returns an inverse closed set of n + n_order_2 elements from G.


    Keyword arguments:
    G -- Sage group object.
    n -- number.
    n_order_2 -- number of elements of order 2. 
    
    Note:
    1) n has to be smaller than the number of elements in G.
    """

    list_G = list(G)
    assert n < len(list_G)
    gens = []
    # non order 2 generators
    i = 0
    while i < n / 2:
        c = random.choice(list_G)
        if c not in gens and c * c != G.identity():
            gens.append(c)
            gens.append(c.inverse())
            i += 1
    # order 2 generators
    i = 0
    while i < n_order_2:
        c = random.choice(list_G)
        if c not in gens and c * c == G.identity():
            gens.append(c)
            i += 1    
    return gens

def get_AB_with_TNC(G, n, n_order_2 = 0, trials = 100):
    """ Returns two sets of a group G of size n, for which that TNC condition hold.
        If no sets were found after # of trials, it exits with error.  


    Keyword arguments:
    G -- Sage group object.
    n -- number.
    n_order_2 -- number. 
    trials -- number of trials to find A,B that uphold TNC. 
    
    Note:
    1) size has to be smaller than the number of elements in G.
    """
    
    for _ in range(trials):
        A = random_generators(G,n,n_order_2)
        B = random_generators(G,n,n_order_2)
        violations = 0
        for a in A:
            for b in B:
                for g in G:
                    if a * g == g * b:
                        violations += 1
                        break
        if violations == 0:
            return A, B
    exit(1)

def get_AB(G, n, n_order_2 = 0, trials = 100):
    """ Returns two sets of a group G of size n, for which TNC condition does not necessarily hold.
        If no sets were found after # of trials, it exits with error.  


    Keyword arguments:
    G -- Sage group object.
    n -- number.
    n_order_2 -- number. 
    trials -- number of trials to find A,B that uphold TNC. 
    
    Note:
    1) size has to be smaller than the number of elements in G.
    """
    
    sets_with_tnc = get_AB_with_TNC(G, n, n_order_2, trials)
    if sets_with_tnc == None:
        A = random_generators(G,n,n_order_2)
        B = random_generators(G,n,n_order_2)
        return A,B
    else:
        return sets_with_tnc
    
def get_AB_from_LPS(G,p,q):
    """ Returns two sets of a group of size p + 1, according to the Lubotzky, Phillips and Sarnak construction of
    Ramanujan Cayley graphs. 


    Keyword arguments:
    G -- Sage group object.
    p -- number.
    q -- number. 
    
    Note:
    1) p,q = 1 mod 4 
    2) Legendre symbol (q,p) = -1
    """
    assert p % 4 == 1
    assert q % 4 == 1
    assert kronecker (q , p) == -1
    F = IntegerModRing(q)
    i = int(F(-1).square_root())
    odd = [x for x in range(p) if int(x) % 2 != 0 and int(x) < p/2]
    even = [x for x in range(-p,p) if int(x) % 2 == 0]
    solutions = []
    for a0 in odd:
        for a1 in even:
            for a2 in even:
                for a3 in even:
                    if (a0*a0 + a1*a1 + a2*a2 + a3*a3) == p:
                        solutions.append((a0,a1,a2,a3))
    generators = []
    for s in solutions:
        a0 = s[0]
        a1 = s[1]
        a2 = s[2]
        a3 = s[3]
        generators.append(G((matrix(F, 2,2,[[a0+i*a1, a2+i*a3],[-a2+i*a3,a0-i*a1]]))))

    A = []
    for g in generators:
        A.append(g)
        A.append(g.inverse())

    random.shuffle(generators)
    B = []
    for g in generators:
        B.append(g)
        B.append(g.inverse())
    return A,B

In [6]:
def decode_via_edges(c3ltc, noisy_word):
    squares_to_values = {}
    for (i, v) in enumerate(noisy_word):
        squares_to_values[c3ltc.index_to_square[i]] = v
    init = True
    iterating_set_A = None
    iterating_set_B = None
    past_words = []
    word_from_square_values = c3ltc.square_to_value_to_word(squares_to_values)
    while (init or (
            len(iterating_set_A) != 0 or len(iterating_set_B) != 0)) and word_from_square_values not in past_words:
        past_words.append(word_from_square_values)
        if init:
            init = False
            iterating_set_A = set(c3ltc.edges_A)
            iterating_set_B = set(c3ltc.edges_B)
        new_iterating_set_A = set([])
        new_iterating_set_B = set([])

        for k,e in enumerate(iterating_set_A):
            a = e.a
            g = e.g
            i = c3ltc.A.index(a)
            local_word = vector(c3ltc.base_field, [0] * len(c3ltc.B))
            for (j, b) in enumerate(c3ltc.B):
                square = Square(c3ltc.G,a, g, b)
                local_word[j] = squares_to_values[square]
            try:
                corrected_localy = c3ltc.C_B.decode_to_code(local_word)
            except:
                new_iterating_set_A.add(AEdge(c3ltc.G, a, g))
                continue
            for (j, b) in enumerate(c3ltc.B):
                square = Square(c3ltc.G,a, g, b)
                if squares_to_values[square] != corrected_localy[j]:
                    new_iterating_set_A.add(AEdge(c3ltc.G, a, g + b))
                    new_iterating_set_B.add(BEdge(c3ltc.G, g, b))
                    new_iterating_set_B.add(BEdge(c3ltc.G, a + g, b))
                squares_to_values[square] = corrected_localy[j]
            if e in new_iterating_set_A:
                new_iterating_set_A.remove(e)

        for k,e in enumerate(iterating_set_B):
            g = e.g
            b = e.b
            j = c3ltc.B.index(b)
            local_word = vector(c3ltc.base_field, [0] * len(c3ltc.A))
            for (i, a) in enumerate(c3ltc.A):
                square = Square(c3ltc.G,a, g, b)
                local_word[i] = squares_to_values[square]
            try:
                corrected_localy = c3ltc.C_A.decode_to_code(local_word)
            except:
                new_iterating_set_B.add(BEdge(c3ltc.G, g, b))
                continue
            for (i, a) in enumerate(c3ltc.A):
                square = Square(c3ltc.G,a, g, b)
                if squares_to_values[square] != corrected_localy[i]:
                    new_iterating_set_B.add(BEdge(c3ltc.G, a + g, b))
                    new_iterating_set_A.add(AEdge(c3ltc.G, a, g))
                    new_iterating_set_A.add(AEdge(c3ltc.G, a, g + b))
                squares_to_values[square] = corrected_localy[i]
            if e in new_iterating_set_B:
                new_iterating_set_B.remove(e)

        iterating_set_A = new_iterating_set_A
        iterating_set_B = new_iterating_set_B
        word_from_square_values = c3ltc.square_to_value_to_word(squares_to_values)
    return word_from_square_values

def decode_via_vertices(c3ltc, noisy_word):
    n_a = len(c3ltc.C_A.parity_check_matrix().columns())
    n_b = len(c3ltc.C_B.parity_check_matrix().columns())
    M = MatrixSpace(c3ltc.base_field, n_a, n_b)
    squares_to_values = {}
    for (i, v) in enumerate(noisy_word):
        squares_to_values[c3ltc.index_to_square[i]] = v
    init = True
    iterating_set = None
    past_words = []
    word_from_square_values = c3ltc.square_to_value_to_word(squares_to_values)

    while (init or len(iterating_set)) != 0 and word_from_square_values not in past_words:
        past_words.append(word_from_square_values)
        if init:
            init = False
            iterating_set = set(c3ltc.G)
        new_iterating_set = set([])
        for g in iterating_set:
            local_view = numpy.zeros((n_a, n_b))
            for i, a in enumerate(c3ltc.A):
                for j, b in enumerate(c3ltc.B):
                    square = Square(c3ltc.G,a, g, b)
                    local_view[i][j] = squares_to_values[square]
            try:
                corrected_local_view = tensor_decoding(M(local_view), c3ltc.C_A, c3ltc.C_B)
            except:
                new_iterating_set.append(g)
                continue
            for i, a in enumerate(c3ltc.A):
                for j, b in enumerate(c3ltc.B):
                    square = Square(c3ltc.G,a, g, b)
                    if corrected_local_view[i][j] != local_view[i][j]:
                        new_iterating_set.add(a + g)
                        new_iterating_set.add(g + b)
                        new_iterating_set.add(a + g + b)
                    squares_to_values[square] = corrected_local_view[i][j]
            if g in new_iterating_set:
                new_iterating_set.remove(g)
        iterating_set = new_iterating_set
        word_from_square_values = c3ltc.square_to_value_to_word(squares_to_values)

    return word_from_square_values

In [7]:
def word_with_k_injected_local_views(c3ltc, k):
    """ Create a random word with k local views "injected" with random tensor codewords. 
        Note that some vertices may not have a tensor codeword in their local view because the 
        function writes over pre-existing values. 

    Keyword arguments:
    c3ltc -- c3LTC object.
    k -- number. 
    """
    word = vector(c3ltc.base_field, [0] * c3ltc.length)
    for _ in range(k):
        v = random.randint(0,len(c3ltc.G)-1)
        local_view_flat = c3ltc.vertex_to_squares[v].reshape((c3ltc.C_A.length() * c3ltc.C_B.length()))
        random_local_word = random_tensor_word(c3ltc.C_A, c3ltc.C_B)
        for (i,b) in enumerate(random_local_word):
            word[int(local_view_flat[i])] = Integer(b)
    return word

def is_word_in_tensor_code(C_A,C_B, word):
    """ Checks if a matrix tensor word is in code by verifying that each row and column are 
        in the associated code. 

    Keyword arguments:
    C_A -- Sage code object.
    C_B -- Sage code object.
    word -- number. 
    """
    rows = word.shape[0]
    cols = word.shape[1]
    field = C_A.base_field()
    for i in range(rows):
        row = word[i,:]
        if 0 != numpy.count_nonzero(C_A.syndrome(vector(field, row))):
            return false
    for i in range(cols):
        col = word[:,i]
        if 0 != numpy.count_nonzero(C_B.syndrome(vector(field, col))):
            return false
    return true

def unsatisfied_vertices(c3ltc, word):
    """ Returns a list of vertices for which the local view in the given word is not in the
        tensor code. 

    Keyword arguments:
    c3ltc -- c3LTC object.
    word -- number. 
    """
    n_vertices = len(c3ltc.G)
    unsat = []
    for g in range(n_vertices):
        lv = c3ltc.local_codeword_on_vertex(g, word)
        if not is_word_in_tensor_code(c3ltc.C_A,c3ltc.C_B, lv):
            unsat.append(g)
    return unsat


def estimate_ltc_constant_with_injected_local_views(c3ltc, trials = 10, max_injected = 50):
    """ Returns an estimation of the local testability parameter by sampling a random noisy word and counting the 
        relative fraction of unsatisfied vertices. 
        The noise is generated by injecting random tensor words into the local view of vertices. 

    Keyword arguments:
    c3ltc -- c3LTC object.
    trials -- number. 
    max_injected -- number. 
    """
    c = Infinity
    n_vertices = len(c3ltc.G)
    for _ in range(trials):
        word = word_with_k_injected_local_views(c3ltc, max_injected)
        syndrome_weight = float(
            len(unsatisfied_vertices(c3ltc,word)) / n_vertices
        )
        error_weight = float(numpy.count_nonzero(word)/ c3ltc.length)
        if error_weight != 0:
            c = min(c, float(syndrome_weight / error_weight))
    return c

def estimate_ltc_constant_with_random_local_views(c3ltc, trials = 10, max_injected = 50, sparsity = 2):
    """ Returns an estimation of the local testability parameter by sampling a random noisy word and counting the 
        relative fraction of unsatisfied vertices. 
        The noise is sampled randomly. 

    Keyword arguments:
    c3ltc -- c3LTC object.
    trials -- number. 
    max_injected -- number. 
    sparsity -- number. 
    """
    c = Infinity
    n_vertices = len(c3ltc.G)
    for _ in range(trials):
        noisy_word, word = random_noisy_word(c3ltc, sparsity)
        syndrome_weight = float(
            len(unsatisfied_vertices(c3ltc,noisy_word)) / n_vertices
        )
        error_weight = float(numpy.count_nonzero(word - noisy_word)/ c3ltc.length)
        if error_weight != 0:
            c = min(c, float(syndrome_weight / error_weight))
    return c

In [8]:
def random_vector(size, max_val, sparsity = 2):
    """ Retruns a random vector of a specified size, each entry sampled in the range between 
        0 and max_val. Sparsity reflects that a non-zero probability is sampled.
        The probability of a non-zero element to appear in the array is 1/(max_val - 1) * (1/2) ^ sparsity.

    Keyword arguments:
    size -- number.
    max_val -- number.
    sparsity -- number. 
    """
    return [random.randint(0,max_val)*numpy.random.binomial(1, pow(0.5,sparsity), 1)[0] for _ in range(size)]

def random_tensor_word(C_A, C_B):
    """ Retruns a random tensor codeword. It is achieved by sampling a random vector, multiplying
        it by the tensor of the generators matrices of C_A, C_B.
        The word returned is a flattening of the "matrix" view of the tensor codeword into a
        single vector by concatenating the rows of the respective matrix.
        Sparsity parameter is defined in the random_vector function. 

    Keyword arguments:
    C_A -- Sage code object.
    C_B -- Sage code object.
    sparsity -- number. 
    """
    field_char = C_A.base_field().characteristic()
    tensor_dimension = C_A.dimension() * C_B.dimension()
    rv = random_vector(tensor_dimension,field_char,0)
    message = vector(C_A.base_field(), rv)
    return (C_A.generator_matrix().tensor_product(C_B.generator_matrix()).transpose() * message)

def random_noisy_word(c3ltc, sparsity = 2):
    """ Retruns a noisy c3ltc codeword by encoding a random vector. 
    Sparsity parameter is defined in the random_vector function. 

    Keyword arguments:
    c3ltc -- c3LTC object.
    sparsity -- number. 
    """
    field = c3ltc.base_field
    error = vector(field, random_vector(c3ltc.length, field.characteristic(),sparsity))
    word = vector(field, random_word(c3ltc)) 
    return vector(field, word + error), vector(field, word)

def random_word(c3ltc):
    """ Retruns a c3ltc codeword by encoding a random vector. 

    Keyword arguments:
    c3ltc -- c3LTC object.
    """
    field = c3ltc.base_field
    rv = random_vector(c3ltc.dimension,field.characteristic(),0)
    message = c3ltc.generator_matrix.transpose() * vector(field, rv)    
    return message


In [9]:
def embedding_local_parity_constraints_on_squares(C_A, C_B, G, A, B):
    """ Returns
    1) Sparse representation of the constrsints by a mapping of squares (represeted by 3 group elements (a,g,b))
        to dictionary whose keys are rows in which the square has non zero value, and value is the value in the relevant row and column.
    2) Number constraints of rows in the constraint matrix.

    Keyword arguments:
    C_A -- Sage code object.
    C_B -- Sage code object.
    G -- Sage group object.
    A -- list of group elements.
    B -- list of group elements.

    """

    constraints = {}
    row = 0
    parity_A = C_A.parity_check_matrix()
    parity_B = C_B.parity_check_matrix()
    codim_C_A = len(parity_A.rows())
    codim_C_B = len(parity_B.rows())

    # Stage 1 - collecting the edges
    edges_A = set()
    edges_B = set()
    for i in tqdm(range(len(list(G)))):
        g = list(G)[i]
        for a in A:
            edges_A.add(AEdge(G, a, g))
        for b in B:
            edges_B.add(BEdge(G, g, b))

    assert len(edges_A) == len(list(G)) * len(A) / 2
    assert len(edges_B) == len(list(G)) * len(B) / 2

    print("finished step 1")
    
    # Stage 2 - iterating over edges and "injecting" constraints into squares
    for t in tqdm(range(len(list(edges_A)))):
        e = list(edges_A)[t]
        a = e.a
        g = e.g
        for (j, b) in enumerate(B):  # B[j] = b
            square = Square(G, a, g, b)
            if square not in constraints:  # if no contraints were added on this square before
                constraints[square] = {}
            for (k, v) in enumerate(parity_A[:, j]):  # parity_A[k,j] = v
                constraints[square][row + k] = v[0]  # v is represted as a length 1 array
        row += codim_C_A

    for t in tqdm(range(len(list(edges_B)))):
        e = list(edges_B)[t]
        g = e.g
        b = e.b
        for (i, a) in enumerate(A):  # A[i] = a
            square = Square(G, a, g, b)
            if square not in constraints:  # if no contraints were added on this square before
                constraints[square] = {}
            for (k, v) in enumerate(parity_B[:, i]):  # parity_B[k,i] = v
                constraints[square][row + k] = v[0]  # v is represted as a length 1 array
        row += codim_C_B
        
    assert len(constraints) == len(A) * len(B) * len(list(G)) / 4
    assert row == codim_C_A * len(edges_A) + codim_C_B * len(edges_B)
    
    print("finished step 2")

    return (constraints, row, edges_A, edges_B)


def embedding_local_parity_constraints_on_edges(C, G, A):
    """ Returns
    1) Sparse representation of the constrsints by a mapping of squares (represeted by 3 group elements (a,g,b))
        to dictionary whose keys are rows in which the square has non zero value, and value is the value in the relevant row and column.
    2) Number constraints of rows in the constraint matrix.

    Keyword arguments:
    C_A -- Sage code object.
    C_B -- Sage code object.
    G -- Sage group object.
    A -- list of group elements.
    B -- list of group elements.

    """

    constraints = {}
    row = 0
    parity = C.parity_check_matrix()
    codim_C = len(parity.rows())

    edges = set()
    for i in tqdm(range(len(list(G)))):
        g = list(G)[i]
        for (i, a) in enumerate(A):
            edge = AEdge(G, a, g)
            edges.add(edge)
            if edge not in constraints:  # if no contraints were added on this square before
                constraints[edge] = {}
            for (k, v) in enumerate(parity[:, i]):  # parity[k,j] = v
                constraints[edge][row + k] = v[0]  # v is represted as a length 1 array
        row += codim_C

    print(len(constraints) , len(A) * len(list(G)) / 2)
    assert len(constraints) == len(A) * len(list(G)) / 2
    assert row == codim_C * len(G)
    
    return (constraints, row, edges)



In [75]:
class c3LTC:

    def __init__(self, C_A, C_B, G, A, B):
        assert len(C_A.generator_matrix().columns()) == len(A)
        assert len(C_B.generator_matrix().columns()) == len(B)
        assert C_A.base_field().characteristic() == C_B.base_field().characteristic()

        (sparse_constraints, count, self.edges_A, self.edges_B) = embedding_local_parity_constraints_on_squares(C_A, C_B, G, A, B)

        # process sparse constraints
        parity_constraints = numpy.zeros((count, len(sparse_constraints)))
        for (i, l) in enumerate(sparse_constraints):
            for k in sparse_constraints[l]:
                parity_constraints[k][i] = sparse_constraints[l][k]

        list_G = list(G)

        self.A = A
        self.B = B
        self.C_A = C_A
        self.C_B = C_B
        self.G = G
        self.base_field = C_A.base_field()
        gen, par = row_reduce_and_orthogonal(sparse_constraints, C_A.base_field().characteristic(), count,len(sparse_constraints))
        M = MatrixSpace(self.base_field, gen.shape[0],
                        gen.shape[1])
        self.generator_matrix = M(gen)
        M = MatrixSpace(self.base_field, par.shape[0],
                        par.shape[1])
        self.parity_check_matrix = M(par)
        self.length = numpy.array(self.generator_matrix).shape[1]
        self.dimension = numpy.array(self.generator_matrix).shape[0]
        print("Generated c3LTC code with dimension " + str(self.dimension) + " and length " + str(self.length) + " and rate: " + str(self.dimension/self.length))

    def square_to_value_to_word(self, squares_to_values):
        corrected_word = vector(self.base_field, [0] * len(squares_to_values))
        for square in self.square_to_index:
            corrected_word[self.square_to_index[square]] = squares_to_values[square]
        return corrected_word

    def syndrome(self, c):
        return self.parity_check_matrix * c
    
    def decode_via_edges(self, noisy_word):
        return decode_via_edges(self, noisy_word)
    
    def decode_via_vertices(self, noisy_word):
        return decode_via_vertices(self, noisy_word)

    def local_codeword_on_vertex(self, vertex, word):
        square_view = self.vertex_to_squares[vertex]
        rows = len(square_view)
        cols = len(square_view[0])
        local_view_values = numpy.array([0] * rows
                                        * cols).reshape((rows, cols))
        for i in range(rows):
            for j in range(cols):
                local_view_values[i][j] = int(word[int(self.vertex_to_squares[vertex][i][j])])
        return local_view_values

    def __repr__(self):
        rep = 'c3LTC'
        return rep
    

class ExpanderCode:

    def __init__(self, C, G, A):
        assert len(C.generator_matrix().columns()) == len(A)

        (sparse_constraints, count, self.edges) = embedding_local_parity_constraints_on_edges(C, G, A)

        # process sparse constraints
        parity_constraints = numpy.zeros((count, len(sparse_constraints)))
        for (i, l) in enumerate(sparse_constraints):
            for k in sparse_constraints[l]:
                parity_constraints[k][i] = sparse_constraints[l][k]

        list_G = list(G)

        self.A = A
        self.G = G
        self.base_field = C.base_field()
        gen, par = row_reduce_and_orthogonal(sparse_constraints, C.base_field().characteristic(), count,len(sparse_constraints))
        M = MatrixSpace(self.base_field, gen.shape[0],
                        gen.shape[1])
        self.generator_matrix = M(gen)
        M = MatrixSpace(self.base_field, par.shape[0],
                        par.shape[1])
        self.parity_check_matrix = M(par)
        self.length = numpy.array(self.generator_matrix).shape[1]
        self.dimension = numpy.array(self.generator_matrix).shape[0]
        print("Generated expander code with dimension " + str(self.dimension) + " and length " + str(self.length) + " and rate: " + str(self.dimension/self.length))

In [76]:
p = 2

n_group,k_group = 16,7
d = round((k - 1)/2)

found = False
C_group = None

MS = MatrixSpace(GF(p),k_group,n_group)
while not found:
    s = [[random.randint(0,p-1) for _ in range(n_group)] for _ in range(k_group)]
    G = MS(s)
    C_group = LinearCode(G)
    if C_group.minimum_distance() >= d and C_group.dimension() == k_group:
        A = C_group.generator_matrix().columns()
        if len(A) == len(set(A)):
            found = True

G = VectorSpace(GF(p),k_group)

expander = ExpanderCode(C_group, G, A)

100%|█████████████████████████████████████████| 128/128 [00:18<00:00,  7.01it/s]


1024 1024
[*] Start row reduce from c


[IO] loading 1024 x 1152 SMS matrix modulo 2... 5.0k NNZ [0.0s]
[CSR] Compressing... 4992 actual NZ, Mem usage = 44.0kbyte [0.00s]
LU : 20 / 1152 [|L| = 0 / |U| = 95] -- current density= (0.009 vs 0.008) --- rank >= 20LU : 22 / 1152 [|L| = 0 / |U| = 108] -- current density= (0.010 vs 0.008) --- rank >= 22LU : 23 / 1152 [|L| = 0 / |U| = 116] -- current density= (0.011 vs 0.008) --- rank >= 23LU : 25 / 1152 [|L| = 0 / |U| = 128] -- current density= (0.012 vs 0.008) --- rank >= 25LU : 29 / 1152 [|L| = 0 / |U| = 149] -- current density= (0.009 vs 0.008) --- rank >= 29LU : 31 / 1152 [|L| = 0 / |U| = 162] -- current density= (0.010 vs 0.008) --- rank >= 31LU : 32 / 1152 [|L| = 0 / |U| = 170] -- current density= (0.011 vs 0.008) --- rank >= 32LU : 34 / 1152 [|L| = 0 / |U| = 182] -- current density= (0.012 vs 0.008) --- rank >= 34LU : 56 / 1152 [|L| = 0 / |U| = 281] -- current density= (0.009 vs 0.008) --- rank >= 56LU : 58 / 1152 [|L| = 0 / |U| = 294] -- current density= (0.010 vs 0

[*] Actual time in c 0.1825239658355713
[*] Finished row reduce from c
Generated expander code with dimension 153 and length 1024 and rate: 0.1494140625


In [74]:
k_group = 4
G = VectorSpace(GF(p),k_group)

def tensor(x,y):
    tensor_list = x.tensor_product(y)
    return [item for sublist in tensor_list for item in sublist]

def pad(v,m,i):
    if i == 0:
        before = []
    else:
        before = [0]*(m*m*(i))
    after = [0]*(m*m*(m-i-1))
    before.extend(v)
    before.extend(after)
    return numpy.array(before)

def xs_matrix(x,s,m):
    v1 = tensor(x,x)
    v2 = tensor(x+s,x+s)
    mat = np.zeros((m, m*m*m))
    for i in range(m):
        pad1 = pad(v1,m,i)
        pad2 = pad(v2,m,i)
        mat[i,:] = s[i]*(pad1-pad2)
    mat = mat[~np.all(mat == 0, axis=1)] # remove zero rows
    return mat

def F_matrix():
    list_G = list(G)
    arr = numpy.array([0]*k_group*k_group*k_group)
    for (i,x) in enumerate(list(G)):
        for (j,s) in enumerate(list(G)):
            xs_mat = xs_matrix(x,s,k_group)
            if len(xs_matrix(x,s,k_group)) != 0:
                arr = numpy.vstack((arr,xs_mat))
    return arr

F = F_matrix().transpose()
MS = MatrixSpace(GF(2), F.shape[0],F.shape[1])
MS(F).column_space()

Vector space of degree 64 and dimension 40 over Finite Field of size 2
Basis matrix:
40 x 64 dense matrix over Finite Field of size 2