<a href="https://colab.research.google.com/github/BenzThitikorn/Vector-Symbol-Decoding-with-AI/blob/main/Data_Generation_Using_PYLDPC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import sys
sys.path.append('/content/drive/MyDrive/VSD + AI/')

!pip install --upgrade pyldpc

Collecting pyldpc
  Downloading pyldpc-0.7.9.tar.gz (1.1 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/1.1 MB[0m [31m7.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━[0m [32m0.8/1.1 MB[0m [31m11.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.1/1.1 MB[0m [31m11.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyldpc
  Building wheel for pyldpc (setup.py) ... [?25l[?25hdone
  Created wheel for pyldpc: filename=pyldpc-0.7.9-py3-none-any.whl size=14303 sha256=cfc662d240c56a313b11fc76d9619d9d23590201603aa2e8260c525d

# Code Set Up

In [2]:
n = 30 #Code lenght
r = 16 #Symbol lenght

In [3]:
#LDPC Setting
from pyldpc import make_ldpc
d_v=3
d_c=6
H,Gt = make_ldpc(n, d_v=3, d_c=6, systematic=True, sparse=True, seed=1)
G=Gt.T
j=G.shape[0]
print(f"H matrix: {H.shape[0]} rows, {H.shape[1]} columns")
print(f"G matrix: {G.shape[0]} rows, {G.shape[1]} columns")

H matrix: 15 rows, 30 columns
G matrix: 17 rows, 30 columns


In [4]:
# prompt: save H and Gt

import numpy as np

# Assuming H and Gt are already defined from the previous code execution
np.save('H_matrix.npy', H)
np.save('Gt_matrix.npy', Gt)


## Function Defined

In [5]:
import Channel_Coding as cc
import numpy as np

def VSD_normal(H, Y):
    Y_Binary = np.array(Y, dtype=int)

    # Compute Syndrome Matrix
    S_Binary = np.floor(np.mod(np.dot(H, Y_Binary), 2)).astype(int)
    S_Gauss, S_rank, Index_Rows = cc.Compute_Gauss_Jordan_Reduction(S_Binary)

    # Compute Error Locating Vector
    Error_Locating_Vector = cc.Compute_Error_Locating_Vector(S_Gauss, Index_Rows, H)

    # Find Number of Erroneous Symbols
    Number_Error = np.count_nonzero(Error_Locating_Vector == 0)

    if Number_Error == 0:
        return Y, 1  # No errors detected

    # Check if Rank of S matches Number of Erroneous Symbols
    if S_rank == Number_Error:
        S_Sub = S_Binary[Index_Rows, :]
        Position_Error = np.where(Error_Locating_Vector == 0)[0]
        H_Sub = H[np.ix_(Index_Rows, Position_Error)]

        # Check if H_Sub is invertible
        if np.linalg.det(H_Sub) == 0:
            #print("normal",H_Sub)
            return Y_Binary, 0  # Cannot correct errors

        H_Sub_inv = np.linalg.inv(H_Sub)
        Error_Binary = np.floor(np.mod(np.dot(H_Sub_inv, S_Sub), 2)).astype(int)

        # Correct the errors
        for index in range(len(Position_Error)):
            Y_Binary[Position_Error[index]] ^= Error_Binary[index]

        Y_decode = Y_Binary  # Decoded output
        return Y_decode, 1  # Successful correction
    else:
        return Y_Binary, 0  # Unable to correct errors

def VSD_normal_get0fast(H,Y):
    Y_Binary = np.array(Y, dtype=int)

    # Compute Syndrome Matrix
    S_Binary = np.floor(np.mod(np.dot(H, Y_Binary), 2)).astype(int)
    S_Gauss, S_rank, Index_Rows = cc.Compute_Gauss_Jordan_Reduction(S_Binary)

    # Compute Error Locating Vector
    Error_Locating_Vector = cc.Compute_Error_Locating_Vector(S_Gauss, Index_Rows, H)

    # Find Number of Erroneous Symbols
    Number_Error = np.count_nonzero(Error_Locating_Vector == 0)

    if Number_Error == 0:
        return Y, 1  # No errors detected

    # Check if Rank of S matches Number of Erroneous Symbols
    if S_rank == Number_Error:
        S_Sub = S_Binary[Index_Rows, :]
        Position_Error = np.where(Error_Locating_Vector == 0)[0]
        H_Sub = H[np.ix_(Index_Rows, Position_Error)]

        if np.linalg.det(H_Sub) == 0:
            return Y_Binary, 0  # Cannot correct errors


        return 0,   1  # Successful correction
    else:
        return 0,   0  # Unable to correct errors


# Simulation Step

In [6]:
def q_ary_error_channel(prob_error,num):
	rng = np.random.default_rng()
	Probability = [1-prob_error, prob_error]
	channel = rng.choice(2, num, p=Probability)
	return channel

def sigma_gen(prob_error,num):
  channel = q_ary_error_channel(prob_error,num)
  return (channel+1)%2

In [7]:
num_samples = 1
SER = 0.1
j=G.shape[0]
for i in range(num_samples):
  error_positions=cc.q_ary_error_channel(0.1,n) #Error Position = 1
  sigma = sigma_gen(SER,n)
  message = np.random.randint(0, 2, (r, j))
  codeword = np.matmul(message, G).astype(np.int8) % 2
  error = np.zeros_like(codeword)
  error[:, sigma == 0] = 1

  print("Error Locating = ",sigma)
  print(error)
#print(np.matmul(H, codeword.T)%2)

Error Locating =  [1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 0 1 1 1 1 1 1 1 1 1 1 1 1]
[[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 

In [8]:
for i in range(num_samples):
        msg = message[i].astype(int)
        error_positions
        sigma = np.zeros(n, dtype=int)
        codeword = np.matmul(msg, G).astype(np.int8) % 2
        num_errors = int(prob_row_error * n)
        error_positions = np.random.choice(n, num_errors, replace=False)
        sigma[error_positions] = 1
        error = np.zeros_like(codeword)
        error[:, sigma == 1] = sparse_random(r, np.sum(sigma), density=0.5, format='csr', dtype=int).toarray() % 2
        received = (codeword + error) % 2
        syndrome = np.matmul(H, received.T).astype(int) % 2

        X_train_dset[i] = syndrome
        y_train_dset[i] = sigma

print("Data saved to data data16bits_2M.h5")

NameError: name 'prob_row_error' is not defined

# Fun Space

In [None]:
print(H)

In [None]:
print(G)