In [1]:
# # RadioML 2016.10a 数据预处理 (用于 CNN-LSTM 模型)
#
# 本 Notebook 的目标是加载 RadioML 2016.10a 数据集，并对其进行预处理，
# 为训练基于 CNN-LSTM 的调制识别模型准备数据。
#
# 主要步骤包括：
# 1. 加载数据集 (`.pkl` 文件)。
# 2. 理解数据结构。
# 3. 提取信号 (I/Q 数据)、调制类型标签和 SNR 值。
# 4. 调整数据形状为 (样本数, 时间步长, 特征数)。
# 5. 对 I/Q 信号数据进行平均功率归一化。
# 6. 将标签转换为整数编码和 One-Hot 编码。
# 7. 将数据集划分为训练集、验证集和测试集（分层抽样）。
# 8. 保存处理后的数据和标签映射。

# ## 1. 导入必要的库

import pickle
import numpy as np
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import gc # 用于垃圾回收

In [2]:
# ## 2. 加载数据集

data_file = './data/RML2016.10a_dict.pkl' # 数据集文件路径

if not os.path.exists(data_file):
    print(f"错误：数据集文件 '{data_file}' 未找到。请确保路径正确。")
    # 在实际应用中，这里可以引发异常或执行其他错误处理逻辑
else:
    print(f"正在加载数据集: {data_file} ...")
    with open(data_file, 'rb') as f:
        # 使用 latin1 编码加载 pickle 文件，这是 RadioML 数据集常用的编码方式
        radio_data = pickle.load(f, encoding='latin1')
    print("数据集加载完成。")

正在加载数据集: ./data/RML2016.10a_dict.pkl ...
数据集加载完成。


In [3]:
# ## 3. 理解和探索数据结构
# 数据集是一个字典，键为元组 `(调制类型, 信噪比)`，
# 值是 NumPy 数组，形状为 `(样本数, 2, 128)`，代表 I/Q 信号数据。

if 'radio_data' in locals(): # 确保 radio_data 已加载
    print(f"数据字典包含 {len(radio_data)} 个 (调制类型, SNR) 组合。")
    # 打印前5个键作为示例，了解数据结构
    print("示例键:", list(radio_data.keys())[:5])

    # 从字典的键中提取所有调制类型和SNR值
    mods, snrs = map(tuple, zip(*radio_data.keys()))
    modulation_types = sorted(list(set(mods))) # 去重并排序
    snr_values = sorted(list(set(snrs)))       # 去重并排序

    print(f"\n调制类型 ({len(modulation_types)}): {modulation_types}")
    print(f"SNR 值 ({len(snr_values)}): {snr_values}")

    # 查看一个数据块的形状，例如数据字典中的第一个条目
    sample_key = list(radio_data.keys())[0]
    sample_data_block = radio_data[sample_key]
    # 形状通常是 (N_samples_per_block, 2, 128)，其中 2 代表 I 和 Q，128 是时间序列长度
    print(f"\n调制类型 '{sample_key[0]}' 在 SNR {sample_key[1]} dB 下的一个数据块形状: {sample_data_block.shape}")
else:
    print("错误：数据集 'radio_data' 未加载，请先执行加载数据的步骤。")

数据字典包含 220 个 (调制类型, SNR) 组合。
示例键: [('QPSK', 2), ('PAM4', 8), ('AM-DSB', -4), ('GFSK', 6), ('QAM64', 8)]

调制类型 (11): ['8PSK', 'AM-DSB', 'AM-SSB', 'BPSK', 'CPFSK', 'GFSK', 'PAM4', 'QAM16', 'QAM64', 'QPSK', 'WBFM']
SNR 值 (20): [-20, -18, -16, -14, -12, -10, -8, -6, -4, -2, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

调制类型 'QPSK' 在 SNR 2 dB 下的一个数据块形状: (1000, 2, 128)


In [4]:
# ## 4. 数据提取、格式化与平均功率归一化
# 此模块负责：
# 1. 从原始数据字典中提取所有 I/Q 信号样本、对应的调制类型标签和 SNR 值。
# 2. 将提取的信号数据 (X) 堆叠成一个大的 NumPy 数组。
# 3. 调整 X 的形状从 (N, 2, 128) 变为 (N, 128, 2)，以适应 CNN/LSTM 模型的输入要求 (通常时间步长在前)。
# 4. 对每个信号样本进行平均功率归一化。

if 'radio_data' in locals(): # 确保 radio_data 已加载
    all_signals = []
    all_labels = []
    all_snrs_list = []
    print("开始提取和整理数据...")
    for key in radio_data.keys(): # 遍历字典中的每个 (调制类型, SNR) 组合
        mod, snr = key
        signals = radio_data[key] # 获取该组合下的所有信号样本
        all_signals.append(signals)
        num_samples = signals.shape[0]
        all_labels.extend([mod] * num_samples) # 为每个样本创建标签
        all_snrs_list.extend([snr] * num_samples) # 为每个样本记录SNR

    # 将列表转换为 NumPy 数组
    X_original = np.vstack(all_signals) # 垂直堆叠所有信号数据块
    y_labels = np.array(all_labels)
    snr_array = np.array(all_snrs_list)
    print(f"原始 X 形状: {X_original.shape}") # 应为 (总样本数, 2, 128)
    print(f"原始 y 形状: {y_labels.shape}")   # 应为 (总样本数,)
    print(f"SNR 数组形状: {snr_array.shape}") # 应为 (总样本数,)

    # 调整 X 的形状: (N, 2, 128) -> (N, 128, 2)
    # 将 I/Q 通道作为特征，时间序列长度为 128
    if X_original.shape[1] == 2 and X_original.shape[2] == 128:
        X_reshaped = X_original.transpose(0, 2, 1)
        print(f"调整后的 X 形状: {X_reshaped.shape}") # 应为 (总样本数, 128, 2)
    else:
        print(f"警告: X 的原始形状 {X_original.shape} 不是预期的 (N, 2, 128)，跳过转置。")
        X_reshaped = X_original # 如果形状不符合预期，则不进行转置

    X_reshaped = X_reshaped.astype(np.float32) # 转换为 float32 类型以节省内存并兼容深度学习框架

    # --- 平均功率归一化 ---
    # 对每个样本（每个 (128, 2) 的 I/Q 序列）进行归一化，使其平均功率为 1。
    # 这有助于稳定训练过程，并使模型对信号的绝对幅度不那么敏感。
    print("\n开始进行平均功率归一化 (逐样本)...")

    def normalize_power(X_set):
        """对每个样本进行平均功率归一化。
        参数:
            X_set (np.ndarray): 输入信号数据，形状为 (N, L, 2)，N为样本数，L为序列长度。
        返回:
            np.ndarray: 归一化后的信号数据。
        """
        X_normalized = np.zeros_like(X_set, dtype=np.float32)
        epsilon = 1e-8 # 添加一个极小值以防止除以零
        for i in range(X_set.shape[0]):
            # 将 I, Q 分量重构为复数信号
            sample_iq = X_set[i, :, 0] + 1j * X_set[i, :, 1] # 形状 (L,)
            # 计算平均功率 P = E[|s(t)|^2]
            avg_power = np.mean(np.abs(sample_iq)**2)
            # 计算归一化因子 1 / sqrt(P)
            norm_factor = np.sqrt(avg_power + epsilon)
            # 应用归一化
            normalized_sample_iq = sample_iq / norm_factor
            # 分别存储归一化后的 I, Q 分量
            X_normalized[i, :, 0] = normalized_sample_iq.real
            X_normalized[i, :, 1] = normalized_sample_iq.imag
        return X_normalized

    X_norm = normalize_power(X_reshaped)
    print("平均功率归一化完成。")
    print(f"归一化后 X_norm 形状: {X_norm.shape}")

    # 清理原始和中间 X 数组以节省内存
    del X_original, X_reshaped
    gc.collect()
else:
    print("错误：数据集 'radio_data' 未加载，无法执行数据提取和归一化。")

开始提取和整理数据...
原始 X 形状: (220000, 2, 128)
原始 y 形状: (220000,)
SNR 数组形状: (220000,)
调整后的 X 形状: (220000, 128, 2)

开始进行平均功率归一化 (逐样本)...
平均功率归一化完成。
归一化后 X_norm 形状: (220000, 128, 2)


In [5]:
# ## 5. 标签编码
# 将文本格式的调制类型标签转换为机器学习模型可以使用的数值格式。
# 1. 整数编码：将每个唯一的调制类型字符串映射到一个整数。
#    例如：'8PSK' -> 0, 'AM-DSB' -> 1, ...
#    这对于分层抽样和某些评估指标是必需的。
# 2. One-Hot 编码：将整数编码的标签转换为二进制向量。
#    例如：如果类别总数为 11，整数标签 0 (代表 '8PSK') 可能被编码为 [1, 0, 0, ..., 0]。
#    这是神经网络分类模型常用的目标变量格式。

if 'y_labels' in locals() and 'X_norm' in locals(): # 确保依赖数据已准备好
    # 1. 整数编码
    label_encoder = LabelEncoder()
    y_numerical = label_encoder.fit_transform(y_labels) # y_labels 是包含字符串标签的数组
    print(f"数值标签形状: {y_numerical.shape}")
    # print(f"前 10 个数值标签: {y_numerical[:10]}") # 可取消注释以查看示例

    # 获取标签映射关系 (整数 -> 原始字符串标签)
    # 这对于后续结果分析和解释非常有用
    label_mapping = {i: label for i, label in enumerate(label_encoder.classes_)}
    print(f"\n标签映射关系: {label_mapping}")
    num_classes = len(label_mapping)
    print(f"类别总数: {num_classes}")

    # 2. One-Hot 编码
    # sparse_output=False 表示返回一个密集的 NumPy 数组，而不是稀疏矩阵
    one_hot_encoder = OneHotEncoder(sparse_output=False, categories='auto')
    # y_numerical 需要 reshape 为 (n_samples, 1) 以适应 OneHotEncoder 的输入要求
    y_one_hot = one_hot_encoder.fit_transform(y_numerical.reshape(-1, 1))

    print(f"\nOne-Hot 编码标签形状: {y_one_hot.shape}") # 应为 (总样本数, num_classes)
    # print(f"第一个样本 ({y_labels[0]} -> {y_numerical[0]}) 的 One-Hot 编码: \n{y_one_hot[0]}") # 可取消注释以查看示例
else:
    print("错误：'y_labels' 或 'X_norm' 未定义，请先执行前面的数据处理步骤。")

数值标签形状: (220000,)

标签映射关系: {0: '8PSK', 1: 'AM-DSB', 2: 'AM-SSB', 3: 'BPSK', 4: 'CPFSK', 5: 'GFSK', 6: 'PAM4', 7: 'QAM16', 8: 'QAM64', 9: 'QPSK', 10: 'WBFM'}
类别总数: 11

One-Hot 编码标签形状: (220000, 11)


In [6]:
# ## 6. 数据集划分
# 将经过预处理的数据集（包括归一化后的信号 X_norm、整数标签 y_numerical、
# One-Hot 编码标签 y_one_hot 和 SNR 值 snr_array）划分为训练集、验证集和测试集。
# - 训练集 (Training set): 用于训练模型。
# - 验证集 (Validation set): 用于在训练过程中调整模型超参数和监控模型性能，防止过拟合。
# - 测试集 (Test set): 用于在模型训练完成后，评估模型的最终泛化能力。
#
# 使用分层抽样 (stratify=y_numerical) 以确保每个数据子集中类别标签的比例与原始数据集中大致相同。
# 这对于类别不平衡的数据集尤其重要。

if 'X_norm' in locals() and 'y_numerical' in locals() and 'y_one_hot' in locals() and 'snr_array' in locals():
    test_size = 0.2  # 测试集占总数据的 20%
    # 验证集占剩余数据 (1 - test_size) 的比例。
    # 例如，如果 test_size=0.2, val_size_relative=0.25,
    # 则验证集占 (1-0.2) * 0.25 = 0.8 * 0.25 = 0.2 (即总数据的20%)
    # 训练集则为 1 - 0.2 - 0.2 = 0.6 (即总数据的60%)
    # 所以最终比例是 60% 训练, 20% 验证, 20% 测试
    val_size_relative = 0.25 # 验证集占 (训练+验证) 部分的 25%

    indices = np.arange(X_norm.shape[0]) # 创建样本索引，用于追踪划分

    # 第一次划分：从完整数据集中分离出测试集
    # X_norm: 归一化后的信号数据
    # y_numerical: 整数编码的标签 (用于分层抽样)
    # y_one_hot: One-Hot 编码的标签
    # snr_array: 每个样本对应的 SNR 值
    # indices: 样本的原始索引 (如果需要追踪)
    X_train_val_norm, X_test_norm, \
    y_train_val_num, y_test_num, \
    y_train_val_onehot, y_test_onehot, \
    snr_train_val, snr_test, \
    indices_train_val, indices_test = train_test_split(
        X_norm, y_numerical, y_one_hot, snr_array, indices,
        test_size=test_size,
        random_state=42,       # 设置随机种子以保证结果可复现
        stratify=y_numerical   # 基于整数标签进行分层抽样
    )

    # 第二次划分：从剩余的 (训练+验证) 数据中分离出验证集
    # 使用第一次划分得到的 y_train_val_num 进行分层
    X_train_norm, X_val_norm, \
    y_train_num, y_val_num, \
    y_train_onehot, y_val_onehot, \
    snr_train, snr_val, \
    indices_train, indices_val = train_test_split(
        X_train_val_norm, y_train_val_num, y_train_val_onehot, snr_train_val, indices_train_val,
        test_size=val_size_relative,
        random_state=42,       # 同样的随机种子，确保一致性（虽然这里的划分基于不同子集）
        stratify=y_train_val_num # 基于 y_train_val_num 进行分层
    )

    print("数据集划分完成。")
    print(f"训练集:   X_norm={X_train_norm.shape}, y_onehot={y_train_onehot.shape}, y_num={y_train_num.shape}, SNR={snr_train.shape}")
    print(f"验证集:   X_norm={X_val_norm.shape}, y_onehot={y_val_onehot.shape}, y_num={y_val_num.shape}, SNR={snr_val.shape}")
    print(f"测试集:   X_norm={X_test_norm.shape}, y_onehot={y_test_onehot.shape}, y_num={y_test_num.shape}, SNR={snr_test.shape}")

    # 清理第一次划分后产生的中间变量，释放内存
    del X_train_val_norm, y_train_val_num, y_train_val_onehot, snr_train_val, indices_train_val
    # 也清理掉不再需要的完整数据集的索引和标签数组
    del y_labels, y_numerical, y_one_hot, snr_array, indices # X_norm 已被划分为更小的集合
    gc.collect()
else:
    print("错误：进行数据集划分所需的变量（如 X_norm, y_numerical等）未定义。请确保前面的步骤已成功执行。")

数据集划分完成。
训练集:   X_norm=(132000, 128, 2), y_onehot=(132000, 11), y_num=(132000,), SNR=(132000,)
验证集:   X_norm=(44000, 128, 2), y_onehot=(44000, 11), y_num=(44000,), SNR=(44000,)
测试集:   X_norm=(44000, 128, 2), y_onehot=(44000, 11), y_num=(44000,), SNR=(44000,)


In [7]:
# ## 7. 保存处理后的数据
# 将划分好的训练集、验证集、测试集数据以及标签映射关系保存到磁盘。
# 保存为 `.npy` 文件格式，这是 NumPy 用于存储数组的标准二进制格式。
# 这些文件可以直接被后续的模型训练脚本加载。

# 检查必要的变量是否存在
required_vars_for_saving = [
    'X_train_norm', 'X_val_norm', 'X_test_norm',
    'y_train_onehot', 'y_val_onehot', 'y_test_onehot',
    'y_train_num', 'y_val_num', 'y_test_num',
    'snr_train', 'snr_val', 'snr_test',
    'label_mapping'
]
if all(var in locals() for var in required_vars_for_saving):
    # 定义保存处理后数据的目录名
    output_dir_dl_norm = 'processed_cnn_lstm_data_powernorm'
    os.makedirs(output_dir_dl_norm, exist_ok=True) # exist_ok=True 表示如果目录已存在则不报错

    print(f"\n正在将处理后的 CNN-LSTM (功率归一化) 数据保存到 '{output_dir_dl_norm}' 目录...")

    # 保存归一化后的 X 数据 (信号数据)
    np.save(os.path.join(output_dir_dl_norm, 'X_train_norm.npy'), X_train_norm)
    np.save(os.path.join(output_dir_dl_norm, 'X_val_norm.npy'), X_val_norm)
    np.save(os.path.join(output_dir_dl_norm, 'X_test_norm.npy'), X_test_norm)

    # 保存 One-Hot 编码的 Y 数据 (通常用作模型训练的目标)
    np.save(os.path.join(output_dir_dl_norm, 'y_train_onehot.npy'), y_train_onehot)
    np.save(os.path.join(output_dir_dl_norm, 'y_val_onehot.npy'), y_val_onehot)
    np.save(os.path.join(output_dir_dl_norm, 'y_test_onehot.npy'), y_test_onehot)

    # 保存整数编码的 Y 数据 (可用于评估函数计算指标，如 sklearn.metrics)
    np.save(os.path.join(output_dir_dl_norm, 'y_train_num.npy'), y_train_num)
    np.save(os.path.join(output_dir_dl_norm, 'y_val_num.npy'), y_val_num)
    np.save(os.path.join(output_dir_dl_norm, 'y_test_num.npy'), y_test_num)

    # 保存 SNR 数据 (可用于按 SNR 分析模型性能)
    np.save(os.path.join(output_dir_dl_norm, 'snr_train.npy'), snr_train)
    np.save(os.path.join(output_dir_dl_norm, 'snr_val.npy'), snr_val)
    np.save(os.path.join(output_dir_dl_norm, 'snr_test.npy'), snr_test)

    # 保存标签映射关系 (整数编码 -> 原始调制类型字符串)
    np.save(os.path.join(output_dir_dl_norm, 'label_mapping.npy'), label_mapping)

    print("\n数据保存完成。")
    print(f"文件列表 ({output_dir_dl_norm}):")
    for filename in sorted(os.listdir(output_dir_dl_norm)): #排序使输出更规整
        print(f" - {filename}")
else:
    print("错误：并非所有待保存的数据都已定义。请检查前面的步骤是否都已成功执行。")

错误：并非所有待保存的数据都已定义。请检查前面的步骤是否都已成功执行。


In [8]:
# ## 8. 总结
#
# 此 Notebook 为 CNN-LSTM 模型预处理了 RadioML 2016.10a 数据集：
# 1. 加载了原始数据。
# 2. 提取了 I/Q 信号、标签和 SNR。
# 3. 将 X 数据调整为 (样本数, 128, 2) 形状。
# 4. 对 X 数据进行了**平均功率归一化**。
# 5. 进行了标签整数编码和 One-Hot 编码。
# 6. 划分了训练/验证/测试集 (60%/20%/20%)，并采用分层抽样。
# 7. 将处理后的数据 (归一化 X, 两种编码的标签 Y, SNR) 和标签映射保存到了 `processed_cnn_lstm_data_powernorm` 目录中。
#
# 这些数据可用于后续的 CNN-LSTM 模型训练和评估。
#
# --- End of Notebook ---