In [1]:
import pandas as pd
import numpy as np

# Load data
df = pd.read_csv('../dataset/dataset.csv')

# ---------------------------
# 1. Data Loading & Parsing
# ---------------------------
def parse_signal(signal_str):
    """Convert complex string to magnitude/phase features"""
    cleaned = signal_str.strip('[]').replace(' ', '')
    parts = cleaned.split('),(')
    signal = []
    for p in parts:
        p = p.replace('(', '').replace(')', '')
        try:
            cmplx = complex(p)
            signal.append([abs(cmplx), np.angle(cmplx)])  # Magnitude + Phase
        except ValueError:
            continue
    return np.array(signal[:208])  # Shape: (208, 2)

def parse_secret_code(code_str):
    """Convert secret code string to integer array"""
    return np.array([int(x) for x in code_str.strip('[]').split(', ')])


# Parse all columns
X_signal = np.array([parse_signal(s) for s in df['received_signal']])  # (15000, 208, 2)
X_secret = np.array([parse_secret_code(c) for c in df['secret_code']])  # (15000, 13)
y = df[['jet1_x', 'jet1_y', 'jet1_z', 'jet2_x', 'jet2_y', 'jet2_z']].values  # (15000, 6)

In [2]:
from sklearn.preprocessing import StandardScaler

# ---------------------------
def compute_doppler(signal_2d):
    """Improved Doppler estimation using phase coherence"""
    phase = signal_2d[:, 1]
    unwrapped_phase = np.unwrap(phase)  # Fix phase wrapping issues
    return np.std(np.diff(unwrapped_phase))  # Better motion signature

X_doppler = np.array([compute_doppler(sig) for sig in X_signal]).reshape(-1, 1)
X_secret = np.hstack([X_secret, X_doppler])

# Time-Frequency Features
def extract_stft_features(signal_2d):
    """Extract time-frequency features from magnitude channel"""
    from scipy.signal import stft
    f, t, Zxx = stft(signal_2d[:, 0], nperseg=32)
    return np.log(np.abs(Zxx[:16, :5]) + 1e-8).flatten()  # 80-dim feature

X_stft = np.array([extract_stft_features(sig) for sig in X_signal])
scaler_stft = StandardScaler()
X_stft = scaler_stft.fit_transform(X_stft)
X_secret = np.hstack([X_secret, X_stft])  # Now shape=(14+80)=94

In [3]:
X_signal.shape

(15000, 208, 2)

In [4]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# ---------------------------
# 2. Data Splitting & Normalization
# ---------------------------
# First split: 80% train, 20% temp
X_sig_train_raw, X_sig_temp_raw, X_sec_train_raw, X_sec_temp_raw, y_train_raw, y_temp_raw = train_test_split(
    X_signal, X_secret, y, test_size=0.2, random_state=42
)

# Second split: 50% validation, 50% test
X_sig_val_raw, X_sig_test_raw, X_sec_val_raw, X_sec_test_raw, y_val_raw, y_test_raw = train_test_split(
    X_sig_temp_raw, X_sec_temp_raw, y_temp_raw, test_size=0.5, random_state=42
)

# Signal Normalizer (fit on training only)
scaler_signal = StandardScaler()
X_sig_train_flat = X_sig_train_raw.reshape(-1, 2)
scaler_signal.fit(X_sig_train_flat)

def scale_and_reshape(X_raw, scaler):
    X_flat = X_raw.reshape(-1, 2)
    X_scaled = scaler.transform(X_flat)
    return X_scaled.reshape(-1, 208, 2)

X_sig_train = scale_and_reshape(X_sig_train_raw, scaler_signal)
X_sig_val = scale_and_reshape(X_sig_val_raw, scaler_signal)
X_sig_test = scale_and_reshape(X_sig_test_raw, scaler_signal)

# Secret Code Normalizer
scaler_secret = StandardScaler()
scaler_secret.fit(X_sec_train_raw)

X_sec_train = scaler_secret.transform(X_sec_train_raw)
X_sec_val = scaler_secret.transform(X_sec_val_raw)
X_sec_test = scaler_secret.transform(X_sec_test_raw)

# Target Normalizer
scaler_target = StandardScaler()
scaler_target.fit(y_train_raw)

y_train = scaler_target.transform(y_train_raw)
y_val = scaler_target.transform(y_val_raw)
y_test = scaler_target.transform(y_test_raw)


# --------------------------------------------------
# 4. Verification
# --------------------------------------------------
print("Training shapes:")
print(f"Signals: {X_sig_train.shape}, Codes: {X_sec_train.shape}, Targets: {y_train.shape}")
print("\nValidation shapes:")
print(f"Signals: {X_sig_val.shape}, Codes: {X_sec_val.shape}, Targets: {y_val.shape}")
print("\nTest shapes:")
print(f"Signals: {X_sig_test.shape}, Codes: {X_sec_test.shape}, Targets: {y_test.shape}")

Training shapes:
Signals: (12000, 208, 2), Codes: (12000, 94), Targets: (12000, 6)

Validation shapes:
Signals: (1500, 208, 2), Codes: (1500, 94), Targets: (1500, 6)

Test shapes:
Signals: (1500, 208, 2), Codes: (1500, 94), Targets: (1500, 6)


In [5]:
# ---------------------------
# 4. Data Augmentation
# ---------------------------
def augment_signal(signal_batch):
    """Realistic signal augmentation with jamming and Doppler effects"""
    batch_size = signal_batch.shape[0]
    
    # Variable SNR based on mission phase
    snr_levels = np.random.uniform(0.02, 0.1, batch_size)
    noise = np.stack([
        np.random.normal(0, snr, signal_batch.shape[1:])
        for snr in snr_levels
    ])
    
    # Realistic Doppler shifts (jets move at 250-900 km/h)
    speed_shifts = np.random.randint(-10, 10, batch_size)
    augmented = np.stack([
        np.roll(signal_batch[i], shift, axis=0)
        for i, shift in enumerate(speed_shifts)
    ])
    
    # Intermittent jamming (5% of samples)
    jamming_mask = np.random.choice(
        [0, 1], 
        size=batch_size,
        p=[0.95, 0.05]
    ).reshape(-1, 1, 1)
    
    return (augmented + noise) * (1 - jamming_mask) + jamming_mask * np.random.normal(0, 0.5, signal_batch.shape)

In [7]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler
from tensorflow.keras.layers import BatchNormalization, GlobalAveragePooling1D, Dropout, Embedding, Dot, Softmax
from tensorflow.keras import layers
import tensorflow as tf
import math

# ---------------------------
def weighted_mse(y_true, y_pred):
    """Custom loss with higher weight for altitude (z) coordinates"""
    weights = tf.constant([1.0, 1.0, 2.0, 1.0, 1.0, 2.0])  # Double weight for z-coordinates
    return tf.reduce_mean(tf.square(y_true - y_pred) * weights)

# Signal Branch (Optimized CNN)
signal_input = Input(shape=(208, 2), name='signal_input')
x = layers.Conv1D(64, 5, activation='relu', padding='same')(signal_input)
x = layers.BatchNormalization()(x)
x = layers.MaxPooling1D(2)(x)  # 104 steps
x = layers.Conv1D(128, 3, activation='relu', padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dense(256, activation='relu', kernel_regularizer='l2')(x)
x = layers.Dropout(0.5)(x)

# Secret Code Branch with Interaction
code_input = Input(shape=(94,), name='code_input')
y = layers.Dense(128, activation='relu')(code_input)
y = layers.BatchNormalization()(y)
y = layers.Dense(256, activation='relu')(y)  # Match CNN output size

# Feature Interaction
code_repeated = layers.RepeatVector(128)(y)  # Match CNN output size
# Feature Interaction
x = layers.Multiply()([x, y])  # Direct multiplication of matching shapes  # Attention-like interaction

# Fusion
merged = layers.concatenate([x, y])
merged = layers.BatchNormalization()(merged)

# Final Layers
z = layers.Dense(512, activation='relu', kernel_regularizer='l2')(merged)
z = layers.BatchNormalization()(z)
z = layers.Dropout(0.3)(z)
z = layers.Dense(256, activation='relu', kernel_regularizer='l2')(z)
outputs = layers.Dense(6)(z)

# Compile with Correct LR
model = Model(inputs=[signal_input, code_input], outputs=outputs)
model.compile(
    optimizer=Adam(learning_rate=3e-3),
    loss=weighted_mse,
    metrics=['mae']
)




In [8]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, LearningRateScheduler

# ---------------------------
# 6. Training Strategy
# ---------------------------
# Learning Rate Schedule
class WarmupCosineDecay:
    def __init__(self, warmup_epochs, total_epochs):
        self.warmup_epochs = warmup_epochs
        self.total_epochs = total_epochs
        
    def __call__(self, epoch):
        if epoch < self.warmup_epochs:
            return 1e-4 + (3e-3 - 1e-4) * epoch / self.warmup_epochs
        return 0.5 * 3e-3 * (
            1 + np.cos(np.pi * (epoch - self.warmup_epochs) / (self.total_epochs - self.warmup_epochs))
        )

# Callbacks
warmup_lr = LearningRateScheduler(WarmupCosineDecay(5, 50))
early_stop = EarlyStopping(patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)

# Create noisy training data
X_sig_train_noisy = augment_signal(X_sig_train)

# Train the model
history = model.fit(
    x=[X_sig_train_noisy, X_sec_train],
    y=y_train,
    validation_data=([X_sig_val, X_sec_val], y_val),
    epochs=50,
    batch_size=128,
    callbacks=[early_stop, reduce_lr, warmup_lr],
    verbose=1
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50


In [9]:
test_loss, test_mae = model.evaluate(
    [X_sig_test, X_sec_test],
    y_test,
    verbose=0
)

print(f"Normalized Test MSE: {test_loss:.4f}")
print(f"Normalized Test MAE: {test_mae:.4f}")

# Inverse transform predictions
y_pred_norm = model.predict([X_sig_test, X_sec_test])
y_pred_real = scaler_target.inverse_transform(y_pred_norm)
y_test_real = scaler_target.inverse_transform(y_test)

# Real-world metrics
real_mae = np.mean(np.abs(y_pred_real - y_test_real))
print(f"\nReal-World MAE: {real_mae:.2f} meters")

Normalized Test MSE: 1.2665
Normalized Test MAE: 0.7802

Real-World MAE: 6079.87 meters
