In [1]:
import asyncio
import pandas as pd
import json

import analyser
import prompts

import dotenv

dotenv.load_dotenv()


df = pd.read_excel("validate.xlsx")
# df = df.head(5)
print(df.head())

sem = asyncio.Semaphore(10)

analyzer = analyser.Analyser(model_name="gpt-4o-mini", prompt=prompts.FEWSHOT_RAG)

tasks = []
for i, row in df.iterrows():
    async def run_with_semaphore(code):
        async with sem:
            return await analyzer.get_vulnerabilities(code)
    tasks.append(run_with_semaphore(row['code']))

results = await asyncio.gather(*tasks, return_exceptions=True)

df['vulnerabilities'] = None
for i, res in enumerate(results):
    if isinstance(res, Exception):
        print(f"Error at row {i}: {res}")
        df.at[i, 'vulnerabilities'] = []
    else:
        df.at[i, 'vulnerabilities'] = [
            v for v in res if v.get('severity') in ['high', 'medium']
        ]


                                                code  \
0  // SPDX-License-Identifier: MIT\npragma solidi...   
1  // SPDX-License-Identifier: MIT\npragma solidi...   
2  // SPDX-License-Identifier: MIT\npragma solidi...   
3  // SPDX-License-Identifier: MIT\npragma solidi...   
4  // SPDX-License-Identifier: MIT\npragma solidi...   

                                             contain  \
0  [{\n    "line": "13",\n    "token": "payable(m...   
1                                                 []   
2  [{\n    "line": "8",\n    "token": "balances[m...   
3  [\n  {\n    "line": "14",\n    "token": "Data ...   
4  [\n  {\n    "line": "10",\n    "token": "tx.or...   

                                                miss  
0                                                 []  
1  [{\n    "line": "12",\n    "token": "payable(m...  
2                                                 []  
3                                                 []  
4                                                 [

In [2]:
from nltk.translate.bleu_score import sentence_bleu

def compare_strings(str1, str2):
    """
    Compare two strings using BLEU score.
    """
    reference = [str1.split()]
    candidate = str2.split()
    score = sentence_bleu(reference, candidate)
    return score

In [3]:
from langchain.embeddings import OpenAIEmbeddings
import numpy as np

def cosine_similarity(vec1, vec2):
    similarity = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    return similarity

def sigmoid_scaled(similarity: float, threshold: float = 0.85, steepness: float = 50.0) -> float:
    """
    Преобразует similarity через сигмоид:
      - threshold — центр «перехода» (где output ≈ 0.5)
      - steepness — крутизна S-образной кривой
    """
    return 1.0 / (1.0 + np.exp(-steepness * (similarity - threshold)))


def compare_strings_llm(str1, str2):
    """
    Compare two strings using OpenAI embeddings and return cosine similarity.
    """
    embeddings = OpenAIEmbeddings()
    vec1 = embeddings.embed_query(str1)
    vec2 = embeddings.embed_query(str2)
    
    return sigmoid_scaled(cosine_similarity(vec1, vec2))

In [4]:
compare_strings_llm(
    "This code uses `tx.origin` for authorization checks. If an authorized account calls a malicious contract that interacts with this contract, the malicious contract can pass the authorization due to `tx.origin` returning the original sender's address. This can lead to unauthorized access and control over the contract.",
    "Directly updating the user's credit before checking the balance could lead to race conditions in certain scenarios, depending on how the `withdraw` function is used. If a malicious actor can call `deposit` right after their balance is checked in the `withdraw` function, they could exploit this to withdraw more than their deposit."
    )

  embeddings = OpenAIEmbeddings()


0.09294196605612257

In [5]:
compare_strings_llm(
    "Using call to withdraw funds exposes the contract to reentrancy attacks, allowing malicious contracts to exploit the inconsistent state during the withdrawal process.",
    "The withdrawal of funds is done using `call` method after checking the user's credit. If a reentrant call is made, it could allow a malicious contract to withdraw more funds than intended before the state is updated (credit[msg.sender] is decremented). This could lead to arbitrary ETH losses."
    )

0.9718318044356793

In [6]:
def evaluate_row(detected, positives, negatives):
    BLUE_MODE = False
    LLM_MODE = True
    """
    Обчислює TP, FP, FN, TN по полях line, problem, token,
    а також accuracy, recall, F1.
    detected: список словників з виявленими вразливостями
    positives: список словників вразливостей, які повинні бути знайдені
    negatives: список словників вразливостей, які не повинні бути знайдені
    """
    # Множини для кожного поля
    det_lines = {v['line'] for v in detected}
    det_probs = {v['problem'] for v in detected}
    det_toks = {v['token'] for v in detected}
    det_exps = {v['explanation'] for v in detected}
    det_migs = {v['migration'] for v in detected}


    pos_lines = {v['line'] for v in positives}
    pos_probs = {v['problem'] for v in positives}
    pos_toks = {v['token'] for v in positives}
    pos_exps = {v['explanation'] for v in positives}
    pos_migs = {v['migration'] for v in positives}

    neg_lines = {v['line'] for v in negatives}
    neg_probs = {v['problem'] for v in negatives}
    neg_toks = {v['token'] for v in negatives}
    neg_exps = {v['explanation'] for v in negatives}
    neg_migs = {v['migration'] for v in negatives}

    # True positives
    tp_line = len(det_lines & pos_lines)
    tp_prob = len(det_probs & pos_probs)
    tp_tok = len(det_toks & pos_toks)
    

    # False positives
    fp_line = len(det_lines & neg_lines)
    fp_prob = len(det_probs & neg_probs)
    fp_tok = len(det_toks & neg_toks)

    # False negatives
    fn_line = len(pos_lines - det_lines)
    fn_prob = len(pos_probs - det_probs)
    fn_tok = len(pos_toks - det_toks)

    # True negatives
    tn_line = len(neg_lines - det_lines)
    tn_prob = len(neg_probs - det_probs)
    tn_tok = len(neg_toks - det_toks)

    threshold = 0.8
    tp_mig = fp_mig = fn_mig = tn_mig = 0
    tp_exp = fp_exp = fn_exp = tn_exp = 0

    if BLUE_MODE:
        cmp = compare_strings
        threshold = 0.5
    elif LLM_MODE:
        cmp = compare_strings_llm
        threshold = 0.8
    else:
        cmp = None
    if cmp:
        # Compare each positive migration against detected
        for pm in pos_migs:
            
            matched = any(cmp(pm, dm) > threshold for dm in det_migs)
            if matched:
                tp_mig += 1
            else:
                fn_mig += 1

        # Compare each negative migration against detected
        for nm in neg_migs:
            matched = any(cmp(nm, dm) > threshold for dm in det_migs)
            if matched:
                fp_mig += 1
            else:
                tn_mig += 1


        for pe in pos_exps:
            matched = any(cmp(pe, de) > threshold for de in det_exps)
            if matched:
                tp_exp += 1
            else:
                fn_exp += 1

        for ne in neg_exps:
            matched = any(cmp(ne, de) > threshold for de in det_exps)
            if matched:
                fp_exp += 1
            else:
                tn_exp += 1

    # Сумарні значення по всіх полях
    tp = tp_line + tp_prob + tp_tok + tp_exp + tp_mig
    fp = fp_line + fp_prob + fp_tok + fp_exp + fp_mig
    fn = fn_line + fn_prob + fn_tok + fn_exp + fn_mig
    tn = tn_line + tn_prob + tn_tok + tn_exp + tn_mig

    # Кількість позитивних та негативних прикладів
    total = tp + fp + fn + tn

    accuracy = (tp + tn) / total if total else 0.0
    recall = tp / (tp + fn) if (tp + fn) else 0.0
    # Precision більше не використовується, але F1 вимагає precision
    precision = tp / (tp + fp) if (tp + fp) else 0.0
    f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0.0

    return {
        'tp': tp, 'fp': fp, 'fn': fn, 'tn': tn,
        'accuracy': accuracy, 'recall': recall, 'f1': f1
    }


In [7]:
total_tp = total_fp = total_fn = total_tn = 0
overall_examples = 0

for i, row in df.iterrows():
    detected = row['vulnerabilities']
    positives = json.loads(row['contain'])
    negatives = json.loads(row['miss'])

    res = evaluate_row(detected, positives, negatives)
    total_tp += res['tp']
    total_fp += res['fp']
    total_fn += res['fn']
    total_tn += res['tn']
    overall_examples += res['tp'] + res['fp'] + res['fn'] + res['tn']

    print(
        f"Row {i}: TP={res['tp']}, FP={res['fp']}, FN={res['fn']}, TN={res['tn']}"
        f" | Accuracy={res['accuracy']:.2f}, Recall={res['recall']:.2f}, F1={res['f1']:.2f}"
    )

overall_accuracy = (total_tp + total_tn) / overall_examples if overall_examples else 0.0
overall_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) else 0.0
# Для F1 необхідний precision, який вже не виводиться для рядків, але можемо обчислити
overall_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) else 0.0
overall_f1 = (2 * overall_precision * overall_recall / (overall_precision + overall_recall)) if (overall_precision + overall_recall) else 0.0

print(
    f"Overall: TP={total_tp}, FP={total_fp}, FN={total_fn}, TN={total_tn}"
    f" | Accuracy={overall_accuracy:.2f}, Recall={overall_recall:.2f}, F1={overall_f1:.2f}"
)


Row 0: TP=3, FP=0, FN=2, TN=0 | Accuracy=0.60, Recall=0.60, F1=0.75
Row 1: TP=0, FP=4, FN=0, TN=1 | Accuracy=0.20, Recall=0.00, F1=0.00
Row 2: TP=2, FP=0, FN=3, TN=0 | Accuracy=0.40, Recall=0.40, F1=0.57
Row 3: TP=3, FP=0, FN=2, TN=0 | Accuracy=0.60, Recall=0.60, F1=0.75
Row 4: TP=2, FP=0, FN=3, TN=0 | Accuracy=0.40, Recall=0.40, F1=0.57
Row 5: TP=3, FP=0, FN=2, TN=0 | Accuracy=0.60, Recall=0.60, F1=0.75
Row 6: TP=2, FP=0, FN=3, TN=0 | Accuracy=0.40, Recall=0.40, F1=0.57
Row 7: TP=1, FP=0, FN=4, TN=0 | Accuracy=0.20, Recall=0.20, F1=0.33
Row 8: TP=0, FP=0, FN=0, TN=5 | Accuracy=1.00, Recall=0.00, F1=0.00
Row 9: TP=3, FP=0, FN=2, TN=0 | Accuracy=0.60, Recall=0.60, F1=0.75
Row 10: TP=4, FP=0, FN=1, TN=0 | Accuracy=0.80, Recall=0.80, F1=0.89
Row 11: TP=0, FP=0, FN=5, TN=0 | Accuracy=0.00, Recall=0.00, F1=0.00
Row 12: TP=3, FP=0, FN=2, TN=0 | Accuracy=0.60, Recall=0.60, F1=0.75
Row 13: TP=0, FP=0, FN=5, TN=0 | Accuracy=0.00, Recall=0.00, F1=0.00
Row 14: TP=3, FP=0, FN=2, TN=0 | Accuracy=0.

In [8]:
for i in df["vulnerabilities"]:
    print(i)

[{'line': '12', 'token': 'require(payable(msg.sender).call{value: amount}("");', 'problem': 'Reentrancy Vulnerability', 'severity': 'high', 'explanation': "The contract allows an external call to `msg.sender` before updating the internal state (the `credit` mapping). This opens up the contract to a reentrancy attack, where a malicious contract can call the `withdraw` function multiple times before the state can be updated, draining the contract's funds.", 'migration': 'Follow the Checks-Effects-Interactions pattern. Update the internal state (subtract the amount from `credit`) before making the external call to `msg.sender`.'}, {'line': '7', 'token': 'credit[msg.sender] += msg.value;', 'problem': 'Potential Overflow', 'severity': 'medium', 'explanation': "With the `credit` variable being a `uint`, there is a potential for overflow if a very large sum is deposited. Although Solidity 0.8.0 has built-in overflow checks, it's best practice to implement checks or use a safe math library for