项目：使用轻量级CNN增强短LDPC码的OSD译码性能 (PyTorch版)

目标:
  验证通过CNN优化OSD中的可靠性排序，可以提升短LDPC码的译码性能。

实验流程:
  1. 定义LDPC码参数并生成矩阵H和G。
  2. 设置计算设备 (自动选择GPU/CUDA或CPU)。
  3. 构建并训练一个基于PyTorch的轻量级1D-CNN。
  4. 实现三种译码器：
     a. 基线NMS译码器
     b. NMS + 标准OSD (基于LLR绝对值排序)
     c. NMS + CNN增强OSD (基于PyTorch模型输出的错误概率排序)
  5. 运行BER仿真，对比三种方案的性能。


-----------------------------------------------------------------------------
1. 环境设置与LDPC码生成
-----------------------------------------------------------------------------

In [None]:
print("--> 步骤1: 导入库并生成LDPC码...")

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from pyldpc import make_ldpc, decode, get_message, encode
from matplotlib import pyplot as plt
import time

# --- LDPC码参数定义 ---
n = 128
d_v = 4
d_c = 8
seed = np.random.RandomState(42)

H, G = make_ldpc(n, d_v, d_c, seed=seed, systematic=True, sparse=True)
n_code, k_info = G.shape

print(f"    实际LDPC码参数: n={n_code}, k={k_info}, 码率={k_info/n_code:.2f}\n")


--> 步骤1: 导入库并生成LDPC码...


ModuleNotFoundError: No module named 'tensorflow'

-----------------------------------------------------------------------------
2. 设置计算设备 (GPU/CPU)
-----------------------------------------------------------------------------

In [None]:
print("--> 步骤2: 检测并设置计算设备...")
# 检查是否有可用的CUDA GPU，如果有则使用，否则使用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"    将使用设备: {device}\n")

-----------------------------------------------------------------------------
3. 为CNN生成训练数据
-----------------------------------------------------------------------------

In [None]:
print("--> 步骤3: 为CNN生成训练数据...")

def generate_training_data(n_samples, snr_db):
    print(f"    正在生成 {n_samples} 条训练样本，信噪比SNR = {snr_db} dB...")
    v_messages = np.zeros((k_info, n_samples))
    c_codewords_modulated = encode(G, v_messages, snr_db, seed=seed)

    sigma2 = 10**(-snr_db / 10)
    mean_llr = 2 / sigma2
    std_llr = np.sqrt(4 / sigma2)
    y_noisy_llrs = np.random.normal(loc=mean_llr, scale=std_llr, size=c_codewords_modulated.shape)

    y_labels = (y_noisy_llrs < 0).astype(int)

    # PyTorch的Conv1d需要 (N, C, L) 格式, 即 (批次数, 通道数, 长度)
    # 我们的通道数是1
    X_train = y_noisy_llrs.T.reshape((n_samples, 1, n_code))
    y_train = y_labels.T.reshape((n_samples, 1, n_code))

    return X_train, y_train


-----------------------------------------------------------------------------
4. 构建并训练CNN模型
-----------------------------------------------------------------------------

In [None]:
print("--> 步骤4: 构建并训练PyTorch CNN模型...")

# --- a. 定义PyTorch模型 ---
class CNN_Model(nn.Module):
    def __init__(self, input_len):
        super(CNN_Model, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=16, kernel_size=5, padding='same'),
            nn.ReLU(),
            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding='same'),
            nn.ReLU(),
            nn.Conv1d(in_channels=32, out_channels=1, kernel_size=3, padding='same'),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.conv_layers(x)


-----------------------------------------------------------------------------
5. 实现三种译码器
-----------------------------------------------------------------------------

In [None]:
print("--> 步骤5: 实现三种不同的译码器...")

def is_valid_codeword(H, c: np.ndarray) -> bool:
    if c is None: return False
    return np.all((H.dot(c) % 2) == 0)

def osd_rescue(H, base_decision: np.ndarray, reliability_vector: np.ndarray, L: int = 15):
    idx_sorted_by_reliability = np.argsort(reliability_vector)
    least_reliable_indices = idx_sorted_by_reliability[:min(L, len(idx_sorted_by_reliability))]

    for i in least_reliable_indices:
        candidate = base_decision.copy(); candidate[i] ^= 1
        if is_valid_codeword(H, candidate): return candidate

    for i in range(len(least_reliable_indices)):
        for j in range(i + 1, len(least_reliable_indices)):
            idx1, idx2 = least_reliable_indices[i], least_reliable_indices[j]
            candidate = base_decision.copy(); candidate[idx1] ^= 1; candidate[idx2] ^= 1
            if is_valid_codeword(H, candidate): return candidate
    return base_decision

def standard_osd_decoder(H, y_channel: np.ndarray, snr: float) -> np.ndarray:
    decoded_word = decode(H, y_channel, snr, maxiter=100)
    if is_valid_codeword(H, decoded_word): return decoded_word
    channel_reliability = np.abs(y_channel)
    return osd_rescue(H, decoded_word, channel_reliability, L=15)

def cnn_enhanced_decoder(H, y_channel: np.ndarray, snr: float, model, device) -> np.ndarray:
    decoded_word = decode(H, y_channel, snr, maxiter=100)
    if is_valid_codeword(H, decoded_word): return decoded_word

    # --- PyTorch预测过程 ---
    model.eval()  # 将模型设置为评估模式
    with torch.no_grad(): # 关闭梯度计算，加速预测
        # 1. Numpy -> PyTorch Tensor
        y_tensor = torch.from_numpy(y_channel).float()
        # 2. Reshape for Conv1d: (L) -> (N, C, L) = (1, 1, n_code)
        y_tensor = y_tensor.reshape((1, 1, n_code))
        # 3. Move to device
        y_tensor = y_tensor.to(device)
        # 4. Predict
        cnn_output_tensor = model(y_tensor)
        # 5. Move back to CPU and convert to Numpy
        cnn_error_probability = cnn_output_tensor.cpu().numpy().flatten()

    # CNN输出的是“错误概率”，概率越高，比特越不可靠。
    # 我们约定osd_rescue中，值越小越不可靠，所以传入1.0 - 概率
    cnn_reliability = 1.0 - cnn_error_probability
    return osd_rescue(H, decoded_word, cnn_reliability, L=15)

print("    译码器实现完毕。\n")

-----------------------------------------------------------------------------
6. 运行BER性能仿真
-----------------------------------------------------------------------------

In [None]:
print("--> 步骤6: 开始最终的BER性能仿真...")
snrs_db = np.arange(1.0, 4.5, 0.5)
n_trials = 2000
max_bp_iter = 100

ber_baseline, ber_std_osd, ber_cnn_osd = [], [], []

for snr in snrs_db:
    start_time = time.time()
    errors_baseline, errors_std_osd, errors_cnn_osd = 0, 0, 0

    v_messages = np.random.randint(2, size=(k_info, n_trials))
    y_noisy = encode(G, v_messages, snr, seed=seed)

    for i in range(n_trials):
        y_col, v_col = y_noisy[:, i], v_messages[:, i]
        d_base = decode(H, y_col, snr, maxiter=max_bp_iter)
        errors_baseline += np.count_nonzero(get_message(G, d_base) != v_col)
        d_std_osd = standard_osd_decoder(H, y_col, snr)
        errors_std_osd += np.count_nonzero(get_message(G, d_std_osd) != v_col)
        # 在调用时传入模型和设备
        d_cnn_osd = cnn_enhanced_decoder(H, y_col, snr, cnn_model, device)
        errors_cnn_osd += np.count_nonzero(get_message(G, d_cnn_osd) != v_col)

    ber_baseline.append(errors_baseline / (k_info * n_trials))
    ber_std_osd.append(errors_std_osd / (k_info * n_trials))
    ber_cnn_osd.append(errors_cnn_osd / (k_info * n_trials))

    end_time = time.time()
    print(f"    SNR={snr:.2f} dB 在 {end_time - start_time:.1f}s 内完成. BERs: ")
    print(f"      - Baseline:  {ber_baseline[-1]:.6f}")
    print(f"      - Std OSD:   {ber_std_osd[-1]:.6f}")
    print(f"      - CNN OSD:   {ber_cnn_osd[-1]:.6f}")

print("    仿真完成。\n")


-----------------------------------------------------------------------------
7. 结果可视化与分析
-----------------------------------------------------------------------------

In [None]:
print("--> 步骤7: 绘制性能对比图...")
# 设置matplotlib支持中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

plt.figure(figsize=(12, 8))
plt.semilogy(snrs_db, ber_baseline, 'o:', label='基线NMS译码器', color='gray', markersize=8)
plt.semilogy(snrs_db, ber_std_osd, 's--', label='NMS + 标准OSD (基于LLR)', color='indianred', markersize=8)
plt.semilogy(snrs_db, ber_cnn_osd, '^-', label='NMS + CNN增强OSD (PyTorch版)', color='royalblue', markersize=8)

plt.title(f'({n_code}, {k_info}) LDPC码不同译码方案性能对比', fontsize=16)
plt.xlabel('信噪比 SNR (dB)', fontsize=12)
plt.ylabel('误比特率 BER', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, which="both", ls="--", alpha=0.5)
plt.ylim(1e-5, 1)
plt.show()

print("--> 脚本执行完毕。")
