In [115]:
import pandas as pd
from scipy.spatial.distance import cdist
import numpy as np

unknown_type = "W"
path = f"../results/test/train{unknown_type}/"


train_self = pd.read_csv(f'../dataset/evaluate/train_self.csv')
test_self = pd.read_csv(f'../dataset/evaluate/test_self.csv')
test_nonself = pd.read_csv(f'../dataset/evaluate/test_nonself.csv')
known = pd.read_csv(f"{path}detectors_0.csv")
# train_set_unknown = pd.read_csv(f'../dataset/check/train/train{unknown_type}.csv')
test_set_unknown = pd.read_csv(f'../dataset/check/test/test{unknown_type}.csv')
self_radius = 0.14

with open(f"coverage_results_epoch.txt", "w") as f:
    f.write(f"Base:\n")
    f.write(f"Unknown type: {unknown_type}\n")
    f.write(f"Number of self samples in train_set: {len(train_self)}\n")
    f.write(f"Number of known samples: {len(known)}\n")
    # f.write(f"Number of unknown samples in train_set: {len(train_set_unknown)}\n")
    
    
    f.write(f"Evaluate:\n")
    f.write(f"Number of self samples in test_set: {len(test_self)}\n")
    f.write(f"Number of non-self samples in test_set: {len(test_nonself)}\n")
    f.write(f"Number of unknown samples in test_set: {len(test_set_unknown)}\n\n")


In [116]:

# 读取数据
def get_data(data_df):
    data = data_df.map(str).apply(lambda row: f"[{' '.join(row)}]", axis=1).tolist()
    return data

# 添加检测半径
def add_detection_radius(detectors_df):
    detector_coords = detectors_df.values
    self_coords = train_self.values
    distances = cdist(detector_coords, self_coords)
    detectors_df['radius'] = distances.min(axis=1) - self_radius
    detectors_df['radius'] = detectors_df['radius'].clip(lower=0)  # 确保半径非负
    return detectors_df


# 评估非自体覆盖率
def evaluate_nonselfcoverage(detectors_df, nonself_df):
    detector_coords = detectors_df.values[:,:-1]
    nonself_coords = nonself_df.values
    distances = cdist(nonself_coords, detector_coords)
    radii = detectors_df['radius'].values.reshape(1, -1)
    covered = (distances <= radii).any(axis=1)
    covered_count = covered.sum()
    
    # nonself_df['covered'] = covered
    # return nonself_df
    return covered_count
    
# 评估非自体覆盖情况
def evaluate_nonself_coverage(detectors_df):
    detector_coords = detectors_df.iloc[:, :-1].values
    nonself_coords = test_nonself.values
    distances = cdist(nonself_coords, detector_coords)
    radii = detectors_df['radius'].values.reshape(1, -1)
    covered = (distances <= radii).any(axis=1)
    return covered  # 返回每个非自体样本是否被覆盖的布尔数组

# 评估自体覆盖情况
def evaluate_self_coverage(detectors_df):
    detector_coords = detectors_df.iloc[:, :-1].values
    self_coords = test_self.values
    distances = cdist(self_coords, detector_coords)
    radii = detectors_df['radius'].values.reshape(1, -1)
    covered = (distances <= radii).any(axis=1)
    return covered  # 返回每个自体样本是否被覆盖的布尔数组

# 计算指标
def calculate_metrics(self_covered, nonself_covered, total_self, total_nonself):

    TP = nonself_covered.sum()  # 非自体 被正确检测为 异常
    FP = self_covered.sum()  # 自体 被错误检测为 异常
    FN = total_nonself - TP  # 非自体 被错误检测为 正常 （非自体 没有 被正确检测为 异常 ）
    TN = total_self - FP  # 自体 被正确检测为 正常（自体 没有被 错误检测为 异常）

    # 计算指标
    accuracy = (TP + TN) / (total_self + total_nonself)
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    fpr = FP / (FP + TN) if (FP + TN) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    confusion_matrix = np.array([[TN, FP], [FN, TP]])
    
    return {
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1 Score": f1_score,
        "False Positive Rate (FPR)": fpr,
        "Confusion Matrix": confusion_matrix
    }


# 性能评估
def evaluate_detector_performance(epoch, detectors_df):
    detectors_df_copy = detectors_df.copy()
    detectors_radius = add_detection_radius(detectors_df_copy)
    
    with open(f"coverage_results_epoch.txt", "a") as f:
        f.write(f"Epoch: {epoch}\n")
        f.write(f"Number of detectors: {len(detectors_df)}\n")
        # f.write(f"Number of unknown samples in train_set covered by detectors: {evaluate_nonselfcoverage(detectors_radius,train_set_unknown)}\n")
        # f.write(f"Train set unknown covery:{evaluate_nonselfcoverage(detectors_radius,train_set_unknown)/len(test_set_unknown):.4f}\n")
        # f.write(f"Number of unknown samples in test_set covered by detectors: {evaluate_nonselfcoverage(detectors_radius,test_set_unknown)}\n")
        f.write(f"Unknown covery:{evaluate_nonselfcoverage(detectors_radius,test_set_unknown)/len(test_set_unknown):.4f}\n")
    # 评估测试集中自体和非自体的覆盖情况
    self_covered = evaluate_self_coverage(detectors_radius)
    nonself_covered = evaluate_nonself_coverage(detectors_radius)

    # 计算各项指标
    metrics = calculate_metrics(
        self_covered=self_covered,
        nonself_covered=nonself_covered,
        total_self=len(test_self),
        total_nonself=len(test_nonself)
    )
    with open(f"coverage_results_epoch.txt", "a") as f:
        # f.write('Metrics:\n')
        for metric, value in metrics.items(): 
                if metric == "Confusion Matrix":
                    f.write(f"{metric}:\n{value}\n")
                else:
                    f.write(f"{metric}: {value:.4f}\n")
        f.write('\n')
   

In [117]:
 

dataset0 = pd.read_csv(f"{path}detectors_0.csv")
evaluate_detector_performance(0, dataset0)
dataset1 = pd.read_csv(f"{path}detectors_1.csv")
evaluate_detector_performance(1, dataset1)
dataset2 = pd.read_csv(f"{path}detectors_2.csv")
evaluate_detector_performance(2, dataset2)
dataset3 = pd.read_csv(f"{path}detectors_3.csv")
evaluate_detector_performance(3, dataset3)
# dataset4 = pd.read_csv(f"{path}detectors_4.csv")
# evaluate_unknown(4, dataset4)

dataset = pd.read_csv(f"{path}detectors.csv")
evaluate_detector_performance(999, dataset)



# dataset = pd.concat([dataset0, dataset1, dataset2, dataset3], ignore_index=True)
# dataset = pd.concat([dataset0, dataset1, dataset2, dataset3, dataset4], ignore_index=True)

