# 01 — Data Preparation and Visualization (RadioML 2016.10b)

This notebook loads, explores, visualizes, preprocesses, splits, and saves the RadioML 2016.10b dataset for Automatic Modulation Classification (AMC). It follows the project standards: English Markdown, strict plot styling (save first then show), reproducibility, and no data leakage.

Outputs: preprocessed NumPy arrays in 1D and 2D shapes, along with fitted preprocessing objects saved for reuse.

In [None]:
# Imports, global style, seeds, and GPU check
import os, sys, json, pickle, math, random, pathlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# Plotly optional (for advanced visualizations if desired)
import plotly.graph_objects as go

import sklearn
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler
from sklearn.utils import check_random_state

import tensorflow as tf
from tensorflow import keras

# Style and aesthetics
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_style('whitegrid')

# Reproducibility
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

# GPU availability
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f'GPU detected: {gpus}')
else:
    print('No GPU detected by TensorFlow.')

In [None]:
# Paths and Google Drive mount (for Colab)
from datetime import datetime

# Define project root and data paths (per spec)
PROJECT_ROOT_PATH = '/content/drive/MyDrive/DTVT_IUH_2025/AMC_RML2016_10b/'
RAW_DATA_PATH = '/content/drive/MyDrive/RML2016.10b.dat'

# Attempt to mount Google Drive in Colab; ignore errors outside Colab
try:
    from google.colab import drive  # type: ignore
    drive.mount('/content/drive')
    print('Google Drive mounted.')
except Exception as e:
    print('Not running in Colab or Drive mount skipped. Proceeding with local paths if set.')

# Construct subfolder paths
PROCESSED_DATA_PATH = os.path.join(PROJECT_ROOT_PATH, 'data/')
VISUALIZATIONS_PATH = os.path.join(PROJECT_ROOT_PATH, 'visualizations/')
PREPROCESSING_OBJECTS_PATH = os.path.join(PROJECT_ROOT_PATH, 'preprocessing_objects/')

# Create directories if they do not exist
for d in [PROJECT_ROOT_PATH, PROCESSED_DATA_PATH, VISUALIZATIONS_PATH, PREPROCESSING_OBJECTS_PATH]:
    os.makedirs(d, exist_ok=True)
print('Directories ensured.')

In [None]:
# Load RadioML 2016.10b dataset and convert to DataFrame
def load_radioml_2016_10b(path):
    """
    Load the RadioML 2016.10b .dat file using pickle.
    Parameters:
        path (str): Absolute path to RML2016.10b.dat
    Returns:
        dict: Keys are (modulation, snr), values are np.ndarray of shape (N, 2, 128)
    """
    with open(path, 'rb') as f:
        data = pickle.load(f, encoding='latin1')
    return data

def to_dataframe(data_dict):
    """
    Convert RadioML dict to a pandas DataFrame with columns: signal (object), modulation (str), snr (int).
    Parameters:
        data_dict (dict): {(modulation:str, snr:int): np.ndarray(N, 2, 128)}
    Returns:
        pd.DataFrame
    """
    records = []
    for (mod, snr), arr in data_dict.items():
        for i in range(arr.shape[0]):
            records.append({
                'signal': arr[i],
                'modulation': str(mod),
                'snr': int(snr)
            })
    df = pd.DataFrame.from_records(records)
    return df

data_dict = load_radioml_2016_10b(RAW_DATA_PATH)
df = to_dataframe(data_dict)

display(df.head())

print('\nDataFrame info:')
df.info()

mods = sorted(df['modulation'].unique().tolist())
snrs = sorted(df['snr'].unique().tolist())
print('\nUnique modulations:', mods)
print('Unique SNRs:', snrs)


In [None]:
# Distribution analysis and countplot
mod_counts = df['modulation'].value_counts().sort_index()
snr_counts = df['snr'].value_counts().sort_index()
pivot_counts = pd.pivot_table(df, index='modulation', columns='snr', values='signal', aggfunc='count', fill_value=0)

print('Samples per Modulation:')
print(mod_counts)
print('\nSamples per SNR:')
print(snr_counts)
print('\nSamples per (Modulation, SNR):')
display(pivot_counts)

# Countplot of modulation distribution
plt.figure(figsize=(12, 4))
sns.countplot(x='modulation', data=df, order=mods, palette='tab10')
plt.title('Distribution of Modulation Types', fontsize=12)
plt.xlabel('Modulation')
plt.ylabel('Count')
plt.xticks(rotation=0)
plt.margins(x=0.02)
plt.tight_layout()
plt.savefig(os.path.join(VISUALIZATIONS_PATH, 'modulation_distribution.png'), dpi=300)
plt.show()

# Null check
print('\nNull values per column:')
print(df.isnull().sum())


In [None]:
# Extract NumPy arrays X (signals), Y (labels), Z (SNR)
# X shape target: (N, 2, 128)
X = np.stack(df['signal'].values, axis=0)
Y = df['modulation'].values.astype(str)
Z = df['snr'].values.astype(int)

print('X shape:', X.shape, '| Y shape:', Y.shape, '| Z shape:', Z.shape)

## Raw Data Visualization (SNR = 18 dB)

For each modulation type, we visualize: I/Q waveform, amplitude, wrapped phase, and the constellation diagram. We improve line clarity using thicker line widths (no data mutation).

In [None]:
# Visualization helpers (non-mutating)
def _ensure_dir(path):
    os.makedirs(path, exist_ok=True)

def plot_iq_waveform(i, q, title, save_path):
    """
    Plot I and Q waveforms on the same time axis.
    Inputs: i, q (1D arrays of shape (128,))
    """
    plt.figure(figsize=(12, 4))
    t = np.arange(len(i))
    plt.plot(t, i, label='I', linewidth=2.0)
    plt.plot(t, q, label='Q', linewidth=2.0)
    plt.title(title)
    plt.xlabel('Time (samples)')
    plt.ylabel('Amplitude')
    plt.legend(loc='best')
    plt.margins(x=0.02, y=0.05)
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.show()

def plot_amplitude(signal_complex, title, save_path):
    amp = np.abs(signal_complex)
    plt.figure(figsize=(12, 4))
    t = np.arange(len(amp))
    plt.plot(t, amp, color='#4C78A8', linewidth=2.0)
    plt.title(title)
    plt.xlabel('Time (samples)')
    plt.ylabel('Amplitude')
    plt.margins(x=0.02, y=0.05)
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.show()

def plot_phase(signal_complex, title, save_path):
    phase = np.angle(signal_complex)  # wrapped to [-pi, pi]
    plt.figure(figsize=(12, 4))
    t = np.arange(len(phase))
    plt.plot(t, phase, color='#F58518', linewidth=2.0)
    plt.title(title)
    plt.xlabel('Time (samples)')
    plt.ylabel('Phase (rad)')
    plt.margins(x=0.02, y=0.05)
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.show()

def plot_constellation(i, q, title, save_path):
    plt.figure(figsize=(6, 6))
    plt.scatter(i, q, s=12, alpha=0.8, c=np.linspace(0,1,len(i)), cmap='viridis', edgecolors='none')
    plt.title(title)
    plt.xlabel('In-phase (I)')
    plt.ylabel('Quadrature (Q)')
    # Add small margins so points are not at edges
    x_min, x_max = np.min(i), np.max(i)
    y_min, y_max = np.min(q), np.max(q)
    dx = (x_max - x_min) * 0.05 + 1e-6
    dy = (y_max - y_min) * 0.05 + 1e-6
    plt.xlim(x_min - dx, x_max + dx)
    plt.ylim(y_min - dy, y_max + dy)
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.show()

In [None]:
# Generate visualizations at SNR = 18 dB, one set per modulation
_ensure_dir(VISUALIZATIONS_PATH)
SNR_TARGET = 18
df_18 = df[df['snr'] == SNR_TARGET]

for mod in mods:
    sub = df_18[df_18['modulation'] == mod]
    if len(sub) == 0:
        print(f'No samples for {mod} at SNR={SNR_TARGET} dB')
        continue
    sig = sub['signal'].iloc[0]  # shape (2, 128)
    I = sig[0].astype(float)
    Q = sig[1].astype(float)
    c = I + 1j * Q
    # I/Q waveform
    plot_iq_waveform(I, Q, f'I/Q Waveform — {mod} @ {SNR_TARGET} dB', os.path.join(VISUALIZATIONS_PATH, f'iq_waveform_{mod}_snr{SNR_TARGET}.png'))
    # Amplitude
    plot_amplitude(c, f'Amplitude — {mod} @ {SNR_TARGET} dB', os.path.join(VISUALIZATIONS_PATH, f'amplitude_{mod}_snr{SNR_TARGET}.png'))
    # Wrapped Phase
    plot_phase(c, f'Wrapped Phase — {mod} @ {SNR_TARGET} dB', os.path.join(VISUALIZATIONS_PATH, f'phase_{mod}_snr{SNR_TARGET}.png'))
    # Constellation
    plot_constellation(I, Q, f'Constellation — {mod} @ {SNR_TARGET} dB', os.path.join(VISUALIZATIONS_PATH, f'constellation_{mod}_snr{SNR_TARGET}.png'))

## Exact Stratified Split (80/10/10) by (Modulation, SNR)

In [None]:
# Perform exact 80/10/10 split per (mod, snr) pair with reproducibility
def stratified_split_by_pair(X, Y, Z, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1, seed=SEED):
    """
    Exact split per (modulation, SNR) pair using deterministic shuffling.
    Fits the specified ratios exactly within each group.
    """
    assert abs(train_ratio + val_ratio - 0.9) < 1e-6 and abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, 'Ratios must sum to 1.0'
    rng = np.random.RandomState(seed)
    # Indices grouped by (mod, snr)
    keys = np.array([Y, Z]).T  # shape (N, 2)
    # Build mapping from (mod, snr) -> indices
    from collections import defaultdict
    groups = defaultdict(list)
    for idx, (mod, snr) in enumerate(keys):
        groups[(mod, int(snr))].append(idx)

    train_idx, val_idx, test_idx = [], [], []
    for k, idxs in groups.items():
        idxs = np.array(idxs)
        rng.shuffle(idxs)
        n = len(idxs)
        n_train = int(round(train_ratio * n))
        n_val = int(round(val_ratio * n))
        n_test = n - n_train - n_val
        train_idx.extend(idxs[:n_train])
        val_idx.extend(idxs[n_train:n_train+n_val])
        test_idx.extend(idxs[n_train+n_val:])
    return (np.array(train_idx), np.array(val_idx), np.array(test_idx))

train_idx, val_idx, test_idx = stratified_split_by_pair(X, Y, Z, 0.8, 0.1, 0.1, SEED)

def split_arrays(X, Y, Z, idx):
    return X[idx], Y[idx], Z[idx]

X_train, y_train, snr_train = split_arrays(X, Y, Z, train_idx)
X_val, y_val, snr_val = split_arrays(X, Y, Z, val_idx)
X_test, y_test, snr_test = split_arrays(X, Y, Z, test_idx)

print('Train:', X_train.shape, '| Val:', X_val.shape, '| Test:', X_test.shape)

# Verify proportion per pair
def count_per_pair(labels, snrs):
    s = pd.Series([f'{l}|{int(s)}' for l, s in zip(labels, snrs)])
    return s.value_counts().sort_index()

print('\nCheck per (mod,SNR) counts in each split:')
print('Train pairs:', count_per_pair(y_train, snr_train).head())
print('Val pairs:', count_per_pair(y_val, snr_val).head())
print('Test pairs:', count_per_pair(y_test, snr_test).head())

## Label Encoding (fit on y_train only) and One-Hot

In [None]:
# Label encoding and one-hot (no leakage)
label_encoder = LabelEncoder()
label_encoder.fit(y_train)  # fit on train only

y_train_int = label_encoder.transform(y_train)
y_val_int = label_encoder.transform(y_val)
y_test_int = label_encoder.transform(y_test)

# One-hot encoding
num_classes = len(label_encoder.classes_)
def to_one_hot(y_int, n_classes):
    y_oh = np.zeros((len(y_int), n_classes), dtype=np.float32)
    y_oh[np.arange(len(y_int)), y_int] = 1.0
    return y_oh

y_train_oh = to_one_hot(y_train_int, num_classes)
y_val_oh = to_one_hot(y_val_int, num_classes)
y_test_oh = to_one_hot(y_test_int, num_classes)

# Save LabelEncoder
with open(os.path.join(PREPROCESSING_OBJECTS_PATH, 'label_encoder.pkl'), 'wb') as f:
    pickle.dump(label_encoder, f)
print('Label encoder saved.')
print('Classes:', label_encoder.classes_.tolist())

## Z-Score Normalization (fit on X_train only)

In [None]:
# Standardize per-channel using StandardScaler
# Reshape to (N*128, 2) to scale channels independently
def reshape_for_scaler(X):
    N = X.shape[0]
    return X.transpose(0, 2, 1).reshape(-1, 2)  # (N,2,128)->(N,128,2)->(N*128,2)

def reshape_back_from_scaler(X_scaled_flat, N):
    Xs = X_scaled_flat.reshape(N, 128, 2).transpose(0, 2, 1)  # (N,128,2)->(N,2,128)
    return Xs

scaler = StandardScaler()
X_train_flat = reshape_for_scaler(X_train)
scaler.fit(X_train_flat)  # fit on train only

X_train_scaled = reshape_back_from_scaler(scaler.transform(X_train_flat), X_train.shape[0])
X_val_scaled = reshape_back_from_scaler(scaler.transform(reshape_for_scaler(X_val)), X_val.shape[0])
X_test_scaled = reshape_back_from_scaler(scaler.transform(reshape_for_scaler(X_test)), X_test.shape[0])

# Save scaler
with open(os.path.join(PREPROCESSING_OBJECTS_PATH, 'standard_scaler.pkl'), 'wb') as f:
    pickle.dump(scaler, f)
print('Standard scaler saved.')

print('Scaled shapes:', X_train_scaled.shape, X_val_scaled.shape, X_test_scaled.shape)

## Reshape to 1D and 2D formats and Save

In [None]:
# 1D format: (samples, 128, 2)
def to_1d_format(X):
    return X.transpose(0, 2, 1)  # (N,2,128)->(N,128,2)

# 2D format: (samples, 2, 128, 1)
def to_2d_format(X):
    return X[:, :, :, np.newaxis]  # (N,2,128)->(N,2,128,1)

X_train_1d = to_1d_format(X_train_scaled)
X_val_1d = to_1d_format(X_val_scaled)
X_test_1d = to_1d_format(X_test_scaled)

X_train_2d = to_2d_format(X_train_scaled)
X_val_2d = to_2d_format(X_val_scaled)
X_test_2d = to_2d_format(X_test_scaled)

np.save(os.path.join(PROCESSED_DATA_PATH, 'X_train_1d.npy'), X_train_1d)
np.save(os.path.join(PROCESSED_DATA_PATH, 'X_val_1d.npy'), X_val_1d)
np.save(os.path.join(PROCESSED_DATA_PATH, 'X_test_1d.npy'), X_test_1d)

np.save(os.path.join(PROCESSED_DATA_PATH, 'X_train_2d.npy'), X_train_2d)
np.save(os.path.join(PROCESSED_DATA_PATH, 'X_val_2d.npy'), X_val_2d)
np.save(os.path.join(PROCESSED_DATA_PATH, 'X_test_2d.npy'), X_test_2d)

np.save(os.path.join(PROCESSED_DATA_PATH, 'y_train.npy'), y_train_oh)
np.save(os.path.join(PROCESSED_DATA_PATH, 'y_val.npy'), y_val_oh)
np.save(os.path.join(PROCESSED_DATA_PATH, 'y_test.npy'), y_test_oh)

np.save(os.path.join(PROCESSED_DATA_PATH, 'snr_train.npy'), snr_train)
np.save(os.path.join(PROCESSED_DATA_PATH, 'snr_val.npy'), snr_val)
np.save(os.path.join(PROCESSED_DATA_PATH, 'snr_test.npy'), snr_test)

print('Saved all processed datasets and labels/SNRs.')
print('1D shapes:', X_train_1d.shape, X_val_1d.shape, X_test_1d.shape)
print('2D shapes:', X_train_2d.shape, X_val_2d.shape, X_test_2d.shape)