# GPS Spoofing Detection Pipeline

A beginner-friendly, fully-documented notebook implementing:

- Synthetic GNSS+IMU dataset generation with injected spoofing
- Feature engineering for residual and rolling statistics
- Supervised detection using **RandomForest** and **XGBoost** with **Stratified K-Fold CV**
- Optional **LSTM Autoencoder** anomaly detector (unsupervised-ish)
- **Automated PDF report** generation of results (via reportlab)

**How to use this notebook**:
1. Run cells **top-to-bottom**.
2. The pipeline saves charts and a PDF report under `outputs_cv/`.
3. Toggle `DO_AE` to enable/disable the Autoencoder section.

> Note: Installing heavy libraries (e.g., xgboost, torch) can take time depending on your environment.


## 1) Environment Setup
We install the PDF library used to produce a polished report of metrics and plots. If you're using a managed environment (e.g., Google Colab), this will fetch and install the package as needed.

In [ ]:
!pip install reportlab -q

## 2) Imports
Core scientific Python stack, ML models (RandomForest & XGBoost), metrics/utilities, and PyTorch for the optional LSTM Autoencoder. We also import `reportlab` to assemble a PDF report at the end.

In [ ]:
import os, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ML models
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (confusion_matrix, roc_auc_score, roc_curve,
                             precision_recall_fscore_support, accuracy_score)
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler

# Deep learning (optional Autoencoder)
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# PDF report
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image
from reportlab.lib.styles import getSampleStyleSheet

## 3) Configuration
Set random seeds for reproducibility, define the output directory, number of CV splits, and whether to run the Autoencoder section.

In [ ]:
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

SAVE_DIR = "outputs_cv"   # where outputs + report will be saved
N_SPLITS = 5               # K-fold cross validation
DO_AE = True               # Run autoencoder anomaly detector?


## 4) Utility Helpers
Small helper(s) used throughout the notebook.

In [ ]:
def ensure_dir(path: str) -> None:
    """Make sure a directory exists."""
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True)


## 5) Synthetic GNSS + IMU Dataset with Spoofing Injection
We generate a plausible trajectory and derive displacement, speed, and heading. GNSS observations are corrupted by noise and additionally **spoofed** over specified windows.

- *Abrupt spoof*: sudden offsets in latitude/longitude.
- *Gradual spoof*: linearly increasing offsets.

We also simulate signal quality (SNR) and satellite count, which can carry weak signals of spoofing in some scenarios.

In [ ]:
def generate_synthetic_gnss_imu(n_points=2000,
                                spoof_windows=[(400,480),(1100,1180),(1600,1680)],
                                abrupt_prob=0.6) -> pd.DataFrame:
    """
    Generate synthetic GNSS + IMU data with injected spoofing attacks.
    """

    deg_to_m = 111320  # degrees latitude ≈ meters
    # True trajectory
    lat = 51.75 + np.cumsum(np.random.normal(0, 0.00002, n_points))
    lon = -1.25 + np.cumsum(np.random.normal(0, 0.00002, n_points))

    # True displacement
    dlat = np.concatenate([[0], np.diff(lat)])
    dlon = np.concatenate([[0], np.diff(lon)])
    disp_m = np.sqrt((dlat*deg_to_m)**2 + (dlon*deg_to_m*np.cos(np.deg2rad(lat)))**2)
    speed = np.clip(disp_m, 0, None)
    heading = (np.degrees(np.arctan2(dlon, np.where(dlat==0, 1e-9, dlat))) + 360) % 360

    # IMU data
    accel = np.concatenate([[0], np.diff(speed)]) + np.random.normal(0,0.02,n_points)
    gyro = np.concatenate([[0], np.diff(heading)]) + np.random.normal(0,0.1,n_points)

    # GNSS (with noise)
    gnss_lat = lat + np.random.normal(0, 1e-5, n_points)
    gnss_lon = lon + np.random.normal(0, 1e-5, n_points)

    # Spoofing injection
    spoofed = np.zeros(n_points, dtype=int)
    for (s,e) in spoof_windows:
        length = e - s
        if np.random.rand() < abrupt_prob:
            # Abrupt spoof
            gnss_lat[s:e] += np.random.normal(0.0009, 0.0003)
            gnss_lon[s:e] += np.random.normal(-0.0009, 0.0003)
        else:
            # Gradual spoof
            gnss_lat[s:e] += np.linspace(0, 0.0012, length) + np.random.normal(0,2e-5,length)
            gnss_lon[s:e] += np.linspace(0, -0.0012, length) + np.random.normal(0,2e-5,length)
        spoofed[s:e] = 1

    # Signal quality
    snr = 30 + np.random.normal(0,1,n_points)
    snr[spoofed==1] += np.random.normal(2.0,0.8, spoofed.sum())
    sat_count = np.random.randint(6,12,n_points)

    return pd.DataFrame({
        'time': np.arange(n_points),
        'true_lat': lat, 'true_lon': lon,
        'gnss_lat': gnss_lat, 'gnss_lon': gnss_lon,
        'speed': speed, 'heading': heading,
        'imu_ax': accel, 'imu_gyro': gyro,
        'snr': snr, 'sat_count': sat_count,
        'spoofed': spoofed
    })

def load_or_generate_dataset():
    """Load dataset (here we generate synthetic)."""
    return generate_synthetic_gnss_imu()


## 6) Feature Engineering
We compute GNSS displacement, compare it with inertial displacement, and build residuals/rolling statistics. We also include heading change and signal quality metrics.

In [ ]:
def extract_features(df: pd.DataFrame):
    """
    Compute engineered features (displacement, residuals, rolling stats).
    """
    d = df.copy()
    deg_to_m = 111320

    d['dlat_g'] = d['gnss_lat'].diff().fillna(0)
    d['dlon_g'] = d['gnss_lon'].diff().fillna(0)
    d['dlat_m_g'] = d['dlat_g'] * deg_to_m
    d['dlon_m_g'] = d['dlon_g'] * deg_to_m * np.cos(np.deg2rad(d['gnss_lat']))
    d['disp_m_g'] = np.sqrt(d['dlat_m_g']**2 + d['dlon_m_g']**2)
    d['disp_m_ins'] = d['speed']
    d['disp_residual'] = d['disp_m_g'] - d['disp_m_ins']

    # Rolling stats
    for w in [3,5,11]:
        d[f'disp_mean_{w}'] = d['disp_m_g'].rolling(w, min_periods=1).mean()
        d[f'disp_std_{w}'] = d['disp_m_g'].rolling(w, min_periods=1).std().fillna(0)

    # Heading change
    dheading = np.abs(np.diff(np.pad(d['heading'].values,(1,0),'edge')))
    d['dheading'] = dheading

    features = ['gnss_lat','gnss_lon','speed','heading','disp_m_g','disp_m_ins',
                'disp_residual','disp_mean_3','disp_std_3','disp_mean_5','disp_std_5',
                'snr','sat_count','dheading']
    X = d[features].fillna(0)
    y = d['spoofed'].astype(int).values
    return X, y


## 7) Metrics & Plotting Utilities
Reusable helpers for computing summary metrics and generating figures saved to disk.

In [ ]:
def compute_metrics(y_true,y_pred,y_prob):
    """Compute accuracy, precision, recall, F1, ROC AUC."""
    acc = accuracy_score(y_true,y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true,y_pred,average="binary",zero_division=0)
    auc = roc_auc_score(y_true,y_prob)
    return {"accuracy":acc,"precision":prec,"recall":rec,"f1":f1,"roc_auc":auc}

def plot_confusion(y_true,y_pred,title,path):
    cm = confusion_matrix(y_true,y_pred)
    fig,ax=plt.subplots(figsize=(3,3))
    im=ax.imshow(cm,cmap="Blues")
    for (i,j),val in np.ndenumerate(cm):
        ax.text(j,i,str(val),ha="center",va="center")
    ax.set_xticks([0,1]); ax.set_yticks([0,1])
    ax.set_xticklabels(["Clean","Spoof"]); ax.set_yticklabels(["Clean","Spoof"])
    ax.set_xlabel("Predicted"); ax.set_ylabel("True"); ax.set_title(title)
    fig.tight_layout(); fig.savefig(path,dpi=150); plt.close(fig)

def plot_roc(y_true,y_prob,title,path):
    fpr,tpr,_=roc_curve(y_true,y_prob)
    auc=roc_auc_score(y_true,y_prob)
    fig,ax=plt.subplots(figsize=(4,3))
    ax.plot(fpr,tpr,label=f"AUC={auc:.3f}")
    ax.plot([0,1],[0,1],'--',c="gray")
    ax.set_xlabel("FPR"); ax.set_ylabel("TPR"); ax.set_title(title); ax.legend()
    fig.tight_layout(); fig.savefig(path,dpi=150); plt.close(fig)

def plot_metric_bar(metrics_dict,title,path):
    fig,ax=plt.subplots(figsize=(5,3))
    names=list(metrics_dict.keys()); vals=list(metrics_dict.values())
    ax.bar(names,vals)
    ax.set_ylim(0,1.05); ax.set_title(title)
    for i,v in enumerate(vals):
        ax.text(i,v+0.01,f"{v:.3f}",ha="center")
    fig.tight_layout(); fig.savefig(path,dpi=150); plt.close(fig)

def plot_summary_bar(avg_tbl,path):
    fig,ax=plt.subplots(figsize=(6,3))
    metrics=["accuracy","precision","recall","f1","roc_auc"]
    for i,m in enumerate(metrics):
        ax.bar([x+i*0.2 for x in range(len(avg_tbl))], avg_tbl[m],width=0.2,label=m)
    ax.set_xticks(range(len(avg_tbl))); ax.set_xticklabels(avg_tbl["model"])
    ax.legend(); ax.set_ylim(0,1.05); ax.set_title("Average Performance Comparison")
    fig.tight_layout(); fig.savefig(path,dpi=150); plt.close(fig)


## 8) LSTM Autoencoder (Optional)
Train an LSTM autoencoder on **clean** samples only. Reconstruction error serves as the anomaly score.

- We scale inputs with `StandardScaler`.
- The 95th percentile of clean reconstruction error sets the anomaly threshold.
- We also compute classification-style metrics by thresholding errors.

In [ ]:
class LSTMAE(nn.Module):
    def __init__(self,n_features,emb_size=16):
        super().__init__()
        self.encoder=nn.LSTM(n_features,emb_size,batch_first=True)
        self.decoder=nn.LSTM(emb_size,n_features,batch_first=True)
    def forward(self,x):
        _,(h,_) = self.encoder(x)
        z=h.repeat(x.size(1),1,1).permute(1,0,2)
        out,_=self.decoder(z)
        return out

def train_lstm_autoencoder(X,y,save_dir,epochs=15,bs=32):
    """Train LSTM autoencoder on clean data only and produce metrics/plots."""
    scaler=StandardScaler(); Xs=scaler.fit_transform(X)
    clean=Xs[y==0]
    data=torch.tensor(clean,dtype=torch.float32).unsqueeze(1)
    loader=DataLoader(TensorDataset(data,data),batch_size=bs,shuffle=True)

    model=LSTMAE(X.shape[1]); opt=torch.optim.Adam(model.parameters(),lr=1e-3)
    lossfn=nn.MSELoss(); losses=[]
    for ep in range(1,epochs+1):
        tot=0
        for xb,_ in loader:
            opt.zero_grad(); recon=model(xb); loss=lossfn(recon,xb); loss.backward(); opt.step()
            tot+=loss.item()*len(xb)
        epoch_loss=tot/len(loader.dataset); losses.append(epoch_loss)
        print(f"[AE] Epoch {ep}/{epochs} | loss={epoch_loss:.6f}")

    # Save loss curve
    fig,ax=plt.subplots(); ax.plot(range(1,epochs+1),losses,marker="o")
    ax.set_title("Autoencoder Training Loss"); ax.set_xlabel("Epoch"); ax.set_ylabel("Loss")
    fig.savefig(os.path.join(save_dir,"ae_loss.png"),dpi=150); plt.close(fig)

    all_data=torch.tensor(scaler.transform(X),dtype=torch.float32).unsqueeze(1)
    with torch.no_grad():
        recon=model(all_data)
        errs=torch.mean((recon-all_data)**2,dim=(1,2)).numpy()
    th=np.percentile(errs[y==0],95)
    preds=(errs>th).astype(int)

    # Metrics for AE
    acc=accuracy_score(y,preds)
    prec,rec,f1,_=precision_recall_fscore_support(y,preds,average="binary",zero_division=0)
    auc=roc_auc_score(y,errs)
    metrics={"accuracy":acc,"precision":prec,"recall":rec,"f1":f1,"roc_auc":auc}

    # Error plot
    fig,ax=plt.subplots(figsize=(10,3))
    ax.plot(errs,label="Error"); ax.axhline(th,color="r",ls="--",label="Threshold")
    ax.scatter(np.where(y==1)[0], np.array(errs)[y==1], c="orange",s=10,label="Spoof")
    ax.set_title("Autoencoder Reconstruction Errors"); ax.legend()
    fig.tight_layout(); fig.savefig(os.path.join(save_dir,"ae_error_plot.png"),dpi=150); plt.close(fig)

    return {"model":model,"scaler":scaler,"errors":errs,"threshold":th,"losses":losses,"metrics":metrics}


## 9) Stratified K-Fold Cross-Validation (RandomForest & XGBoost)
We evaluate two supervised classifiers using stratified 5-fold cross-validation and save confusion matrices, ROC curves, and per-fold metric bars.

In [ ]:
def run_cross_validation(X,y,out_root,n_splits=5):
    """Run stratified K-fold cross-validation with RF + XGB."""
    ensure_dir(out_root)
    skf=StratifiedKFold(n_splits=n_splits,shuffle=True,random_state=RANDOM_SEED)
    rows=[]
    for fold_idx,(train_idx,test_idx) in enumerate(skf.split(X,y),1):
        print(f"\n--- Fold {fold_idx}/{n_splits} ---")
        fold_dir=os.path.join(out_root,f"fold_{fold_idx}"); ensure_dir(fold_dir)
        Xtr,Xte=X.iloc[train_idx],X.iloc[test_idx]; ytr,yte=y[train_idx],y[test_idx]

        # RandomForest
        rf=RandomForestClassifier(n_estimators=200,random_state=RANDOM_SEED)
        rf.fit(Xtr,ytr); ypred=rf.predict(Xte); yprob=rf.predict_proba(Xte)[:,1]
        rf_metrics=compute_metrics(yte,ypred,yprob)
        print("RF:",rf_metrics)
        plot_confusion(yte,ypred,"RF Confusion",os.path.join(fold_dir,"rf_confusion.png"))
        plot_roc(yte,yprob,"RF ROC",os.path.join(fold_dir,"rf_roc.png"))
        plot_metric_bar(rf_metrics,"RF Metrics",os.path.join(fold_dir,"rf_metrics.png"))
        rows.append({"model":"RandomForest","fold":fold_idx,**rf_metrics})

        # XGBoost
        xg=xgb.XGBClassifier(n_estimators=200,use_label_encoder=False,eval_metric="logloss",random_state=RANDOM_SEED)
        xg.fit(Xtr,ytr); ypred=xg.predict(Xte); yprob=xg.predict_proba(Xte)[:,1]
        xg_metrics=compute_metrics(yte,ypred,yprob)
        print("XGB:",xg_metrics)
        plot_confusion(yte,ypred,"XGB Confusion",os.path.join(fold_dir,"xgb_confusion.png"))
        plot_roc(yte,yprob,"XGB ROC",os.path.join(fold_dir,"xgb_roc.png"))
        plot_metric_bar(xg_metrics,"XGB Metrics",os.path.join(fold_dir,"xgb_metrics.png"))
        rows.append({"model":"XGBoost","fold":fold_idx,**xg_metrics})

    df=pd.DataFrame(rows)
    df.to_csv(os.path.join(out_root,"per_fold_metrics.csv"),index=False)
    avg=df.groupby("model")[['accuracy','precision','recall','f1','roc_auc']].mean().reset_index()
    avg.to_csv(os.path.join(out_root,"average_metrics.csv"),index=False)
    return avg


## 10) PDF Report Builder
Creates a single PDF collecting per-fold images, average tables, and autoencoder figures/threshold.

In [ ]:
def generate_pdf_report(avg_tbl, save_dir, ae_info=None, n_splits=5):
    """Build PDF report with tables and plots."""
    ensure_dir(save_dir)
    pdf_path = os.path.join(save_dir, "report.pdf")
    doc = SimpleDocTemplate(pdf_path, pagesize=letter)
    styles = getSampleStyleSheet()
    elems = []

    elems.append(Paragraph("GPS Spoofing Detection Report", styles['Title']))
    elems.append(Paragraph(f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}", styles['Normal']))
    elems.append(Spacer(1, 12))

    # Per-fold metrics table
    per_fold_csv = os.path.join(save_dir, "per_fold_metrics.csv")
    if os.path.exists(per_fold_csv):
        per_fold_df = pd.read_csv(per_fold_csv)
        elems.append(Paragraph("Per-Fold Metrics:", styles['Heading2']))
        elems.append(Paragraph(per_fold_df.to_html(index=False), styles['Normal']))
        elems.append(Spacer(1, 12))

    # Average metrics
    elems.append(Paragraph("Average Metrics (Cross Validation):", styles['Heading2']))
    elems.append(Paragraph(avg_tbl.to_html(index=False), styles['Normal']))
    elems.append(Spacer(1, 12))

    # Per-fold plots
    for fold_idx in range(1, n_splits+1):
        fold_dir = os.path.join(save_dir, f"fold_{fold_idx}")
        if not os.path.exists(fold_dir):
            continue
        elems.append(Paragraph(f"Results for Fold {fold_idx}:", styles['Heading2']))
        for model in ["rf","xgb"]:
            for plot in ["confusion","roc","metrics"]:
                img_path = os.path.join(fold_dir, f"{model}_{plot}.png")
                if os.path.exists(img_path):
                    elems.append(Image(img_path, width=400, height=250))
                    elems.append(Spacer(1, 12))

    # Autoencoder section
    if ae_info:
        elems.append(Paragraph("LSTM Autoencoder Results:", styles['Heading2']))
        metrics_df=pd.DataFrame([ae_info['metrics']]) if 'metrics' in ae_info else None
        if metrics_df is not None:
            elems.append(Paragraph(metrics_df.to_html(index=False), styles['Normal']))
            elems.append(Spacer(1, 10))
        elems.append(Paragraph(f"95th percentile threshold: {ae_info['threshold']:.4f}", styles['Normal']))
        loss_plot = os.path.join(save_dir,"ae_loss.png")
        ae_plot = os.path.join(save_dir,"ae_error_plot.png")
        if os.path.exists(loss_plot):
            elems.append(Image(loss_plot, width=400, height=250))
        if os.path.exists(ae_plot):
            elems.append(Image(ae_plot, width=400, height=150))

    doc.build(elems)
    print(f"\n✅ Full PDF report will be saved at: {pdf_path}")


## 11) Main: Run Everything
Execute this cell to:
1. Generate the dataset and features
2. Run CV for RandomForest & XGBoost (and save plots)
3. Optionally train the AE and generate its plots/metrics
4. Build the consolidated PDF report in `outputs_cv/report.pdf`

In [ ]:
def main():
    ensure_dir(SAVE_DIR)
    df = load_or_generate_dataset()
    print("Dataset shape:", df.shape)
    X, y = extract_features(df)
    print("Feature matrix:", X.shape, "| Labels:", y.shape)

    # Cross-validation and plots
    results = run_cross_validation(X, y, SAVE_DIR, N_SPLITS)
    print("\nAverage CV metrics:\n", results)

    # Optional Autoencoder
    ae_info = None
    if DO_AE:
        ae_info = train_lstm_autoencoder(X, y, SAVE_DIR, epochs=15, bs=32)
        print("\nAE metrics:\n", ae_info.get('metrics', {}))

    # PDF report
    generate_pdf_report(results, SAVE_DIR, ae_info, N_SPLITS)

if __name__=="__main__":
    main()


## 12) Appendix: Notes & Tips
- **Data realism**: The dataset is synthetic and intended for didactic purposes—expect different behavior on real logs.
- **Hyperparameters**: Feel free to tune `n_estimators`, learning rates, or add regularization to XGBoost.
- **Scaling**: Tree-based models generally don't need scaling; the AE does—handled internally.
- **Compute**: Autoencoders may benefit from a GPU; CPU works but can be slower.
- **Extensions**:
  - Try additional features (e.g., Doppler residuals, CN0 stats per-satellite).
  - Incorporate temporal windows (sequence models for the supervised side).
  - Calibrate thresholds using validation curves.
