In [1]:
import os
import ast
import pandas as pd

# Base configuration
BASE_DIR = '/Users/justin/BDAA/ACL/code/agentdojo/src/agentdojo/adverseral_tool'
SUITES = ['banking', 'slack', 'workspace', 'travel']
ATTACK_TYPES = ['Datastream', 'Type I-A', 'Type I-B', 'Type II (A+B)', 'Type III-A', 'Type III-B']

# Helper to safely parse python files without importing them
def parse_python_file(filepath):
    if not os.path.exists(filepath):
        return None
    with open(filepath, 'r', encoding='utf-8') as f:
        try:
            return ast.parse(f.read())
        except Exception as e:
            print(f"Error parsing {filepath}: {e}")
            return None

def get_list_len_from_assignment(tree, variable_name):
    """Finds a list assignment like VAR = [...] and returns its length."""
    if not tree:
        return 0
    for node in ast.walk(tree):
        if isinstance(node, ast.Assign):
            for target in node.targets:
                if isinstance(target, ast.Name) and target.id == variable_name:
                    if isinstance(node.value, ast.List):
                        return len(node.value.elts)
                    elif isinstance(node.value, ast.Dict):
                        return len(node.value.keys)
                    elif isinstance(node.value, ast.Name):
                        # Handle alias like MALICIOUS_TOOLS = NOT_ALLOWED_TOOLS
                        return get_list_len_from_assignment(tree, node.value.id)
    return 0

def get_user_task_count(tree):
    """Counts user tasks from _USER_TASK_PROMPTS dict."""
    return get_list_len_from_assignment(tree, "_USER_TASK_PROMPTS")

def analyze_injection_tasks(tree, user_task_count, attack_filter=None):
    """
    Analyzes injection task classes to calculate security cases.
    Logic:
    - If has user_task_list/id -> count specific.
    - Else -> Cartesian product (1 * user_task_count).
    """
    if not tree:
        return 0
    
    security_cases = 0
    
    for node in ast.walk(tree):
        if isinstance(node, ast.ClassDef):
            # Check if it's an injection task (heuristic: inherits from _BaseAdversarialInjection or decorated)
            is_injection = False
            for base in node.bases:
                if isinstance(base, ast.Name) and 'Injection' in base.id:
                    is_injection = True
            
            # Also check decorator
            for decorator in node.decorator_list:
                if isinstance(decorator, ast.Attribute) and 'register_injection_task' in decorator.attr:
                    is_injection = True
            
            if not is_injection:
                continue

            # Extract attributes from class body
            attrs = {}
            for item in node.body:
                if isinstance(item, ast.Assign):
                    for target in item.targets:
                        if isinstance(target, ast.Name):
                            # Get simple values (strings, lists)
                            if isinstance(item.value, ast.Constant):
                                attrs[target.id] = item.value.value
                            elif isinstance(item.value, ast.List):
                                attrs[target.id] = item.value.elts
                            elif isinstance(item.value, ast.NameConstant): # For None in older python or True/False
                                attrs[target.id] = item.value.value

            # Filter by attack type if required (for Datastream)
            if attack_filter:
                attack_type = attrs.get('ATTACK_TYPE', '')
                # If filter is 'important_instruction', we look for it or generic tasks without type
                # Assuming Datastream tasks might be labeled or are in injection_tasks.py
                if attack_filter == 'important_instruction':
                    if attack_type != 'important_instruction' and 'important_instruction' not in str(attack_type):
                        # If it's a generic task in injection_tasks.py, we often count it as Datastream
                        pass 

            # Calculate cases
            source_id = attrs.get('SOURCE_USER_TASK_ID')
            source_list = attrs.get('SOURCE_USER_TASK_IDS')

            if source_list:
                security_cases += len(source_list)
            elif source_id:
                security_cases += 1
            else:
                # Cartesian product
                security_cases += user_task_count

    return security_cases

# Initialize stats container
stats = {atype: {
    'Benign Tool Quantity': 0,
    'Malicious Tool Quantity': 0,
    'Same-domain Tool Quantity': 0,
    'Benign Task Quantity': 0,
    'Security Case Quantity': 0,
    'Inference Type': 'Single-path' if 'Type I' in atype or 'Datastream' in atype else 'Multi-path'
} for atype in ATTACK_TYPES}

# Define file mappings
TYPE_FILES = {
    'Type I-A': ('type_i_a.py', 'type_i_a_injection_tasks.py'),
    'Type I-B': ('type_i_b.py', 'type_i_b_injection_tasks.py'),
    'Type II (A+B)': [
        ('type_ii_a.py', 'type_ii_a_injection_tasks.py'),
        ('type_ii_b.py', 'type_ii_b_injection_tasks.py')
    ],
    'Type III-A': ('type_iii_a.py', 'type_iii_a_injection_tasks.py'),
    'Type III-B': ('type_iii_b.py', 'type_iii_b_injection_tasks.py')
}

def process_suite(suite_name):
    suite_dir = os.path.join(BASE_DIR, suite_name)
    
    # 1. Determine User Task Count for this suite (Benign Tasks)
    # We use type_i_a_injection_tasks.py as the reference for user prompts, assuming consistency
    ref_task_file = os.path.join(suite_dir, 'type_i_a_injection_tasks.py')
    tree_ref = parse_python_file(ref_task_file)
    user_task_count = get_user_task_count(tree_ref) if tree_ref else 0
    
    # 2. Determine Benign Tool Count for this suite
    # We use NOT_ALLOWED_TOOLS in type_i_a.py as a proxy for the benign toolset size
    ref_tool_file = os.path.join(suite_dir, 'type_i_a.py')
    tree_tool_ref = parse_python_file(ref_tool_file)
    benign_tool_count = get_list_len_from_assignment(tree_tool_ref, 'NOT_ALLOWED_TOOLS')

    # Update Benign Task Count globally (it's the same base for all types, so we add the suite's contribution)
    # Actually, the metric is "Total benign tasks", so we add this suite's tasks to all types
    for atype in ATTACK_TYPES:
        stats[atype]['Benign Task Quantity'] += user_task_count
        # Benign Tool Quantity is usually a static property of the suite, summed across suites
        stats[atype]['Benign Tool Quantity'] += benign_tool_count

    # 3. Process Datastream (Generic Injection Tasks)
    # Usually found in injection_tasks.py
    datastream_file = os.path.join(suite_dir, 'injection_tasks.py')
    tree_ds = parse_python_file(datastream_file)
    ds_cases = analyze_injection_tasks(tree_ds, user_task_count, attack_filter='important_instruction')
    # If analyze returns 0 because no filter matched, assume all in injection_tasks.py are datastream
    if ds_cases == 0 and tree_ds:
         ds_cases = analyze_injection_tasks(tree_ds, user_task_count)
    
    stats['Datastream']['Security Case Quantity'] += ds_cases
    # Datastream uses benign tools (no specific malicious tools defined usually)
    
    # 4. Process Specific Types
    for atype, file_info in TYPE_FILES.items():
        # Handle merged types (II-A + II-B)
        if isinstance(file_info, list):
            files_list = file_info
        else:
            files_list = [file_info]
            
        for (tool_file_name, task_file_name) in files_list:
            tool_path = os.path.join(suite_dir, tool_file_name)
            task_path = os.path.join(suite_dir, task_file_name)
            
            # Parse Tools
            tree_tool = parse_python_file(tool_path)
            mal_tools = get_list_len_from_assignment(tree_tool, 'MALICIOUS_TOOLS')
            
            stats[atype]['Malicious Tool Quantity'] += mal_tools
            
            # Same-domain tools (Only for Type II and III)
            if 'Type II' in atype or 'Type III' in atype:
                # For these types, malicious tools are typically replacement tools
                stats[atype]['Same-domain Tool Quantity'] += mal_tools
            
            # Parse Tasks (Security Cases)
            tree_task = parse_python_file(task_path)
            cases = analyze_injection_tasks(tree_task, user_task_count)
            stats[atype]['Security Case Quantity'] += cases

# Run processing
for suite in SUITES:
    process_suite(suite)

# Convert to DataFrame for display
df = pd.DataFrame(stats)
print(df)

# Save to CSV or further processing if needed
# df.to_csv('siren_stats.csv')


                            Datastream     Type I-A     Type I-B  \
Benign Tool Quantity                70           70           70   
Malicious Tool Quantity              0           70           18   
Same-domain Tool Quantity            0            0            0   
Benign Task Quantity                20           20           20   
Security Case Quantity               0          218          187   
Inference Type             Single-path  Single-path  Single-path   

                          Type II (A+B)   Type III-A   Type III-B  
Benign Tool Quantity                 70           70           70  
Malicious Tool Quantity              54           24           18  
Same-domain Tool Quantity            54           24           18  
Benign Task Quantity                 20           20           20  
Security Case Quantity              179            0           77  
Inference Type              Single-path  Single-path  Single-path  


  elif isinstance(item.value, ast.NameConstant): # For None in older python or True/False
