In [None]:
import os
import pathlib
import numpy as np
import re
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score,accuracy_score
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

In [None]:
def loadPow(attack=None):
    cwd = os.getcwd()
    directory = os.path.join(cwd,"power_csv")
    pow_hm = {}
    powers,labels=[],[]
    device=""
    for path2 in os.scandir(pathlib.Path(directory)):
        if not path2.is_dir(): continue
        for path in os.scandir(pathlib.Path(path2)):
            if not path.is_dir(): continue
            powers_list=[]
            labels_list=[]
            device = os.path.basename(path)
            for file in os.scandir(pathlib.Path(path)):
                csv = os.path.basename(file)
                if "csv" in csv:
                    if attack:
                        if attack in csv:
                            df = pd.read_csv(file)
                            df = df[:int(len(df)*0.8)]
                            powers_list.append(df.iloc[:,1].to_numpy())
                            labels_list.append(df.iloc[:,2].to_numpy())
                    else:
                        df = pd.read_csv(file)
                        df = df[:int(len(df)*0.8)]
                        powers_list.append(df.iloc[:,1].to_numpy())
                        labels_list.append(df.iloc[:,2].to_numpy())
            if powers_list:
                powers=np.concatenate(powers_list)
                labels=np.concatenate(labels_list)
                pow_hm[device]=(powers,labels)
    return pow_hm

def loadPowTest(attack=None):
    cwd = os.getcwd()
    directory = os.path.join(cwd,"power_csv")
    pow_hm = {}
    powers,labels=[],[]
    device=""
    for path2 in os.scandir(pathlib.Path(directory)):
        if not path2.is_dir(): continue
        for path in os.scandir(pathlib.Path(path2)):
            if not path.is_dir(): continue
            powers_list=[]
            labels_list=[]
            device = os.path.basename(path)
            for file in os.scandir(pathlib.Path(path)):
                csv = os.path.basename(file)
                if "csv" in csv:
                    if attack:
                        if attack in csv:
                            df = pd.read_csv(file)
                            df = df[int(len(df)*0.8):]
                            powers_list.append(df.iloc[:,1].to_numpy())
                            labels_list.append(df.iloc[:,2].to_numpy())
                    else:
                        df = pd.read_csv(file)
                        df = df[int(len(df)*0.8):]
                        powers_list.append(df.iloc[:,1].to_numpy())
                        labels_list.append(df.iloc[:,2].to_numpy())
            if powers_list:
                powers=np.concatenate(powers_list)
                labels=np.concatenate(labels_list)
                pow_hm[device]=(powers,labels)
    return pow_hm
dic = loadPow()
dic_test = loadPowTest()


In [None]:
def make_windows(series, labels, w=30, s=10):
    X, y = [], []
    
    for i in range(0, len(series) - w, s):
        window_vals  = series[i:i+w]
        window_labels = labels[i:i+w]
        X.append(window_vals)
        ones = window_labels.sum()
        y.append(int(ones >= (w / 2)))             # 1 if â‰¥ half else 0
        
    X = torch.tensor(np.array(X), dtype=torch.float32) 
    y = torch.tensor(y, dtype=torch.long)
    return X, y

In [None]:
import numpy as np

def extract_features(windows):
    feats = []
    for w in windows:
        feats.append([
            torch.mean(w),
            torch.std(w),
            torch.min(w),
            torch.max(w),
            torch.median(w)
        ])
    
    
    return np.array(feats)




In [None]:
#overall performance
dic,dic_test=loadPow(),loadPowTest()
results=[]
for device in dic.keys():
    scaler=StandardScaler()
    X_raw,y_raw = np.array(dic[device][0]).reshape(-1, 1) , np.array(dic[device][1])
    X_scaled = scaler.fit_transform(X_raw).flatten() 
    X_train,y_train = make_windows(X_scaled, y_raw)
    X_test,y_test = np.array(dic_test[device][0]).reshape(-1, 1), np.array(dic_test[device][1])  
    X_test = scaler.transform(X_test).flatten()
    X_test,y_test = make_windows(X_test, y_test)
    X_train_feats = extract_features(X_train)
    X_test_feats  = extract_features(X_test)
    clf = RandomForestClassifier(n_estimators=100, random_state=42,class_weight='balanced')
    clf.fit(X_train_feats, y_train)
    y_pred = clf.predict(X_test_feats)
    f1_macro = f1_score(y_test, y_pred, average='macro')
    acc=accuracy_score(y_test, y_pred)
    results.append([device, acc, f1_macro])

df_results = pd.DataFrame(results, columns=["device", "accuracy", "f1_macro"])
df_results.to_csv("overall.csv", index=False)
df_results

## Per Attack

In [None]:
#per attack performance
attacks=["icmp","os","port","syn"]
results=[]
for attack in attacks:
    dic,dic_test=loadPow(attack),loadPowTest(attack)
    print(attack)
    for device in dic.keys():
        scaler=StandardScaler()
        X_raw,y_raw = np.array(dic[device][0]).reshape(-1, 1) , np.array(dic[device][1])
        X_scaled = scaler.fit_transform(X_raw).flatten() 
        X_train,y_train = make_windows(X_scaled, y_raw)
        X_test,y_test = np.array(dic_test[device][0]).reshape(-1, 1), np.array(dic_test[device][1])  
        X_test = scaler.transform(X_test).flatten()
        X_test,y_test = make_windows(X_test, y_test)
        X_train_feats = extract_features(X_train)
        X_test_feats  = extract_features(X_test)
        clf = RandomForestClassifier(n_estimators=100, random_state=42,class_weight='balanced')
        clf.fit(X_train_feats, y_train)
        y_pred = clf.predict(X_test_feats)
        f1_macro = f1_score(y_test, y_pred, average='macro')
        acc=accuracy_score(y_test, y_pred)
        results.append([ attack,device, acc, f1_macro])

# Save to CSV
df_results = pd.DataFrame(results, columns=[ "attack","device", "accuracy", "f1_macro"])
df_results
df_results.to_csv("per_attack.csv", index=False)

## Feature ablation study

In [None]:
# Full set of features if needed
def extract_features(windows):
    feats = []

    for w in windows:
        # Convert tensor -> numpy
        w_np = w.detach().cpu().numpy().astype(float)
        n = len(w_np)

        # ---------------------------
        # 1. Baseline Features (5)
        # ---------------------------
        mean = np.mean(w_np)
        std = np.std(w_np)
        mn = np.min(w_np)
        mx = np.max(w_np)
        med = np.median(w_np)

        # ---------------------------
        # 2. Temporal Features (6)
        # ---------------------------

        # Successive diffs
        diffs = np.diff(w_np)
        abs_diffs = np.abs(diffs)

        masc = np.mean(abs_diffs) if n > 1 else 0.0
        line_length = np.sum(abs_diffs) if n > 1 else 0.0

        # Autocorrelation lag-1
        if n > 1:
            acf1 = np.corrcoef(w_np[:-1], w_np[1:])[0,1]
        else:
            acf1 = 0.0
        acf1 = np.nan_to_num(acf1)

        # Autocorrelation lag-5
        if n > 5:
            acf5 = np.corrcoef(w_np[:-5], w_np[5:])[0,1]
        else:
            acf5 = 0.0
        acf5 = np.nan_to_num(acf5)

        # Zero-crossing rate
        zcr = np.mean((w_np[:-1] * w_np[1:]) < 0)

        # Slope (linear regression on index positions)
        idx = np.arange(n)
        slope = np.polyfit(idx, w_np, 1)[0]

        # ---------------------------
        # 3. Spectral Features (5)
        # ---------------------------
        # One-sided FFT magnitude squared = PSD
        fft_vals = np.fft.rfft(w_np - mean)
        psd = np.abs(fft_vals)**2 + 1e-12
        freqs = np.fft.rfftfreq(n, d=1.0)

        psd_sum = np.sum(psd)

        # Spectral centroid
        centroid = np.sum(freqs * psd) / psd_sum

        # Spectral bandwidth
        bandwidth = np.sqrt(np.sum(((freqs - centroid)**2) * psd) / psd_sum)

        # Spectral flatness
        flatness = np.exp(np.mean(np.log(psd))) / np.mean(psd)

        # Spectral entropy
        p = psd / psd_sum
        entropy = -np.sum(p * np.log2(p))

        # Dominant frequency amplitude
        dom_amp = np.max(psd)

        # Collect all features
        feats.append([
            mean, std, mn, mx, med,     # 5 baseline
            masc, acf1, acf5, zcr, line_length, slope,   # 6 temporal
            centroid, bandwidth, flatness, entropy, dom_amp  # 5 spectral
        ])

    return np.array(feats)


# Usage:
X_train_feats = extract_features(X_train)
X_test_feats  = extract_features(X_test)


## Idle vs Active seperation code

In [None]:
def loadPow(attack=None):
    cwd = os.getcwd()
    directory = os.path.join(cwd,"power_csv")
    pow_hm = {}
    powers,labels=[],[]
    device=""
    for path2 in os.scandir(pathlib.Path(directory)):
        if not path2.is_dir(): continue
        for path in os.scandir(pathlib.Path(path2)):
            if not path.is_dir(): continue
            powers_list=[]
            labels_list=[]
            device = os.path.basename(path)
            for file in os.scandir(pathlib.Path(path)):
                csv = os.path.basename(file)
                if "csv" in csv:
                    if attack:
                        if attack in csv and "active" in csv:
                            df = pd.read_csv(file)
                            df = df[:int(len(df)*0.8)]
                            powers_list.append(df.iloc[:,1].to_numpy())
                            labels_list.append(df.iloc[:,2].to_numpy())
                    else:
                        df = pd.read_csv(file)
                        df = df[:int(len(df)*0.8)]
                        powers_list.append(df.iloc[:,1].to_numpy())
                        labels_list.append(df.iloc[:,2].to_numpy())
            if powers_list:
                powers=np.concatenate(powers_list)
                labels=np.concatenate(labels_list)
                pow_hm[device]=(powers,labels)
    return pow_hm

dic_active= loadPow()

def loadPow(attack=None):
    cwd = os.getcwd()
    directory = os.path.join(cwd,"power_csv")
    pow_hm = {}
    powers,labels=[],[]
    device=""
    for path2 in os.scandir(pathlib.Path(directory)):
        if not path2.is_dir(): continue
        for path in os.scandir(pathlib.Path(path2)):
            if not path.is_dir(): continue
            powers_list=[]
            labels_list=[]
            device = os.path.basename(path)
            for file in os.scandir(pathlib.Path(path)):
                csv = os.path.basename(file)
                if "csv" in csv:
                    if attack:
                        if attack in csv and "idle" in csv:
                            df = pd.read_csv(file)
                            df = df[:int(len(df)*0.8)]
                            powers_list.append(df.iloc[:,1].to_numpy())
                            labels_list.append(df.iloc[:,2].to_numpy())
                    else:
                        df = pd.read_csv(file)
                        df = df[:int(len(df)*0.8)]
                        powers_list.append(df.iloc[:,1].to_numpy())
                        labels_list.append(df.iloc[:,2].to_numpy())
            if powers_list:
                powers=np.concatenate(powers_list)
                labels=np.concatenate(labels_list)
                pow_hm[device]=(powers,labels)
    return pow_hm
dic_idle = loadPow()