In [1]:
# imports
import pandas as pd
import numpy as np
import os
from sklearn.ensemble import BaggingClassifier
from sklearn import svm
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    classification_report,
    precision_recall_fscore_support,
    balanced_accuracy_score,
)
from tqdm import tqdm
from scipy.stats import skew, kurtosis
from pywt import wavedec

## Computing the wavelet transforms

In [None]:
def load_data_to_numpy_array(file: str, packet_size: int) -> pd.DataFrame:
    """

    Load data from file to a pandas dataframe


    Args:

    file : str : file path


    Returns:

    df : pd.DataFrame : dataframe with the data
    """

    data = np.fromfile(file, dtype=np.float32)

    data = data.reshape(data.shape[0] // 2, 2)

    df = pd.DataFrame(data, columns=["Z_I", "Z_Q"])

    df = df.drop(df.tail(df.shape[0] % packet_size).index)

    data = df["Z_I"] + 1j * df["Z_Q"]

    data = np.reshape(data, (data.shape[0] // packet_size, packet_size))

    return data


def get_stats_from_dwt(row):
    """

    Get statistics from DWT


    Args:

    row : np.array : row containing the IQ values
    """

    coeffs = wavedec(row, "db1", level=3)
    # print(coeffs)
    stats = []

    for coeff in coeffs:
        stats.append(np.mean(coeff))  # mean
        stats.append(np.std(coeff))  # std
        stats.append(skew(coeff))  # skew
        stats.append(kurtosis(coeff))  # kurtosis

        stats.append(np.sum(np.square(coeff)))  # energy

    return stats


packet_sizes = {
    "zigbee": 127 * 8,
    "bluetooth": 258 * 8,
    "wifi": 2304 * 8,
}

CURRENT_PROTOCOL = "zigbee"
raw_files = [x for x in os.listdir("data") if x.endswith("iq")]


for i, file in tqdm(enumerate(raw_files)):
    print(f"Starting with file {file}")
    data = load_data_to_numpy_array(f"data\\{file}", packet_sizes[CURRENT_PROTOCOL])
    data = np.apply_along_axis(np.real, 1, data)
    data = np.apply_along_axis(get_stats_from_dwt, 1, data)
    data = np.array(data)
    np.save(f"data\\{file[:-3]}_{CURRENT_PROTOCOL}.npy", data)

## Training and testing model

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import svm
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    precision_recall_fscore_support,
    precision_recall_curve,
    roc_curve,
    roc_auc_score,
)

# ----------------- MODEL SET A (Handcrafted stats from .npy) -----------------
loaded_files_order = [x for x in os.listdir("data") if x.endswith(".iq")]
df = pd.read_parquet("data\\packets_1016_zigbee.parquet")

# Load all features from .npy
all_data = np.empty((0, 20))
for file in loaded_files_order:
    data = np.load(f"data\\{file[:-3]}_zigbee.npy")
    all_data = np.concatenate((all_data, data), axis=0)

# Construct column names based on format
order = ["mean", "std", "skew", "kurtosis", "energy"]
col_names = [
    [f"{x}_{i}" for x in order] for i in range(all_data.shape[1] // len(order))
]
col_names = [item for sublist in col_names for item in sublist]
df[col_names] = pd.DataFrame(all_data)

df.dropna(inplace=True)
df.label = df.label.astype(np.uint8) // 2

# Drop unwanted features
df = df.drop(columns=[col for col in df.columns if "skew" in col or "kurtosis" in col])
df = df.drop(
    columns=[
        "Z_I",
        "Z_Q",
        "R_I",
        "R_phase",
        "Z_mag",
        "Z_phase",
        "magnitude_error",
        "E_I",
        "E_mag",
        "E_phase",
        "phase_error",
    ]
)
X_a = df.drop(columns=["label"])
print(X_a.columns)
y_a = df["label"]

# ----------------- MODEL SET B (Signal Features) -----------------
df_packets = pd.read_parquet("data\\packets_1016_zigbee.parquet")
df_packets["label"] = df_packets["label"].astype(int) // 2
X_b = df_packets[["Z_I", "Z_Q", "IQ_offset", "magnitude_error", "phase_error"]]
print(X_b.columns)
y_b = df_packets["label"]


# ----------------- EVALUATION FUNCTION -----------------
def evaluate_models(X, y):
    metrics = {
        "prec_curves": [],
        "rec_curves": [],
        "fpr_curves": [],
        "tpr_curves": [],
        "auc_roc": [],
        "accuracy_scores": [],
        "recall_scores": [],
        "precision_scores": [],
        "f1_scores": [],
    }
    recall_range = np.linspace(0, 1, 100)
    fpr_range = np.linspace(0, 1, 100)
    precision_interp_total = np.zeros_like(recall_range)
    tpr_interp_total = np.zeros_like(fpr_range)

    for i in range(6):
        y_binary = np.where(y == i, 1, 0)
        X_train, X_test, y_train, y_test = train_test_split(X, y_binary, test_size=0.2)

        model = svm.LinearSVC(class_weight="balanced", max_iter=10000)
        model.fit(X_train, y_train)
        y_scores = model.decision_function(X_test)
        y_pred = model.predict(X_test)

        precision, recall, _ = precision_recall_curve(y_test, y_scores)
        fpr, tpr, _ = roc_curve(y_test, y_scores)
        auc_roc = roc_auc_score(y_test, y_scores)

        metrics["prec_curves"].append(precision)
        metrics["rec_curves"].append(recall)
        metrics["fpr_curves"].append(fpr)
        metrics["tpr_curves"].append(tpr)
        metrics["auc_roc"].append(auc_roc)
        metrics["accuracy_scores"].append(accuracy_score(y_test, y_pred))
        metrics["recall_scores"].append(recall_score(y_test, y_pred))
        metrics["precision_scores"].append(precision_score(y_test, y_pred))
        metrics["f1_scores"].append(
            precision_recall_fscore_support(y_test, y_pred, average="weighted")[2]
        )

        # For averaging
        precision_interp_total += np.interp(recall_range, recall[::-1], precision[::-1])
        tpr_interp_total += np.interp(fpr_range, fpr, tpr)

    metrics["recall_range"] = recall_range
    metrics["fpr_range"] = fpr_range
    metrics["avg_precision"] = precision_interp_total / 6
    metrics["avg_tpr"] = tpr_interp_total / 6
    return metrics


# Evaluate both sets
results_a = evaluate_models(X_a, y_a)
results_b = evaluate_models(X_b, y_b)

# ----------------- FIGURE 1: PRECISION-RECALL CURVES (SWAPPED) -----------------
fig1, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

# Set B (now on the left)
for i in range(6):
    ax1.plot(
        results_b["rec_curves"][i], results_b["prec_curves"][i], label=f"Label {i}"
    )
ax1.plot(
    results_b["recall_range"],
    results_b["avg_precision"],
    label="Avg",
    color="black",
    linestyle="--",
    linewidth=2,
)
ax1.set_title("PR Curves – Without frequency statistics")
ax1.set_xlabel("Recall")
ax1.set_ylabel("Precision")
ax1.grid(True)
ax1.legend()

# Set A (now on the right)
for i in range(6):
    ax2.plot(
        results_a["rec_curves"][i], results_a["prec_curves"][i], label=f"Label {i}"
    )
ax2.plot(
    results_a["recall_range"],
    results_a["avg_precision"],
    label="Avg",
    color="black",
    linestyle="--",
    linewidth=2,
)
ax2.set_title("PR Curves – With frequency statistics")
ax2.set_xlabel("Recall")
ax2.set_ylabel("Precision")
ax2.grid(True)
ax2.legend()

plt.tight_layout()
plt.savefig("figures/PR_curves_3.png")
plt.show()

# ----------------- FIGURE 2: ROC CURVES (SWAPPED) -----------------
fig2, (ax3, ax4) = plt.subplots(1, 2, figsize=(14, 6))

# Set B (now on the left)
for i in range(6):
    ax3.plot(
        results_b["fpr_curves"][i],
        results_b["tpr_curves"][i],
        label=f"Label {i} (AUC={results_b['auc_roc'][i]:.2f})",
    )
ax3.plot(
    results_b["fpr_range"],
    results_b["avg_tpr"],
    label="Avg",
    color="black",
    linestyle="--",
    linewidth=2,
)
ax3.set_title("ROC Curves – Without frequency statistics")
ax3.set_xlabel("False Positive Rate")
ax3.set_ylabel("True Positive Rate")
ax3.grid(True)
ax3.legend()

# Set A (now on the right)
for i in range(6):
    ax4.plot(
        results_a["fpr_curves"][i],
        results_a["tpr_curves"][i],
        label=f"Label {i} (AUC={results_a['auc_roc'][i]:.2f})",
    )
ax4.plot(
    results_a["fpr_range"],
    results_a["avg_tpr"],
    label="Avg",
    color="black",
    linestyle="--",
    linewidth=2,
)
ax4.set_title("ROC Curves – With frequency statistics")
ax4.set_xlabel("False Positive Rate")
ax4.set_ylabel("True Positive Rate")
ax4.grid(True)
ax4.legend()

df_results_a = pd.DataFrame(
    {
        "accuracy": results_a["accuracy_scores"],
        "recall": results_a["recall_scores"],
        "precision": results_a["precision_scores"],
        "f1": results_a["f1_scores"],
        "auc": results_a["auc_roc"],
    }, index=range(6), columns=["Accuracy", "Recall", "Precision", "F1", "AUC"]	
)
df_results_a = df_results_a.round(3)
df_results_a.to_latex("tables/results_3.tex")

plt.tight_layout()
plt.savefig("figures/ROC_curves_3.png")
plt.show()