In [1]:
import time
    
import warnings
warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np

from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import precision_score, recall_score, confusion_matrix, classification_report, accuracy_score, f1_score
from sklearn import metrics

import os
import joblib
import matplotlib.pyplot as plt

from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
import itertools



In [2]:
cici_test_dir='/home/irteam/junghye-dcloud-dir/MLAC/230624/CICI'
X_test=pd.read_csv(os.path.join(cici_test_dir,'X_test.csv'))
y_test=pd.read_csv(os.path.join(cici_test_dir,'y_test.csv'))

In [3]:
df=pd.DataFrame(columns=['L1_acc','L1_f1','L1_rc','L1_pc','L2_acc','L2_f1','L2_rc','L2_pc']+\
                 ['c1_acc','c1_f1','c1_rc','c1_pc']+\
                    ['c2_acc','c2_f1','c2_rc','c2_pc','c3_acc','c3_f1','c3_rc','c3_pc','c4_acc','c4_f1','c4_rc','c4_pc']+\
                     ['total_acc','total_f1','total_rc','total_pc'])

outpath='/home/irteam/junghye-dcloud-dir/MLAC/230624/CICI'
cnt=0
model_eval=[]

In [4]:
X_test.head()

Unnamed: 0.1,Unnamed: 0,protocol,flow_duration,tot_fwd_pkts,tot_bwd_pkts,tot_len_fwd_pkts,tot_len_bwd_pkts,fwd_pkt_len_max,fwd_pkt_len_min,fwd_pkt_len_mean,...,active_mean,active_std,active_max,active_min,idle_mean,idle_std,idle_max,idle_min,attack_category,nist_category
0,6170590,0.385385,0.279279,0.0,0.37037,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0
1,5847639,0.385385,0.52855,0.856356,0.871371,0.751251,0.897898,0.694194,0.0,0.524024,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0
2,1157416,0.385385,0.331832,0.482482,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0
3,2038020,0.385385,0.443333,0.738739,0.7002,0.95858,0.779005,0.990885,0.0,0.970421,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0
4,684465,1.0,0.725435,0.311311,0.136136,0.470971,0.600045,0.578579,0.894895,0.720721,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0


In [5]:
L1_ytest=y_test.copy()

L2_ytest=X_test['nist_category'].copy()

L3_ytest=X_test['attack_category'].copy()

c1_Xtest=X_test.query('nist_category==1')
c1_ytest=c1_Xtest['attack_category']

c2_Xtest=X_test.query('nist_category==2')
c2_ytest=c2_Xtest['attack_category']

c3_Xtest=X_test.query('nist_category==3')
c3_ytest=c3_Xtest['attack_category']

c4_Xtest=X_test.query('nist_category==4')
c4_ytest=c4_Xtest['attack_category']

In [6]:
for class_data in [c1_Xtest,c2_Xtest,c3_Xtest,c4_Xtest]:
    class_data.drop(labels=['attack_category','nist_category'],axis=1,inplace=True)

In [7]:
X_test.drop(labels=['Unnamed: 0','nist_category','attack_category'],axis=1,inplace=True)

In [8]:

def test_result(model:str,test,pred) ->list:

    acc=accuracy_score(test,pred)
    f1=f1_score(test,pred,average='weighted')
    recall=recall_score(test,pred,average='weighted')
    precision=precision_score(test,pred,average='weighted')

    print(f'{model} result , acc:{acc}, f1:{f1},recall:{recall},precision:{precision}')
    return([acc,f1,recall,precision])


In [9]:
saved_path='/home/irteam/dcloud-global-dir/MLAC/saved_models/230620/savedmodels'
confusion_path='/home/irteam/junghye-dcloud-dir/MLAC/230624/CICI/confusion'

In [10]:
def plot_confusion_matrix(con_mat,labels,title:str,cmap=plt.cm.get_cmap('Blues'),normalize=False):
    plt.figure(figsize=(20,15))
    plt.imshow(con_mat,interpolation='nearest',cmap=cmap)
    plt.title(title)
    plt.colorbar()
    marks=np.arange(len(labels))
    nlabels=[]
    for k in range(len(con_mat)):
        n=sum(con_mat[k])
        nlabel='{0}(n={1})'.format(labels[k],n)
        nlabels.append(nlabel)

    plt.xticks(marks,labels,rotation=45)
    plt.yticks(marks,nlabels)

    thresh=con_mat.max()/2.
    if normalize:
        for i, j in itertools.product(range(con_mat.shape[0]), range(con_mat.shape[1])):
            plt.text(j, i, '{0}%'.format(con_mat[i, j] * 100 / n), horizontalalignment="center", color="white" if con_mat[i, j] > thresh else "black")
    else:
        for i, j in itertools.product(range(con_mat.shape[0]), range(con_mat.shape[1])):
            plt.text(j, i, con_mat[i, j], horizontalalignment="center", color="white" if con_mat[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    #plt.show()
    #이미지 저장
    plt.savefig(confusion_path+'/'+title+'.png',facecolor='#eeeeee')
    plt.clf()

In [11]:
L1_model=joblib.load(os.path.join(saved_path,'CICI_L1mod_rc.pkl'))



In [None]:
L1_ypred=L1_model.predict(X_test)

eval_result=test_result(L1_model,L1_ytest,L1_ypred)

model_eval.extend(eval_result)

#confusion
confusion=metrics.confusion_matrix(L1_ytest,L1_ypred)
plot_confusion_matrix(confusion,labels=[0,1],title='Layer1')

In [None]:
malicious_indices=np.where(L1_ypred==1)[0]


if malicious_indices.any():
    L2_model=joblib.load(os.path.join(saved_path,'CICI_nist.pkl'))
    L2_Xtest=X_test.iloc[malicious_indices]
    L2_ypred=L2_model.predict(L2_Xtest)
    L2_ytest_selected=L2_ytest.iloc[malicious_indices]
    L2_result=test_result(L2_model,L2_ytest_selected,L2_ypred)
    
    model_eval.extend(L2_result)
else:
    print('no malicious predicted')
    import sys
    sys.exit()
    

In [None]:
L2_encoded=[]
L2_encoded.extend(L2_ypred)
L2_encoded.extend(L2_ytest_selected)
L2_encoded=list(set(L2_encoded))

confusion=metrics.confusion_matrix(L2_ytest_selected,L2_ypred)
plot_confusion_matrix(confusion,labels=L2_encoded,title='Layer2')




In [None]:
from collections import defaultdict

class_models = [
    joblib.load(os.path.join(saved_path, 'class_1_CICI.pkl')),
    joblib.load(os.path.join(saved_path, 'class_2_CICI.pkl')),
    joblib.load(os.path.join(saved_path, 'class_3_CICI.pkl')),
    joblib.load(os.path.join(saved_path, 'class_4_CICI.pkl'))
]

class_names = ['Reconnaissance', 'Access', 'Dos', 'Malware']

class_encodings=defaultdict(list)
final_y_pred=[]
final_y_test=[]

L3_ytest_selected=L3_ytest.iloc[malicious_indices]

for class_index,class_model in enumerate(class_models):
    indices=np.where(L2_ypred==class_index+1)[0]
    print(class_names[class_index]+'train & test')

    if indices.any():
        X_test_selected=L2_Xtest.iloc[indices]
        y_pred=class_model.predict(X_test_selected)
        y_test_selected=L3_ytest_selected.iloc[indices]
        result=test_result(class_model,y_test_selected,y_pred)
        model_eval.extend(result)

        class_encodings[class_names[class_index]].extend(y_pred)
        class_encodings[class_names[class_index]].extend(y_test_selected)

        final_y_pred.extend(y_pred)
        final_y_test.extend(y_test_selected)
        

    else:
        model_eval.extend([0,0,0,0])





final_result = test_result('Layer3', final_y_test, final_y_pred)
model_eval.extend(final_result)


for class_name in class_names:
    confusion = metrics.confusion_matrix(class_encodings[class_name][len(class_encodings[class_name]) // 2:], 
                                         class_encodings[class_name][:len(class_encodings[class_name]) // 2])
    plot_confusion_matrix(confusion,labels=list(set(class_encodings[class_name])),title=class_name)

confusion = metrics.confusion_matrix(final_y_test, final_y_pred)
plot_confusion_matrix(confusion, labels=list(set(final_y_pred + final_y_test)), title='Layer3')


In [None]:
df.loc[cnt]=model_eval
df



In [None]:
df.to_csv(os.path.join(outpath,'CICI_result.csv'))
