In [None]:
import pandas as pd

rules = pd.read_csv("Rules.csv").values.tolist()

print("ID" + '\t' + "DLC"+ '\t' + "Mean")
for i in range(len(rules)):
    print(str(rules[i][0]) + '\t' + str(rules[i][1]) + '\t' + str("{:.6f}".format(rules[i][2])))

In [None]:
block_size = 2000

rules_id = [row[0] for row in rules]
rules_dlc = [row[1] for row in rules]
rules_mean = [row[2] for row in rules]

In [None]:
import joblib

# 加载模型
loaded_model = joblib.load('dt_model.pkl')

In [None]:
def read_dataset(dataset):
    df = pd.read_csv(dataset)

    print(dataset + " ok!")
    return df

df_DoS = read_dataset("new_DoS_dataset.csv")
df_Fuzzy = read_dataset("new_Fuzzy_dataset.csv")
df_Spoofing = read_dataset("Spoofing_dataset.csv")
df_Replaying = read_dataset("Replaying_dataset.csv")
df_Drop = read_dataset("Drop_dataset.csv")
df_Masquerade = read_dataset("Masquerade_dataset.csv")

In [None]:
def detect(block):
    
    for row in block:
        if row[1] not in rules_id:
            return True
        if row[2] != rules_dlc[rules_id.index(row[1])]:
            return True
    
    for i in range(len(rules_id)):
        # 非周期性消息
        if rules_mean[i] == 0:
            continue

        # 时间间隔规则
        timestamp = block[0][0]
        for j in range(len(block)):
            if block[j][1] == rules_id[i]:
                if timestamp == block[0][0]:
                    time_interval = block[j][0] - timestamp
                    if time_interval > rules_mean[i] * 2:
                        return True
                else:
                    time_interval = block[j][0] - timestamp
                    if time_interval < rules_mean[i] * 0.5 or time_interval > rules_mean[i] * 2:
                        return True
                timestamp = block[j][0]
            if j == len(block) - 1:
                time_interval = block[j][0] - timestamp
                if time_interval > rules_mean[i] * 2:
                    return True
    
    inputs = []
    for row in block:
        id = int(row[1], 16)
        data_1 = int(row[4], 16)
        data_7 = int(row[10], 16)
        inputs.append([id, data_1, data_7])
    inputs = pd.DataFrame(inputs, columns=["ID", "DATA[1]", "DATA[7]"])
    outputs = loaded_model.predict(inputs)
    
    if 1 in outputs:
        return True
        
    return False

In [None]:
import time

attack_types = ["DoS Attack", "Fuzzy Attack", "Spoofing Attack", "Replaying Attack", "Drop Attack", "Masquerade Attack"]
df_list = [df_DoS, df_Fuzzy, df_Spoofing, df_Replaying, df_Drop, df_Masquerade]
TP_list = []
FP_list = []
FN_list = []
TN_list = []
pred_time_list = []

for attack_type, df in zip(attack_types, df_list):
    blocks = []
    flags = []
    for i in range(df.shape[0] // block_size):
        block = df.iloc[i * block_size : i * block_size + block_size]
        if 'T' in block["Flag"].values:
            flags.append(True)
        else:
            flags.append(False)
        if attack_type == "Drop Attack":
            block = block.loc[block["Flag"] == 'R']
        blocks.append(block[["Timestamp", "ID", "DLC", "DATA[0]","DATA[1]","DATA[2]","DATA[3]","DATA[4]","DATA[5]","DATA[6]","DATA[7]"]].values.tolist())

    start_time = time.time()
    TP = 0
    FP = 0
    FN = 0
    TN = 0
    for block, flag in zip(blocks, flags):
        pred = detect(block)
        if pred and flag:
            TP += 1
        elif pred and not flag:
            FP += 1
        elif not pred and flag:
            FN += 1
        else:
            TN += 1
    pred_time = time.time() - start_time

    print(attack_type + ": ")

    # 计算准确率
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    
    # 计算真正率（召回率）
    TPR = TP / (TP + FN)
    
    # 计算假正率
    FPR = FP / (FP + TN)

    print("accuracy = " + "{:.4%}".format(accuracy))
    print("TPR = " + "{:.4%}".format(TPR))
    print("FPR = " + "{:.4%}".format(FPR) + '\n')

    TP_list.append(TP)
    FP_list.append(FP)
    FN_list.append(FN)
    TN_list.append(TN)
    pred_time_list.append(pred_time)

In [None]:
# 弱对手
TP = sum(TP_list[0 : 2])
FP = sum(FP_list[0 : 2])
FN = sum(FN_list[0 : 2])
TN = sum(TN_list[0 : 2])
average_pred_time = sum(pred_time_list[0 : 2]) / (TP + TN + FP + FN) * 1000

print("弱对手: ")

# 计算准确率
accuracy = (TP + TN) / (TP + TN + FP + FN)

# 计算真正率（召回率）
TPR = TP / (TP + FN)

# 计算假正率
FPR = FP / (FP + TN)

print("average_pred_time = " + "{:.4f}".format(average_pred_time) + " ms")
print("accuracy = " + "{:.4%}".format(accuracy))
print("TPR = " + "{:.4%}".format(TPR))
print("FPR = " + "{:.4%}".format(FPR) + '\n')

# 中型对手
TP = sum(TP_list[0 : 5])
FP = sum(FP_list[0 : 5])
FN = sum(FN_list[0 : 5])
TN = sum(TN_list[0 : 5])
average_pred_time = sum(pred_time_list[0 : 5]) / (TP + TN + FP + FN) * 1000

print("中型对手: ")

# 计算准确率
accuracy = (TP + TN) / (TP + TN + FP + FN)

# 计算真正率（召回率）
TPR = TP / (TP + FN)

# 计算假正率
FPR = FP / (FP + TN)

print("average_pred_time = " + "{:.4f}".format(average_pred_time) + " ms")
print("accuracy = " + "{:.4%}".format(accuracy))
print("TPR = " + "{:.4%}".format(TPR))
print("FPR = " + "{:.4%}".format(FPR) + '\n')

# 强对手
TP = sum(TP_list[0 : 6])
FP = sum(FP_list[0 : 6])
FN = sum(FN_list[0 : 6])
TN = sum(TN_list[0 : 6])
average_pred_time = sum(pred_time_list[0 : 6]) / (TP + TN + FP + FN) * 1000

print("强对手: ")

# 计算准确率
accuracy = (TP + TN) / (TP + TN + FP + FN)

# 计算真正率（召回率）
TPR = TP / (TP + FN)

# 计算假正率
FPR = FP / (FP + TN)

print("average_pred_time = " + "{:.4f}".format(average_pred_time) + " ms")
print("accuracy = " + "{:.4%}".format(accuracy))
print("TPR = " + "{:.4%}".format(TPR))
print("FPR = " + "{:.4%}".format(FPR) + '\n')