In [1]:
import os
import sys

import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F  
import matplotlib 
import matplotlib.pyplot as plt

In [2]:
EPSILON = 0.05       # Max perturbation (for L∞ PGD)
ALPHA = 0.01         # Step size per iteration
ATTACK_ITERATIONS = 20
TARGET_LABEL = 3     # Example target label for the targeted attack
EOT_ITERATIONS = 5
# System/Model parameters
sys.path.append("/home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/models")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#MODEL_PATH = "/home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/weights/justin_model_slicing_norm_LR1_smallHop_12_largerInput.pth"
MODEL_PATH = "/home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/weights/best_model.pth"
#IQ_FILE_PATH = "/home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/1m_2m_replacedPluto4/1m_2m/Pluto_0_1m_run0.iq"  # Example file
IQ_FILE_PATH = "/home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/1m_2m_replacedPluto4/1m_2m/Pluto_0_2m_run0.iq"



In [3]:
from attempt2 import resnet50_1d  # Directly import from attempt2.py
num_classes = 8  # Change this if your model was trained with a different number of classes

# Initialize the model architecture
model = resnet50_1d(num_classes=num_classes).to(DEVICE)

# Load trained weights
#MODEL_PATH = "/home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/weights/justin_model_slicing_norm_LR1_smallHop_12.pth"
print(f"Loading trained model weights from: {MODEL_PATH}")
state_dict = torch.load(MODEL_PATH, map_location=DEVICE)

# Load the state dictionary into the model
model.load_state_dict(state_dict)

# Set the model to evaluation mode
model.eval()

Loading trained model weights from: /home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/weights/best_model.pth


  state_dict = torch.load(MODEL_PATH, map_location=DEVICE)


ResNet1D(
  (conv1): Conv1d(2, 64, kernel_size=(7,), stride=(2,), padding=(3,), bias=False)
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool): MaxPool1d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck1D(
      (conv1): Conv1d(64, 64, kernel_size=(1,), stride=(1,), bias=False)
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,), bias=False)
      (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv1d(64, 256, kernel_size=(1,), stride=(1,), bias=False)
      (bn3): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (downsample): Sequential(
        (0): Conv1d(64, 256, kernel_size=(1,), stride=(1,), bias=False)
        (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True

In [4]:
def load_iq_data(file_path, max_samples=4096*2, start_idx=0):
    """
    Loads a limited portion of `.iq` data.
    
    Args:
        file_path (str): Path to the `.iq` file.
        max_samples (int): The number of samples to load.
        start_idx (int): The starting index to read from.

    Returns:
        data_tensor (torch.Tensor): shape [1, 2, max_samples]
        label_tensor (torch.Tensor): Dummy label for testing.
    """
    total_samples = max_samples * 2  # Since I/Q samples are interleaved

    #  Open the file in binary mode and seek to `start_idx`
    with open(file_path, "rb") as f:
        f.seek(start_idx * 4 * 2)  # 4 bytes per float32, 2 channels (I/Q)
        raw_data = np.fromfile(f, dtype="float32", count=total_samples)

    #  Ensure we have enough data
    if raw_data.shape[0] < total_samples:
        raise ValueError(f"Not enough data in {file_path}. Requested {total_samples}, got {raw_data.shape[0]}.")

    #  Extract I/Q channels
    I = raw_data[0::2]  # Even indices
    Q = raw_data[1::2]  # Odd indices

    #  Stack into [2, max_samples] format
    iq_data = np.stack([I, Q], axis=0)

    #  Add batch dimension → [1, 2, max_samples]
    iq_data = np.expand_dims(iq_data, axis=0)

    #  Convert to PyTorch tensor
    data_tensor = torch.from_numpy(iq_data).float()

    #  Dummy label for now (adjust if needed)
    label_tensor = torch.tensor([0], dtype=torch.long)

    return data_tensor, label_tensor

In [5]:
def transform_distance_path_loss(x, min_distance=5.0, max_distance=7.0, path_loss_exponent=2.0, reference_distance=1.0):
    #Sample a random dist between 1 and 2 m 
    d = torch.empty(1, device=x.device).uniform_(min_distance, max_distance)
    #Compute the path loss scaling
    scaling = (reference_distance / d) ** path_loss_exponent
    return x * scaling


def transform_channel_effects(x, min_distance=1.0, max_distance=2.0, path_loss_exponent=2.0, reference_distance=1.0, noise_std=0.01, apply_fading=True, apply_phase=True):
    # Sample a random distance between min and max.
    d = torch.empty(1, device=x.device).uniform_(min_distance, max_distance)
    
    # Compute path loss scaling.
    scaling = (reference_distance / d) ** path_loss_exponent
    x_transformed = x * scaling

    # Apply fading (e.g., Rayleigh fading)
    if apply_fading:
        # Generate Rayleigh fading factor: sqrt(X^2 + Y^2) for two independent gaussians.
        fading = torch.sqrt(torch.randn_like(x_transformed)**2 + torch.randn_like(x_transformed)**2)
        x_transformed = x_transformed * fading

    # Add AWGN noise.
    noise = torch.randn_like(x_transformed) * noise_std
    x_transformed = x_transformed + noise

    # Apply phase rotation (for IQ signals, if represented in a complex manner or separate I/Q channels).
    if apply_phase:
        # For simplicity, assume x is a real tensor with I and Q channels concatenated.
        # You might want to construct a complex representation. Here we apply the same rotation on both channels.
        phase = torch.empty(1, device=x.device).uniform_(0, 2 * np.pi)
        x_transformed = x_transformed * torch.cos(phase)  # simple approximation for phase rotation

    return x_transformed



In [6]:
def targeted_eot_pgd_attack(model, x, y, target_label, eps, alpha, num_iter, eot_iter):
    """
    Targeted PGD attack with Expectation Over Transformation (EOT) using a distance-based path loss transformation.

    Args:
        model (nn.Module): The model to attack.
        x (torch.Tensor): Original input tensor.
        y (torch.Tensor): True labels.
        target_label (int or torch.Tensor): The target class for the attack.
        eps (float): Maximum perturbation.
        alpha (float): Step size.
        num_iter (int): Number of PGD iterations.
        eot_iter (int): Number of EOT iterations per PGD step.

    Returns:
        torch.Tensor: Adversarial examples.
    """
    model.eval()
    
    if isinstance(target_label, int):
        target_label = torch.full_like(y, target_label)
    
    x_adv = x.clone().detach().to(x.device)
    x_adv.requires_grad = True

    loss_fn = nn.CrossEntropyLoss()

    for _ in range(num_iter):
        grad_accum = torch.zeros_like(x_adv)
        
        # Run multiple transformations for EOT and accumulate gradients.
        for _ in range(eot_iter):
            # Clone the current adversarial input and set requires_grad.
            new_inputs = x_adv.detach().clone().requires_grad_()
            transformed = transform_channel_effects(new_inputs)
            
            outputs = model(transformed)
            # For a targeted attack, minimize negative loss to push predictions toward target_label.
            loss = -loss_fn(outputs, target_label)
            
            # Compute gradient with respect to new_inputs.
            grad = torch.autograd.grad(loss, new_inputs)[0]
            grad_accum += grad
        
        # Average the accumulated gradients.
        grad_avg = grad_accum / eot_iter
        
        # Update adversarial input with the sign of the averaged gradient.
        x_adv = x_adv.detach() + alpha * grad_avg.sign()
        
        # Project the perturbation back to the epsilon ball and clip to valid range.
        x_adv = torch.min(torch.max(x_adv, x - eps), x + eps)
        x_adv = torch.clamp(x_adv, 0, 1)
        x_adv.requires_grad = True

    return x_adv.detach()

In [8]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F

print(f"Loading data from: {IQ_FILE_PATH}")
data, labels = load_iq_data(IQ_FILE_PATH)
data, labels = data.to(DEVICE), labels.to(DEVICE)

print("Original data shape: ", data.shape)  # Expected shape: [1, 2, N]

model.eval()

with torch.no_grad():
    logits = model(data)  # Raw model output
    probs = F.softmax(logits, dim=1)  # Convert to probabilities
    original_pred = torch.argmax(probs, dim=1)  # Get predicted class
    confidence = probs.max(dim=1).values  # Get confidence score of predicted class

print(f"Original prediction: {original_pred.cpu().numpy()}, Confidence: {confidence.cpu().item():.4f}")

# Make adversarial example with targeted EOT PGD attack
x_adv = targeted_eot_pgd_attack(
    model=model,
    x=data,
    y=labels,
    target_label=TARGET_LABEL,  
    eps=EPSILON,
    alpha=ALPHA,
    num_iter=ATTACK_ITERATIONS,
    eot_iter=EOT_ITERATIONS
)



Loading data from: /home/jfeng/Desktop/jfeng/rf_spoofing/spoofing/1m_2m_replacedPluto4/1m_2m/Pluto_0_2m_run0.iq
Original data shape:  torch.Size([1, 2, 8192])
Original prediction: [5], Confidence: 0.9728


In [8]:

perturbations = x_adv - data  # Get the difference between adversarial and original data
perturbations_np = perturbations.cpu().detach().numpy()

data_np = data.cpu().detach().numpy()
x_adv_np = x_adv.cpu().detach().numpy()

I_orig = data_np[0, 0, :]  # Original In-phase component
Q_orig = data_np[0, 1, :]  # Original Quadrature component

I_adv = x_adv_np[0, 0, :]  # Adversarial In-phase component
Q_adv = x_adv_np[0, 1, :]  # Adversarial Quadrature component

I_diff = perturbations_np[0, 0, :]  # Perturbation (I channel)
Q_diff = perturbations_np[0, 1, :]  # Perturbation (Q channel)

interleaved_perturbations = np.empty((I_diff.size + Q_diff.size,), dtype=np.float32)
interleaved_perturbations[0::2] = I_diff  # Place I values at even indices
interleaved_perturbations[1::2] = Q_diff  # Place Q values at odd indices

# Save the interleaved perturbation IQ sequence as a binary .iq file
FILENAME = "2m_0target3_noise.iq"
interleaved_perturbations.astype("float32").tofile(FILENAME)



# Interleave I and Q to form an IQIQIQ... sequence
interleaved = np.empty((I_diff.size + Q_diff.size,), dtype=I_diff.dtype)
interleaved[0::2] = I_diff
interleaved[1::2] = Q_diff

print("First 10 values of the interleaved perturbation sequence:", interleaved[:10])
np.savetxt("perturbations_sequence.txt", interleaved, fmt='%f')

# Evaluate the adversarial example
with torch.no_grad():
    adv_logits = model(x_adv)  # Raw output for adversarial example
    adv_probs = F.softmax(adv_logits, dim=1)  # Convert to probabilities
    adv_pred = torch.argmax(adv_probs, dim=1)  # Get predicted class
    adv_confidence = adv_probs.max(dim=1).values  # Get confidence score of predicted class
    
    # Print out confidence for class 3 regardless of prediction.
    # Assuming a single example (batch size = 1)
    confidence_class3 = adv_probs[0, 3].item()

print(f"Adversarial prediction: {adv_pred.cpu().numpy()}, Confidence: {adv_confidence.cpu().item():.4f}")
print(f"Confidence for class 3: {confidence_class3:.4f}")
print("First perturbation value (I channel):", I_diff[0])


First 10 values of the interleaved perturbation sequence: [-5.4933305e-04 -1.1291846e-03  2.9999999e-02  2.9999999e-02
 -6.1037004e-05 -7.5669959e-10  5.0000001e-02  4.9694814e-02
  3.8962372e-02  3.0000001e-02]
Adversarial prediction: [3], Confidence: 0.9998
Confidence for class 3: 0.9998
First perturbation value (I channel): -0.00054933305


Here below is the EOT PGD that uses transforms_channel_effects to simulate realistic channel conditions over distances from min = 5 to max = 7 meters

I have a random start in the targeted eot pgd attack to introduce randomness in starting point

For each pgd step the code performs eot integrations transformations

In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

def transform_distance_path_loss(x, min_distance=5.0, max_distance=7.0, path_loss_exponent=2.0, reference_distance=1.0):
    # Sample a random distance between min_distance and max_distance (e.g., 5 to 7 m)
    d = torch.empty(1, device=x.device).uniform_(min_distance, max_distance)
    # Compute the path loss scaling factor
    scaling = (reference_distance / d) ** path_loss_exponent
    return x * scaling

def transform_channel_effects(x, min_distance=5.0, max_distance=7.0, path_loss_exponent=2.0, reference_distance=1.0, noise_std=0.01, apply_fading=True, apply_phase=True):
    # Sample a random distance between min_distance and max_distance
    d = torch.empty(1, device=x.device).uniform_(min_distance, max_distance)
    
    # Compute path loss scaling
    scaling = (reference_distance / d) ** path_loss_exponent
    x_transformed = x * scaling

    # Optionally apply fading (e.g., Rayleigh fading)
    if apply_fading:
        fading = torch.sqrt(torch.randn_like(x_transformed)**2 + torch.randn_like(x_transformed)**2)
        x_transformed = x_transformed * fading

    # Add AWGN noise
    noise = torch.randn_like(x_transformed) * noise_std
    x_transformed = x_transformed + noise

    # Optionally apply phase rotation (for IQ signals)
    if apply_phase:
        phase = torch.empty(1, device=x.device).uniform_(0, 2 * np.pi)
        x_transformed = x_transformed * torch.cos(phase)
    
    return x_transformed

def targeted_eot_pgd_attack(model, x, y, target_label, eps, alpha, num_iter, eot_iter, use_random_start=True):
    """
    Targeted PGD attack with Expectation Over Transformation (EOT) that applies
    a random transformation (channel effects and/or distance path loss) every EOT iteration.
    
    The idea is that in every PGD step, we generate multiple transformed versions
    of the adversarial sample (simulating different distances, channel conditions, etc.),
    average the computed gradients, and update the adversarial example accordingly.
    """
    model.eval()
    
    if isinstance(target_label, int):
        target_label = torch.full_like(y, target_label)
    
    x_adv = x.clone().detach().to(x.device)
    
    if use_random_start:
        x_adv = x_adv + torch.empty_like(x_adv).uniform_(-eps, eps)
        x_adv = torch.clamp(x_adv, 0, 1).detach()
    
    loss_fn = nn.CrossEntropyLoss()

    for step in range(num_iter):
        grad_accum = torch.zeros_like(x_adv)
        x_adv.requires_grad = True

        for _ in range(eot_iter):
            new_inputs = x_adv.detach().clone().requires_grad_()
            transformed = transform_channel_effects(new_inputs, min_distance=5.0, max_distance=7.0)

            transformed = F.adaptive_avg_pool1d(transformed, 8192)
            
            outputs = model(transformed)
        
            loss = -loss_fn(outputs, target_label)
            grad = torch.autograd.grad(loss, new_inputs)[0]
            grad_accum += grad

        # Average the accumulated gradients over EOT iterations
        grad_avg = grad_accum / eot_iter

        # Update adversarial input using the sign of the averaged gradient.
        x_adv = x_adv.detach() + alpha * grad_avg.sign()

        # Project the updated adversarial sample back into the epsilon ball and valid range [0, 1]
        x_adv = torch.min(torch.max(x_adv, x - eps), x + eps)
        x_adv = torch.clamp(x_adv, 0, 1).detach()

    return x_adv



In [14]:

eps = 8 / 255
alpha = 2 / 255
num_iter = 10
eot_iter = 2
target_class = 3  # example target label

# Generate adversarial examples using the targeted EOT PGD attack
x_adv = targeted_eot_pgd_attack(model, data, labels, target_class, eps, alpha, num_iter, eot_iter)

# Calculate the perturbations (difference between adversarial and original data)
perturbations = x_adv - data  
perturbations_np = perturbations.cpu().detach().numpy()

data_np = data.cpu().detach().numpy()
x_adv_np = x_adv.cpu().detach().numpy()

# Assume IQ samples have two channels: In-phase (I) and Quadrature (Q)
I_orig = data_np[0, 0, :]  # Original I channel
Q_orig = data_np[0, 1, :]  # Original Q channel

I_adv = x_adv_np[0, 0, :]  # Adversarial I channel
Q_adv = x_adv_np[0, 1, :]  # Adversarial Q channel

I_diff = perturbations_np[0, 0, :]  # Perturbation on I channel
Q_diff = perturbations_np[0, 1, :]  # Perturbation on Q channel

# Interleave I and Q perturbations (I0, Q0, I1, Q1, ...)
interleaved_perturbations = np.empty((I_diff.size + Q_diff.size,), dtype=np.float32)
interleaved_perturbations[0::2] = I_diff
interleaved_perturbations[1::2] = Q_diff

# Save the interleaved perturbation IQ sequence as a binary .iq file
FILENAME = "2m_0target3_noise.iq"
interleaved_perturbations.astype("float32").tofile(FILENAME)

# Also save the perturbation sequence to a text file for inspection
np.savetxt("perturbations_sequence.txt", interleaved_perturbations, fmt='%f')
print("First 10 values of the interleaved perturbation sequence:", interleaved_perturbations[:10])

# Evaluate the adversarial example by checking model predictions
with torch.no_grad():
    # Apply the same pooling to x_adv before evaluation to match the input size expected by the model.
    x_adv_evaluated = F.adaptive_avg_pool1d(x_adv, 8192)
    adv_logits = model(x_adv_evaluated)
    adv_probs = F.softmax(adv_logits, dim=1)
    adv_pred = torch.argmax(adv_probs, dim=1)
    adv_confidence = adv_probs.max(dim=1).values
    
    # For demonstration, print confidence for class 3
    confidence_class3 = adv_probs[0, 3].item()

print(f"Adversarial prediction: {adv_pred.cpu().numpy()}, Confidence: {adv_confidence.cpu().item():.4f}")
print(f"Confidence for class 3: {confidence_class3:.4f}")
print("First perturbation value (I channel):", I_diff[0])


First 10 values of the interleaved perturbation sequence: [ 0.01568628  0.00671395 -0.00073244  0.01516746  0.00784314  0.01570334
  0.00680551  0.00753795 -0.00103763  0.02352941]
Adversarial prediction: [3], Confidence: 0.7959
Confidence for class 3: 0.7959
First perturbation value (I channel): 0.015686275


In [None]:

2m_0target3_pathloss.iq