In [None]:
import pandas as pd
from sklearn import preprocessing
from sklearn.decomposition import PCA
import numpy as np
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# 加载数据
file_path_train = r"D:\Download\zyFile\Cyberthreat_Cognitive_System\CTCS_Code\attack_datasets\NSL-KDD\KDDTrain+.txt"
file_path_test = r"D:\Download\zyFile\Cyberthreat_Cognitive_System\CTCS_Code\attack_datasets\NSL-KDD\KDDTest+.txt"
# 定义列名
data_columns = ["duration", "protocol_type", "service", "flag", "src_bytes",
                "dst_bytes", "land", "wrong_fragment", "urgent", "hot", "num_failed_logins",
                "logged_in", "num_compromised", "root_shell", "su_attempted", "num_root",
                "num_file_creations", "num_shells", "num_access_files", "num_outbound_cmds",
                "is_host_login", "is_guest_login", "count", "srv_count", "serror_rate",
                "srv_serror_rate", "rerror_rate", "srv_rerror_rate", "same_srv_rate",
                "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count",
                "dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate",
                "dst_host_srv_diff_host_rate", "dst_host_serror_rate", "dst_host_srv_serror_rate",
                "dst_host_rerror_rate", "dst_host_srv_rerror_rate", "label", "difficulty"]
# 加载数据
train_data = pd.read_csv(file_path_train, header=None, names=data_columns)
test_data = pd.read_csv(file_path_test, header=None, names=data_columns)

In [None]:
attack_mapping = {}
with open(r'D:\Download\zyFile\Cyberthreat_Cognitive_System\CTCS_Code\attack_datasets\NSL-KDD\attack_name',
          'r') as file:
    for line in file:
        parts = line.strip().split(' ')
        if len(parts) == 2:
            attack, category = parts
            attack_mapping[attack] = category
# 然后像之前那样使用这个映射字典
train_data['label'] = train_data['label'].map(attack_mapping)
test_data['label'] = test_data['label'].map(attack_mapping)

In [None]:
from collections import Counter
print(f'train_label is {Counter(train_data["label"])}')
print(f'test_label is {Counter(test_data["label"])}')

In [None]:
# 获取训练集和测试集中的 'service' 列
service_train = train_data['service']
service_test = test_data['service']

protocol_type_train = train_data['protocol_type']
protocol_type_test = test_data['protocol_type']

flag_train = train_data['flag']
flag_test = test_data['flag']

label_train = train_data['label']
label_test = test_data['label']

# 找出只在训练集中出现的 service 类型
unique_service = set(service_train) - set(service_test)
unique_protocol_type = set(protocol_type_train) - set(protocol_type_test)
unique_flag = set(flag_train) - set(flag_test)
unique_label = set(label_train) - set(label_test)

test_unique_service = set(service_test) - set(service_train)
test_unique_protocol_type = set(protocol_type_test) - set(protocol_type_train)
test_unique_flag = set(flag_test) - set(flag_train)
test_unique_label = set(label_test) - set(label_train)

# 输出结果
print("service type only exist in train_dataset:", unique_service)
print("protocol type only exist in train_dataset:", unique_protocol_type)
print("flag type only exist in train_dataset:", unique_flag)
print("label type only exist in train_dataset:", unique_label)
print("-------------------------------------------------------")
print("service type only exist in test_dataset:", test_unique_service)
print("protocol type only exist in test_dataset:", test_unique_protocol_type)
print("flag type only exist in test_dataset:", test_unique_flag)
print("label type only exist in test_dataset:", test_unique_label)

In [None]:
# 定义要检查的服务类型列表
services_to_check = ['urh_i', 'http_2784', 'aol', 'http_8001', 'harvest', 'red_i']

# 对每种服务类型进行计数
for service in services_to_check:
    count = (train_data['service'] == service).sum()
    print(f"Number of occurrences for service '{service}': {count}")

In [None]:
# 找出只在训练集中出现的 service 类型
unique_service_train = set(train_data['service']) - set(test_data['service'])
# 删除训练集中存在但测试集中不存在的 service 类型的行
train_data = train_data[~train_data['service'].isin(unique_service_train)]

In [None]:
print(f'train_label is {Counter(train_data["label"])}')
print(f'test_label is {Counter(test_data["label"])}')

In [None]:
# 找出只在训练集中出现的 service 类型
unique_service = set(train_data) - set(test_data)
test_unique_service = set(test_data) - set(train_data)
# 输出结果
print("service type only exist in train_dataset:", unique_service)
print("-------------------------------------------------------")
print("service type only exist in test_dataset:", test_unique_service)

In [None]:
# 删除train_data中的difficulty, label列
X_train = train_data.drop('label', axis=1)
X_train = X_train.drop('difficulty', axis=1)
# 提取出训练集中的label标签
labels_train = train_data['label']
# 删掉测试集的标签项
X_test = test_data.drop('label', axis=1)
X_test = X_test.drop('difficulty', axis=1)
labels_test = test_data['label']

In [None]:
print(X_train['protocol_type'].nunique())
print(X_train['service'].nunique())
print(X_train['flag'].nunique())

In [None]:
from sklearn import preprocessing

# 初始化LabelEncoder
le_protocol_type = preprocessing.LabelEncoder()
le_service = preprocessing.LabelEncoder()
le_flag = preprocessing.LabelEncoder()
le_labels = preprocessing.LabelEncoder()

# 对训练集进行标签编码
X_train['protocol_type'] = le_protocol_type.fit_transform(X_train['protocol_type'])
X_train['service'] = le_service.fit_transform(X_train['service'])
X_train['flag'] = le_flag.fit_transform(X_train['flag'])
labels_train = le_labels.fit_transform(labels_train) + 1

# 使用相同的编码器对测试集进行标签编码
X_test['protocol_type'] = le_protocol_type.transform(X_test['protocol_type'])
X_test['service'] = le_service.transform(X_test['service'])
X_test['flag'] = le_flag.transform(X_test['flag'])
labels_test = le_labels.transform(labels_test) + 1

protocol_type_mapping = le_protocol_type.classes_
service_mapping = le_service.classes_
flag_mapping = le_flag.classes_
label_mapping = le_labels.classes_

In [None]:
# from imblearn.over_sampling import SMOTE
# 
# # 初始化 SMOTE 对象
# smote = SMOTE()
# 
# # 对训练集进行重采样
# train_data, label_train = smote.fit_resample(train_data, label_train)
# 
# # 打印重采样后的数据规模
# print(f'重采样后的数据规模')
# print(f'X_train_resampled shape: {train_data.shape}')
# print(f'labels_train_resampled shape: {label_train.shape}')

In [None]:
from imblearn.over_sampling import ADASYN

# 初始化 ADASYN 对象
adasyn = ADASYN()

# 对训练集进行重采样
X_train, labels_train = adasyn.fit_resample(X_train, labels_train)

# 打印重采样后的数据规模
print(f'重采样后的数据规模')
print(f'X_train_resampled shape: {X_train.shape}')
print(f'labels_train_resampled shape: {labels_train.shape}')
# 注意：X_test 和 labels_test 不需要重采样

In [None]:
print(f'test_label is {Counter(labels_train)}')

In [None]:
# 特征标准化
scaler = preprocessing.StandardScaler()
standard_train_X = scaler.fit_transform(X_train)
standard_test_X = scaler.transform(X_test)

In [None]:
# from sklearn.feature_selection import VarianceThreshold
# selector = VarianceThreshold(threshold=1)  # 设置方差的阈值
# X_train_selected = selector.fit_transform(standard_train_X)
# X_test_selected = selector.transform(standard_test_X)

In [None]:
# from sklearn.feature_selection import SelectKBest
# from scipy.stats import pearsonr
# import numpy as np
# 
# # 定义计算皮尔森相关系数的函数
# def pearsonr_correlation(X, y):
#     # 计算每个特征与目标变量的相关性
#     correlations = np.array([pearsonr(x, y)[0] for x in X.T])
#     return correlations
# 
# # 创建 SelectKBest 实例，选择 k 个最相关的特征
# k = 10  # 您可以根据需要设置 k 的值
# selector = SelectKBest(score_func=lambda X, y: pearsonr_correlation(X, y), k=k)
# 
# # 对训练数据进行拟合和转换
# X_train_selected = selector.fit_transform(standard_train_X, labels_train)
# 
# # 对测试数据进行转换
# X_test_selected = selector.transform(standard_test_X)

In [None]:
# from sklearn.feature_selection import SelectFromModel
# from sklearn.ensemble import GradientBoostingClassifier
# 
# # 训练 GradientBoostingClassifier 模型
# gbc = GradientBoostingClassifier()
# gbc.fit(standard_train_X, labels_train)
# 
# # 创建 SelectFromModel 实例
# selector = SelectFromModel(gbc, prefit=True)
# 
# # 对训练和测试数据应用特征选择
# X_train_selected = selector.transform(standard_train_X)
# X_test_selected = selector.transform(standard_test_X)


In [None]:
print(f'The dimension after Z-score StandardScaler: {standard_train_X.shape}')
print(f'The dimension after Z-score StandardScaler: {standard_test_X.shape}')

In [None]:
# # 打印经过标准化的训练数据的前几行
# print(standard_train_X[:1:])
# # 计算并打印训练数据的均值和标准差
# print("Train Data Mean:", np.mean(standard_train_X, axis=0))
# print("Train Data Standard Deviation:", np.std(standard_train_X, axis=0))

In [None]:
# # 打印经过标准化的测试数据的前几行
# print(standard_test_X[:1, :])
# # 计算并打印测试数据的均值和标准差
# print("Test Data Mean:", np.mean(standard_test_X, axis=0))
# print("Test Data Standard Deviation:", np.std(standard_test_X, axis=0))

In [None]:
# plt.figure(figsize=(12, 6))
# sns.boxplot(data=X_train_selected)
# plt.title('Boxplot of Standardized Training Data')
# plt.ylim([-20, 20])
# plt.xlabel('Feature Index')
# plt.ylabel('Value')
# plt.show()

In [None]:
# plt.figure(figsize=(12, 6))
# sns.boxplot(data=X_test_selected)
# plt.title('Boxplot of Standardized Testing Data')
# plt.ylim([-20, 20])
# plt.xlabel('Feature Index')
# plt.ylabel('Value')
# plt.show()

In [None]:
# # 进行 PCA 降维
# from sklearn.decomposition import PCA
# import matplotlib.pyplot as plt
# 
# # 初始化 PCA，不设置 n_components
# pca = PCA()
# 
# # 对数据进行拟合
# pca.fit(X_train_selected)
# 
# # 现在可以访问 explained_variance_ratio_
# explained_variance = np.cumsum(pca.explained_variance_ratio_)
# 
# # 绘制 Scree Plot
# plt.figure(figsize=(10, 6))
# plt.plot(explained_variance, marker='o')
# plt.xlabel('Number of Components')
# plt.ylabel('Cumulative Explained Variance')
# plt.title('Scree Plot')
# plt.grid(True)
# plt.show()
# # 计算累积方差
# cumulative_variance = np.cumsum(pca.explained_variance_ratio_)
# 
# # 找到解释了至少 95% 总方差的主成分数量
# n_components = np.where(cumulative_variance > 0.95)[0][0] + 1
# 
# # 使用找到的 n_components 重新初始化 PCA
# pca = PCA(n_components=n_components)
# new_X = pca.fit_transform(X_train_selected)
# new_test_X = pca.transform(X_test_selected)
# print(n_components)

In [None]:
FINAL_TRAIN = standard_train_X
FINAL_TEST = standard_test_X

In [None]:
from sklearn.svm import SVC
import time

# 初始化SVC模型
svc = SVC(kernel='rbf', class_weight='balanced', C=0.5)

# 训练模型
start = time.time()
# clf = svc.fit(standard_train_X, labels_train)  # 使用全部数据进行训练
clf = svc.fit(FINAL_TRAIN, labels_train)  # 使用全部数据进行训练
print('训练用时：{0}'.format(time.time() - start))

# 保存模型（如果需要）
# joblib.dump(clf, './model/IDS_model_full_data.m')
# print('Model saved')


In [None]:
MODEL = clf

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
# 使用测试集进行预测
y_pred = clf.predict(FINAL_TEST)

accuracy = np.mean(y_pred == label_test)
print(f'accuracy is {accuracy}')

In [None]:
# 生成混淆矩阵
conf_matrix = confusion_matrix(label_test, y_pred)
# 获取类别名称（假设 label_mapping 是之前保存的映射）
class_names = label_mapping
# 可视化混淆矩阵
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

In [None]:
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
from itertools import cycle
from sklearn.preprocessing import label_binarize

# 获取测试集上的决策函数得分
y_score = clf.decision_function(FINAL_TEST)

# 为每个类别进行one-hot编码
y_test_binarized = label_binarize(labels_test, classes=np.unique(labels_train))

# 计算ROC曲线和ROC面积
n_classes = y_test_binarized.shape[1]
fpr = dict()
tpr = dict()
roc_auc = dict()

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test_binarized[:, i], y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# 绘制所有类别的ROC曲线
plt.figure(figsize=(8, 8))
colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'deeppink', 'navy'])  # 五种颜色

for i, color in zip(range(n_classes), colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label='ROC curve of class {0} (area = {1:0.2f})'.format(class_names[i], roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multi-class ROC Curve')
plt.legend(loc="lower right")
plt.show()


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import numpy as np

# 计算精度、召回率和F1分数
recall = recall_score(labels_test, y_pred, average='weighted')
precision = precision_score(labels_test, y_pred, average='weighted')
f1 = f1_score(labels_test, y_pred, average='weighted')

print(f"Recall: {recall}")
print(f"Precision: {precision}")
print(f"F1 Score: {f1}")

# 定义MCC分数计算函数
def mcc_score(y_true, y_pred, class_label):
    cm = confusion_matrix(y_true, y_pred)
    tp = cm[class_label, class_label]
    tn = np.sum(cm) - np.sum(cm[class_label, :]) - np.sum(cm[:, class_label]) + tp
    fp = np.sum(cm[:, class_label]) - tp
    fn = np.sum(cm[class_label, :]) - tp
    numerator = (tp * tn) - (fp * fn)
    denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
    return numerator / denominator if denominator != 0 else 0

# 计算每个类别的MCC并求平均
n_classes = len(np.unique(labels_test))  # 确定类别数
mcc_scores = [mcc_score(labels_test, y_pred, i) for i in range(n_classes)]
average_mcc = np.mean(mcc_scores)

print(f"Average MCC: {average_mcc}")

In [None]:
from sklearn.metrics import precision_recall_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
import numpy as np


# 将真实标签转换为 one-hot 编码
y_true_bin = label_binarize(labels_test, classes=np.unique(labels_train))

# 确定类别数和类别名称
n_classes = y_true_bin.shape[1]
class_names = label_mapping  # 确保类别名称正确

# 为每个类别绘制PR曲线
for i in range(n_classes):
    precision, recall, _ = precision_recall_curve(y_true_bin[:, i], y_score[:, i])
    auc_score = auc(recall, precision)

    plt.figure()
    plt.plot(recall, precision, marker='.')
    plt.title('Precision-Recall Curve for Class {} (AUC = {:.4f})'.format(class_names[i], auc_score))
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.show()