## Libraries

In [2]:
import pandas as pd
import train
from decimal import Decimal
from joblib import dump,load
import joblib
import numpy as np
from pysat.card import CardEnc
from mxreason import MXReasoner
from encoder import *
from explainer import *
from class_explanation import *
from adversarial import *

In [1]:
import warnings
warnings.filterwarnings('ignore')

## Dataset path

In [3]:
train_path='CICIDS_dataset/train_df.csv'
test_path='CICIDS_dataset/test_df.csv'

## Instance LightGBM formal explanations

In [3]:
df = pd.read_csv(train_path)
X_train=df.iloc[:,:-1]
y_train=df.iloc[:,-1]
df = pd.read_csv(test_path)
X_test=df.iloc[:,:-1]
y_test=df.iloc[:,-1]

In [4]:
trees,nofcl=train.train_model( X_train, X_test, y_train, y_test)

[LightGBM] [Info] Number of positive: 8514, number of negative: 12141
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000474 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 3960
[LightGBM] [Info] Number of data points in the train set: 20655, number of used features: 19
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.412200 -> initscore=-0.354876
[LightGBM] [Info] Start training from score -0.354876
              precision    recall  f1-score   support

           0       0.89      0.99      0.94      5219
           1       0.98      0.83      0.90      3634

    accuracy                           0.92      8853
   macro avg       0.94      0.91      0.92      8853
weighted avg       0.93      0.92      0.92      8853



In [5]:
thresholds = get_thresholds(trees)
idmgr=IDPool()
enc,imaps,lvars,ivars,intvs,vid2fid=encode(nofcl,thresholds,idmgr)
for tree_index in range(len(trees)):
        result,leaves_weights = traverse_tree(idmgr,trees[tree_index]['tree_structure'], [], {},thresholds,lvars)
        transform(result)
        for key in result:
                enc[0].formula.extend(result[key])
                enc[0].leaves.append((key, Decimal(str(-leaves_weights[key]))))
        leaves = [key for key in result]
        cnf =CardEnc.equals(leaves,vpool=idmgr, encoding=1)
        enc[0].formula.extend(cnf)
        if enc[0].trees:
            last_tree = enc[0].trees[-1]
            enc[0].trees.append((last_tree[1], last_tree[1]+len(leaves)))
        else:
            enc[0].trees.append((0, len(leaves)))  
enc[0].formula.extend(enc['common'])
enc[0].formula.nv=idmgr.top
del enc['common']
del enc['paths']
enc[1]=swap_literals(enc[0])

In [6]:
features_list=list(df.columns[:-1])
feature_names_as_array_strings=[str(i) for  i in range(len(features_list))] # to change iris.feature_names
ivars={str(k):v for k,v in ivars.items()}
all_explanations=[]
oracles={}
for clid in range(nofcl):
           oracles[clid] = MXReasoner(enc, clid)

In [None]:
all_explanations=[]
res=[]
with open('explanations.txt', 'a') as f:     
    for i in range(len(X_test)):
        sample_internal=X_test.iloc[i,:]
        self_hypos=get_literals(sample_internal,thresholds,ivars,feature_names_as_array_strings)
        fcats=get_feature_categories(ivars,feature_names_as_array_strings)
        scats= list(range(len(fcats)))
        hypos=cats2hypo(scats,self_hypos,fcats)
        v2cat,out_id=predict(sample_internal,hypos,fcats,nofcl,enc)
        res.append(out_id)
        allcats=[i for i in range (nofcl)]
        expls=mhs_mus_enumeration(oracle=oracles[out_id], allcats=allcats,  v2cat=v2cat,hypos=hypos,fcats= fcats, verbose=0, xnum=1, smallest=True)
        v=vid2fid
        v2feat =  {key:int(value[0]) for key, value in v.items()}
        labels = features_list
        preamble = [f"{label} == {value}" for label, value in zip(labels, sample_internal)]
        target_name=[i for i in range (nofcl)]
        result=GenerateExplanation(out_id,sample_internal,expls,hypos,fcats,v2feat,preamble,target_name) 
        all_explanations.append(result)
        print("sample done", i)
        f.write(f"{str(result[0][1])} \n")

## Class-level LightGBM formal explanations

In [8]:
#all formal instance explanations of train data
explanations_path="CICIDS_results/explanations_cic_train_explanation_only.txt"
results_generate_train,results_recover_train=generate_class_exp(explanations_path)

['explanation: "IF Bwd Packet Length Max == 0.0523006623488017 AND Fwd Packet Length Max == 0.61847004563708 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == -0.4448443404381836 AND Fwd Header Length == -0.0834127317497789 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == 0.4771802575141738 AND Fwd Packet Length Max == 0.3116467193437048 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == -0.0604538743657722 AND Fwd Packet Length Max == 0.1170411836854066 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == -0.430493763038147 AND Init_Win_bytes_backward == -0.2076506787893725 AND Fwd Packet Length Mean == -0.2487437645219864 AND Fwd Packet Length Max == -0.2579651040064964 AND Fwd Header Length == 0.0976329894459864 THEN 1" ', '  explanation: "IF Bwd Packet Length Max == 2.520599975155113 AND Bwd Header Length == 0.0364915412381135 THEN 1" ', '  explanation: "IF Bwd Packet Length Max == -0.447919464166763 AND Total Length of Fwd Packets == -0.0987196789903636 AND Fw

In [None]:
for class_id, features in results_generate_train.items():
    print(f"Intervals of important features of class {class_id}:")
    for feature_name, intervals in features.items():
        print(f"  Feature: {feature_name}")
        for interval in intervals:
            print(f"    Interval: {interval}")

## Generate Adversarial samples

In [5]:
#all formal instance explanations of train data and test data
explanations_path_test="CICIDS_results/explanations_cic_test_explanation_only.txt"
explanations_path_train="CICIDS_results/explanations_cic_train_explanation_only.txt"

In [6]:
model_path = 'CICIDS_results/trained_model_lgbm.joblib'
test_df=pd.read_csv(test_path)
train_df=pd.read_csv(train_path)

In [7]:
results_generate_train,results_recover_train=generate_class_exp(explanations_path_train)
results_generate_test,results_recover_test=generate_class_exp(explanations_path_test)

['explanation: "IF Bwd Packet Length Max == 0.0523006623488017 AND Fwd Packet Length Max == 0.61847004563708 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == -0.4448443404381836 AND Fwd Header Length == -0.0834127317497789 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == 0.4771802575141738 AND Fwd Packet Length Max == 0.3116467193437048 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == -0.0604538743657722 AND Fwd Packet Length Max == 0.1170411836854066 THEN 0" ', '  explanation: "IF Bwd Packet Length Max == -0.430493763038147 AND Init_Win_bytes_backward == -0.2076506787893725 AND Fwd Packet Length Mean == -0.2487437645219864 AND Fwd Packet Length Max == -0.2579651040064964 AND Fwd Header Length == 0.0976329894459864 THEN 1" ', '  explanation: "IF Bwd Packet Length Max == 2.520599975155113 AND Bwd Header Length == 0.0364915412381135 THEN 1" ', '  explanation: "IF Bwd Packet Length Max == -0.447919464166763 AND Total Length of Fwd Packets == -0.0987196789903636 AND Fw

In [7]:
all_features_lgbm,targets_lgbm=Determine_nb_features("explanations_cic_test_explanation_only.txt")
all_features_lgbm_0={item[0]: item[1] for item in all_features_lgbm[0]}
all_features_lgbm_1={item[0]: item[1] for item in all_features_lgbm[1]}

In [None]:
features_with_indexes = {}
for i, column in enumerate(test_df.columns[:-1]):
    features_with_indexes[column] = i
adversarial_samples=[]
for sample_index in range(len(test_df)):
    sample=test_df.iloc[sample_index,:-1].values.reshape((1, -1) )
    target=prediction_adv(model_path,sample)
    if target==0:
      all_features_lgbm_target=all_features_lgbm_0
    elif (target==1):
      all_features_lgbm_target=all_features_lgbm_1
    features_dict = results_generate_train[target]
    features_to_extract = all_features_lgbm_target[sample_index].keys()
    important_intervals = {key: features_dict[key] for key in features_to_extract}
    print("important",important_intervals)
    perturbed_sample = perturb_sample(sample, important_intervals,train_df,features_with_indexes)
    print("Original sample:", sample)
    print("Predicted class for original sample",prediction_adv(model_path,sample))
    print("Perturbed sample:", perturbed_sample)
    print("Distance between the two samples",euclidean_distance(sample,perturbed_sample))
    perturbed_sample=perturbed_sample.reshape(1,-1)
    print("Predicted class for perturbed sample",prediction_adv(model_path,perturbed_sample))
    perturbed_sample=np.append(perturbed_sample, prediction_adv(model_path,perturbed_sample))
    adversarial_samples.append(perturbed_sample)

# Convert the list of adversarial samples to a DataFrame
adversarial_df = pd.DataFrame(adversarial_samples, columns=train_df.columns[:])

# Save the adversarial samples to a CSV file
adversarial_df.to_csv('adversarial_samples.csv', index=False)

In [4]:
#generated adversarial samples from test data
adversarial_df=pd.read_csv("CICIDS_results/adversarial_samples.csv")

In [8]:
y_adversarial=adversarial_df.iloc[:,-1].values
y_data=targets_lgbm[:]

# Convert y_data to numpy array if it's a list
y_data = np.array(y_data)
y_adversarial = np.array(y_adversarial)

# Ensure both arrays have the same shape
assert y_data.shape == y_adversarial.shape, "Arrays have different shapes"

# Count samples that changed from 0 to 1
zero_to_one = np.sum((y_data == 0) & (y_adversarial == 1))
total_zeros = np.sum(y_data == 0)

# Count samples that changed from 1 to 0
one_to_zero = np.sum((y_data == 1) & (y_adversarial == 0))
total_ones = np.sum(y_data == 1)

# Total number of samples that fooled the model
total_fooled = zero_to_one + one_to_zero

print(f"Number of samples that fooled the model: {total_fooled} out of {len(y_data)}")
print(f"Percentage of samples that fooled the model: {total_fooled/len(y_data)*100:.2f}%")
print(f"Samples that changed from 0 to 1: {zero_to_one} out of {total_zeros} original 0s ({zero_to_one/total_zeros*100:.2f}%)")
print(f"Samples that changed from 1 to 0: {one_to_zero} out of {total_ones} original 1s ({one_to_zero/total_ones*100:.2f}%)")

Number of samples that fooled the model: 2821 out of 8853
Percentage of samples that fooled the model: 31.86%
Samples that changed from 0 to 1: 777 out of 5795 original 0s (13.41%)
Samples that changed from 1 to 0: 2044 out of 3058 original 1s (66.84%)


In [9]:
original_data = test_df.iloc[:, :-1].values
adversarial_data = adversarial_df.iloc[:, :-1].values

distances = np.linalg.norm(original_data[y_data != y_adversarial] - adversarial_data[y_data != y_adversarial], axis=1)
# Calculate the average distance
avg_distance = np.mean(distances)

print("Average Euclidean distance:", avg_distance)

Average Euclidean distance: 1.6708504352287232


## Detect Adversarial Samples

In [23]:
# consider only the samples that their predictions were changed
changed_adversarial_df = pd.read_csv("CICIDS_results/changed_adversarial_samples.csv")
features_names=list(train_df.columns[:-1])
features_with_indexes = {}
for i, column in enumerate(test_df.columns[:-1]):
    features_with_indexes[column] = i

In [27]:
#all formal instance explanations of adversarial data
explanations_path = "CICIDS_results/explanations_adv.txt"
with open(explanations_path, 'r') as file:
        file_content = file.read()
explanations = parse_data(file_content)
expls = extract_feature_indices(explanations, features_with_indexes)

In [29]:
nb_successfully_fooled=0
for index in range (len(changed_adversarial_df)):
    print(index)
    perturbed_sample=changed_adversarial_df.iloc[index,:-3].values.reshape(1, -1)
    print("Perturbed sample:", perturbed_sample)
    prediction_perturbed_sample=prediction_adv(model_path,perturbed_sample)
    expls_perturbed_sample=expls[index]
    out_of_intervals=0
    for feature in expls_perturbed_sample:
      if features_names[feature] not in (results_recover_train[prediction_perturbed_sample].keys()):
        out_of_intervals+=1
      else:
        for interval in results_recover_train[prediction_perturbed_sample][features_names[feature]]:
          if is_within_interval (perturbed_sample[:,feature][0],interval):
            break
          else:
            continue
        if interval==results_recover_train[prediction_perturbed_sample][features_names[feature]][-1]:
              if not is_within_interval (perturbed_sample[:,feature][0],interval):
                out_of_intervals+=1
        prob_adv = out_of_intervals/len(expls_perturbed_sample) * 100
    if prob_adv>=50:
          nb_successfully_fooled+=1

0
Perturbed sample: [[ 2.02742957 -0.71498734 -0.59981065  2.50017473  1.61779193 -0.32174034
  -0.91930837 -0.20671847  0.30590858  0.17938489  0.45396074  0.02469144
  -0.25816742 -0.17965766  1.78979922 -0.1180764   0.02757199 -0.49159966
  -0.05287281]]
1
Perturbed sample: [[ 5.83782716 -0.71498734 -0.59981065  2.46701695  1.61783854 -0.32174034
   0.903657   -0.20671847 -0.12656255  0.1497441  -1.01108497 -0.04335908
  -0.25816729 -0.17965841  1.72261495 -0.11820511  0.09763299 -0.47254041
  -0.05288596]]
2
Perturbed sample: [[ 0.71868594 -0.71498734 -0.59981065  2.05696627  1.4971513  -0.32174034
   0.903657   -0.20671847 -0.0168989   0.17386253  0.45396074 -0.04032713
  -0.25816738 -0.17965766 -0.80507218 -0.1182051   0.02974084 -0.47245367
  -0.05288592]]
3
Perturbed sample: [[ 2.05267340e+00 -7.14987336e-01 -5.99810646e-01  2.50390988e+00
   2.03018935e+00 -3.21740335e-01  9.03657005e-01 -2.06718471e-01
  -9.37693796e-04  1.98738225e-01 -1.01108497e+00 -3.71999824e-02
  -2.581

In [34]:
print("Number of detected adversarial samples:", nb_successfully_fooled, "over", len(changed_adversarial_df), "(", nb_successfully_fooled / len(changed_adversarial_df) * 100, "%)")


Number of detected adversarial samples: 1731 over 2821 ( 61.361219425735555 %)
