# GPS Spoofing Detection Using Convolutional VAE

## 1. Load and Preprocess Data

In [None]:
import numpy as np

def create_windows(data_array, window_size=100, step=25):
    windows = []
    for start in range(0, data_array.shape[0] - window_size + 1, step):
        windows.append(data_array[start : start + window_size])
    return np.array(windows)

# Load training data (should be .npy files extracted from DS-GS and DS-GD)
real_data_gs = np.load('real_data_gs.npy')
real_data_gd = np.load('real_data_gd.npy')

train_windows = np.concatenate([
    create_windows(real_data_gs),
    create_windows(real_data_gd)
], axis=0)

print("Train window shape:", train_windows.shape)

## 2. Convert to PyTorch Dataset

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Convert numpy array to PyTorch tensor and adjust shape for Conv1d: [batch, channels, sequence_length]
train_tensor = torch.tensor(train_windows, dtype=torch.float32).permute(0, 2, 1)

# Wrap into TensorDataset and DataLoader
train_dataset = TensorDataset(train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

## 3. Define the 1D Convolutional VAE

In [None]:
import torch
import torch.nn as nn

class ConvVAE(nn.Module):
    def __init__(self, latent_dim=20):
        super(ConvVAE, self).__init__()
        self.enc_conv1 = nn.Conv1d(7, 16, kernel_size=3, stride=2, padding=1)
        self.enc_conv2 = nn.Conv1d(16, 32, kernel_size=3, stride=2, padding=1)
        self.flatten_dim = 32 * 25  # 输入长度100 -> 50 -> 25
        self.fc_mu = nn.Linear(self.flatten_dim, latent_dim)
        self.fc_logvar = nn.Linear(self.flatten_dim, latent_dim)

        self.fc_decode = nn.Linear(latent_dim, self.flatten_dim)
        self.dec_conv1 = nn.ConvTranspose1d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.dec_conv2 = nn.ConvTranspose1d(16, 7, kernel_size=3, stride=2, padding=1, output_padding=1)

        self.relu = nn.ReLU()

    def encode(self, x):
        h = self.relu(self.enc_conv1(x))
        h = self.relu(self.enc_conv2(h))
        h_flat = h.view(h.size(0), -1)
        return self.fc_mu(h_flat), self.fc_logvar(h_flat)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        return mu + torch.randn_like(std) * std

    def decode(self, z):
        h = self.relu(self.fc_decode(z)).view(-1, 32, 25)
        h = self.relu(self.dec_conv1(h))
        return self.dec_conv2(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

# Instantiate model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ConvVAE().to(device)


## 4. Train the VAE

In [None]:
import torch.optim as optim
import torch.nn.functional as F

optimizer = optim.Adam(model.parameters(), lr=1e-3)
beta = 1.0
num_epochs = 50

for epoch in range(1, num_epochs + 1):
    model.train()
    total_loss = 0

    for (x,) in train_loader:
        x = x.to(device)
        optimizer.zero_grad()
        recon_x, mu, logvar = model(x)

        # 重构损失
        recon_loss = F.mse_loss(recon_x, x, reduction='mean')

        # KL 散度损失
        kl_loss = -0.5 * torch.mean(torch.sum(
            1 + logvar - mu.pow(2) - logvar.exp(), dim=1))

        # 总损失
        loss = recon_loss + beta * kl_loss
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * x.size(0)

    avg_loss = total_loss / len(train_loader.dataset)
    print(f"Epoch {epoch}: Loss = {avg_loss:.6f}")


## 5. Estimate Threshold ρ from Training Errors

In [None]:
model.eval()
errors = []

with torch.no_grad():
    for (x,) in train_loader:
        x = x.to(device)
        recon_x, _, _ = model(x)
        # 对每个样本计算 MSE（逐样本平均）
        err = F.mse_loss(recon_x, x, reduction='none').mean(dim=(1, 2)).cpu().numpy()
        errors.extend(err)

# 设置重构误差阈值 ρ 为训练误差的第99百分位
import numpy as np
rho = np.percentile(errors, 99)
print("Reconstruction error threshold ρ =", rho)

## 6. Define Detection Function

In [None]:
def detect_window(window_data):
    """
    判断单个时间窗口是否为欺骗信号。
    参数：
        window_data: shape 为 [7, 100] 的 numpy 或 tensor，对应一个滑动窗口
    返回：
        (是否为欺骗信号: bool, 重构误差: float)
    """
    x = torch.tensor(window_data, dtype=torch.float32).unsqueeze(0).to(device)  # 添加 batch 维度

    with torch.no_grad():
        recon, _, _ = model(x)
        error = F.mse_loss(recon, x, reduction='mean').item()

    return error > rho, error
