# Deep Dive Analysis - Advanced Signal Transforms for Anomaly Detection

**Objective**: Find discriminative features/transforms that can separate anomaly classes (6,7,8) from known classes (0-5).

Based on EDA findings:
- Anomalies resemble low-power classes (3,4,5) in simple features
- Class 7 is hardest (mixed with 3,4,5 in t-SNE)
- Simple features achieve only 22% recall
- Different IQ constellations suggest different modulation schemes

**Approaches to explore:**
1. Higher-order statistics (cumulants) - crucial for modulation classification
2. Cyclostationary features - radio signals have periodic statistics
3. Wavelet transforms - multi-resolution analysis
4. Phase-based features - instantaneous phase, unwrapped phase dynamics
5. Entropy measures - spectral, sample, permutation entropy
6. Constellation geometry - cluster analysis of IQ plane
7. Bispectrum analysis - higher-order spectra

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import signal as scipy_signal
from scipy.fft import fft, fftfreq, fftshift
from scipy.stats import kurtosis, skew, entropy
from scipy.spatial.distance import cdist
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score
import warnings
warnings.filterwarnings('ignore')

from data_utils import load_train_data, load_test_anomalies, create_binary_labels, filter_by_snr

# Plot settings
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (14, 6)
plt.rcParams['font.size'] = 11

KNOWN_COLOR = 'tab:blue'
ANOMALY_COLOR = 'tab:red'
CLASS_COLORS = plt.cm.tab10(np.arange(10))

np.random.seed(42)

In [None]:
# Load data
train_signals, train_labels, train_snr = load_train_data()
test_signals, test_labels, test_snr = load_test_anomalies()

# Filter out SNR=0 from training (not present in test)
train_signals_filt, train_labels_filt, train_snr_filt = filter_by_snr(
    train_signals, train_labels, train_snr, [10, 20, 30]
)

# Binary labels for test
test_binary = create_binary_labels(test_labels)

print(f"Training data (filtered): {train_signals_filt.shape[0]} samples")
print(f"Test data: {test_signals.shape[0]} samples")
print(f"Test anomalies: {test_binary.sum()} ({100*test_binary.mean():.1f}%)")

---
## 1. Higher-Order Statistics (Cumulants)

Cumulants are powerful for modulation classification. The 2nd, 4th, and 6th order cumulants can distinguish different modulation types.

Key cumulants:
- C20: E[x²] - variance
- C21: E[|x|²] - power
- C40: E[x⁴] - 3E[x²]² (4th-order, relates to kurtosis)
- C41: E[x²|x|²] - E[x²]E[|x|²]
- C42: E[|x|⁴] - |E[x²]|² - 2E[|x|²]²

In [None]:
def compute_complex_signal(signal):
    """Convert IQ to complex signal."""
    return signal[:, 0] + 1j * signal[:, 1]

def compute_cumulants(signal):
    """
    Compute higher-order cumulants for modulation classification.
    Following Swami & Sadler (2000) formulation.
    """
    x = compute_complex_signal(signal)
    n = len(x)
    
    # Normalize to unit power
    x = x / (np.sqrt(np.mean(np.abs(x)**2)) + 1e-10)
    
    # Moments
    M20 = np.mean(x**2)
    M21 = np.mean(np.abs(x)**2)  # Should be ~1 after normalization
    M40 = np.mean(x**4)
    M41 = np.mean((x**2) * (np.abs(x)**2))
    M42 = np.mean(np.abs(x)**4)
    M60 = np.mean(x**6)
    M63 = np.mean((np.abs(x)**2)**3)
    
    # Cumulants
    C20 = M20
    C21 = M21
    C40 = M40 - 3 * M20**2
    C41 = M41 - 3 * M20 * M21
    C42 = M42 - np.abs(M20)**2 - 2 * M21**2
    C60 = M60 - 15 * M20 * M40 + 30 * M20**3
    C63 = M63 - 9 * M42 * M21 + 12 * M21**3
    
    return {
        'C20_abs': np.abs(C20),
        'C20_angle': np.angle(C20),
        'C21': np.real(C21),
        'C40_abs': np.abs(C40),
        'C40_angle': np.angle(C40),
        'C41_abs': np.abs(C41),
        'C42': np.real(C42),
        'C60_abs': np.abs(C60),
        'C63': np.real(C63),
    }

In [None]:
# Compute cumulants for all signals
print("Computing cumulants for training data...")
train_cumulants = [compute_cumulants(s) for s in train_signals_filt]
train_cum_df = pd.DataFrame(train_cumulants)
train_cum_df['class'] = train_labels_filt
train_cum_df['snr'] = train_snr_filt

print("Computing cumulants for test data...")
test_cumulants = [compute_cumulants(s) for s in test_signals]
test_cum_df = pd.DataFrame(test_cumulants)
test_cum_df['class'] = test_labels
test_cum_df['snr'] = test_snr
test_cum_df['is_anomaly'] = test_binary

print("Done!")

In [None]:
# Cumulant distribution by class
print("\nMean Cumulants by Class (Test Data):")
print("="*80)
cumulant_cols = ['C20_abs', 'C40_abs', 'C41_abs', 'C42', 'C60_abs', 'C63']
display_df = test_cum_df.groupby('class')[cumulant_cols].mean().round(4)
display_df['is_anomaly'] = ['No']*6 + ['YES']*3
print(display_df)

In [None]:
# Visualize key cumulants: Known vs Anomaly
fig, axes = plt.subplots(2, 3, figsize=(16, 10))

for i, col in enumerate(cumulant_cols):
    ax = axes.flatten()[i]
    known = test_cum_df[test_cum_df['is_anomaly'] == 0][col]
    anomaly = test_cum_df[test_cum_df['is_anomaly'] == 1][col]
    
    ax.hist(known, bins=50, alpha=0.6, label='Known (0-5)', color=KNOWN_COLOR, density=True)
    ax.hist(anomaly, bins=50, alpha=0.6, label='Anomaly (6-8)', color=ANOMALY_COLOR, density=True)
    ax.set_xlabel(col)
    ax.set_ylabel('Density')
    ax.legend()
    ax.set_title(f'{col} Distribution')

plt.suptitle('Higher-Order Cumulants: Known vs Anomaly', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_cumulants_distribution.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# 2D scatter: C40 vs C42 (classic for modulation classification)
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

ax = axes[0]
for class_idx in range(9):
    mask = test_cum_df['class'] == class_idx
    color = ANOMALY_COLOR if class_idx >= 6 else CLASS_COLORS[class_idx]
    marker = 'x' if class_idx >= 6 else 'o'
    label = f'ANOMALY {class_idx}' if class_idx >= 6 else f'Class {class_idx}'
    ax.scatter(test_cum_df[mask]['C40_abs'], test_cum_df[mask]['C42'],
               c=[color], label=label, alpha=0.5, marker=marker, s=20)
ax.set_xlabel('|C40|')
ax.set_ylabel('C42')
ax.set_title('Cumulant Space: |C40| vs C42')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')

ax = axes[1]
for class_idx in range(9):
    mask = test_cum_df['class'] == class_idx
    color = ANOMALY_COLOR if class_idx >= 6 else CLASS_COLORS[class_idx]
    marker = 'x' if class_idx >= 6 else 'o'
    ax.scatter(test_cum_df[mask]['C20_abs'], test_cum_df[mask]['C60_abs'],
               c=[color], alpha=0.5, marker=marker, s=20)
ax.set_xlabel('|C20|')
ax.set_ylabel('|C60|')
ax.set_title('Cumulant Space: |C20| vs |C60|')

plt.tight_layout()
plt.savefig('plots_cumulants_scatter.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 2. Phase-Based Features

The instantaneous phase and its derivatives contain modulation information:
- Phase variance
- Phase jitter (derivative of phase)
- Phase transitions (how phase changes between samples)

In [None]:
def compute_phase_features(signal):
    """Compute phase-based features."""
    x = compute_complex_signal(signal)
    
    # Instantaneous phase
    phase = np.angle(x)
    
    # Unwrapped phase (continuous)
    phase_unwrap = np.unwrap(phase)
    
    # Phase derivative (instantaneous frequency)
    phase_diff = np.diff(phase_unwrap)
    
    # Phase statistics
    features = {
        # Direct phase stats
        'phase_mean': np.mean(phase),
        'phase_std': np.std(phase),
        'phase_var': np.var(phase),
        
        # Phase derivative (instantaneous frequency) stats
        'phase_diff_mean': np.mean(phase_diff),
        'phase_diff_std': np.std(phase_diff),
        'phase_diff_max': np.max(np.abs(phase_diff)),
        
        # Phase jitter (variance of inst. freq)
        'phase_jitter': np.var(phase_diff),
        
        # Non-linear phase component
        'phase_nonlinear': np.std(phase_unwrap - np.polyval(
            np.polyfit(np.arange(len(phase_unwrap)), phase_unwrap, 1), 
            np.arange(len(phase_unwrap)))),
        
        # Phase histogram entropy
        'phase_entropy': entropy(np.histogram(phase, bins=32, density=True)[0] + 1e-10),
        
        # Circular statistics
        'phase_circular_mean': np.abs(np.mean(np.exp(1j * phase))),
        'phase_circular_var': 1 - np.abs(np.mean(np.exp(1j * phase))),
    }
    
    return features

In [None]:
# Compute phase features
print("Computing phase features for training data...")
train_phase = [compute_phase_features(s) for s in train_signals_filt]
train_phase_df = pd.DataFrame(train_phase)
train_phase_df['class'] = train_labels_filt
train_phase_df['snr'] = train_snr_filt

print("Computing phase features for test data...")
test_phase = [compute_phase_features(s) for s in test_signals]
test_phase_df = pd.DataFrame(test_phase)
test_phase_df['class'] = test_labels
test_phase_df['snr'] = test_snr
test_phase_df['is_anomaly'] = test_binary

print("Done!")

In [None]:
# Phase feature distribution by class
print("\nMean Phase Features by Class (Test Data):")
print("="*80)
phase_cols = ['phase_std', 'phase_jitter', 'phase_entropy', 'phase_circular_var', 'phase_diff_std']
display_df = test_phase_df.groupby('class')[phase_cols].mean().round(4)
display_df['is_anomaly'] = ['No']*6 + ['YES']*3
print(display_df)

In [None]:
# Visualize phase features
fig, axes = plt.subplots(2, 3, figsize=(16, 10))

cols_to_plot = ['phase_std', 'phase_jitter', 'phase_entropy', 'phase_circular_var', 'phase_diff_std', 'phase_nonlinear']
for i, col in enumerate(cols_to_plot):
    ax = axes.flatten()[i]
    known = test_phase_df[test_phase_df['is_anomaly'] == 0][col]
    anomaly = test_phase_df[test_phase_df['is_anomaly'] == 1][col]
    
    ax.hist(known, bins=50, alpha=0.6, label='Known (0-5)', color=KNOWN_COLOR, density=True)
    ax.hist(anomaly, bins=50, alpha=0.6, label='Anomaly (6-8)', color=ANOMALY_COLOR, density=True)
    ax.set_xlabel(col)
    ax.legend()
    ax.set_title(f'{col}')

plt.suptitle('Phase Features: Known vs Anomaly', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_phase_features.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 3. Entropy-Based Features

Different entropy measures capture signal complexity:
- Spectral entropy: Flatness of power spectrum
- Sample entropy: Signal regularity/predictability
- Permutation entropy: Time-series complexity

In [None]:
def compute_entropy_features(signal):
    """Compute various entropy measures."""
    x = compute_complex_signal(signal)
    amplitude = np.abs(x)
    
    # Spectral entropy (normalized)
    spectrum = np.abs(fft(x))**2
    spectrum_norm = spectrum / (np.sum(spectrum) + 1e-10)
    spectral_ent = entropy(spectrum_norm + 1e-10)
    
    # Amplitude histogram entropy
    amp_hist, _ = np.histogram(amplitude, bins=64, density=True)
    amp_entropy = entropy(amp_hist + 1e-10)
    
    # I and Q channel entropies
    i_hist, _ = np.histogram(signal[:, 0], bins=64, density=True)
    q_hist, _ = np.histogram(signal[:, 1], bins=64, density=True)
    i_entropy = entropy(i_hist + 1e-10)
    q_entropy = entropy(q_hist + 1e-10)
    
    # Approximate entropy (simplified)
    def approx_entropy(data, m=2, r_mult=0.2):
        """Simplified approximate entropy."""
        n = len(data)
        r = r_mult * np.std(data)
        
        # Subsample for speed
        step = max(1, n // 500)
        data_sub = data[::step]
        n_sub = len(data_sub)
        
        def count_similar(m_val):
            count = 0
            templates = np.array([data_sub[i:i+m_val] for i in range(n_sub - m_val)])
            for i in range(len(templates)):
                dists = np.max(np.abs(templates - templates[i]), axis=1)
                count += np.sum(dists <= r) - 1  # Exclude self
            return count / (len(templates) * (len(templates) - 1) + 1e-10)
        
        c_m = count_similar(m)
        c_m1 = count_similar(m + 1)
        
        return -np.log((c_m1 + 1e-10) / (c_m + 1e-10))
    
    # Compute on amplitude (simplified for speed)
    approx_ent = approx_entropy(amplitude[:512])
    
    # Spectral flatness (Wiener entropy)
    spectrum_pos = spectrum[:len(spectrum)//2]
    geometric_mean = np.exp(np.mean(np.log(spectrum_pos + 1e-10)))
    arithmetic_mean = np.mean(spectrum_pos)
    spectral_flatness = geometric_mean / (arithmetic_mean + 1e-10)
    
    return {
        'spectral_entropy': spectral_ent,
        'amplitude_entropy': amp_entropy,
        'i_entropy': i_entropy,
        'q_entropy': q_entropy,
        'approx_entropy': approx_ent,
        'spectral_flatness': spectral_flatness,
    }

In [None]:
# Compute entropy features
print("Computing entropy features for training data...")
train_entropy = [compute_entropy_features(s) for s in train_signals_filt]
train_entropy_df = pd.DataFrame(train_entropy)
train_entropy_df['class'] = train_labels_filt
train_entropy_df['snr'] = train_snr_filt

print("Computing entropy features for test data...")
test_entropy = [compute_entropy_features(s) for s in test_signals]
test_entropy_df = pd.DataFrame(test_entropy)
test_entropy_df['class'] = test_labels
test_entropy_df['snr'] = test_snr
test_entropy_df['is_anomaly'] = test_binary

print("Done!")

In [None]:
# Entropy by class
print("\nMean Entropy Features by Class (Test Data):")
print("="*80)
entropy_cols = ['spectral_entropy', 'amplitude_entropy', 'approx_entropy', 'spectral_flatness']
display_df = test_entropy_df.groupby('class')[entropy_cols].mean().round(4)
display_df['is_anomaly'] = ['No']*6 + ['YES']*3
print(display_df)

In [None]:
# Visualize entropy features
fig, axes = plt.subplots(2, 3, figsize=(16, 10))

entropy_plot_cols = ['spectral_entropy', 'amplitude_entropy', 'i_entropy', 'q_entropy', 'approx_entropy', 'spectral_flatness']
for i, col in enumerate(entropy_plot_cols):
    ax = axes.flatten()[i]
    known = test_entropy_df[test_entropy_df['is_anomaly'] == 0][col]
    anomaly = test_entropy_df[test_entropy_df['is_anomaly'] == 1][col]
    
    ax.hist(known, bins=50, alpha=0.6, label='Known (0-5)', color=KNOWN_COLOR, density=True)
    ax.hist(anomaly, bins=50, alpha=0.6, label='Anomaly (6-8)', color=ANOMALY_COLOR, density=True)
    ax.set_xlabel(col)
    ax.legend()
    ax.set_title(f'{col}')

plt.suptitle('Entropy Features: Known vs Anomaly', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_entropy_features.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 4. Constellation Geometry Features

Analyze the IQ constellation structure:
- Number of clusters (estimated via silhouette)
- Cluster compactness
- Radial distribution

In [None]:
def compute_constellation_features(signal):
    """Analyze IQ constellation geometry."""
    iq = signal.copy()  # (2048, 2)
    
    # Normalize to unit power
    power = np.mean(iq[:, 0]**2 + iq[:, 1]**2)
    iq = iq / (np.sqrt(power) + 1e-10)
    
    # Radial (amplitude) distribution
    radii = np.sqrt(iq[:, 0]**2 + iq[:, 1]**2)
    
    # Angle distribution
    angles = np.arctan2(iq[:, 1], iq[:, 0])
    
    # Cluster analysis (try k=4, common for QAM/PSK)
    from sklearn.cluster import KMeans
    
    # Subsample for speed
    iq_sub = iq[::4]
    
    # Try different k and get inertias
    inertias = []
    for k in [2, 4, 8]:
        kmeans = KMeans(n_clusters=k, n_init=3, random_state=42)
        kmeans.fit(iq_sub)
        inertias.append(kmeans.inertia_)
    
    # Best k=4 clustering metrics
    kmeans_4 = KMeans(n_clusters=4, n_init=5, random_state=42)
    labels_4 = kmeans_4.fit_predict(iq_sub)
    
    # Within-cluster variance
    cluster_variances = []
    for k in range(4):
        cluster_points = iq_sub[labels_4 == k]
        if len(cluster_points) > 1:
            cluster_variances.append(np.var(cluster_points))
    mean_cluster_var = np.mean(cluster_variances) if cluster_variances else 0
    
    # Centroid distances (how spread are clusters)
    centroids = kmeans_4.cluster_centers_
    centroid_dists = cdist(centroids, centroids)
    mean_centroid_dist = np.mean(centroid_dists[np.triu_indices(4, k=1)])
    
    # Radial statistics
    features = {
        'radius_mean': np.mean(radii),
        'radius_std': np.std(radii),
        'radius_skew': skew(radii),
        'radius_kurtosis': kurtosis(radii),
        
        # Angle uniformity (high = uniform, low = clustered)
        'angle_uniformity': np.abs(np.mean(np.exp(1j * angles))),
        'angle_std': np.std(angles),
        
        # Cluster metrics
        'cluster_variance': mean_cluster_var,
        'centroid_spread': mean_centroid_dist,
        'inertia_k2': inertias[0],
        'inertia_k4': inertias[1],
        'inertia_k8': inertias[2],
        'inertia_ratio_4_2': inertias[1] / (inertias[0] + 1e-10),
        'inertia_ratio_8_4': inertias[2] / (inertias[1] + 1e-10),
        
        # Quadrant distribution
        'q1_frac': np.mean((iq[:, 0] > 0) & (iq[:, 1] > 0)),
        'q2_frac': np.mean((iq[:, 0] < 0) & (iq[:, 1] > 0)),
        'q3_frac': np.mean((iq[:, 0] < 0) & (iq[:, 1] < 0)),
        'q4_frac': np.mean((iq[:, 0] > 0) & (iq[:, 1] < 0)),
        'quadrant_balance': np.std([np.mean((iq[:, 0] > 0) & (iq[:, 1] > 0)),
                                    np.mean((iq[:, 0] < 0) & (iq[:, 1] > 0)),
                                    np.mean((iq[:, 0] < 0) & (iq[:, 1] < 0)),
                                    np.mean((iq[:, 0] > 0) & (iq[:, 1] < 0))]),
    }
    
    return features

In [None]:
# Compute constellation features
print("Computing constellation features for training data...")
train_const = [compute_constellation_features(s) for s in train_signals_filt]
train_const_df = pd.DataFrame(train_const)
train_const_df['class'] = train_labels_filt
train_const_df['snr'] = train_snr_filt

print("Computing constellation features for test data...")
test_const = [compute_constellation_features(s) for s in test_signals]
test_const_df = pd.DataFrame(test_const)
test_const_df['class'] = test_labels
test_const_df['snr'] = test_snr
test_const_df['is_anomaly'] = test_binary

print("Done!")

In [None]:
# Constellation features by class
print("\nMean Constellation Features by Class (Test Data):")
print("="*80)
const_cols = ['radius_std', 'angle_uniformity', 'cluster_variance', 'centroid_spread', 'quadrant_balance']
display_df = test_const_df.groupby('class')[const_cols].mean().round(4)
display_df['is_anomaly'] = ['No']*6 + ['YES']*3
print(display_df)

In [None]:
# Visualize constellation features
fig, axes = plt.subplots(2, 3, figsize=(16, 10))

const_plot_cols = ['radius_std', 'radius_kurtosis', 'angle_uniformity', 'cluster_variance', 'centroid_spread', 'quadrant_balance']
for i, col in enumerate(const_plot_cols):
    ax = axes.flatten()[i]
    known = test_const_df[test_const_df['is_anomaly'] == 0][col]
    anomaly = test_const_df[test_const_df['is_anomaly'] == 1][col]
    
    ax.hist(known, bins=50, alpha=0.6, label='Known (0-5)', color=KNOWN_COLOR, density=True)
    ax.hist(anomaly, bins=50, alpha=0.6, label='Anomaly (6-8)', color=ANOMALY_COLOR, density=True)
    ax.set_xlabel(col)
    ax.legend()
    ax.set_title(f'{col}')

plt.suptitle('Constellation Geometry Features: Known vs Anomaly', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_constellation_features.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 5. Spectral Features (Advanced)

More detailed spectral analysis:
- Spectral moments
- Bandwidth measures
- Peak analysis

In [None]:
def compute_spectral_features(signal):
    """Compute advanced spectral features."""
    x = compute_complex_signal(signal)
    n = len(x)
    
    # FFT
    spectrum = np.abs(fftshift(fft(x)))**2
    freqs = fftshift(fftfreq(n))
    
    # Normalize spectrum
    spectrum_norm = spectrum / (np.sum(spectrum) + 1e-10)
    
    # Spectral moments
    spectral_centroid = np.sum(freqs * spectrum_norm)
    spectral_spread = np.sqrt(np.sum((freqs - spectral_centroid)**2 * spectrum_norm))
    spectral_skewness = np.sum((freqs - spectral_centroid)**3 * spectrum_norm) / (spectral_spread**3 + 1e-10)
    spectral_kurtosis = np.sum((freqs - spectral_centroid)**4 * spectrum_norm) / (spectral_spread**4 + 1e-10)
    
    # Bandwidth (3dB, 10dB, 20dB)
    spectrum_db = 10 * np.log10(spectrum + 1e-10)
    max_db = np.max(spectrum_db)
    
    bw_3db = np.sum(spectrum_db > (max_db - 3)) / n
    bw_10db = np.sum(spectrum_db > (max_db - 10)) / n
    bw_20db = np.sum(spectrum_db > (max_db - 20)) / n
    
    # Peak analysis
    peaks, properties = scipy_signal.find_peaks(spectrum, height=np.max(spectrum)*0.1, distance=10)
    n_peaks = len(peaks)
    
    # Peak to average ratio
    papr = np.max(spectrum) / (np.mean(spectrum) + 1e-10)
    
    # Roll-off frequency (95% energy)
    cumsum = np.cumsum(spectrum_norm)
    rolloff_idx = np.argmax(cumsum >= 0.95)
    rolloff_freq = freqs[rolloff_idx]
    
    return {
        'spectral_centroid': spectral_centroid,
        'spectral_spread': spectral_spread,
        'spectral_skewness': spectral_skewness,
        'spectral_kurtosis': spectral_kurtosis,
        'bandwidth_3db': bw_3db,
        'bandwidth_10db': bw_10db,
        'bandwidth_20db': bw_20db,
        'n_spectral_peaks': n_peaks,
        'papr': papr,
        'rolloff_freq': rolloff_freq,
    }

In [None]:
# Compute spectral features
print("Computing spectral features for training data...")
train_spectral = [compute_spectral_features(s) for s in train_signals_filt]
train_spectral_df = pd.DataFrame(train_spectral)
train_spectral_df['class'] = train_labels_filt
train_spectral_df['snr'] = train_snr_filt

print("Computing spectral features for test data...")
test_spectral = [compute_spectral_features(s) for s in test_signals]
test_spectral_df = pd.DataFrame(test_spectral)
test_spectral_df['class'] = test_labels
test_spectral_df['snr'] = test_snr
test_spectral_df['is_anomaly'] = test_binary

print("Done!")

In [None]:
# Spectral features by class
print("\nMean Spectral Features by Class (Test Data):")
print("="*80)
spectral_cols = ['spectral_spread', 'spectral_kurtosis', 'bandwidth_3db', 'n_spectral_peaks', 'papr']
display_df = test_spectral_df.groupby('class')[spectral_cols].mean().round(4)
display_df['is_anomaly'] = ['No']*6 + ['YES']*3
print(display_df)

In [None]:
# Visualize spectral features
fig, axes = plt.subplots(2, 3, figsize=(16, 10))

spectral_plot_cols = ['spectral_spread', 'spectral_kurtosis', 'bandwidth_3db', 'bandwidth_10db', 'papr', 'n_spectral_peaks']
for i, col in enumerate(spectral_plot_cols):
    ax = axes.flatten()[i]
    known = test_spectral_df[test_spectral_df['is_anomaly'] == 0][col]
    anomaly = test_spectral_df[test_spectral_df['is_anomaly'] == 1][col]
    
    ax.hist(known, bins=50, alpha=0.6, label='Known (0-5)', color=KNOWN_COLOR, density=True)
    ax.hist(anomaly, bins=50, alpha=0.6, label='Anomaly (6-8)', color=ANOMALY_COLOR, density=True)
    ax.set_xlabel(col)
    ax.legend()
    ax.set_title(f'{col}')

plt.suptitle('Spectral Features: Known vs Anomaly', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_spectral_features.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 6. Cyclostationary Features

Radio signals exhibit cyclostationary properties (periodic statistics). The cyclic autocorrelation can reveal symbol rate and modulation type.

In [None]:
def compute_cyclostationary_features(signal):
    """Compute cyclostationary features."""
    x = compute_complex_signal(signal)
    n = len(x)
    
    # Autocorrelation of |x|^2 (reveals symbol rate)
    power = np.abs(x)**2
    power_centered = power - np.mean(power)
    acf_power = np.correlate(power_centered, power_centered, mode='full')
    acf_power = acf_power[n-1:] / (acf_power[n-1] + 1e-10)  # Normalize, keep positive lags
    
    # Find peaks in autocorrelation (cyclic frequencies)
    peaks, _ = scipy_signal.find_peaks(acf_power[10:500], height=0.1, distance=5)
    n_cyclic_peaks = len(peaks)
    first_peak_lag = peaks[0] + 10 if len(peaks) > 0 else 0
    
    # Autocorrelation decay rate
    acf_decay = np.mean(acf_power[100:200])
    
    # Second-order cyclic moment at alpha=0 (standard autocorrelation)
    acf_x = np.correlate(x, x, mode='full')
    acf_x = acf_x[n-1:] / (np.abs(acf_x[n-1]) + 1e-10)
    acf_mag_decay = np.mean(np.abs(acf_x[100:200]))
    
    # Conjugate autocorrelation (important for BPSK vs QPSK)
    conj_acf = np.correlate(x, np.conj(x), mode='full')
    conj_acf = conj_acf[n-1:] / (np.abs(conj_acf[n-1]) + 1e-10)
    conj_acf_ratio = np.abs(conj_acf[1]) / (np.abs(acf_x[1]) + 1e-10)
    
    return {
        'n_cyclic_peaks': n_cyclic_peaks,
        'first_cyclic_lag': first_peak_lag,
        'acf_power_decay': acf_decay,
        'acf_signal_decay': acf_mag_decay,
        'conj_acf_ratio': conj_acf_ratio,
        'acf_power_50': acf_power[50],
        'acf_power_100': acf_power[100],
        'acf_power_200': acf_power[200],
    }

In [None]:
# Compute cyclostationary features
print("Computing cyclostationary features for training data...")
train_cyclo = [compute_cyclostationary_features(s) for s in train_signals_filt]
train_cyclo_df = pd.DataFrame(train_cyclo)
train_cyclo_df['class'] = train_labels_filt
train_cyclo_df['snr'] = train_snr_filt

print("Computing cyclostationary features for test data...")
test_cyclo = [compute_cyclostationary_features(s) for s in test_signals]
test_cyclo_df = pd.DataFrame(test_cyclo)
test_cyclo_df['class'] = test_labels
test_cyclo_df['snr'] = test_snr
test_cyclo_df['is_anomaly'] = test_binary

print("Done!")

In [None]:
# Cyclostationary features by class
print("\nMean Cyclostationary Features by Class (Test Data):")
print("="*80)
cyclo_cols = ['n_cyclic_peaks', 'first_cyclic_lag', 'acf_power_decay', 'conj_acf_ratio']
display_df = test_cyclo_df.groupby('class')[cyclo_cols].mean().round(4)
display_df['is_anomaly'] = ['No']*6 + ['YES']*3
print(display_df)

In [None]:
# Visualize cyclostationary features
fig, axes = plt.subplots(2, 3, figsize=(16, 10))

cyclo_plot_cols = ['n_cyclic_peaks', 'first_cyclic_lag', 'acf_power_decay', 'acf_signal_decay', 'conj_acf_ratio', 'acf_power_100']
for i, col in enumerate(cyclo_plot_cols):
    ax = axes.flatten()[i]
    known = test_cyclo_df[test_cyclo_df['is_anomaly'] == 0][col]
    anomaly = test_cyclo_df[test_cyclo_df['is_anomaly'] == 1][col]
    
    ax.hist(known, bins=50, alpha=0.6, label='Known (0-5)', color=KNOWN_COLOR, density=True)
    ax.hist(anomaly, bins=50, alpha=0.6, label='Anomaly (6-8)', color=ANOMALY_COLOR, density=True)
    ax.set_xlabel(col)
    ax.legend()
    ax.set_title(f'{col}')

plt.suptitle('Cyclostationary Features: Known vs Anomaly', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_cyclostationary_features.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 7. Combine All Features and Evaluate Separability

In [None]:
# Combine all features
feature_dfs = [
    test_cum_df.drop(['class', 'snr', 'is_anomaly'], axis=1),
    test_phase_df.drop(['class', 'snr', 'is_anomaly'], axis=1),
    test_entropy_df.drop(['class', 'snr', 'is_anomaly'], axis=1),
    test_const_df.drop(['class', 'snr', 'is_anomaly'], axis=1),
    test_spectral_df.drop(['class', 'snr', 'is_anomaly'], axis=1),
    test_cyclo_df.drop(['class', 'snr', 'is_anomaly'], axis=1),
]

test_all_features = pd.concat(feature_dfs, axis=1)
test_all_features['class'] = test_labels
test_all_features['snr'] = test_snr
test_all_features['is_anomaly'] = test_binary

print(f"Total features: {test_all_features.shape[1] - 3}")
print(f"Feature names: {list(test_all_features.columns[:-3])}")

In [None]:
# Same for training
train_feature_dfs = [
    train_cum_df.drop(['class', 'snr'], axis=1),
    train_phase_df.drop(['class', 'snr'], axis=1),
    train_entropy_df.drop(['class', 'snr'], axis=1),
    train_const_df.drop(['class', 'snr'], axis=1),
    train_spectral_df.drop(['class', 'snr'], axis=1),
    train_cyclo_df.drop(['class', 'snr'], axis=1),
]

train_all_features = pd.concat(train_feature_dfs, axis=1)
train_all_features['class'] = train_labels_filt
train_all_features['snr'] = train_snr_filt

In [None]:
# Feature importance via separability (AUC for each feature)
from sklearn.metrics import roc_auc_score

feature_cols = list(test_all_features.columns[:-3])
y_true = test_all_features['is_anomaly'].values

auc_scores = {}
for col in feature_cols:
    try:
        # Handle potential NaN/Inf
        values = test_all_features[col].values
        valid_mask = np.isfinite(values)
        if valid_mask.sum() > 100:
            auc = roc_auc_score(y_true[valid_mask], values[valid_mask])
            # Take max(auc, 1-auc) since direction doesn't matter
            auc_scores[col] = max(auc, 1 - auc)
    except:
        pass

# Sort by AUC
sorted_features = sorted(auc_scores.items(), key=lambda x: x[1], reverse=True)

print("\n" + "="*60)
print("FEATURE RANKING BY SEPARABILITY (AUC)")
print("="*60)
print(f"{'Rank':<6}{'Feature':<30}{'AUC':>10}")
print("-"*50)
for i, (feat, auc) in enumerate(sorted_features[:25], 1):
    print(f"{i:<6}{feat:<30}{auc:>10.4f}")

In [None]:
# Visualize top discriminative features
top_features = [f[0] for f in sorted_features[:12]]

fig, axes = plt.subplots(3, 4, figsize=(18, 12))
axes = axes.flatten()

for i, col in enumerate(top_features):
    ax = axes[i]
    known = test_all_features[test_all_features['is_anomaly'] == 0][col]
    anomaly = test_all_features[test_all_features['is_anomaly'] == 1][col]
    
    ax.hist(known, bins=50, alpha=0.6, label='Known', color=KNOWN_COLOR, density=True)
    ax.hist(anomaly, bins=50, alpha=0.6, label='Anomaly', color=ANOMALY_COLOR, density=True)
    ax.set_xlabel(col)
    ax.legend()
    auc = auc_scores.get(col, 0)
    ax.set_title(f'{col}\nAUC={auc:.3f}')

plt.suptitle('Top 12 Discriminative Features', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_top_discriminative_features.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 8. PCA and t-SNE with Advanced Features

In [None]:
# Use top features for visualization
top_n = 20
top_feat_names = [f[0] for f in sorted_features[:top_n]]

# Prepare data
X_train = train_all_features[top_feat_names].values
X_test = test_all_features[top_feat_names].values

# Handle NaN/Inf
X_train = np.nan_to_num(X_train, nan=0, posinf=0, neginf=0)
X_test = np.nan_to_num(X_test, nan=0, posinf=0, neginf=0)

# Standardize
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
# PCA
pca = PCA(n_components=2)
train_pca = pca.fit_transform(X_train_scaled)
test_pca = pca.transform(X_test_scaled)

print(f"PCA explained variance: {pca.explained_variance_ratio_}")
print(f"Total: {sum(pca.explained_variance_ratio_):.2%}")

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Training PCA
ax = axes[0]
for class_idx in range(6):
    mask = train_all_features['class'].values == class_idx
    ax.scatter(train_pca[mask, 0], train_pca[mask, 1], 
               c=[CLASS_COLORS[class_idx]], label=f'Class {class_idx}', alpha=0.4, s=10)
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%})')
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%})')
ax.set_title('PCA - Training Data (Advanced Features)')
ax.legend()

# Test PCA with anomalies
ax = axes[1]
for class_idx in range(9):
    mask = test_all_features['class'].values == class_idx
    color = ANOMALY_COLOR if class_idx >= 6 else CLASS_COLORS[class_idx]
    marker = 'x' if class_idx >= 6 else 'o'
    size = 40 if class_idx >= 6 else 15
    label = f'ANOMALY {class_idx}' if class_idx >= 6 else f'Class {class_idx}'
    ax.scatter(test_pca[mask, 0], test_pca[mask, 1], 
               c=[color], label=label, alpha=0.6, marker=marker, s=size)
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%})')
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%})')
ax.set_title('PCA - Test Data (Known + Anomalies)')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')

plt.tight_layout()
plt.savefig('plots_pca_advanced.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# t-SNE on test data
print("Running t-SNE on test data with advanced features...")
tsne = TSNE(n_components=2, perplexity=30, random_state=42, n_iter=1000)
test_tsne = tsne.fit_transform(X_test_scaled)
print("Done!")

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# t-SNE colored by class
ax = axes[0]
for class_idx in range(9):
    mask = test_all_features['class'].values == class_idx
    color = ANOMALY_COLOR if class_idx >= 6 else CLASS_COLORS[class_idx]
    marker = 'x' if class_idx >= 6 else 'o'
    size = 50 if class_idx >= 6 else 20
    label = f'ANOMALY {class_idx}' if class_idx >= 6 else f'Class {class_idx}'
    ax.scatter(test_tsne[mask, 0], test_tsne[mask, 1], 
               c=[color], label=label, alpha=0.6, marker=marker, s=size)
ax.set_xlabel('t-SNE 1')
ax.set_ylabel('t-SNE 2')
ax.set_title('t-SNE - Test Data (by Class)')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')

# t-SNE colored by known/anomaly
ax = axes[1]
known_mask = test_all_features['is_anomaly'].values == 0
ax.scatter(test_tsne[known_mask, 0], test_tsne[known_mask, 1], 
           c=KNOWN_COLOR, label='Known (0-5)', alpha=0.5, s=20)
ax.scatter(test_tsne[~known_mask, 0], test_tsne[~known_mask, 1], 
           c=ANOMALY_COLOR, label='Anomaly (6-8)', alpha=0.7, s=50, marker='x')
ax.set_xlabel('t-SNE 1')
ax.set_ylabel('t-SNE 2')
ax.set_title('t-SNE - Known vs Anomaly')
ax.legend()

plt.tight_layout()
plt.savefig('plots_tsne_advanced.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 9. Anomaly Detection with Advanced Features

In [None]:
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report

# Prepare data
y_test = test_all_features['is_anomaly'].values

print("\n" + "="*60)
print("ANOMALY DETECTION WITH ADVANCED FEATURES")
print("="*60)

In [None]:
# Isolation Forest
print("\n--- Isolation Forest ---")
iso_forest = IsolationForest(contamination=0.1, random_state=42, n_estimators=200)
iso_forest.fit(X_train_scaled)
y_pred_iso = (iso_forest.predict(X_test_scaled) == -1).astype(int)

print(f"Precision: {precision_score(y_test, y_pred_iso):.3f}")
print(f"Recall: {recall_score(y_test, y_pred_iso):.3f}")
print(f"F1 Score: {f1_score(y_test, y_pred_iso):.3f}")

In [None]:
# One-Class SVM
print("\n--- One-Class SVM ---")
# Subsample training for speed
train_sub_idx = np.random.choice(len(X_train_scaled), size=5000, replace=False)
ocsvm = OneClassSVM(kernel='rbf', nu=0.1, gamma='scale')
ocsvm.fit(X_train_scaled[train_sub_idx])
y_pred_svm = (ocsvm.predict(X_test_scaled) == -1).astype(int)

print(f"Precision: {precision_score(y_test, y_pred_svm):.3f}")
print(f"Recall: {recall_score(y_test, y_pred_svm):.3f}")
print(f"F1 Score: {f1_score(y_test, y_pred_svm):.3f}")

In [None]:
# Local Outlier Factor (novelty detection mode)
print("\n--- Local Outlier Factor ---")
lof = LocalOutlierFactor(n_neighbors=20, novelty=True, contamination=0.1)
lof.fit(X_train_scaled[train_sub_idx])
y_pred_lof = (lof.predict(X_test_scaled) == -1).astype(int)

print(f"Precision: {precision_score(y_test, y_pred_lof):.3f}")
print(f"Recall: {recall_score(y_test, y_pred_lof):.3f}")
print(f"F1 Score: {f1_score(y_test, y_pred_lof):.3f}")

In [None]:
# Per-anomaly class detection rates
print("\n" + "="*60)
print("PER-ANOMALY CLASS DETECTION RATES (Isolation Forest)")
print("="*60)

for anomaly_class in [6, 7, 8]:
    mask = test_all_features['class'].values == anomaly_class
    detection_rate = y_pred_iso[mask].mean()
    print(f"Anomaly Class {anomaly_class}: {detection_rate:.1%} detected ({y_pred_iso[mask].sum()}/{mask.sum()})")

---
## 10. Per-Class Feature Analysis (Why is Class 7 Hard?)

In [None]:
# Detailed comparison: Class 7 vs similar known classes
print("\n" + "="*60)
print("COMPARING ANOMALY CLASS 7 WITH SIMILAR KNOWN CLASSES")
print("="*60)

# Focus on classes 3, 4, 5 (low-power group) and anomaly 7
comparison_classes = [3, 4, 5, 7]
comparison_features = top_feat_names[:10]

print("\nMean values for top features:")
display_df = test_all_features[test_all_features['class'].isin(comparison_classes)].groupby('class')[comparison_features].mean().round(4).T
display_df['Class7_vs_345_diff'] = np.abs(display_df[7] - display_df[[3,4,5]].mean(axis=1))
print(display_df)

In [None]:
# Find features that best separate class 7 from classes 3,4,5
class_7_mask = test_all_features['class'].values == 7
class_345_mask = np.isin(test_all_features['class'].values, [3, 4, 5])

separability_7_vs_345 = {}
for col in feature_cols:
    try:
        values = test_all_features[col].values
        valid = np.isfinite(values)
        if valid.sum() > 50:
            y_binary = np.zeros(len(values))
            y_binary[class_7_mask] = 1
            mask = valid & (class_7_mask | class_345_mask)
            if mask.sum() > 50 and y_binary[mask].sum() > 10:
                auc = roc_auc_score(y_binary[mask], values[mask])
                separability_7_vs_345[col] = max(auc, 1-auc)
    except:
        pass

sorted_sep_7 = sorted(separability_7_vs_345.items(), key=lambda x: x[1], reverse=True)

print("\n" + "="*60)
print("FEATURES THAT BEST SEPARATE CLASS 7 FROM CLASSES 3,4,5")
print("="*60)
for i, (feat, auc) in enumerate(sorted_sep_7[:15], 1):
    print(f"{i:>3}. {feat:<30} AUC = {auc:.4f}")

In [None]:
# Visualize best separating features for class 7
best_for_7 = [f[0] for f in sorted_sep_7[:6]]

fig, axes = plt.subplots(2, 3, figsize=(16, 10))
axes = axes.flatten()

for i, col in enumerate(best_for_7):
    ax = axes[i]
    for class_idx in [3, 4, 5, 7]:
        mask = test_all_features['class'].values == class_idx
        color = ANOMALY_COLOR if class_idx == 7 else CLASS_COLORS[class_idx]
        label = f'ANOMALY 7' if class_idx == 7 else f'Class {class_idx}'
        values = test_all_features[mask][col].values
        values = values[np.isfinite(values)]
        ax.hist(values, bins=30, alpha=0.5, label=label, color=color, density=True)
    ax.set_xlabel(col)
    ax.legend()
    auc = separability_7_vs_345.get(col, 0)
    ax.set_title(f'{col}\nAUC(7 vs 3,4,5) = {auc:.3f}')

plt.suptitle('Features Best Separating Anomaly 7 from Classes 3,4,5', fontsize=14, y=1.02)
plt.tight_layout()
plt.savefig('plots_class7_separability.png', dpi=150, bbox_inches='tight')
plt.show()

---
## 11. Save All Features

In [None]:
# Save comprehensive feature sets
train_all_features.to_csv('train_advanced_features.csv', index=False)
test_all_features.to_csv('test_advanced_features.csv', index=False)

print(f"Saved train_advanced_features.csv: {train_all_features.shape}")
print(f"Saved test_advanced_features.csv: {test_all_features.shape}")

---
## 12. Summary and Key Insights

In [None]:
print("="*70)
print("DEEP DIVE ANALYSIS SUMMARY")
print("="*70)

print("\n1. FEATURE CATEGORIES EXPLORED:")
print("   - Higher-order cumulants (9 features): Modulation-specific signatures")
print("   - Phase features (11 features): Phase jitter, entropy, circular stats")
print("   - Entropy features (6 features): Spectral, amplitude, approximate entropy")
print("   - Constellation geometry (17 features): Cluster analysis, radial stats")
print("   - Spectral features (10 features): Bandwidth, peaks, PAPR")
print("   - Cyclostationary features (8 features): Autocorrelation, cyclic peaks")

print(f"\n2. TOTAL FEATURES: {len(feature_cols)}")

print("\n3. TOP 10 DISCRIMINATIVE FEATURES (by AUC):")
for i, (feat, auc) in enumerate(sorted_features[:10], 1):
    print(f"   {i:>2}. {feat:<30} AUC = {auc:.4f}")

print("\n4. ANOMALY DETECTION RESULTS (with advanced features):")
print(f"   - Isolation Forest:  P={precision_score(y_test, y_pred_iso):.1%}, R={recall_score(y_test, y_pred_iso):.1%}, F1={f1_score(y_test, y_pred_iso):.1%}")
print(f"   - One-Class SVM:     P={precision_score(y_test, y_pred_svm):.1%}, R={recall_score(y_test, y_pred_svm):.1%}, F1={f1_score(y_test, y_pred_svm):.1%}")
print(f"   - LOF:               P={precision_score(y_test, y_pred_lof):.1%}, R={recall_score(y_test, y_pred_lof):.1%}, F1={f1_score(y_test, y_pred_lof):.1%}")

print("\n5. KEY INSIGHTS:")
print("   a) Higher-order cumulants (C40, C42) provide modulation-specific signatures")
print("   b) Constellation geometry reveals structural differences in IQ plane")
print("   c) Phase features capture modulation dynamics")
print("   d) Class 7 remains hardest - overlaps with classes 3,4,5 in many features")
print("   e) Deep learning can learn even better representations from raw signals")

print("\n6. RECOMMENDATIONS FOR MODELING:")
print("   a) Use top discriminative features as additional inputs to deep model")
print("   b) Design network architecture that captures:")
print("      - Higher-order statistics (via higher-order pooling layers)")
print("      - Phase dynamics (via phase-aware convolutions)")
print("      - Multi-scale features (via wavelet/multi-resolution analysis)")
print("   c) Consider contrastive learning to maximize class separation")
print("   d) Focus on detecting Class 7 - may need specialized handling")