In [None]:
import pandas as pd

input_path = "OctopusGuard/data/experiment_address.csv"
df = pd.read_csv(input_path)

df["scam"] = 1
df.loc[df.index[-30:], "scam"] = 0

models = ["0_steps", "74_steps", "148_steps", "222_steps", "296_steps"]
tasks = ["Kline_scam", "Tx_scam", "code_scam", "multimodal_scam"]

for model in models:
    for task in tasks:
        df[f"{model}_{task}"] = None  

output_path = "OctopusGuard/evaluations/ablation_and_training_progress/stepwise_template.csv"
df.to_csv(output_path, index=False, encoding='utf-8-sig')

print(f"successÔºö{output_path}")


In [None]:
import pandas as pd
import json
import re
from openai import OpenAI
import time
from tqdm import tqdm


API_KEY = "sk-" 

file_paths = {
    "log_file": "OctopusGuard/evaluations/ablation_and_training_progress/logs/analysis_log_checkpoint-74.txt",
    "gt_json": "OctopusGuard/evaluations/ablation_and_training_progress/test_multimodal_data.json",
    "template_csv": "OctopusGuard/evaluations/ablation_and_training_progress/stepwise_template.csv",
}

client = OpenAI(
    api_key=API_KEY,
    base_url="https://api.deepseek.com"
)

api_cache = {}
def deepseek_judge_match(gt: str, slither_output: str) -> str:
    cache_key = (gt, slither_output)
    if cache_key in api_cache:
        return api_cache[cache_key]

    prompt = f"""
The following is the Ground Truth (human-labeled vulnerability description):
{gt}

And here is the vulnerability analysis output by the OctopusGuard tool:
{slither_output}

As a blockchain security expert, please evaluate the following:
1. Does the OctopusGuard output cover all the core issues in the Ground Truth? Summarize your judgment in one sentence (Match, Partial Match, No Match).
2. Give a similarity score between 0 and 100 indicating how well OctopusGuard's output matches the Ground Truth. Please use the format: `Score: XX`
"""
    for attempt in range(3): 
        try:
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": prompt.strip()}],
                max_tokens=256,
                stream=False
            )
            result = response.choices[0].message.content.strip()
            api_cache[cache_key] = result 
            print(result)
            return result
        except Exception as e:
            print(f"  [API Error] Attempt {attempt + 1} failed: {e}. Retrying in 5 seconds...")
            time.sleep(5)
    
    print(f"  [API Error] All attempts failed for GT: '{gt[:50]}...' and Output: '{slither_output[:50]}...'")
    return "Score: 0" 


def parse_model_log(log_path: str) -> dict:
    print(f"Parsing model log file: {log_path}")
    with open(log_path, 'r', encoding='utf-8') as f:
        content = f.read()

    results = {}
    blocks = re.split(r'==================== Contract Address: ', content)
    
    for block in tqdm(blocks, desc="Parsing Log Blocks"):
        if not block.strip():
            continue
        
        addr_match = re.match(r'(0x[a-fA-F0-9]{40})', block)
        if not addr_match:
            continue
        address = addr_match.group(1).lower()

        patterns = {
            'kline': r"üñºÔ∏è Analyzing token price chart image.*?A:(.*?)(?=üìä Analyzing transaction data)",
            'tx': r"üìä Analyzing transaction data.*?A:(.*?)(?=üîç Analyzing smart contract code)",
            'code': r"üìù Contract Analysis Dialogue Log:.*?A:(.*?)(?=üß† Final Assessment)",
            'multimodal': r"üß† Final Assessment:(.*?)(?======================= ANALYSIS COMPLETE)"
        }
        
        parsed_data = {}
        for key, pattern in patterns.items():
            match = re.search(pattern, block, re.DOTALL | re.IGNORECASE)
            if match:
                parsed_data[key] = match.group(1).strip()
            else:
                parsed_data[key] = "" 
        
        results[address] = parsed_data
    return results

def load_ground_truth(json_path: str) -> list:
    print(f"Loading ground truth from: {json_path}")
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    gt_list = []
    for i in range(0, len(data), 6):
        group = data[i:i+6]
        if len(group) == 6:
            gt_list.append({
                'kline_gt': group[0]['completion'],
                'tx_gt': group[1]['completion'],
                'code_gt': group[4]['completion'],
            })
    return gt_list

def main():
    
    df = pd.read_csv(file_paths["template_csv"])
    df['contract_address'] = df['contract_address'].str.lower()
    
    model_outputs = parse_model_log(file_paths["log_file"])
    ground_truths = load_ground_truth(file_paths["gt_json"])
    
    if len(df) != len(ground_truths) or len(df) != len(model_outputs):
        print(f"Warning: Data length mismatch!")
        print(f"CSV rows: {len(df)}, GT entries: {len(ground_truths)}, Logged addresses: {len(model_outputs)}")
    
    address_list = df['contract_address'].tolist()
    
    for i, row in tqdm(df.iterrows(), total=len(df), desc="Processing Contracts"):
        address = row['contract_address']
        
        gt_scam_label = row['scam'] 
        
        if i >= len(ground_truths):
            print(f"Warning: No ground truth found for index {i}, address {address}. Skipping.")
            continue
        gt_data = ground_truths[i]

        if address not in model_outputs:
            print(f"Warning: No model output found for address {address}. Skipping.")
            continue
        model_output = model_outputs[address]

        print('11---------------------')
        print(model_output.get('kline', '').lower())
        print('11---------------------')
        model_predicts_scam_kline = "yes" in model_output.get('kline', '').lower()
        is_correct_kline = (model_predicts_scam_kline)
        print('12---------------------')
        print(is_correct_kline)
        print('12---------------------')
        df.loc[i, '74_steps_Kline_scam'] = 1 if is_correct_kline else 0

        print('21---------------------')
        print(model_output.get('tx', '').lower())
        print('21---------------------')
        model_predicts_scam_tx = "yes" in model_output.get('tx', '').lower()
        is_correct_tx = (model_predicts_scam_tx)
        print('22---------------------')
        print(is_correct_tx)
        print('22---------------------')
        df.loc[i, '74_steps_Tx_scam'] = 1 if is_correct_tx else 0
        
        print('31---------------------')
        print(model_output.get('multimodal', '').lower())
        print('31---------------------')
        model_predicts_scam_multi = "yes" in model_output.get('multimodal', '').lower()
        is_correct_multi = (model_predicts_scam_multi)
        print('32---------------------')
        print(is_correct_multi)
        print('32---------------------')
        df.loc[i, '74_steps_multimodal_scam'] = 1 if is_correct_multi else 0

        gt_code = gt_data['code_gt']
        model_code = model_output.get('code', 'No output found.')
        print('41---------------------')
        print(model_code)
        print('41---------------------')
        
        api_response = deepseek_judge_match(gt_code, model_code)
        
        score_match = re.search(r'Score:\s*(\d+)', api_response, re.IGNORECASE)
        score = int(score_match.group(1)) if score_match else 0
        
        is_match = score >= 50
        print('42---------------------')
        print(is_match)
        print('42---------------------')
        if i < 270:
            df.loc[i, '74_steps_code_scam'] = 1 if is_match else 0
        else:
            df.loc[i, '74_steps_code_scam'] = 0 if is_match else 1

    df.to_csv(file_paths["template_csv"], index=False, encoding='utf-8-sig')
    print(f"\nAnalysis complete! Results saved to: {file_paths['template_csv']}")


if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
import json
import re
from openai import OpenAI
import time
from tqdm import tqdm


API_KEY = "sk-" 

file_paths = {
    "log_file": "OctopusGuard/evaluations/ablation_and_training_progress/logs/analysis_log_checkpoint-148.txt",
    "gt_json": "OctopusGuard/evaluations/ablation_and_training_progress/test_multimodal_data.json",
    "template_csv": "OctopusGuard/evaluations/ablation_and_training_progress/stepwise_template.csv",
}

client = OpenAI(
    api_key=API_KEY,
    base_url="https://api.deepseek.com"
)

api_cache = {}
def deepseek_judge_match(gt: str, slither_output: str) -> str:
    cache_key = (gt, slither_output)
    if cache_key in api_cache:
        return api_cache[cache_key]

    prompt = f"""
The following is the Ground Truth (human-labeled vulnerability description):
{gt}

And here is the vulnerability analysis output by the OctopusGuard tool:
{slither_output}

As a blockchain security expert, please evaluate the following:
1. Does the OctopusGuard output cover all the core issues in the Ground Truth? Summarize your judgment in one sentence (Match, Partial Match, No Match).
2. Give a similarity score between 0 and 100 indicating how well OctopusGuard's output matches the Ground Truth. Please use the format: `Score: XX`
"""
    for attempt in range(3): 
        try:
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": prompt.strip()}],
                max_tokens=256,
                stream=False
            )
            result = response.choices[0].message.content.strip()
            api_cache[cache_key] = result 
            print(result)
            return result
        except Exception as e:
            print(f"  [API Error] Attempt {attempt + 1} failed: {e}. Retrying in 5 seconds...")
            time.sleep(5)
    
    print(f"  [API Error] All attempts failed for GT: '{gt[:50]}...' and Output: '{slither_output[:50]}...'")
    return "Score: 0" 


def parse_model_log(log_path: str) -> dict:
    print(f"Parsing model log file: {log_path}")
    with open(log_path, 'r', encoding='utf-8') as f:
        content = f.read()

    results = {}
    blocks = re.split(r'==================== Contract Address: ', content)
    
    for block in tqdm(blocks, desc="Parsing Log Blocks"):
        if not block.strip():
            continue
        
        addr_match = re.match(r'(0x[a-fA-F0-9]{40})', block)
        if not addr_match:
            continue
        address = addr_match.group(1).lower()

        patterns = {
            'kline': r"üñºÔ∏è Analyzing token price chart image.*?A:(.*?)(?=üìä Analyzing transaction data)",
            'tx': r"üìä Analyzing transaction data.*?A:(.*?)(?=üîç Analyzing smart contract code)",
            'code': r"üìù Contract Analysis Dialogue Log:.*?A:(.*?)(?=üß† Final Assessment)",
            'multimodal': r"üß† Final Assessment:(.*?)(?======================= ANALYSIS COMPLETE)"
        }
        
        parsed_data = {}
        for key, pattern in patterns.items():
            match = re.search(pattern, block, re.DOTALL | re.IGNORECASE)
            if match:
                parsed_data[key] = match.group(1).strip()
            else:
                parsed_data[key] = "" 
        
        results[address] = parsed_data
    return results

def load_ground_truth(json_path: str) -> list:
    print(f"Loading ground truth from: {json_path}")
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    gt_list = []
    for i in range(0, len(data), 6):
        group = data[i:i+6]
        if len(group) == 6:
            gt_list.append({
                'kline_gt': group[0]['completion'],
                'tx_gt': group[1]['completion'],
                'code_gt': group[4]['completion'],
            })
    return gt_list

def main():
    
    df = pd.read_csv(file_paths["template_csv"])
    df['contract_address'] = df['contract_address'].str.lower()
    
    model_outputs = parse_model_log(file_paths["log_file"])
    ground_truths = load_ground_truth(file_paths["gt_json"])
    
    if len(df) != len(ground_truths) or len(df) != len(model_outputs):
        print(f"Warning: Data length mismatch!")
        print(f"CSV rows: {len(df)}, GT entries: {len(ground_truths)}, Logged addresses: {len(model_outputs)}")
    
    address_list = df['contract_address'].tolist()
    
    for i, row in tqdm(df.iterrows(), total=len(df), desc="Processing Contracts"):
        address = row['contract_address']
        
        gt_scam_label = row['scam'] 
        
        if i >= len(ground_truths):
            print(f"Warning: No ground truth found for index {i}, address {address}. Skipping.")
            continue
        gt_data = ground_truths[i]

        if address not in model_outputs:
            print(f"Warning: No model output found for address {address}. Skipping.")
            continue
        model_output = model_outputs[address]

        print('11---------------------')
        print(model_output.get('kline', '').lower())
        print('11---------------------')
        model_predicts_scam_kline = "yes" in model_output.get('kline', '').lower()
        is_correct_kline = (model_predicts_scam_kline)
        print('12---------------------')
        print(is_correct_kline)
        print('12---------------------')
        df.loc[i, '148_steps_Kline_scam'] = 1 if is_correct_kline else 0

        print('21---------------------')
        print(model_output.get('tx', '').lower())
        print('21---------------------')
        model_predicts_scam_tx = "yes" in model_output.get('tx', '').lower()
        is_correct_tx = (model_predicts_scam_tx)
        print('22---------------------')
        print(is_correct_tx)
        print('22---------------------')
        df.loc[i, '148_steps_Tx_scam'] = 1 if is_correct_tx else 0
        
        print('31---------------------')
        print(model_output.get('multimodal', '').lower())
        print('31---------------------')
        model_predicts_scam_multi = "yes" in model_output.get('multimodal', '').lower()
        is_correct_multi = (model_predicts_scam_multi)
        print('32---------------------')
        print(is_correct_multi)
        print('32---------------------')
        df.loc[i, '148_steps_multimodal_scam'] = 1 if is_correct_multi else 0

        gt_code = gt_data['code_gt']
        model_code = model_output.get('code', 'No output found.')
        print('41---------------------')
        print(model_code)
        print('41---------------------')
        
        api_response = deepseek_judge_match(gt_code, model_code)
        
        score_match = re.search(r'Score:\s*(\d+)', api_response, re.IGNORECASE)
        score = int(score_match.group(1)) if score_match else 0
        
        is_match = score >= 50
        print('42---------------------')
        print(is_match)
        print('42---------------------')
        if i < 270:
            df.loc[i, '148_steps_code_scam'] = 1 if is_match else 0
        else:
            df.loc[i, '148_steps_code_scam'] = 0 if is_match else 1

    df.to_csv(file_paths["template_csv"], index=False, encoding='utf-8-sig')
    print(f"\nAnalysis complete! Results saved to: {file_paths['template_csv']}")


if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
import json
import re
from openai import OpenAI
import time
from tqdm import tqdm


API_KEY = "sk-" 

file_paths = {
    "log_file": "OctopusGuard/evaluations/ablation_and_training_progress/logs/analysis_log_checkpoint-222.txt",
    "gt_json": "OctopusGuard/evaluations/ablation_and_training_progress/test_multimodal_data.json",
    "template_csv": "OctopusGuard/evaluations/ablation_and_training_progress/stepwise_template.csv",
}

client = OpenAI(
    api_key=API_KEY,
    base_url="https://api.deepseek.com"
)

api_cache = {}
def deepseek_judge_match(gt: str, slither_output: str) -> str:
    cache_key = (gt, slither_output)
    if cache_key in api_cache:
        return api_cache[cache_key]

    prompt = f"""
The following is the Ground Truth (human-labeled vulnerability description):
{gt}

And here is the vulnerability analysis output by the OctopusGuard tool:
{slither_output}

As a blockchain security expert, please evaluate the following:
1. Does the OctopusGuard output cover all the core issues in the Ground Truth? Summarize your judgment in one sentence (Match, Partial Match, No Match).
2. Give a similarity score between 0 and 100 indicating how well OctopusGuard's output matches the Ground Truth. Please use the format: `Score: XX`
"""
    for attempt in range(3): 
        try:
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": prompt.strip()}],
                max_tokens=256,
                stream=False
            )
            result = response.choices[0].message.content.strip()
            api_cache[cache_key] = result 
            print(result)
            return result
        except Exception as e:
            print(f"  [API Error] Attempt {attempt + 1} failed: {e}. Retrying in 5 seconds...")
            time.sleep(5)
    
    print(f"  [API Error] All attempts failed for GT: '{gt[:50]}...' and Output: '{slither_output[:50]}...'")
    return "Score: 0" 


def parse_model_log(log_path: str) -> dict:
    print(f"Parsing model log file: {log_path}")
    with open(log_path, 'r', encoding='utf-8') as f:
        content = f.read()

    results = {}
    blocks = re.split(r'==================== Contract Address: ', content)
    
    for block in tqdm(blocks, desc="Parsing Log Blocks"):
        if not block.strip():
            continue
        
        addr_match = re.match(r'(0x[a-fA-F0-9]{40})', block)
        if not addr_match:
            continue
        address = addr_match.group(1).lower()

        patterns = {
            'kline': r"üñºÔ∏è Analyzing token price chart image.*?A:(.*?)(?=üìä Analyzing transaction data)",
            'tx': r"üìä Analyzing transaction data.*?A:(.*?)(?=üîç Analyzing smart contract code)",
            'code': r"üìù Contract Analysis Dialogue Log:.*?A:(.*?)(?=üß† Final Assessment)",
            'multimodal': r"üß† Final Assessment:(.*?)(?======================= ANALYSIS COMPLETE)"
        }
        
        parsed_data = {}
        for key, pattern in patterns.items():
            match = re.search(pattern, block, re.DOTALL | re.IGNORECASE)
            if match:
                parsed_data[key] = match.group(1).strip()
            else:
                parsed_data[key] = "" 
        
        results[address] = parsed_data
    return results

def load_ground_truth(json_path: str) -> list:
    print(f"Loading ground truth from: {json_path}")
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    gt_list = []
    for i in range(0, len(data), 6):
        group = data[i:i+6]
        if len(group) == 6:
            gt_list.append({
                'kline_gt': group[0]['completion'],
                'tx_gt': group[1]['completion'],
                'code_gt': group[4]['completion'],
            })
    return gt_list

def main():
    
    df = pd.read_csv(file_paths["template_csv"])
    df['contract_address'] = df['contract_address'].str.lower()
    
    model_outputs = parse_model_log(file_paths["log_file"])
    ground_truths = load_ground_truth(file_paths["gt_json"])
    
    if len(df) != len(ground_truths) or len(df) != len(model_outputs):
        print(f"Warning: Data length mismatch!")
        print(f"CSV rows: {len(df)}, GT entries: {len(ground_truths)}, Logged addresses: {len(model_outputs)}")
    
    address_list = df['contract_address'].tolist()
    
    for i, row in tqdm(df.iterrows(), total=len(df), desc="Processing Contracts"):
        address = row['contract_address']
        
        gt_scam_label = row['scam'] 
        
        if i >= len(ground_truths):
            print(f"Warning: No ground truth found for index {i}, address {address}. Skipping.")
            continue
        gt_data = ground_truths[i]

        if address not in model_outputs:
            print(f"Warning: No model output found for address {address}. Skipping.")
            continue
        model_output = model_outputs[address]

        print('11---------------------')
        print(model_output.get('kline', '').lower())
        print('11---------------------')
        model_predicts_scam_kline = "yes" in model_output.get('kline', '').lower()
        is_correct_kline = (model_predicts_scam_kline)
        print('12---------------------')
        print(is_correct_kline)
        print('12---------------------')
        df.loc[i, '222_steps_Kline_scam'] = 1 if is_correct_kline else 0

        print('21---------------------')
        print(model_output.get('tx', '').lower())
        print('21---------------------')
        model_predicts_scam_tx = "yes" in model_output.get('tx', '').lower()
        is_correct_tx = (model_predicts_scam_tx)
        print('22---------------------')
        print(is_correct_tx)
        print('22---------------------')
        df.loc[i, '222_steps_Tx_scam'] = 1 if is_correct_tx else 0
        
        print('31---------------------')
        print(model_output.get('multimodal', '').lower())
        print('31---------------------')
        model_predicts_scam_multi = "yes" in model_output.get('multimodal', '').lower()
        is_correct_multi = (model_predicts_scam_multi)
        print('32---------------------')
        print(is_correct_multi)
        print('32---------------------')
        df.loc[i, '222_steps_multimodal_scam'] = 1 if is_correct_multi else 0

        gt_code = gt_data['code_gt']
        model_code = model_output.get('code', 'No output found.')
        print('41---------------------')
        print(model_code)
        print('41---------------------')
        
        api_response = deepseek_judge_match(gt_code, model_code)
        
        score_match = re.search(r'Score:\s*(\d+)', api_response, re.IGNORECASE)
        score = int(score_match.group(1)) if score_match else 0
        
        is_match = score >= 50
        print('42---------------------')
        print(is_match)
        print('42---------------------')
        if i < 270:
            df.loc[i, '222_steps_code_scam'] = 1 if is_match else 0
        else:
            df.loc[i, '222_steps_code_scam'] = 0 if is_match else 1

    df.to_csv(file_paths["template_csv"], index=False, encoding='utf-8-sig')
    print(f"\nAnalysis complete! Results saved to: {file_paths['template_csv']}")


if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
import json
import re
from openai import OpenAI
import time
from tqdm import tqdm


API_KEY = "sk-" 

file_paths = {
    "log_file": "OctopusGuard/evaluations/ablation_and_training_progress/logs/analysis_log_checkpoint-296.txt",
    "gt_json": "OctopusGuard/evaluations/ablation_and_training_progress/test_multimodal_data.json",
    "template_csv": "OctopusGuard/evaluations/ablation_and_training_progress/stepwise_template.csv",
}

client = OpenAI(
    api_key=API_KEY,
    base_url="https://api.deepseek.com"
)

api_cache = {}
def deepseek_judge_match(gt: str, slither_output: str) -> str:
    cache_key = (gt, slither_output)
    if cache_key in api_cache:
        return api_cache[cache_key]

    prompt = f"""
The following is the Ground Truth (human-labeled vulnerability description):
{gt}

And here is the vulnerability analysis output by the OctopusGuard tool:
{slither_output}

As a blockchain security expert, please evaluate the following:
1. Does the OctopusGuard output cover all the core issues in the Ground Truth? Summarize your judgment in one sentence (Match, Partial Match, No Match).
2. Give a similarity score between 0 and 100 indicating how well OctopusGuard's output matches the Ground Truth. Please use the format: `Score: XX`
"""
    for attempt in range(3): 
        try:
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": prompt.strip()}],
                max_tokens=256,
                stream=False
            )
            result = response.choices[0].message.content.strip()
            api_cache[cache_key] = result 
            print(result)
            return result
        except Exception as e:
            print(f"  [API Error] Attempt {attempt + 1} failed: {e}. Retrying in 5 seconds...")
            time.sleep(5)
    
    print(f"  [API Error] All attempts failed for GT: '{gt[:50]}...' and Output: '{slither_output[:50]}...'")
    return "Score: 0" 


def parse_model_log(log_path: str) -> dict:
    print(f"Parsing model log file: {log_path}")
    with open(log_path, 'r', encoding='utf-8') as f:
        content = f.read()

    results = {}
    blocks = re.split(r'==================== Contract Address: ', content)
    
    for block in tqdm(blocks, desc="Parsing Log Blocks"):
        if not block.strip():
            continue
        
        addr_match = re.match(r'(0x[a-fA-F0-9]{40})', block)
        if not addr_match:
            continue
        address = addr_match.group(1).lower()

        patterns = {
            'kline': r"üñºÔ∏è Analyzing token price chart image.*?A:(.*?)(?=üìä Analyzing transaction data)",
            'tx': r"üìä Analyzing transaction data.*?A:(.*?)(?=üîç Analyzing smart contract code)",
            'code': r"üìù Contract Analysis Dialogue Log:.*?A:(.*?)(?=üß† Final Assessment)",
            'multimodal': r"üß† Final Assessment:(.*?)(?======================= ANALYSIS COMPLETE)"
        }
        
        parsed_data = {}
        for key, pattern in patterns.items():
            match = re.search(pattern, block, re.DOTALL | re.IGNORECASE)
            if match:
                parsed_data[key] = match.group(1).strip()
            else:
                parsed_data[key] = "" 
        
        results[address] = parsed_data
    return results

def load_ground_truth(json_path: str) -> list:
    print(f"Loading ground truth from: {json_path}")
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    gt_list = []
    for i in range(0, len(data), 6):
        group = data[i:i+6]
        if len(group) == 6:
            gt_list.append({
                'kline_gt': group[0]['completion'],
                'tx_gt': group[1]['completion'],
                'code_gt': group[4]['completion'],
            })
    return gt_list

def main():
    
    df = pd.read_csv(file_paths["template_csv"])
    df['contract_address'] = df['contract_address'].str.lower()
    
    model_outputs = parse_model_log(file_paths["log_file"])
    ground_truths = load_ground_truth(file_paths["gt_json"])
    
    if len(df) != len(ground_truths) or len(df) != len(model_outputs):
        print(f"Warning: Data length mismatch!")
        print(f"CSV rows: {len(df)}, GT entries: {len(ground_truths)}, Logged addresses: {len(model_outputs)}")
    
    address_list = df['contract_address'].tolist()
    
    for i, row in tqdm(df.iterrows(), total=len(df), desc="Processing Contracts"):
        address = row['contract_address']
        
        gt_scam_label = row['scam'] 
        
        if i >= len(ground_truths):
            print(f"Warning: No ground truth found for index {i}, address {address}. Skipping.")
            continue
        gt_data = ground_truths[i]

        if address not in model_outputs:
            print(f"Warning: No model output found for address {address}. Skipping.")
            continue
        model_output = model_outputs[address]

        print('11---------------------')
        print(model_output.get('kline', '').lower())
        print('11---------------------')
        model_predicts_scam_kline = "yes" in model_output.get('kline', '').lower()
        is_correct_kline = (model_predicts_scam_kline)
        print('12---------------------')
        print(is_correct_kline)
        print('12---------------------')
        df.loc[i, '296_steps_Kline_scam'] = 1 if is_correct_kline else 0

        print('21---------------------')
        print(model_output.get('tx', '').lower())
        print('21---------------------')
        model_predicts_scam_tx = "yes" in model_output.get('tx', '').lower()
        is_correct_tx = (model_predicts_scam_tx)
        print('22---------------------')
        print(is_correct_tx)
        print('22---------------------')
        df.loc[i, '296_steps_Tx_scam'] = 1 if is_correct_tx else 0
        
        print('31---------------------')
        print(model_output.get('multimodal', '').lower())
        print('31---------------------')
        model_predicts_scam_multi = "yes" in model_output.get('multimodal', '').lower()
        is_correct_multi = (model_predicts_scam_multi)
        print('32---------------------')
        print(is_correct_multi)
        print('32---------------------')
        df.loc[i, '296_steps_multimodal_scam'] = 1 if is_correct_multi else 0

        gt_code = gt_data['code_gt']
        model_code = model_output.get('code', 'No output found.')
        print('41---------------------')
        print(model_code)
        print('41---------------------')
        
        api_response = deepseek_judge_match(gt_code, model_code)
        
        score_match = re.search(r'Score:\s*(\d+)', api_response, re.IGNORECASE)
        score = int(score_match.group(1)) if score_match else 0
        
        is_match = score >= 50
        print('42---------------------')
        print(is_match)
        print('42---------------------')
        if i < 270:
            df.loc[i, '296_steps_code_scam'] = 1 if is_match else 0
        else:
            df.loc[i, '296_steps_code_scam'] = 0 if is_match else 1

    df.to_csv(file_paths["template_csv"], index=False, encoding='utf-8-sig')
    print(f"\nAnalysis complete! Results saved to: {file_paths['template_csv']}")


if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
import json
import re
from openai import OpenAI
import time
from tqdm import tqdm

API_KEY = "sk-" 

file_paths = {
    "log_file": "OctopusGuard/evaluations/ablation_and_training_progress/logs/analysis_log_checkpoint-0-withoutCoT.txt",
    "gt_json": "OctopusGuard/evaluations/ablation_and_training_progress/test_multimodal_data.json",
    "template_csv": "OctopusGuard/evaluations/ablation_and_training_progress/stepwise_template.csv",
}

client = OpenAI(
    api_key=API_KEY,
    base_url="https://api.deepseek.com"
)

api_cache = {}
def deepseek_judge_match(gt: str, model_output: str) -> str:
    cache_key = (gt, model_output)
    if cache_key in api_cache:
        return api_cache[cache_key]

    prompt = f"""
The following is the Ground Truth (human-labeled vulnerability description):
{gt}

And here is the vulnerability analysis output by the analysis tool:
{model_output}

As a blockchain security expert, please evaluate the following:
1. Does the tool's output cover all the core issues in the Ground Truth? Summarize your judgment in one sentence (Match, Partial Match, No Match).
2. Give a similarity score between 0 and 100 indicating how well the tool's output matches the Ground Truth. Please use the format: `Score: XX`
"""
    for attempt in range(3): 
        try:
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[{"role": "user", "content": prompt.strip()}],
                max_tokens=256,
                stream=False
            )
            result = response.choices[0].message.content.strip()
            api_cache[cache_key] = result
            print(result)
            return result
        except Exception as e:
            print(f"  [API Error] Attempt {attempt + 1} failed: {e}. Retrying in 5 seconds...")
            time.sleep(5)
    
    print(f"  [API Error] All attempts failed for GT: '{gt[:50]}...'")
    return "Score: 0" 


def parse_model_log(log_path: str) -> dict:
    print(f"Parsing model log file: {log_path}")
    with open(log_path, 'r', encoding='utf-8') as f:
        content = f.read()

    results = {}
    blocks = re.split(r'==================== Contract Address: ', content)
    
    for block in tqdm(blocks, desc="Parsing Log Blocks"):
        if not block.strip():
            continue
        
        addr_match = re.match(r'(0x[a-fA-F0-9]{40})', block)
        if not addr_match:
            continue
        address = addr_match.group(1).lower()

        patterns = {
            'kline': r"üìà \[K-line Analysis Result\]\s*(.*?)\s*(?=üìä \[Transaction Analysis Result\])",
            'tx': r"üìä \[Transaction Analysis Result\]\s*(.*?)\s*(?=üß© \[Contract Analysis Result\])",
            'code': r"üß© \[Contract Analysis Result\]\s*(.*?)\s*(?=üß† \[Final Unified Decision\])",
            'multimodal': r"üß† \[Final Unified Decision\]\s*(.*?)(?======================= ANALYSIS COMPLETE =====================)"
        }
        
        parsed_data = {}
        for key, pattern in patterns.items():
            match = re.search(pattern, block, re.DOTALL)
            if match:
                parsed_data[key] = match.group(1).strip()
            else:
                parsed_data[key] = ""
        
        results[address] = parsed_data
    return results


def load_ground_truth(json_path: str) -> dict:
    print(f"Loading ground truth from: {json_path}")
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    gt_map = {}
    for i in range(0, len(data), 6):
        group = data[i:i+6]
        if len(group) == 6:
            gt_map[i // 6] = {
                'kline_gt': group[0]['completion'],
                'tx_gt': group[1]['completion'],
                'code_gt': group[4]['completion'],
                'multimodal_gt': group[5]['completion'], 
            }
    return gt_map


def is_scam_from_gt(gt_string: str, modality: str) -> bool:
    gt_lower = gt_string.lower()
    if modality == 'code':
        return "healthy" not in gt_lower
    else:
        return "scam: no" not in gt_lower

def is_scam_from_model_output(model_output_string: str, modality: str) -> bool:
    output_lower = model_output_string.lower()
    if modality == 'kline':
        return output_lower.strip() != "no"
    else:
        return "scam: no" not in output_lower

def main():
    print("Starting experiment 6 analysis for checkpoint-0...")
    
    df = pd.read_csv(file_paths["template_csv"])
    df['contract_address'] = df['contract_address'].str.lower()
    
    model_outputs = parse_model_log(file_paths["log_file"])
    ground_truths = load_ground_truth(file_paths["gt_json"])
    
    if len(df) != len(ground_truths) or len(df) > len(model_outputs):
         print(f"Warning: Data length mismatch!")
         print(f"CSV rows: {len(df)}, GT entries: {len(ground_truths)}, Logged addresses: {len(model_outputs)}")
    
    for i, row in tqdm(df.iterrows(), total=len(df), desc="Processing Contracts"):
        address = row['contract_address']
        
        if i not in ground_truths:
            print(f"Warning: No ground truth found for index {i}, address {address}. Skipping.")
            continue
        gt_data = ground_truths[i]

        if address not in model_outputs:
            print(f"Warning: No model output found for address {address}. Skipping.")
            continue
        model_output = model_outputs[address]

        gt_kline_is_scam = is_scam_from_gt(gt_data['kline_gt'], 'kline')
        model_kline_predicts_scam = is_scam_from_model_output(model_output.get('kline', ''), 'kline')
        is_correct_kline = (model_kline_predicts_scam)
        df.loc[i, '0_steps_Kline_scam'] = 1 if is_correct_kline else 0

        gt_tx_is_scam = is_scam_from_gt(gt_data['tx_gt'], 'tx')
        model_tx_predicts_scam = is_scam_from_model_output(model_output.get('tx', ''), 'tx')
        is_correct_tx = (model_tx_predicts_scam)
        df.loc[i, '0_steps_Tx_scam'] = 1 if is_correct_tx else 0
        
        gt_multi_is_scam = is_scam_from_gt(gt_data['multimodal_gt'], 'multimodal')
        model_multi_predicts_scam = is_scam_from_model_output(model_output.get('multimodal', ''), 'multimodal')
        is_correct_multi = (model_multi_predicts_scam)
        df.loc[i, '0_steps_multimodal_scam'] = 1 if is_correct_multi else 0

        gt_code = gt_data['code_gt']
        model_code = model_output.get('code', 'No output found.')
        
        gt_is_healthy = not is_scam_from_gt(gt_code, 'code')

        is_match_code = False
        api_response = deepseek_judge_match(gt_code, model_code)
        score_match = re.search(r'Score:\s*(\d+)', api_response, re.IGNORECASE)
        score = int(score_match.group(1)) if score_match else 0
        if score >= 50:
            is_match_code = True
        
        if i < 270:
            df.loc[i, '0_steps_code_scam'] = 1 if is_match_code else 0
        else:
            df.loc[i, '0_steps_code_scam'] = 0 if is_match_code else 1

    output_path = file_paths["template_csv"].replace(".csv", "_results_checkpoint-0.csv")
    df.to_csv(output_path, index=False, encoding='utf-8-sig')
    print(f"\nAnalysis complete! Results saved to: {output_path}")


if __name__ == "__main__":
    main()

In [None]:
import pandas as pd

df = pd.read_csv("OctopusGuard/evaluations/ablation_and_training_progress/stepwise_template.csv")

gt_column = "scam"

results = []

for column in df.columns:
    if column in ["contract_address", gt_column]:
        continue

    y_true = df[gt_column]
    y_pred = df[column]

    tp = ((y_pred == 1) & (y_true == 1)).sum()
    fn = ((y_pred == 0) & (y_true == 1)).sum()
    fp = ((y_pred == 1) & (y_true == 0)).sum()
    tn = ((y_pred == 0) & (y_true == 0)).sum()

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0.0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0

    print(f"result - {column}")
    print(f"TP: {tp}")
    print(f"FN: {fn}")
    print(f"FP: {fp}")
    print(f"TN: {tn}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print("-" * 40)

    results.append({
        "Model": column,
        "TP": tp,
        "FN": fn,
        "FP": fp,
        "TN": tn,
        "Precision": precision,
        "Recall": recall,
        "Accuracy": accuracy,
        "F1 Score": f1
    })

results_df = pd.DataFrame(results)
results_df.to_csv("stepwise_results.csv", index=False)
print("result saved to stepwise_results.csv")


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

steps = ["0_steps", "74_steps", "148_steps", "222_steps", "296_steps"]
tasks = ["Kline_scam", "Tx_scam", "code_scam", "multimodal_scam"]
metrics = ["Recall", "Accuracy", "Precision", "F1 Score",]

df = pd.read_csv("stepwise_results.csv")

plot_data = {
    metric: {task: [] for task in tasks}
    for metric in metrics
}

for step in steps:
    for task in tasks:
        model_name = f"{step}_{task}"
        row = df[df["Model"] == model_name]
        if not row.empty:
            for metric in metrics:
                value = row.iloc[0][metric]
                plot_data[metric][task].append(value)
        else:
            for metric in metrics:
                plot_data[metric][task].append(0.0)

for metric in metrics:
    plt.figure(figsize=(10, 6))
    for task in tasks:
        plt.plot(
            [int(step.split("_")[0]) for step in steps],
            plot_data[metric][task],
            marker='o',
            label=task
        )
    plt.title(f"{metric} over Steps", fontsize=25)
    plt.xlabel("Step", fontsize=25)
    plt.ylabel(metric, fontsize=25)
    plt.xticks(fontsize=25)
    plt.yticks(fontsize=25)
    plt.grid(True)
    plt.legend(fontsize=25)
    plt.tight_layout()
    plt.savefig(f"{metric.replace(' ', '_')}_over_steps.png")
    plt.show()


In [None]:
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import matplotlib.pyplot as plt
import seaborn as sns
import re
import os

def parse_ablation_log(log_path, ordered_addresses):
    if not os.path.exists(log_path):
        print(f"‚ö†Ô∏è Warning: Log file not found, returning empty predictions: {log_path}")
        return [0] * len(ordered_addresses)
    predictions = {}
    current_address = None
    with open(log_path, 'r', encoding='utf-8') as f:
        for line in f:
            if "Contract Address:" in line:
                match = re.search(r'Contract Address: (0x[a-fA-F0-9]{40})', line)
                if match:
                    current_address = match.group(1).lower()
            if "Final Assessment" in line:
                try:
                    while True:
                        next_line = next(f).strip()
                        if next_line.startswith("Scam:"):
                            if "yes" in next_line.lower():
                                predictions[current_address] = 1
                            elif "no" in next_line.lower():
                                predictions[current_address] = 0
                            break
                except StopIteration:
                    pass
    ordered_predictions = []
    for addr in ordered_addresses:
        ordered_predictions.append(predictions.get(addr.lower(), 0))
    return ordered_predictions

def analyze_ablation_study_english(csv_path, ablation_log_dir):
    try:
        df = pd.read_csv(csv_path)
    except FileNotFoundError:
        print(f"Error: Main CSV file not found at {csv_path}")
        return
    y_true = df['scam']
    ordered_addresses = df['contract_address'].tolist()
    step_to_epoch_map = {0: 0, 74: 2, 148: 4, 222: 6, 296: 8}
    steps_list = sorted(list(step_to_epoch_map.keys()))
    base_modalities = ['Kline', 'Tx', 'code', 'multimodal']
    results = []
    for step in steps_list:
        epoch = step_to_epoch_map[step]
        print(f"Processing Step: {step} (Epoch: {epoch})")
        for modality in base_modalities:
            col_name = f'{step}_steps_{modality}_scam'
            if col_name not in df.columns: continue
            y_pred = df[col_name]
            results.append({'Step': step, 'Epoch': epoch, 'Model': modality, 'Accuracy': accuracy_score(y_true, y_pred),
                            'Precision': precision_score(y_true, y_pred, zero_division=0),
                            'Recall': recall_score(y_true, y_pred, zero_division=0),
                            'F1-Score': f1_score(y_true, y_pred, zero_division=0)})
        if epoch == 0:
            print(f"  -> Epoch 0: Using logical OR from CSV for 'Code + Tx'.")
            code_col, tx_col = f'{step}_steps_code_scam', f'{step}_steps_Tx_scam'
            if code_col in df.columns and tx_col in df.columns:
                y_pred_combined = df[code_col] | df[tx_col]
                results.append({'Step': step, 'Epoch': epoch, 'Model': 'Code + Tx', 'Accuracy': accuracy_score(y_true, y_pred_combined),
                                'Precision': precision_score(y_true, y_pred_combined, zero_division=0),
                                'Recall': recall_score(y_true, y_pred_combined, zero_division=0),
                                'F1-Score': f1_score(y_true, y_pred_combined, zero_division=0)})
        else:
            print(f"  -> Epoch {epoch}: Reading 'Code + Tx' results from ablation log for step {step}.")
            log_file_path = os.path.join(ablation_log_dir, f"ablation_code_tx_log_checkpoint-{step}.txt")
            y_pred_ablation_agent = parse_ablation_log(log_file_path, ordered_addresses)
            if len(y_pred_ablation_agent) != len(y_true):
                print(f"‚ùå Error: Mismatch in prediction count for step {step}. Expected {len(y_true)}, got {len(y_pred_ablation_agent)}")
                continue
            results.append({'Step': step, 'Epoch': epoch, 'Model': 'Code + Tx', 'Accuracy': accuracy_score(y_true, y_pred_ablation_agent),
                            'Precision': precision_score(y_true, y_pred_ablation_agent, zero_division=0),
                            'Recall': recall_score(y_true, y_pred_ablation_agent, zero_division=0),
                            'F1-Score': f1_score(y_true, y_pred_ablation_agent, zero_division=0)})
    results_df = pd.DataFrame(results)
    plot_performance_curves_final(results_df)


def plot_performance_curves_final(results_df):
    if results_df.empty:
        print("No data available for plotting.")
        return
        
    model_order = ['code', 'Tx', 'Code + Tx', 'Kline', 'multimodal']
    results_df['Model'] = pd.Categorical(results_df['Model'], categories=model_order, ordered=True)
    results_df = results_df.sort_values('Model')

    FIG_SIZE = (22, 20) 
    FONT_SIZE = 30 
    LABEL_PAD = 20 
    TICK_PAD = 15  
    LINE_WIDTH, MARKER_SIZE = 3.0, 10.0

    fig, axes = plt.subplots(2, 2, figsize=FIG_SIZE)
    
    metrics_to_plot = ['F1-Score', 'Recall', 'Precision', 'Accuracy']
    palette = sns.color_palette("bright", n_colors=len(results_df['Model'].unique()))

    for ax, metric in zip(axes.flatten(), metrics_to_plot):
        sns.lineplot(
            data=results_df, x='Epoch', y=metric, hue='Model', style='Model', 
            markers=True, dashes=True, ax=ax, palette=palette,
            linewidth=LINE_WIDTH, markersize=MARKER_SIZE
        )
        
        ax.set_ylabel(metric, fontsize=FONT_SIZE, labelpad=LABEL_PAD)
        ax.set_xlabel('Epoch', fontsize=FONT_SIZE, labelpad=LABEL_PAD)

        ax.grid(True, linestyle='--', alpha=0.7, linewidth=1.0)
        
        legend = ax.legend(
            title='Model',
            title_fontsize=FONT_SIZE, 
            fontsize=FONT_SIZE,      
            loc='best'                
        )
        for leg_line in legend.get_lines():
            leg_line.set_linewidth(3.0)
            
        ax.set_ylim(0, 1.05)
        ax.tick_params(axis='both', which='major', labelsize=FONT_SIZE, pad=TICK_PAD)
        ax.xaxis.set_major_locator(plt.MaxNLocator(integer=True))

    plt.tight_layout(pad=2.0)
    plt.savefig("ablation_study_epochs_all_legends.png", dpi=300, bbox_inches='tight')
    plt.show()

if __name__ == '__main__':
    main_csv_file = 'stepwise_results.csv'
    ablation_log_directory = 'OctopusGuard/evaluations/ablation_and_training_progress/logs'
    
    analyze_ablation_study_english(main_csv_file, ablation_log_directory)