In [2]:
import numpy as np
from conv import Convolutional_code
from mod import Modulation
from AWGN import AWGN
import os
import os.path
from tqdm import tqdm

from pathlib import Path

# 1 X 128 

In [6]:
# SNR 범위 설정
SNR = range(-20, 21, 1)

num_samples = 1000

# 데이터 셋 폴더 생성 위치

current_dir = Path('dataset.ipynb').parent

dest = current_dir / 'DATASET_FOR_GAF_1_128'

if not os.path.exists(dest):
    os.mkdir(dest)
    print("데이터 셋 폴더 생성")

# 데이터와 레이블을 저장할 리스트 초기화
data_list = []
label_list = []

for CL in range(3):
    # 채널 코딩에 사용할 콘볼루션 코드 선택 (1, 2, or 3)
    convolution_code = CL + 1
    modulation = 1
    
    encoder = Convolutional_code(convolution_code)
    mod = Modulation(modulation)
    
    for i in tqdm(SNR, desc="Loading Files"):
        for j in range(1, num_samples+1):
            
            all_data = []
 
            if CL == 0:
                SIZE = 62
            elif CL == 1:
                SIZE = 61
            else:
                SIZE = 60
            random_bits = np.random.randint(2, size=SIZE)
            
            # 채널코딩
            encoded, _ = encoder.encoder(list(random_bits))

            # 변조
            moded = mod.mod(encoded)
            
            # 채널 효과 적용
            noise_data = AWGN(int(i), 1/2, 1, moded)
            
            # 복조
            data = mod.demod(noise_data, 1)
            
            # 넘파이 배열로 변경
            data = np.array(data).reshape(1, 128)
            all_data.append(data)
  
            # 데이터 리스트에 추가
            data_list.append(all_data)
            label_list.append((CL, i))  # CL과 SNR 값을 레이블로 사용

# 데이터를 넘파이 배열로 변환
data_array = np.array(data_list)
label_array = np.array(label_list)

# 데이터 셔플링
np.random.seed(42)
indices = np.arange(data_array.shape[0])
np.random.shuffle(indices)

data_array = data_array[indices]
label_array = label_array[indices]

# 기존 데이터 분할 (70% 훈련, 30% 테스트)
# 훈련, 테스트, 검증 데이터 분할 (80% 훈련, 10% 테스트, 10% 검증)
n_examples = data_array.shape[0]
n_train = int(0.8 * n_examples)
n_test = int(0.1 * n_examples)
n_val = n_examples - n_train - n_test

train_idx = indices[:n_train]
test_idx = indices[n_train:n_train + n_test]
val_idx = indices[n_train + n_test:]

X_train = data_array[train_idx]
X_test = data_array[test_idx]
X_val = data_array[val_idx]

Y_train = label_array[train_idx]
Y_test = label_array[test_idx]
Y_val = label_array[val_idx]

# 원-핫 인코딩 함수
def to_onehot(labels, num_classes):
    onehot_labels = np.zeros((labels.shape[0], num_classes))
    for i, label in enumerate(labels):
        onehot_labels[i, label] = 1
    return onehot_labels

# 레이블의 첫 번째 열(CL)을 원-핫 인코딩 (3개의 클래스)
num_classes = len(set(label_array[:, 0]))  # 실제 클래스 수 계산
Y_train_onehot = to_onehot(Y_train[:, 0], num_classes)
Y_test_onehot = to_onehot(Y_test[:, 0], num_classes)
Y_val_onehot = to_onehot(Y_val[:, 0], num_classes)

# idx도 저장
np.save(os.path.join(dest, "train_idx.npy"), train_idx)
np.save(os.path.join(dest, "test_idx.npy"), test_idx)
np.save(os.path.join(dest, "val_idx.npy"), val_idx)

# npy 파일로 저장
np.save(os.path.join(dest, "lbl.npy"), label_array)
np.save(os.path.join(dest, "x_train.npy"), X_train)
np.save(os.path.join(dest, "x_test.npy"), X_test)
np.save(os.path.join(dest, "x_val.npy"), X_val)
np.save(os.path.join(dest, "y_train.npy"), Y_train_onehot)
np.save(os.path.join(dest, "y_test.npy"), Y_test_onehot)
np.save(os.path.join(dest, "y_val.npy"), Y_val_onehot)

print("데이터 생성 완료")

데이터 셋 폴더 생성


Loading Files: 100%|██████████| 41/41 [00:36<00:00,  1.13it/s]
Loading Files: 100%|██████████| 41/41 [00:40<00:00,  1.02it/s]
Loading Files: 100%|██████████| 41/41 [00:40<00:00,  1.02it/s]


데이터 생성 완료


In [35]:
random_bits = np.random.randint(2, size=(14))

encoder = Convolutional_code(1)
encoded, _ = encoder.encoder(list(random_bits))

print(random_bits)
print(list(random_bits))
print(encoded)
print(encoded.shape)

[1 1 1 1 0 1 0 1 1 0 0 1 0 1]
[1, 1, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1]
[1. 1. 1. 0. 0. 1. 0. 1. 1. 0. 0. 0. 0. 1. 0. 0. 1. 0. 1. 0. 1. 1. 1. 1.
 0. 1. 0. 0. 0. 1. 1. 1.]
(32,)


# 16 X 16

In [29]:
from joblib import Parallel, delayed

# SNR 범위 설정
SNR = range(-5, 21, 1)
num_samples = 1000

# 데이터 셋 폴더 생성 위치
current_dir = Path('dataset.ipynb').parent
dest = current_dir / 'DATASET_16'

if not os.path.exists(dest):
    os.mkdir(dest)
    print("데이터 셋 폴더 생성")

# 데이터와 레이블을 저장할 리스트 초기화
data_list = []
label_list = []

def process_sample(CL, SNR_value):
    encoder = Convolutional_code(CL + 1)
    mod = Modulation(1)
    
    SIZE = 6 if CL == 0 else 5 if CL == 1 else 4

    # 16*16 크기의 비트 스트림 생성
    random_bits = np.random.randint(2, size=(16, SIZE))

    all_data_list = []

    for bits in random_bits:
        # 채널 코딩
        encoded, _ = encoder.encoder(list(bits))

        # 변조
        moded = mod.mod(encoded)

        # 채널 효과 적용
        noise_data = AWGN(int(SNR_value), 1/2, 1, moded)

        # 복조
        data = mod.demod(noise_data, 1)

        all_data_list.append(data[:16])  # 필요한 길이만큼 자르기

    # 128개의 1x128 배열을 128x128 배열로 변환
    all_data = np.vstack(all_data_list)
    return all_data, (CL, SNR_value)

# 병렬 처리
results = Parallel(n_jobs=-1)(delayed(process_sample)(CL, i) for CL in range(3) for i in SNR for j in range(1, num_samples + 1))

# 결과 분리
data_list, label_list = zip(*results)

# 데이터를 넘파이 배열로 변환
data_array = np.array(data_list)
label_array = np.array(label_list)

# 데이터 셔플링
np.random.seed(42)
indices = np.arange(data_array.shape[0])
np.random.shuffle(indices)

data_array = data_array[indices]
label_array = label_array[indices]

# 훈련, 테스트, 검증 데이터 분할 (80% 훈련, 10% 테스트, 10% 검증)
n_examples = data_array.shape[0]
n_train = int(0.8 * n_examples)
n_test = int(0.1 * n_examples)
n_val = n_examples - n_train - n_test

train_idx = indices[:n_train]
test_idx = indices[n_train:n_train + n_test]
val_idx = indices[n_train + n_test:]

X_train = data_array[train_idx]
X_test = data_array[test_idx]
X_val = data_array[val_idx]

Y_train = label_array[train_idx]
Y_test = label_array[test_idx]
Y_val = label_array[val_idx]

# 원-핫 인코딩 함수
def to_onehot(labels, num_classes):
    onehot_labels = np.zeros((labels.shape[0], num_classes))
    for i, label in enumerate(labels):
        onehot_labels[i, label] = 1
    return onehot_labels

# 레이블의 첫 번째 열(CL)을 원-핫 인코딩 (3개의 클래스)
num_classes = len(set(label_array[:, 0]))  # 실제 클래스 수 계산
Y_train_onehot = to_onehot(Y_train[:, 0], num_classes)
Y_test_onehot = to_onehot(Y_test[:, 0], num_classes)
Y_val_onehot = to_onehot(Y_val[:, 0], num_classes)

# npy 파일로 저장
np.save(os.path.join(dest, "x_train.npy"), X_train)
np.save(os.path.join(dest, "x_test.npy"), X_test)
np.save(os.path.join(dest, "x_val.npy"), X_val)

np.save(os.path.join(dest, "y_train.npy"), Y_train_onehot)
np.save(os.path.join(dest, "y_test.npy"), Y_test_onehot)
np.save(os.path.join(dest, "y_val.npy"), Y_val_onehot)

np.save(os.path.join(dest, "lbl.npy"), label_array)

# idx도 저장
np.save(os.path.join(dest, "train_idx.npy"), train_idx)
np.save(os.path.join(dest, "test_idx.npy"), test_idx)
np.save(os.path.join(dest, "val_idx.npy"), val_idx)

print("데이터 생성 완료")

데이터 생성 완료


# 32 X 32

In [38]:
from joblib import Parallel, delayed

# SNR 범위 설정
SNR = range(-5, 21, 1)
num_samples = 1000

# 데이터 셋 폴더 생성 위치
current_dir = Path('dataset.ipynb').parent
dest = current_dir / 'DATASET_32'

if not os.path.exists(dest):
    os.mkdir(dest)
    print("데이터 셋 폴더 생성")

# 데이터와 레이블을 저장할 리스트 초기화
data_list = []
label_list = []

def process_sample(CL, SNR_value):
    encoder = Convolutional_code(CL + 1)
    mod = Modulation(1)
    
    SIZE = 14 if CL == 0 else 13 if CL == 1 else 12

    # 32*32 크기의 비트 스트림 생성
    random_bits = np.random.randint(2, size=(32, SIZE))

    all_data_list = []

    for bits in random_bits:
        # 채널 코딩
        encoded, _ = encoder.encoder(list(bits))

        # 변조
        moded = mod.mod(encoded)

        # 채널 효과 적용
        noise_data = AWGN(int(SNR_value), 1/2, moded)

        # 복조
        data = mod.demod(noise_data, 1)

        all_data_list.append(data[:32])  # 필요한 길이만큼 자르기

    # 128개의 1x128 배열을 128x128 배열로 변환
    all_data = np.vstack(all_data_list)
    return all_data, (CL, SNR_value)

# 병렬 처리
results = Parallel(n_jobs=-1)(delayed(process_sample)(CL, i) for CL in range(3) for i in SNR for j in range(1, num_samples + 1))

# 결과 분리
data_list, label_list = zip(*results)

# 데이터를 넘파이 배열로 변환
data_array = np.array(data_list)
label_array = np.array(label_list)

# 데이터 셔플링
np.random.seed(42)
indices = np.arange(data_array.shape[0])
np.random.shuffle(indices)

data_array = data_array[indices]
label_array = label_array[indices]

# 훈련, 테스트, 검증 데이터 분할 (80% 훈련, 10% 테스트, 10% 검증)
n_examples = data_array.shape[0]
n_train = int(0.8 * n_examples)
n_test = int(0.1 * n_examples)
n_val = n_examples - n_train - n_test

train_idx = indices[:n_train]
test_idx = indices[n_train:n_train + n_test]
val_idx = indices[n_train + n_test:]

X_train = data_array[train_idx]
X_test = data_array[test_idx]
X_val = data_array[val_idx]

Y_train = label_array[train_idx]
Y_test = label_array[test_idx]
Y_val = label_array[val_idx]

# 원-핫 인코딩 함수
def to_onehot(labels, num_classes):
    onehot_labels = np.zeros((labels.shape[0], num_classes))
    for i, label in enumerate(labels):
        onehot_labels[i, label] = 1
    return onehot_labels

# 레이블의 첫 번째 열(CL)을 원-핫 인코딩 (3개의 클래스)
num_classes = len(set(label_array[:, 0]))  # 실제 클래스 수 계산
Y_train_onehot = to_onehot(Y_train[:, 0], num_classes)
Y_test_onehot = to_onehot(Y_test[:, 0], num_classes)
Y_val_onehot = to_onehot(Y_val[:, 0], num_classes)

# npy 파일로 저장
np.save(os.path.join(dest, "x_train.npy"), X_train)
np.save(os.path.join(dest, "x_test.npy"), X_test)
np.save(os.path.join(dest, "x_val.npy"), X_val)

np.save(os.path.join(dest, "y_train.npy"), Y_train_onehot)
np.save(os.path.join(dest, "y_test.npy"), Y_test_onehot)
np.save(os.path.join(dest, "y_val.npy"), Y_val_onehot)

np.save(os.path.join(dest, "lbl.npy"), label_array)

# idx도 저장
np.save(os.path.join(dest, "train_idx.npy"), train_idx)
np.save(os.path.join(dest, "test_idx.npy"), test_idx)
np.save(os.path.join(dest, "val_idx.npy"), val_idx)

print("데이터 생성 완료")

데이터 생성 완료


In [None]:
# (23, 12) Golay code
def golay(data_size):
    m_len = 12

    P = np.array([
        [1, 0, 1, 0, 1, 1, 1, 0, 0, 0, 1],
        [1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1],
        [1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1],
        [1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1],
        [1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0],
        [0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0],
        [0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1],
        [1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 0],
        [0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0],
        [0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1],
        [1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 0],
        [0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 1],
    ], dtype='int')

    G = np.concatenate((np.eye(m_len, dtype="int"), P), axis=1)

    msg = np.random.randint(0,2, (data_size, m_len))
    codeword = np.dot(msg, G) %2

    return codeword

In [None]:
# Hamming Code (8 4)
def Hamming(data_size):
    G = np.array([
        [1, 1, 1, 0, 0 ,0, 0, 1],
        [1, 0 ,0, 1, 1, 0, 0, 1],
        [0, 1, 0, 1, 0, 1, 0, 1],
        [1, 1, 0, 1, 0, 0, 1, 0]
    ], dtype='int')

    msg = np.random.randint(0,2, (data_size, 4))
    codeword = np.dot(msg, G) %2

    return codeword

In [None]:
# BCH Code (15, 7)
def BCH(data_size):
    c_length = 15
    m_length = 7
    G_X_15_7 = [1,1,1,0,1,0,0,0,1]

    BCH_15_7_G = np.zeros((7,15))

    for i in range(7):
        BCH_15_7_G[i,i:i+9] = G_X_15_7

    for i in range(7):
        for j in range(i+1,7):
            if BCH_15_7_G[i, j] == 1:
                BCH_15_7_G[i] = (BCH_15_7_G[i] + BCH_15_7_G[j])%2

    G = BCH_15_7_G # G =BCH_7_4_G
    msg = np.random.randint(0,2,(data_size,m_length))
    codeword = np.dot(msg, G)%2

    return codeword

In [None]:
# Product Code (8, 4)
def product(data_size):
    G = np.array([
        [1, 0, 1, 0, 0 ,0, 1, 0],
        [0, 1 ,1, 0, 0, 0, 0, 1],
        [0, 0, 0, 1, 0, 1, 1, 0],
        [0, 0, 0, 1, 1, 1, 0, 1]
    ], dtype='int')

    msg = np.random.randint(0,2, (data_size, 4))
    codeword = np.dot(msg, G) %2

    return codeword

In [None]:
# RM Code
def G_matrix(length, m, r):
    G = np.ones(length)
    for i in range(m):
        v = np.zeros((int(length/(2**(i+1)))))
        v = np.hstack((v, np.ones((int(length/(2**(i+1)))))))
        while v.shape[0] < length :
            v = np.hstack((v, np.zeros((int(length/(2**(i+1)))))))
            v = np.hstack((v, np.ones((int(length/(2**(i+1)))))))
        G = np.vstack((G,v))
    if r == 1:
        return G
    elif r > 1 :
        for i in range(1,m):
            for j in range(i+1,m+1):
                G = np.vstack((G,(G[i]*G[j])))
        if r == 3:
            G = np.vstack((G,(G[1]*G[2]*G[3])))
            G = np.vstack((G,(G[1]*G[3]*G[4])))
            G = np.vstack((G,(G[1]*G[2]*G[4])))
            G = np.vstack((G,(G[2]*G[3]*G[4])))
        return G
    return G

def rm(data_size):
    m = 4
    r = 2
    length = 2**m

    if r == 1:
        masking_length=0
        msg_length = m+r
    elif r == 2:
        masking_length=6
        msg_length = 11
    elif r == 3:
        masking_length=10
        msg_length = 15

    G = G_matrix(length, m, r)
    msg = np.random.randint(0,2,(data_size,msg_length))
    codeword = np.dot(msg, G) %2

    return codeword

In [None]:
# (n, k) Polar code

def make_F(power):
    F = np.array([[1,0],[1,1]])
    for i in range(1,power):
        first = np.concatenate((F,np.zeros((2**i, 2**i))), axis=1)
        second = np.concatenate((F,F), axis=1)
        F = np.concatenate((first, second), axis =0)

    return F
def Compute_z(z, k, i = 1):
    for j in range(i):
        z[(2*i,2*j)] = 2*z[(i,j)] - (z[(i,j)])**2
        z[(2*i,2*j+1)] = (z[(i,j)])**2
    if 2*i < 2**k:
        z = Compute_z(z, k, 2*i)

    return z
def Frozen_bits(n, z, slice_index):
    bit_index = np.zeros(n)
    for i in range(n):
        bit_index[i] = z[(n, i)]

    bit_index = np.argsort(bit_index)[::-1]

    frozen_bit_index = bit_index[:slice_index]
    message_bit_index = bit_index[slice_index:]
    
    return np.sort(frozen_bit_index), np.sort(message_bit_index)

def polar(data_size):
    # (n, k) Polar code
    k = 4
    n = 2*k
    power = int(np.log2(n))

    F = make_F(power)

    z = {}
    z[(1,0)] = 0.5
    z = Compute_z(z, power)

    frozen_bit_index, message_bit_index = Frozen_bits(n, z, int(n-k))

    msg = np.random.randint(0,2,(data_size, k))
    u = np.zeros((data_size, n))
    u[:,message_bit_index] = msg

    codeword = np.dot(u,F)%2

    return codeword