In [1]:
import pandas as pd
import torch
import torch.nn as nn
import numpy as np
import joblib
from transformers import RobertaTokenizer, RobertaModel
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.multioutput import MultiOutputClassifier
from tqdm import tqdm

# 定義模型類別 (保持與原代碼一致)
class VulnScreener(nn.Module):
    def __init__(self):
        super(VulnScreener, self).__init__()
        self.mlp = nn.Sequential(
            nn.Linear(768, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        return self.mlp(x)

class VulnAnalyzer(nn.Module):
    def __init__(self, dropout_rate=0.2):
        super(VulnAnalyzer, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.residual = nn.Sequential(
            nn.Conv1d(1, 256, kernel_size=1),
            nn.AvgPool1d(kernel_size=8, stride=8)
        )
        self.fc_layers = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(256 * 96, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(512, 9),
            nn.Sigmoid()
        )
    
    def forward(self, x, p_s):
        if p_s.dim() == 1:
            p_s = p_s.unsqueeze(1)
        x = torch.cat((x, p_s), dim=1).unsqueeze(1)
        residual = self.residual(x)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        if residual.size(2) != x.size(2):
            residual = nn.functional.interpolate(residual, size=x.size(2), mode='nearest')
        x = x + residual
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x, None

class VulnValidator:
    def __init__(self, model_path='../codebert/vuln_validator_model.pkl', 
                 scaler_path='../codebert/scaler.pkl', 
                 pca_path='../codebert/pca.pkl'):
        # 加載預訓練的隨機森林模型
        self.rf = joblib.load(model_path)
        # 加載預訓練的Scaler和PCA
        self.scaler = joblib.load(scaler_path)
        self.pca = joblib.load(pca_path)
        self.feature_names = None  # 將在加載時設置
    
    def prepare_features(self, X, p_a, p_s, fit=False):
        X = X.cpu().detach().numpy() if torch.is_tensor(X) else X
        p_a = p_a.cpu().detach().numpy() if torch.is_tensor(p_a) else p_a
        p_s = p_s.cpu().detach().numpy() if torch.is_tensor(p_s) else p_s
        
        # PCA 和統計特徵
        if fit:
            X_pca = self.pca.fit_transform(X)
        else:
            X_pca = self.pca.transform(X)
        
        stats = np.hstack([
            X.mean(axis=1, keepdims=True),
            X.var(axis=1, keepdims=True),
            X.max(axis=1, keepdims=True),
            X.min(axis=1, keepdims=True)
        ])
        interaction = p_a * p_s
        
        # 組合特徵
        features = np.hstack([X_pca, p_a, p_s, stats, interaction])
        if fit:
            features = self.scaler.fit_transform(features)
        else:
            features = self.scaler.transform(features)
        
        # 設置特徵名稱（僅在fit時）
        if fit:
            num_pca = self.pca.n_components
            num_p_a = p_a.shape[1]
            num_p_s = p_s.shape[1]
            self.feature_names = (
                [f"PCA_{i}" for i in range(num_pca)] +
                [f"Analyzer_Prob_{i}" for i in range(num_p_a)] +
                [f"Screener_Prob_{i}" for i in range(num_p_s)] +
                ["Mean", "Variance", "Max", "Min"] +
                [f"Interaction_{i}" for i in range(num_p_a)]
            )
        
        return features
    
    def fit(self, X, p_a, p_s, y_train):
        features = self.prepare_features(X, p_a, p_s, fit=True)
        y_train = y_train.cpu().detach().numpy() if torch.is_tensor(y_train) else y_train
        self.rf.fit(features, y_train)
    
    def predict(self, X, p_a, p_s):
        features = self.prepare_features(X, p_a, p_s, fit=False)
        p_v = self.rf.predict_proba(features)
        return np.stack([prob[:, 1] for prob in p_v], axis=1)
    
    def generate_validation_report(self, p_f, p_v, threshold=0.05):
        report = {"anomalies": [], "corrections": [], "top_features": {}}
        p_f = p_f.cpu().detach().numpy() if torch.is_tensor(p_f) else p_f
        p_v = p_v.cpu().detach().numpy() if torch.is_tensor(p_v) else p_v
        
        # 檢測異常和修正
        for j in range(p_f.shape[1]):
            diff = np.abs(p_f[0, j] - p_v[0, j])
            if diff > threshold:
                if p_f[0, j] > 0.5 and p_v[0, j] < 0.5:
                    report["anomalies"].append(f"Vuln {j}")
                elif p_f[0, j] < 0.5 and p_v[0, j] > 0.5:
                    report["corrections"].append(f"Vuln {j}")
        
        # 計算特徵重要性
        try:
            if hasattr(self.rf, "estimators_") and self.feature_names:
                avg_importances = np.mean([est.feature_importances_ for est in self.rf.estimators_], axis=0)
                top_5_idx = np.argsort(avg_importances)[-5:][::-1]
                report["top_features"] = {self.feature_names[idx]: float(avg_importances[idx]) for idx in top_5_idx}
            else:
                print("Warning: Feature importances not available.")
        except Exception as e:
            print(f"Error calculating feature importances: {e}")
        
        return report

In [2]:
# CodeBERT 特徵提取函數
def extract_codebert_features(code_snippet, tokenizer, model, max_length=512):
    inputs = tokenizer(code_snippet, return_tensors='pt', max_length=max_length, truncation=True, padding='max_length')
    inputs = {k: v.to(model.device) for k, v in inputs.items()}  # 確保輸入在同一設備
    model.eval()
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs.last_hidden_state[:, 0, :].squeeze().cpu().numpy()

def extract_codebert_features_long_code(code_snippet, tokenizer, model, max_length=512, stride=1024):
    tokens = tokenizer.tokenize(code_snippet)
    if len(tokens) <= max_length - 2:
        return extract_codebert_features(code_snippet, tokenizer, model, max_length)

    features = []
    for i in range(0, len(tokens), stride):
        segment = tokens[i:i + max_length - 2]
        if len(segment) > 0:
            segment_code = tokenizer.convert_tokens_to_string(segment)
            feat = extract_codebert_features(segment_code, tokenizer, model, max_length)
            features.append(feat)
    return np.max(features, axis=0) if features else np.zeros(768)

In [3]:
# 檢測單一智能合約
def detect_smart_contract(code_snippet, screener, analyzer, validator, tokenizer, codebert_model, device, max_length=512, stride=1024):
    """
    檢測單一智能合約的漏洞，從原始代碼開始
    code_snippet: str - 原始智能合約代碼
    """
    # 提取 CodeBERT 特徵
    try:
        contract_embedding = extract_codebert_features_long_code(code_snippet, tokenizer, codebert_model, max_length, stride)
        contract_embedding = torch.tensor(contract_embedding, dtype=torch.float32).unsqueeze(0)  # [1, 768]
    except Exception as e:
        print(f"Error extracting CodeBERT features: {e}")
        contract_embedding = torch.zeros(1, 768, dtype=torch.float32)  # 默認值

    contract_embedding = contract_embedding.to(device)
    
    # Step 1: VulnScreener
    screener.eval()
    with torch.no_grad():
        screener_prob = screener(contract_embedding)  # [1, 1]
    
    # Step 2: VulnAnalyzer
    analyzer.eval()
    with torch.no_grad():
        analyzer_prob, _ = analyzer(contract_embedding, screener_prob)  # [1, 9]
    
    # Step 3: VulnValidator
    validator_prob = torch.from_numpy(validator.predict(
        contract_embedding, analyzer_prob, screener_prob
    )).to(device, dtype=torch.float32)  # [1, 9]
    
    # Step 4: 融合結果
    final_prob = 0.5 * analyzer_prob + 0.5 * validator_prob  # [1, 9]
    predictions = (final_prob > 0.5).int().cpu().numpy()[0]  # [9]
    
    # Step 5: 生成驗證報告
    validation_report = validator.generate_validation_report(final_prob, validator_prob)
    
    # 漏洞名稱映射
    vuln_names = [
        "No Vulnerability", "block number dependency (BN)", "dangerous delegatecall (DE)",
        "ether frozen (EF)", "ether strict equality (SE)", "integer overflow (OF)",
        "reentrancy (RE)", "timestamp dependency (TP)", "unchecked external call (UC)"
    ]
    
    # 生成最終報告
    report = {
        "vulnerabilities_detected": [vuln_names[i] for i, pred in enumerate(predictions) if pred == 1],
        "probability_scores": {vuln_names[i]: float(final_prob[0, i]) for i in range(9)},
        "screener_vulnerability_score": float(screener_prob[0, 0]),
        "anomalies": [vuln_names[int(v.split()[1])] for v in validation_report["anomalies"]],
        "corrections": [vuln_names[int(v.split()[1])] for v in validation_report["corrections"]],
        "top_features": validation_report["top_features"]
    }
    
    return report

In [4]:
# 主函數
def main():
    # 設置設備
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # 初始化 CodeBERT
    tokenizer_codebert = RobertaTokenizer.from_pretrained('microsoft/codebert-base')
    model_codebert = RobertaModel.from_pretrained('microsoft/codebert-base').to(device)
    
    # 初始化並加載檢測模型
    screener = VulnScreener().to(device)
    analyzer = VulnAnalyzer().to(device)
    validator = VulnValidator(
        model_path='../codebert/vuln_validator_model.pkl',
        scaler_path='../codebert/scaler.pkl',
        pca_path='../codebert/pca.pkl'
    )
    
    # 加載預訓練權重
    screener = torch.load('../codebert/vuln_screener_model.pth', weights_only=False).to(device)
    analyzer = torch.load('../codebert/vuln_analyzer_model.pth', weights_only=False).to(device)
    
    # 讀取數據集
    df = pd.read_csv("../combined_dataset_cleaned.csv")
    
    # 示例：檢測第一筆智能合約代碼
    sample_idx = 3207  # 可修改為其他索引
    sample_code = df['code'].iloc[sample_idx]
    true_vulnerabilities_binary = eval(df['vulnerability_list'].iloc[sample_idx])  # 將字符串轉為列表
    vuln_names = [
        "No Vulnerability", "block number dependency (BN)", "dangerous delegatecall (DE)",
        "ether frozen (EF)", "ether strict equality (SE)", "integer overflow (OF)",
        "reentrancy (RE)", "timestamp dependency (TP)", "unchecked external call (UC)"
    ]
    true_vulnerabilities = [vuln_names[i] for i, val in enumerate(true_vulnerabilities_binary) if val == 1]
    
    print(f"\nProcessing sample code snippet (index {sample_idx}):\n{sample_code[:200]}...")
    print(f"True Vulnerabilities from dataset: {true_vulnerabilities}")
    
    result = detect_smart_contract(
        sample_code, screener, analyzer, validator, tokenizer_codebert, model_codebert, device
    )
    
    # 輸出結果並比對
    print("\nSmart Contract Vulnerability Detection Report:")
    print(f"Overall Vulnerability Score: {result['screener_vulnerability_score']:.4f}")
    print("Detected Vulnerabilities (Predicted):", result['vulnerabilities_detected'])
    print("True Vulnerabilities (Dataset):", true_vulnerabilities)
    print("Anomalies (Analyzer overconfident):", result['anomalies'])
    print("Corrections (Validator adjustments):", result['corrections'])
    print("Top 5 Important Features:", result['top_features'])
    print("\nDetailed Probability Scores:")
    for vuln, prob in result['probability_scores'].items():
        print(f"{vuln}: {prob:.4f}")

if __name__ == "__main__":
    main()

Using device: mps

Processing sample code snippet (index 3207):
library SafeMathLib{ function mul(uint256 a, uint256 b) internal pure returns (uint256) { uint256 c = a * b; assert(a == 0 || c / a == b); return c; } function div(uint256 a, uint256 b) internal pure ...
True Vulnerabilities from dataset: ['dangerous delegatecall (DE)', 'ether frozen (EF)']

Smart Contract Vulnerability Detection Report:
Overall Vulnerability Score: 8.3231
Detected Vulnerabilities (Predicted): ['dangerous delegatecall (DE)', 'ether frozen (EF)']
True Vulnerabilities (Dataset): ['dangerous delegatecall (DE)', 'ether frozen (EF)']
Anomalies (Analyzer overconfident): []
Corrections (Validator adjustments): []
Top 5 Important Features: {}

Detailed Probability Scores:
No Vulnerability: 0.0536
block number dependency (BN): 0.1010
dangerous delegatecall (DE): 0.7916
ether frozen (EF): 0.7224
ether strict equality (SE): 0.1277
integer overflow (OF): 0.2133
reentrancy (RE): 0.2163
timestamp dependency (TP): 0.2056