调用包

In [2]:
import pandas as pd
import numpy as np
import os
import json
import shutil
import math
from pathlib import Path
from random import sample
from typing import List

groundTruth

In [2]:
# ************************************************************
# Normal
# ************************************************************
def read_json(file_path):
    # 打开文件并读取内容
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
    return data

def calculate_metrics_fromLists(groundTruth_list, model_list):
    # 统一转化为小写集合
    groundTruth_lower_set = set([item.lower() for item in groundTruth_list])
    model_lower_set = set([item.lower() for item in model_list])

    # 计算TP, FP, FN, TN
    # 将两个列表转换为集合进行计算
    true_positives = len(groundTruth_lower_set & model_lower_set)
    false_positives = len(model_lower_set - groundTruth_lower_set)
    false_negatives = len(groundTruth_lower_set - model_lower_set)
    true_negatives = 0  # 这个对于集合而言不适用
    
    # 计算Precision, Recall, Accuracy, F1 Score
    if true_positives + false_positives > 0:
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0.0
    else:
        precision = 0.0
    
    if true_positives + false_negatives > 0:
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0.0
    else:
        recall = 0.0
    
    accuracy = true_positives / (true_positives + false_positives + false_negatives) if (true_positives + false_positives + false_negatives) > 0 else 1.0
    
    if precision + recall > 0:
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    else:
        f1 = 0.0
    
    # 将每个类别的值保存到dict中
    metrics_dict = {
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1 Score": f1
    }
    
    return metrics_dict

# 从指定文件夹中获取项目名称list
def get_projectNams_from_folder(_groundTruth_folder_path):
    # 获取文件夹中的所有文件
    files = os.listdir(_groundTruth_folder_path)
    
    # 过滤出所有以 .json 结尾的文件，并去掉文件后缀
    project_names_list = [os.path.splitext(file)[0] for file in files if file.endswith('.json')]
    
    return project_names_list

In [3]:
# ************************************************************
# DeFi Staking Model metrics
# ************************************************************
def get_model_metrics(_groundTruth_json_path, _model_json_path):
    _metrics_dict = {}

    _groundTruth_dict = read_json(_groundTruth_json_path)
    _model_dict = read_json(_model_json_path)

    _model_groundTruth_dict = _groundTruth_dict["Model"]

    # Variables Model metrics
    _variables_model_groundTruth_dict = _model_groundTruth_dict["Variables"]
    _variables_model_dict = _model_dict["Variables"]

    _variables_metrics_dict = get_variables_model_metrics(_variables_model_groundTruth_dict, _variables_model_dict)
    _metrics_dict["Variables"] = _variables_metrics_dict

    # Functions Model metrics
    _functions_model_groundTruth_dict = _model_groundTruth_dict["Functions"]
    _functions_model_dict = _model_dict["Functions"]

    _functions_metrics_dict = get_functions_model_metrics(_functions_model_groundTruth_dict, _functions_model_dict)
    _metrics_dict["Functions"] = _functions_metrics_dict

    # Calculations Model metrics
    _calculations_model_groundTruth_dict = _model_groundTruth_dict["Calculations"]
    _calculations_model_dict = _model_dict["Calculations"]

    _calculations_metrics_dict = get_calculations_model_metrics(_calculations_model_groundTruth_dict, _calculations_model_dict)
    _metrics_dict["Calculations"] = _calculations_metrics_dict

    # Avg
    _avg_accuracy = np.mean([_metrics_dict["Variables"]["Mean"]["Accuracy"], _metrics_dict["Functions"]["Mean"]["Accuracy"], _metrics_dict["Calculations"]["Mean"]["Accuracy"]])
    _avg_precision = np.mean([_metrics_dict["Variables"]["Mean"]["Precision"], _metrics_dict["Functions"]["Mean"]["Precision"], _metrics_dict["Calculations"]["Mean"]["Precision"]])
    _avg_recall = np.mean([_metrics_dict["Variables"]["Mean"]["Recall"], _metrics_dict["Functions"]["Mean"]["Recall"], _metrics_dict["Calculations"]["Mean"]["Recall"]])
    _avg_f1_score = np.mean([_metrics_dict["Variables"]["Mean"]["F1 Score"], _metrics_dict["Functions"]["Mean"]["F1 Score"], _metrics_dict["Calculations"]["Mean"]["F1 Score"]])

    _metrics_dict["Mean"] = {
        "Accuracy": _avg_accuracy,
        "Precision": _avg_precision,
        "Recall": _avg_recall,
        "F1 Score": _avg_f1_score
    }

    return _metrics_dict

# Variables
def get_variables_model_metrics(_variables_model_groundTruth_dict, variables_model_dict):
    _variables_metrics_dict = {}

    _accuracies_list = []
    _precisions_list = []
    _recalls_list = []
    _f1_scores_list = []

    for key in _variables_model_groundTruth_dict.keys():
        _groundTruth_list = _variables_model_groundTruth_dict[key]
        _model_list = variables_model_dict[key]

        _metrics_dict_perKey = calculate_metrics_fromLists(_groundTruth_list, _model_list)

        _accuracies_list.append(_metrics_dict_perKey["Accuracy"])
        _precisions_list.append(_metrics_dict_perKey["Precision"])
        _recalls_list.append(_metrics_dict_perKey["Recall"])
        _f1_scores_list.append(_metrics_dict_perKey["F1 Score"])

        _variables_metrics_dict[key] = _metrics_dict_perKey
    
    _avg_accuracy = np.mean(_accuracies_list)
    _avg_precision = np.mean(_precisions_list)
    _avg_recall = np.mean(_recalls_list)
    _avg_f1_score = np.mean(_f1_scores_list)

    _variables_metrics_dict["Mean"] = {
        "Accuracy": _avg_accuracy,
        "Precision": _avg_precision,
        "Recall": _avg_recall,
        "F1 Score": _avg_f1_score
    }

    return _variables_metrics_dict

# Functions
def get_functions_model_metrics(_functions_model_groundTruth_dict, functions_model_dict):
    _functions_metrics_dict = {}

    _accuracies_list = []
    _precisions_list = []
    _recalls_list = []
    _f1_scores_list = []

    for key in _functions_model_groundTruth_dict.keys():
        _groundTruth_list = _functions_model_groundTruth_dict[key]
        _model_list = functions_model_dict[key]

        _metrics_dict_perKey = calculate_metrics_fromLists(_groundTruth_list, _model_list)

        _accuracies_list.append(_metrics_dict_perKey["Accuracy"])
        _precisions_list.append(_metrics_dict_perKey["Precision"])
        _recalls_list.append(_metrics_dict_perKey["Recall"])
        _f1_scores_list.append(_metrics_dict_perKey["F1 Score"])

        _functions_metrics_dict[key] = _metrics_dict_perKey
    
    _avg_accuracy = np.mean(_accuracies_list)
    _avg_precision = np.mean(_precisions_list)
    _avg_recall = np.mean(_recalls_list)
    _avg_f1_score = np.mean(_f1_scores_list)

    _functions_metrics_dict["Mean"] = {
        "Accuracy": _avg_accuracy,
        "Precision": _avg_precision,
        "Recall": _avg_recall,
        "F1 Score": _avg_f1_score
    }

    return _functions_metrics_dict

# Calculations
def get_calculations_model_metrics(_calculations_model_groundTruth_dict, calculations_model_dict):
    _calculations_metrics_dict = {}

    _accuracies_list = []
    _precisions_list = []
    _recalls_list = []
    _f1_scores_list = []

    for key in _calculations_model_groundTruth_dict.keys():
        _groundTruth_list = _calculations_model_groundTruth_dict[key]
        _model_list = calculations_model_dict[key]

        _calculations_metrics_dict[key] = []

        for _groundTruth_dict in _groundTruth_list:
            # 根据函数名称找到对应的模型输出
            _func_name = _groundTruth_dict["Function"]
            for _model_dict in _model_list:
                if _model_dict["Function"] == _func_name:
                    _metrics_dict_perFunc = calculate_metrics_fromLists(_groundTruth_dict["Calculation Variables"], _model_dict["Full Calculation Variables"])

                    _accuracies_list.append(_metrics_dict_perFunc["Accuracy"])
                    _precisions_list.append(_metrics_dict_perFunc["Precision"])
                    _recalls_list.append(_metrics_dict_perFunc["Recall"])
                    _f1_scores_list.append(_metrics_dict_perFunc["F1 Score"])

                    _calculations_metrics_dict[key].append({
                        "Function": _func_name,
                        "Metrics": _metrics_dict_perFunc
                    })

                    break
    
    _avg_accuracy = np.mean(_accuracies_list)
    _avg_precision = np.mean(_precisions_list)
    _avg_recall = np.mean(_recalls_list)
    _avg_f1_score = np.mean(_f1_scores_list)

    _calculations_metrics_dict["Mean"] = {
        "Accuracy": _avg_accuracy,
        "Precision": _avg_precision,
        "Recall": _avg_recall,
        "F1 Score": _avg_f1_score
    }
    
    return _calculations_metrics_dict

In [8]:
# ************************************************************
# DeFi Staking Defects Detection metrics
# ************************************************************
def get_defects_detection_metrics(_groundTruth_json_path, _defects_json_path):
    _metrics_dict = {}

    _groundTruth_dict = read_json(_groundTruth_json_path)
    _defects_dict = read_json(_defects_json_path)
    _defects_groundTruth_dict = _groundTruth_dict["Defects"]

    for key in _defects_groundTruth_dict.keys():
        _num_defects_groundTruth = len(_defects_groundTruth_dict[key])
        _num_defects_detected = len(_defects_dict[key])

        if _num_defects_detected == 0 and _num_defects_groundTruth == 0:
            _metrics_dict[key] = "TN"

        if _num_defects_detected == 0 and _num_defects_groundTruth > 0:
            _metrics_dict[key] = "FN"

        if _num_defects_detected > 0 and _num_defects_groundTruth == 0:
            _metrics_dict[key] = "FP"

        if _num_defects_detected > 0 and _num_defects_groundTruth > 0:
            _metrics_dict[key] = "TP"
    
    return _metrics_dict

# ************************************************************
# Total metrics
# ************************************************************
def output_total_metrics_perProject(_groundTruth_json_path, _model_json_path, _defects_json_path, output_json_path):
    _metrics_dict = {}

    _model_metrics_dict = get_model_metrics(_groundTruth_json_path, _model_json_path)
    _defects_detection_metrics_dict = get_defects_detection_metrics(_groundTruth_json_path, _defects_json_path)

    _metrics_dict["Model"] = _model_metrics_dict
    _metrics_dict["Defects"] = _defects_detection_metrics_dict

    # Output
    with open(output_json_path, 'w', encoding='utf-8') as file:
        json.dump(_metrics_dict, file, indent=4)
    
    return _metrics_dict

# ************************************************************
# GroundTruth Dataset metrics
# ************************************************************

# 输出groundTruth数据集中DeFi Staking建模以及漏洞检测的效果
def output_groundTruth_metrics(_groundTruth_folder_path, _output_json_path):
    _metrics_dict = get_groundTruth_metrics_dict(_groundTruth_folder_path)

    # Output
    with open(_output_json_path, 'w', encoding='utf-8') as file:
        json.dump(_metrics_dict, file, indent=4)


# 计算groundTruth数据集中DeFi Staking建模以及漏洞检测的效果
def get_groundTruth_metrics_dict(_groundTruth_folder_path):
    _metrics_dict_list = read_json_files_from_folder(_groundTruth_folder_path)

    # _metrics_model_dict = get_total_metrics_model(_metrics_dict_list)
    _metrics_defects_dict = get_total_metrics_defects(_metrics_dict_list)

    _metrics_dict = {
        # "Model": _metrics_model_dict,
        "Defects": _metrics_defects_dict
    }

    return _metrics_dict

# 从指定文件夹中读取所有的json文件，并生成list
def read_json_files_from_folder(folder_path):
    json_data_list = []
    
    # 遍历文件夹中的所有文件
    for filename in os.listdir(folder_path):
        # 检查文件扩展名是否为.json
        if filename.endswith('.json'):
            file_path = os.path.join(folder_path, filename)
            
            # 打开并读取JSON文件
            with open(file_path, 'r', encoding='utf-8') as f:
                try:
                    data = json.load(f)
                    json_data_list.append(data)
                except json.JSONDecodeError:
                    print(f"文件 {filename} 不是有效的JSON文件，跳过该文件。")
    
    return json_data_list


# 计算DeFi Staking模型的总体指标
def get_total_metrics_model(_metrics_dict_list):
    _variables_accuracies_list = []
    _variables_precisions_list = []
    _variables_recalls_list = []
    _variables_f1_scores_list = []

    _functions_accuracies_list = []
    _functions_precisions_list = []
    _functions_recalls_list = []
    _functions_f1_scores_list = []

    _calculations_accuracies_list = []
    _calculations_precisions_list = []
    _calculations_recalls_list = []
    _calculations_f1_scores_list = []

    _mean_accuracies_list = []
    _mean_precisions_list = []
    _mean_recalls_list = []
    _mean_f1_scores_list = []

    for _metrics_dict_perProject in _metrics_dict_list:
        _metrics_model_dict_perProject = _metrics_dict_perProject["Model"]

        _variables_accuracies_list.append(_metrics_model_dict_perProject["Variables"]["Mean"]["Accuracy"])
        _variables_precisions_list.append(_metrics_model_dict_perProject["Variables"]["Mean"]["Precision"])
        _variables_recalls_list.append(_metrics_model_dict_perProject["Variables"]["Mean"]["Recall"])
        _variables_f1_scores_list.append(_metrics_model_dict_perProject["Variables"]["Mean"]["F1 Score"])

        _functions_accuracies_list.append(_metrics_model_dict_perProject["Functions"]["Mean"]["Accuracy"])
        _functions_precisions_list.append(_metrics_model_dict_perProject["Functions"]["Mean"]["Precision"])
        _functions_recalls_list.append(_metrics_model_dict_perProject["Functions"]["Mean"]["Recall"])
        _functions_f1_scores_list.append(_metrics_model_dict_perProject["Functions"]["Mean"]["F1 Score"])

        _calculations_accuracies_list.append(_metrics_model_dict_perProject["Calculations"]["Mean"]["Accuracy"])
        _calculations_precisions_list.append(_metrics_model_dict_perProject["Calculations"]["Mean"]["Precision"])
        _calculations_recalls_list.append(_metrics_model_dict_perProject["Calculations"]["Mean"]["Recall"])
        _calculations_f1_scores_list.append(_metrics_model_dict_perProject["Calculations"]["Mean"]["F1 Score"])

        _mean_accuracies_list.append(_metrics_model_dict_perProject["Mean"]["Accuracy"])
        _mean_precisions_list.append(_metrics_model_dict_perProject["Mean"]["Precision"])
        _mean_recalls_list.append(_metrics_model_dict_perProject["Mean"]["Recall"])
        _mean_f1_scores_list.append(_metrics_model_dict_perProject["Mean"]["F1 Score"])

    _total_metrics_model_dict_variables = {
        "Accuracy": np.mean(_variables_accuracies_list),
        "Precision": np.mean(_variables_precisions_list),
        "Recall": np.mean(_variables_recalls_list), 
        "F1 Score": np.mean(_variables_f1_scores_list)
    }

    _total_metrics_model_dict_functions = {
        "Accuracy": np.mean(_functions_accuracies_list),
        "Precision": np.mean(_functions_precisions_list),
        "Recall": np.mean(_functions_recalls_list), 
        "F1 Score": np.mean(_functions_f1_scores_list)
    }

    _total_metrics_model_dict_calculations = {
        "Accuracy": np.mean(_calculations_accuracies_list),
        "Precision": np.mean(_calculations_precisions_list),
        "Recall": np.mean(_calculations_recalls_list), 
        "F1 Score": np.mean(_calculations_f1_scores_list)
    }

    _total_metrics_model_dict_total = {
        "Accuracy": np.mean(_mean_accuracies_list),
        "Precision": np.mean(_mean_precisions_list),
        "Recall": np.mean(_mean_recalls_list), 
        "F1 Score": np.mean(_mean_f1_scores_list)
    }

    _total_metrics_model_dict = {
        "Variables": _total_metrics_model_dict_variables,
        "Functions": _total_metrics_model_dict_functions,
        "Calculations": _total_metrics_model_dict_calculations,
        "Total": _total_metrics_model_dict_total
    }

    return _total_metrics_model_dict

# 计算DeFi Staking Defects Detection的总体指标
def get_total_metrics_defects(_metrics_dict_list):
    # 初始化变量
    _num_cvm_tp = 0
    _num_cvm_fp = 0
    _num_cvm_tn = 0
    _num_cvm_fn = 0

    _num_rt_tp = 0
    _num_rt_fp = 0
    _num_rt_tn = 0
    _num_rt_fn = 0

    _num_slr_tp = 0
    _num_slr_fp = 0
    _num_slr_tn = 0
    _num_slr_fn = 0

    _num_esu_tp = 0
    _num_esu_fp = 0
    _num_esu_tn = 0
    _num_esu_fn = 0

    _num_uv_tp = 0
    _num_uv_fp = 0
    _num_uv_tn = 0
    _num_uv_fn = 0

    _num_ufa_tp = 0
    _num_ufa_fp = 0
    _num_ufa_tn = 0
    _num_ufa_fn = 0

    for _metrics_dict_perProject in _metrics_dict_list:
        _metrics_defects_dict_perProject = _metrics_dict_perProject["Defects"]
        # print(_metrics_defects_dict_perProject)

        _cvm_result_perProject = _metrics_defects_dict_perProject["Critical Variables Manipulation (CVM)"]
        _rt_result_perProject = _metrics_defects_dict_perProject["Rewards without Timedelay (RT)"]
        _slr_result_perProject = _metrics_defects_dict_perProject["Single Liquidity Pool Reliance (SLR)"]
        _esu_result_perProject = _metrics_defects_dict_perProject["Omission in Status Update (OSU)"]
        _uv_result_perProject = _metrics_defects_dict_perProject["Unsafe Verifications (UV)"]
        _ufa_result_perProject = _metrics_defects_dict_perProject["Unauthorized User Funds Access (UFA)"]

        # CVM
        if _cvm_result_perProject == "TP":
            _num_cvm_tp += 1
        elif _cvm_result_perProject == "FP":
            _num_cvm_fp += 1
        elif _cvm_result_perProject == "TN":
            _num_cvm_tn += 1
        elif _cvm_result_perProject == "FN":
            _num_cvm_fn += 1
        
        # RT
        if _rt_result_perProject == "TP":
            _num_rt_tp += 1
        elif _rt_result_perProject == "FP":
            _num_rt_fp += 1
        elif _rt_result_perProject == "TN":
            _num_rt_tn += 1
        elif _rt_result_perProject == "FN":
            _num_rt_fn += 1
        
        # SLR
        if _slr_result_perProject == "TP":
            _num_slr_tp += 1
        elif _slr_result_perProject == "FP":
            _num_slr_fp += 1
        elif _slr_result_perProject == "TN":
            _num_slr_tn += 1
        elif _slr_result_perProject == "FN":
            _num_slr_fn += 1
        
        # ESU
        if _esu_result_perProject == "TP":
            _num_esu_tp += 1
        elif _esu_result_perProject == "FP":
            _num_esu_fp += 1
        elif _esu_result_perProject == "TN":
            _num_esu_tn += 1
        elif _esu_result_perProject == "FN":
            _num_esu_fn += 1
        
        # UV
        if _uv_result_perProject == "TP":
            _num_uv_tp += 1
        elif _uv_result_perProject == "FP":
            _num_uv_fp += 1
        elif _uv_result_perProject == "TN":
            _num_uv_tn += 1
        elif _uv_result_perProject == "FN":
            _num_uv_fn += 1
        
        # UFA
        if _ufa_result_perProject == "TP":
            _num_ufa_tp += 1
        elif _ufa_result_perProject == "FP":
            _num_ufa_fp += 1
        elif _ufa_result_perProject == "TN":
            _num_ufa_tn += 1
        elif _ufa_result_perProject == "FN":
            _num_ufa_fn += 1

    _cvm_accuracy = (_num_cvm_tp + _num_cvm_tn) / (_num_cvm_tp + _num_cvm_tn + _num_cvm_fp + _num_cvm_fn) if _num_cvm_tp + _num_cvm_tn + _num_cvm_fp + _num_cvm_fn > 0 else 0
    _cvm_precision = _num_cvm_tp / (_num_cvm_tp + _num_cvm_fp) if _num_cvm_tp + _num_cvm_fp > 0 else 0
    _cvm_recall = _num_cvm_tp / (_num_cvm_tp + _num_cvm_fn) if _num_cvm_tp + _num_cvm_fn > 0 else 0
    _cvm_f1_score = 2 * _cvm_precision * _cvm_recall / (_cvm_precision + _cvm_recall) if _cvm_precision + _cvm_recall > 0 else 0

    _rt_accuracy = (_num_rt_tp + _num_rt_tn) / (_num_rt_tp + _num_rt_tn + _num_rt_fp + _num_rt_fn) if _num_rt_tp + _num_rt_tn + _num_rt_fp + _num_rt_fn > 0 else 0
    _rt_precision = _num_rt_tp / (_num_rt_tp + _num_rt_fp) if _num_rt_tp + _num_rt_fp > 0 else 0
    _rt_recall = _num_rt_tp / (_num_rt_tp + _num_rt_fn) if _num_rt_tp + _num_rt_fn > 0 else 0
    _rt_f1_score = 2 * _rt_precision * _rt_recall / (_rt_precision + _rt_recall) if _rt_precision + _rt_recall > 0 else 0

    _slr_accuracy = (_num_slr_tp + _num_slr_tn) / (_num_slr_tp + _num_slr_tn + _num_slr_fp + _num_slr_fn) if _num_slr_tp + _num_slr_tn + _num_slr_fp + _num_slr_fn > 0 else 0
    _slr_precision = _num_slr_tp / (_num_slr_tp + _num_slr_fp) if _num_slr_tp + _num_slr_fp > 0 else 0
    _slr_recall = _num_slr_tp / (_num_slr_tp + _num_slr_fn) if _num_slr_tp + _num_slr_fn > 0 else 0
    _slr_f1_score = 2 * _slr_precision * _slr_recall / (_slr_precision + _slr_recall) if _slr_precision + _slr_recall > 0 else 0

    _esu_accuracy = (_num_esu_tp + _num_esu_tn) / (_num_esu_tp + _num_esu_tn + _num_esu_fp + _num_esu_fn) if _num_esu_tp + _num_esu_tn + _num_esu_fp + _num_esu_fn > 0 else 0
    _esu_precision = _num_esu_tp / (_num_esu_tp + _num_esu_fp) if _num_esu_tp + _num_esu_fp > 0 else 0
    _esu_recall = _num_esu_tp / (_num_esu_tp + _num_esu_fn) if _num_esu_tp + _num_esu_fn > 0 else 0
    _esu_f1_score = 2 * _esu_precision * _esu_recall / (_esu_precision + _esu_recall) if _esu_precision + _esu_recall > 0 else 0

    _uv_accuracy = (_num_uv_tp + _num_uv_tn) / (_num_uv_tp + _num_uv_tn + _num_uv_fp + _num_uv_fn) if _num_uv_tp + _num_uv_tn + _num_uv_fp + _num_uv_fn > 0 else 0
    _uv_precision = _num_uv_tp / (_num_uv_tp + _num_uv_fp) if _num_uv_tp + _num_uv_fp > 0 else 0
    _uv_recall = _num_uv_tp / (_num_uv_tp + _num_uv_fn) if _num_uv_tp + _num_uv_fn > 0 else 0
    _uv_f1_score = 2 * _uv_precision * _uv_recall / (_uv_precision + _uv_recall) if _uv_precision + _uv_recall > 0 else 0

    _ufa_accuracy = (_num_ufa_tp + _num_ufa_tn) / (_num_ufa_tp + _num_ufa_tn + _num_ufa_fp + _num_ufa_fn) if _num_ufa_tp + _num_ufa_tn + _num_ufa_fp + _num_ufa_fn > 0 else 0
    _ufa_precision = _num_ufa_tp / (_num_ufa_tp + _num_ufa_fp) if _num_ufa_tp + _num_ufa_fp > 0 else 0
    _ufa_recall = _num_ufa_tp / (_num_ufa_tp + _num_ufa_fn) if _num_ufa_tp + _num_ufa_fn > 0 else 0
    _ufa_f1_score = 2 * _ufa_precision * _ufa_recall / (_ufa_precision + _ufa_recall) if _ufa_precision + _ufa_recall > 0 else 0
        
    _CVM_metrics_dict = {
        "TP": _num_cvm_tp,
        "FP": _num_cvm_fp,
        "TN": _num_cvm_tn,
        "FN": _num_cvm_fn,
        "Accuracy": _cvm_accuracy,
        "Precision": _cvm_precision,
        "Recall": _cvm_recall,
        "F1 Score": _cvm_f1_score
    }

    _RT_metrics_dict = {
        "TP": _num_rt_tp,
        "FP": _num_rt_fp,
        "TN": _num_rt_tn,
        "FN": _num_rt_fn,
        "Accuracy": _rt_accuracy,
        "Precision": _rt_precision,
        "Recall": _rt_recall,
        "F1 Score": _rt_f1_score
    }

    _SLR_metrics_dict = {
        "TP": _num_slr_tp,
        "FP": _num_slr_fp,
        "TN": _num_slr_tn,
        "FN": _num_slr_fn,
        "Accuracy": _slr_accuracy,
        "Precision": _slr_precision,
        "Recall": _slr_recall,
        "F1 Score": _slr_f1_score
    }

    _ESU_metrics_dict = {
        "TP": _num_esu_tp,
        "FP": _num_esu_fp,
        "TN": _num_esu_tn,
        "FN": _num_esu_fn,
        "Accuracy": _esu_accuracy,
        "Precision": _esu_precision,
        "Recall": _esu_recall,
        "F1 Score": _esu_f1_score
    }

    _UV_metrics_dict = {
        "TP": _num_uv_tp,
        "FP": _num_uv_fp,
        "TN": _num_uv_tn,
        "FN": _num_uv_fn,
        "Accuracy": _uv_accuracy,
        "Precision": _uv_precision,
        "Recall": _uv_recall,
        "F1 Score": _uv_f1_score
    }

    _UFA_metrics_dict = {
        "TP": _num_ufa_tp,
        "FP": _num_ufa_fp,
        "TN": _num_ufa_tn,
        "FN": _num_ufa_fn,
        "Accuracy": _ufa_accuracy,
        "Precision": _ufa_precision,
        "Recall": _ufa_recall,
        "F1 Score": _ufa_f1_score
    }

    _num_positive_cvm = _num_cvm_tp + _num_cvm_fp
    _num_positive_rt = _num_rt_tp + _num_rt_fp
    _num_positive_slr = _num_slr_tp + _num_slr_fp
    _num_positive_esu = _num_esu_tp + _num_esu_fp
    _num_positive_uv = _num_uv_tp + _num_uv_fp
    _num_positive_ufa = _num_ufa_tp + _num_ufa_fp
    _num_positive_total = _num_positive_cvm + _num_positive_rt + _num_positive_slr + _num_positive_esu + _num_positive_uv + _num_positive_ufa

    _total_accuracy = (
        (_CVM_metrics_dict["Accuracy"] * _num_positive_cvm) + 
        (_RT_metrics_dict["Accuracy"] * _num_positive_rt) + 
        (_SLR_metrics_dict["Accuracy"] * _num_positive_slr) + 
        (_ESU_metrics_dict["Accuracy"] * _num_positive_esu) + 
        (_UV_metrics_dict["Accuracy"] * _num_positive_uv) + 
        (_UFA_metrics_dict["Accuracy"] * _num_positive_ufa)
    ) / _num_positive_total if _num_positive_total > 0 else 0

    _total_precision = (
        (_CVM_metrics_dict["Precision"] * _num_positive_cvm) + 
        (_RT_metrics_dict["Precision"] * _num_positive_rt) +    
        (_SLR_metrics_dict["Precision"] * _num_positive_slr) + 
        (_ESU_metrics_dict["Precision"] * _num_positive_esu) + 
        (_UV_metrics_dict["Precision"] * _num_positive_uv) + 
        (_UFA_metrics_dict["Precision"] * _num_positive_ufa)
    ) / _num_positive_total if _num_positive_total > 0 else 0

    _total_recall = (
        (_CVM_metrics_dict["Recall"] * _num_positive_cvm) + 
        (_RT_metrics_dict["Recall"] * _num_positive_rt) + 
        (_SLR_metrics_dict["Recall"] * _num_positive_slr) + 
        (_ESU_metrics_dict["Recall"] * _num_positive_esu) + 
        (_UV_metrics_dict["Recall"] * _num_positive_uv) + 
        (_UFA_metrics_dict["Recall"] * _num_positive_ufa)
    ) / _num_positive_total if _num_positive_total > 0 else 0

    _total_f1_score = (
        (_CVM_metrics_dict["F1 Score"] * _num_positive_cvm) + 
        (_RT_metrics_dict["F1 Score"] * _num_positive_rt) + 
        (_SLR_metrics_dict["F1 Score"] * _num_positive_slr) + 
        (_ESU_metrics_dict["F1 Score"] * _num_positive_esu) + 
        (_UV_metrics_dict["F1 Score"] * _num_positive_uv) + 
        (_UFA_metrics_dict["F1 Score"] * _num_positive_ufa)
    ) / _num_positive_total if _num_positive_total > 0 else 0

    _total_metrics_dict = {
        "Accuracy": _total_accuracy,
        "Precision": _total_precision,
        "Recall": _total_recall,
        "F1 Score": _total_f1_score
    }

    _total_metrics_defects_dict = {
        "Critical Variables Manipulation (CVM)": _CVM_metrics_dict,
        "Rewards without Timedelay (RT)": _RT_metrics_dict,
        "Single Liquidity Pool Reliance (SLR)": _SLR_metrics_dict,
        "Omission in Status Update (OSU)": _ESU_metrics_dict,
        "Unsafe Verifications (UV)": _UV_metrics_dict,
        "Unauthorized User Funds Access (UFA)": _UFA_metrics_dict,
        "Total": _total_metrics_dict
    }

    return _total_metrics_defects_dict

In [9]:
def output_all_metrics_groundTruth(_groundTruth_folder_path, _model_folder_path, _defects_folder_path, _output_json_path):
    _metrics_dict_list = []
    project_name_list = get_projectNams_from_folder(_groundTruth_folder_path)
    for project_name in project_name_list:
        # print("Project Name: ", project_name)
        # 读取groundTruth.json文件
        _groundTruth_json_path = os.path.join(_groundTruth_folder_path, project_name + ".json")
        _defects_json_path = os.path.join(_defects_folder_path, project_name + ".json")
        _model_json_path = os.path.join(_model_folder_path, project_name + ".json")

        _metrics_dict = {}

        _model_metrics_dict = get_model_metrics(_groundTruth_json_path, _model_json_path)
        _defects_detection_metrics_dict = get_defects_detection_metrics(_groundTruth_json_path, _defects_json_path)

        _metrics_dict["Model"] = _model_metrics_dict
        _metrics_dict["Defects"] = _defects_detection_metrics_dict

        _metrics_dict_list.append(_metrics_dict)

    _total_metrics_defects_dict = get_total_metrics_defects(_metrics_dict_list)
    _total_metrics_model_dict = get_total_metrics_model(_metrics_dict_list)

    _metrics_dict = {
        "Model": _total_metrics_model_dict,
        "Defects": _total_metrics_defects_dict
    }

    # Output
    with open(_output_json_path, 'w', encoding='utf-8') as file:
        json.dump(_metrics_dict, file, indent=4)

    return _metrics_dict

In [10]:
groungTruth_folder_path = "/mnt/linzw3/work/defistaking/3_Experiment/groundTruth/1_groundTruth"
defects_folder_path = "/mnt/linzw3/work/defistaking/3_Experiment/groundTruth/3_defects/defects"
model_folder_path = "/mnt/linzw3/work/defistaking/3_Experiment/groundTruth/2_model/model"
output_json_path = "/mnt/linzw3/work/defistaking/3_Experiment/groundTruth/4_metrics/groundTruth_metrics.json"

total_groundTruth_metrics_dict = output_all_metrics_groundTruth(groungTruth_folder_path, model_folder_path, defects_folder_path, output_json_path)

largeScale

In [3]:
# 所有链的检测结果所在的文件夹
path_largeScale_results = "/mnt/linzw3/work/defistaking/3_Experiment/largeScale/2_defects"
path_largeScale_dataset = "/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final"
path_largeScale_infos = "/mnt/linzw3/work/defistaking/3_Experiment/largeScale/1_model/infos"
path_largeScale_model = "/mnt/linzw3/work/defistaking/3_Experiment/largeScale/1_model/model"

# 所有链的名称
chain_list = [
    "ethereum",
    "bsc",
    "arbitrum",
    "avalanche",
    "celo",
    "fantom",
    "optimism",
    "polygon",
    "tron"
]

defects_kind_list = [
    "Critical Variables Manipulation (CVM)",
    "Rewards without Timedelay (RT)",
    "Single Liquidity Pool Reliance (SLR)",
    "Omission in Status Update (OSU)",
    "Unsafe Verifications (UV)",
    "Unauthorized User Funds Access (UFA)"
]

list_largeScale_results_path = [path_largeScale_results + "/" + chain_name for chain_name in chain_list] 
list_largeScale_defects_path = [list_largeScale_results_path[i] + "/defects" for i in range(len(chain_list))]
list_largeScale_details_path = [list_largeScale_results_path[i] + "/details" for i in range(len(chain_list))]


In [15]:
# 复制数据集，不用再跑了
def copy_matching_sol_files(folder_defects, folder_dataset_origin, folder_dataset_final):
    # 如果文件夹 folder_dataset_final 不存在，则创建它
    if not os.path.exists(folder_dataset_final):
        os.makedirs(folder_dataset_final)
        print(f"文件夹 C ({folder_dataset_final}) 不存在，已创建该文件夹。")

    # 遍历文件夹 A 中的每个 JSON 文件
    for json_filename in os.listdir(folder_defects):
        if json_filename.endswith('.json'):
            # 获取文件名（不带扩展名）
            json_basename = os.path.splitext(json_filename)[0]
            sol_filename = f"{json_basename}.sol"
            sol_path = os.path.join(folder_dataset_origin, sol_filename)

            # 检查对应的 .sol 文件是否存在于文件夹 B 中
            if os.path.isfile(sol_path):
                # 构造目标路径
                target_path = os.path.join(folder_dataset_final, sol_filename)
                # 执行复制操作
                shutil.copy(sol_path, target_path)
                # print(f"已将 {sol_filename} 从 {folder_dataset_origin} 复制到 {folder_dataset_final}")
            else:
                print(f"在文件夹 B 中找不到匹配的 .sol 文件: {sol_filename}")

folder_infos_all = "/mnt/linzw3/work/defistaking/3_Experiment/largeScale/1_model/infos"
folder_dataset_origin_all = "/mnt/linzw3/work/defistaking/1_Datasets/largeScale/analyzable"
folder_dataset_final_all = "/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2"

for chain_name in chain_list:
    print("Copying .sol files for " + chain_name)
    path_folder_infos = folder_infos_all + "/" + chain_name 
    path_folder_dataset_origin = folder_dataset_origin_all + "/" + chain_name
    path_folder_dataset_final = folder_dataset_final_all + "/" + chain_name
    copy_matching_sol_files(path_folder_infos, path_folder_dataset_origin, path_folder_dataset_final)

Copying .sol files for ethereum
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/ethereum) 不存在，已创建该文件夹。
Copying .sol files for bsc
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/bsc) 不存在，已创建该文件夹。
在文件夹 B 中找不到匹配的 .sol 文件: A61C3c7B1297fF1e26D0f56DFBD518a1078e791c_MasterChef.sol
Copying .sol files for arbitrum
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/arbitrum) 不存在，已创建该文件夹。
Copying .sol files for avalanche
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/avalanche) 不存在，已创建该文件夹。
Copying .sol files for celo
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/celo) 不存在，已创建该文件夹。
Copying .sol files for fantom
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/fantom) 不存在，已创建该文件夹。
Copying .sol files for optimism
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/optimism) 不存在，已创建该文件夹。
Copying .sol files for polygon
文件夹 C (/mnt/linzw3/work/defistaking/1_Datasets/largeScale/fina

In [6]:
missed_file_path = '/mnt/linzw3/work/defistaking/1_Datasets/largeScale/bsc/A61C3c7B1297fF1e26D0f56DFBD518a1078e791c_MasterChef.sol'

file_name = os.path.basename(missed_file_path)
destination_path = path_largeScale_dataset + '/bsc/' + file_name

shutil.copy(missed_file_path, destination_path)

'/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final_2/bsc/A61C3c7B1297fF1e26D0f56DFBD518a1078e791c_MasterChef.sol'

In [4]:
# 验证大规模数据
def count_files_in_subfolders(folder_path):
    """计算每个子文件夹中文件的数量，并返回总数量"""
    total_files = 0
    subfolder_counts = {}

    for subfolder in os.listdir(folder_path):
        subfolder_path = os.path.join(folder_path, subfolder)
        if os.path.isdir(subfolder_path):
            files = os.listdir(subfolder_path)
            subfolder_counts[subfolder] = len(files)
            total_files += len(files)
    
    return subfolder_counts, total_files

# 获取每个文件夹的文件数量和总数量
sol_counts, sol_total = count_files_in_subfolders(path_largeScale_dataset)
txt_counts, txt_total = count_files_in_subfolders(path_largeScale_infos)
json_counts, json_total = count_files_in_subfolders(path_largeScale_model)

# 输出每个子文件夹中文件数量
print("智能合约源码(sol)文件夹:")
for subfolder, count in sol_counts.items():
    print(f"{subfolder}: {count} 个文件")
print(f"总数量: {sol_total} 个文件")

print("\n合约基本信息(json)文件夹:")
for subfolder, count in txt_counts.items():
    print(f"{subfolder}: {count} 个文件")
print(f"总数量: {txt_total} 个文件")

print("\n合约建模(json)文件夹:")
for subfolder, count in json_counts.items():
    print(f"{subfolder}: {count} 个文件")
print(f"总数量: {json_total} 个文件")

智能合约源码(sol)文件夹:
ethereum: 1097 个文件
bsc: 5718 个文件
arbitrum: 748 个文件
avalanche: 1386 个文件
celo: 28 个文件
fantom: 2335 个文件
optimism: 92 个文件
polygon: 3917 个文件
tron: 671 个文件
总数量: 15992 个文件

合约基本信息(json)文件夹:
bsc: 5718 个文件
arbitrum: 748 个文件
avalanche: 1386 个文件
celo: 28 个文件
fantom: 2335 个文件
optimism: 92 个文件
polygon: 3917 个文件
tron: 671 个文件
ethereum: 1097 个文件
总数量: 15992 个文件

合约建模(json)文件夹:
bsc: 5607 个文件
arbitrum: 741 个文件
avalanche: 1385 个文件
celo: 28 个文件
fantom: 2312 个文件
optimism: 92 个文件
polygon: 3899 个文件
tron: 666 个文件
ethereum: 1086 个文件
总数量: 15816 个文件


In [5]:
def read_defects_results_allChains(_chain_list, _path_largeScale_dataset, _path_largeScale_results):
    # 初始化检测结果：DataFrame结构
    # 创建列名，"Contract" 作为第一列，其余来自输入列表
    all_columns = ["Contract"] + defects_kind_list
    # 构建空的 DataFrame，只有列名，没有数据
    df_defects_all = pd.DataFrame(columns = all_columns)

    for _chain_name in _chain_list:
        # 读取某个链上所有合约的检测结果
        _path_dataset_perChain = _path_largeScale_dataset + "/" + _chain_name
        _path_defects_perChain = _path_largeScale_results + "/" + _chain_name + "/defects"
        df_defects_perChain = read_defects_results_perChain(_path_dataset_perChain, _path_defects_perChain)
        # 合并两个 DataFrame
        df_combined = pd.concat([df_defects_all, df_defects_perChain], ignore_index=True)
        df_defects_all = df_combined

    return df_defects_all


# 读取所有链上合约的检测结果（只保留是否存在缺陷）
def read_defects_results_perChain(_path_dataset_perChain, _path_largeScale_defects_perChain):
    # 初始化检测结果：DataFrame结构
    # 创建列名，"Contract" 作为第一列，其余来自输入列表
    all_columns = ["Contract"] + defects_kind_list
    
    # 构建空的 DataFrame，只有列名，没有数据
    df_defects = pd.DataFrame(columns = all_columns)
    
    # 读取所有
    list_solPaths = get_solPath_list(_path_dataset_perChain)
    for _sol_path in list_solPaths:
        # 读取合约名
        contract_name = os.path.basename(_sol_path)[:-4]  # 去掉.sol 后缀
        _json_path = os.path.join(_path_largeScale_defects_perChain, contract_name + ".json")

        if not os.path.exists(_json_path):
            _dict_defects_perContract = {"Contract": _sol_path}  # 合约名作为第一列
            for _defect_kind in defects_kind_list:
                _dict_defects_perContract[_defect_kind] = False  # 缺陷类型列初始化为 False
            new_row = pd.DataFrame([_dict_defects_perContract])
            df_defects = pd.concat([df_defects, new_row], ignore_index=True)
            continue

        _dict_defects_perContract = read_defects_results_perContract(_json_path)
        _dict_defects_perContract["Contract"] = _sol_path  # 合约名作为第一列

        # 将新的一行添加到DataFrame
        new_row = pd.DataFrame([_dict_defects_perContract])
        df_defects = pd.concat([df_defects, new_row], ignore_index=True)

    # 输出dataFrame
    return df_defects

# 读取某个合约的检测结果
def read_defects_results_perContract(_path_json_file):
    # 读取 JSON 文件
    with open(_path_json_file, 'r') as f:
        data = json.load(f)
    
    # 创建一个新的行数据字典
    dict_results_perContract = {}  # "Contract"列为json文件路径
    
    # 遍历除“Contract”外的列名
    for key, value in data.items():
        dict_results_perContract[key] = True if len(value) > 0 else False
    
    return dict_results_perContract

# 获取指定文件夹中所有json文件的路径
def get_jsonPath_list(folder_path):
    # 存储所有json文件路径的列表
    list_json_path = []
    
    # 遍历文件夹
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".json"):  # 判断文件扩展名是否为.json
                list_json_path.append(os.path.join(root, file))  # 获取文件的完整路径并添加到列表

    return list_json_path

# 获取指定文件夹中所有json文件的路径
def get_solPath_list(folder_path):
    # 存储所有json文件路径的列表
    list_sol_path = []
    
    # 遍历文件夹
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".sol"):  # 判断文件扩展名是否为.sol
                list_sol_path.append(os.path.join(root, file))  # 获取文件的完整路径并添加到列表

    return list_sol_path

def analyze_vulnerabilities(df):
    # 假设第一列是文件路径，后6列是漏洞标志（True/False）
    vuln_columns = df.columns[1:]  # A-F
    total_files = len(df)

    print(f"总文件数量: {total_files}")

    print("各漏洞类型的文件数量及比例：\n")
    for col in vuln_columns:
        count = df[col].sum()  # True 会被当作 1 来统计
        ratio = count / total_files
        print(f"{col} 漏洞: 数量 = {count}, 比例 = {ratio:.2%}")

    # 至少包含一种漏洞的文件数量和比例
    any_vuln_mask = df[vuln_columns].sum(axis=1) > 0
    any_vuln_count = any_vuln_mask.sum()
    any_vuln_ratio = any_vuln_count / total_files

    print("\n至少包含一种漏洞的文件：")
    print(f"数量 = {any_vuln_count}, 比例 = {any_vuln_ratio:.2%}")



In [6]:
# Test
df_defects_allChains = read_defects_results_allChains(chain_list, path_largeScale_dataset, path_largeScale_results)
analyze_vulnerabilities(df_defects_allChains)

总文件数量: 15992
各漏洞类型的文件数量及比例：

Critical Variables Manipulation (CVM) 漏洞: 数量 = 430, 比例 = 2.69%
Rewards without Timedelay (RT) 漏洞: 数量 = 516, 比例 = 3.23%
Single Liquidity Pool Reliance (SLR) 漏洞: 数量 = 44, 比例 = 0.28%
Omission in Status Update (OSU) 漏洞: 数量 = 889, 比例 = 5.56%
Unsafe Verifications (UV) 漏洞: 数量 = 1315, 比例 = 8.22%
Unauthorized User Funds Access (UFA) 漏洞: 数量 = 846, 比例 = 5.29%

至少包含一种漏洞的文件：
数量 = 3557, 比例 = 22.24%


In [26]:
df_defects_allChains["Contract"][0]

'/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final/ethereum/B6e576f7dEeDdb8cF941625C2b7C78472F162553_StakingETH.sol'

随机抽样

In [7]:
def calculate_sample_size(N, confidence_level=0.95, margin_error=0.10):
    """
    使用简单的二项分布估算样本量。
    Z值约为1.96（对应95%置信度）
    """
    Z = 1.96
    p = 0.5  # 最保守估计
    e = margin_error
    n_0 = (Z**2 * p * (1 - p)) / (e**2)
    n = n_0 / (1 + (n_0 - 1) / N)
    return min(N, math.ceil(n))

def generate_paths(contract_path_str):
    contract_path = Path(contract_path_str)
    largeScale_dataset_folder = Path("/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final")
    largeScale_results_folder = Path("/mnt/linzw3/work/defistaking/3_Experiment/largeScale/2_defects")

    # 拆解路径：.../blockchain_name/defects/contract.json

    blockchain_name = contract_path.parts[-2]         # 区块链名称
    contract_name = contract_path.stem.split(".")[0]  # 合约名（不含后缀）

    # 构造新的路径
    defect_path = largeScale_results_folder / blockchain_name / "defects" / f"{contract_name}.json"
    detail_path = largeScale_results_folder / blockchain_name / "details" / f"{contract_name}.txt"
    source_path = largeScale_dataset_folder / blockchain_name / f"{contract_name}.sol"
    contract_infos = blockchain_name + "/" + contract_name

    return str(defect_path), str(detail_path), str(source_path), contract_infos

def sample_and_copy(df_defects_all: pd.DataFrame, vuln_col: str, base_output_dir: str):
    # 过滤出该漏洞为True的行
    vuln_true_df = df_defects_all[df_defects_all[vuln_col] == True]

    N = len(vuln_true_df)
    if N == 0:
        print(f"[警告] 漏洞 {vuln_col} 无任何 True 样本，跳过。")
        return

    sample_size = calculate_sample_size(N)
    print(f"[信息] {vuln_col} 样本量为 {sample_size}")
    sampled_df = vuln_true_df.sample(n=sample_size, random_state=314)

    # 创建子文件夹
    vuln_folder = Path(base_output_dir) / vuln_col
    vuln_folder.mkdir(parents=True, exist_ok=True)
    vuln_contract_folder = vuln_folder / "contract"
    vuln_contract_folder.mkdir(parents=True, exist_ok=True)
    vuln_details_folder = vuln_folder / "details"
    vuln_details_folder.mkdir(parents=True, exist_ok=True)
    vuln_defects_folder = vuln_folder / "defects"
    vuln_defects_folder.mkdir(parents=True, exist_ok=True)

    sampled_paths = []

    for contract_sol_path in sampled_df.iloc[:, 0]:
        defect_json_path, detail_txt_path, contract_path, contract_infos = generate_paths(contract_sol_path)
        contract_path = Path(contract_path)
        detail_txt_path = Path(detail_txt_path)
        defect_json_path = Path(defect_json_path)

        if contract_path.exists():
            dest_path = vuln_folder / "contract" / contract_path.name
            shutil.copy(contract_path, dest_path)
        else:
            print(f"[警告] 找不到文件: {contract_path}")

        if detail_txt_path.exists():
            dest_path = vuln_folder / "details" / detail_txt_path.name
            shutil.copy(detail_txt_path, dest_path)
        else:
            print(f"[警告] 找不到文件: {detail_txt_path}")

        if defect_json_path.exists():
            dest_path = vuln_folder / "defects" / defect_json_path.name
            shutil.copy(defect_json_path, dest_path)
        else:
            print(f"[警告] 找不到文件: {defect_json_path}")

        sampled_paths.append(str(contract_infos))

    # 保存抽样路径到Excel
    result_df = pd.DataFrame(sampled_paths, columns=["ContractPath"])
    excel_path = Path(base_output_dir) / f"{vuln_col}_sampled.xlsx"
    result_df.to_excel(excel_path, index=False)
    print(f"[信息] 已完成 {vuln_col} 抽样并保存至 {excel_path}")

def output_sampled_contracts(df_defects, output_folder: str):
    if df_defects.shape[1] < 7:
        print("[错误] 输入数据列数不足，应包含至少7列（路径 + 6种漏洞标志）")
        return

    # 提取漏洞列名（假设从第2列到第7列）
    vuln_columns = df_defects.columns[1:7]

    for vuln in vuln_columns:
        sample_and_copy(df_defects, vuln, output_folder)

In [9]:
sample_output_folder = "/mnt/linzw3/work/defistaking/3_Experiment/largeScale/3_samples"
output_sampled_contracts(df_defects_allChains, sample_output_folder)

[信息] Critical Variables Manipulation (CVM) 样本量为 79
[信息] 已完成 Critical Variables Manipulation (CVM) 抽样并保存至 /mnt/linzw3/work/defistaking/3_Experiment/largeScale/3_samples/Critical Variables Manipulation (CVM)_sampled.xlsx
[信息] Rewards without Timedelay (RT) 样本量为 82
[信息] 已完成 Rewards without Timedelay (RT) 抽样并保存至 /mnt/linzw3/work/defistaking/3_Experiment/largeScale/3_samples/Rewards without Timedelay (RT)_sampled.xlsx
[信息] Single Liquidity Pool Reliance (SLR) 样本量为 31
[信息] 已完成 Single Liquidity Pool Reliance (SLR) 抽样并保存至 /mnt/linzw3/work/defistaking/3_Experiment/largeScale/3_samples/Single Liquidity Pool Reliance (SLR)_sampled.xlsx
[信息] Omission in Status Update (OSU) 样本量为 87
[信息] 已完成 Omission in Status Update (OSU) 抽样并保存至 /mnt/linzw3/work/defistaking/3_Experiment/largeScale/3_samples/Omission in Status Update (OSU)_sampled.xlsx
[信息] Unsafe Verifications (UV) 样本量为 90
[信息] 已完成 Unsafe Verifications (UV) 抽样并保存至 /mnt/linzw3/work/defistaking/3_Experiment/largeScale/3_samples/Unsafe Verifications (

In [None]:
# 复制FP sample for test
testFP_chain_name = "fantom"
testFP_contract_name = "0f01D50C9A6dDe60a14E4e8875063cc79F6283E4_Boardroom"

testFP_contract_path = "/mnt/linzw3/work/defistaking/1_Datasets/largeScale/final/" + testFP_chain_name + "/" + testFP_contract_name + ".sol"


Test

In [14]:
df_defects_all["Contract"][0]

'/mnt/linzw3/work/defistaking/3_Experiment/largeScale/2_defects/ethereum/defects/b61b80d1ab9b2D306D2F989E63deD2B0410dA8ab_UNICURRYLTTRewards.json'

In [7]:
path_json_test = "/mnt/linzw3/work/defistaking/3_Experiment/largeScale/2_defects/analyzable/ethereum/defects/b6a2452e8ee8c6d18bdad151935d29c1870598f7_ZEUSToken.json"
dict_defects_test = read_defects_results_perContract(path_json_test)

In [20]:
df_defects_all = read_defects_results_allChains(list_largeScale_defects_path)

In [22]:
df_defects_all

Unnamed: 0,Contract,Critical Variables Manipulation (CVM),Rewards without Timedelay (RT),Single Liquidity Pool Reliance (SLR),Error or Omission in Status Update (ESU),Unsafe Verifications (UV),Unauthorized User Funds Access (UFA)
0,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,False,False,True,False,False
1,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,True,False,False,False,False
2,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,True,False,True,False,False
3,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,True,False,True,False,False
4,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,False,False,True,False,False
5,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,True,False,True,False,False
6,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,False,False,True,True,False
7,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,False,False,False,True,True
8,/mnt/linzw3/work/defistaking/3_Experiment/larg...,True,True,False,True,True,False
9,/mnt/linzw3/work/defistaking/3_Experiment/larg...,False,False,False,True,True,False
