# CBMC-Compatible Harness Generation System - Part 4: Node Functions (Second Half)

This part implements the remaining node functions for our LangGraph workflow, including harness generation, CBMC verification, and results processing.

In [1]:
# Node 5: Generator (LLM Agent) - Generates CBMC harnesses
def generator_node(state: HarnessGenerationState):
    """Generates CBMC-compatible harnesses for the vulnerable functions using ChromaDB data."""
    
    harnesses = {}
    
    for func_name in state.vulnerable_functions:
        # Get function code from ChromaDB
        function_result = code_collection.get(
            ids=[func_name],
            include=["documents", "metadatas"]
        )
        
        if function_result["ids"]:
            func_code = function_result["documents"][0]
            func_metadata = function_result["metadatas"][0]
            
            # Query patterns for this function
            patterns_result = query_pattern_db(func_code)
            
            # Get parameter information
            return_type = func_metadata.get("return_type", "void")
            params = func_metadata.get("params", "")
            has_malloc = func_metadata.get("has_malloc", False)
            has_free = func_metadata.get("has_free", False)
            
            # Build the generator prompt with CBMC-specific instructions
            generator_prompt = f"""
            You are a specialized harness generator for CBMC verification.
            Create a CBMC-compatible harness for the following function that might have memory leaks:
            
            ```c
            {func_code}
            ```
            
            Function metadata:
            - Return type: {return_type}
            - Parameters: {params}
            - Contains malloc: {has_malloc}
            - Contains free: {has_free}
            
            Matching vulnerability patterns:
            {json.dumps(patterns_result['matching_patterns'], indent=2)}
            
            The harness should:
            1. Create realistic inputs for the function
            2. Set up appropriate CBMC assertions and assumptions (__CPROVER_assert, __CPROVER_assume)
            3. Properly call the function and check its behavior
            4. Include memory leak checks
            5. Follow CBMC's harness structure with a void main() function
            
            For memory leak detection, make sure to:
            - Track all allocated memory
            - Verify memory is properly freed for all execution paths
            - Use CBMC-specific features for verification
            
            Provide only the harness code without explanation.
            """
            
            # Generate the harness using the LLM
            response = llm.invoke([SystemMessage(content=generator_prompt)])
            
            # Extract the harness code (assuming it's between code blocks)
            import re
            harness_code = response.content
            match = re.search(r'```(?:c)?\n(.+?)\n```', harness_code, re.DOTALL)
            if match:
                harness_code = match.group(1)
            else:
                harness_code = harness_code  # Use the whole content if no code block is found
            
            harnesses[func_name] = harness_code
    
    return {
        "messages": [AIMessage(content=f"Generated harnesses for {len(harnesses)} functions.")],
        "harnesses": harnesses
    }

In [2]:
# Node 6: CBMC - Runs verification using the generated harnesses
def cbmc_node(state: HarnessGenerationState):
    """Runs CBMC verification on the generated harnesses."""
    
    cbmc_results = {}
    
    for func_name, harness_code in state.harnesses.items():
        # Run CBMC verification
        result = run_cbmc(func_name, harness_code)
        cbmc_results[func_name] = result
    
    # Format results for display
    results_summary = []
    for func_name, result in cbmc_results.items():
        status = result["status"]
        message = result["message"]
        suggestions = result["suggestions"]
        
        summary = f"Function: {func_name}\nStatus: {status}\nMessage: {message}"
        if suggestions:
            summary += f"\nSuggestions: {suggestions}"
        
        results_summary.append(summary)
    
    results_text = "\n\n".join(results_summary)
    
    return {
        "messages": [AIMessage(content=f"CBMC Verification Results:\n\n{results_text}")],
        "cbmc_results": cbmc_results
    }

In [3]:
# Node 7: Output/Frontend - Final results processing
def output_node(state: HarnessGenerationState):
    """Processes the verification results and prepares the output."""
    
    # Create a comprehensive summary of the entire process
    summary = [
        "# Memory Leak Verification Summary",
        "",
        f"Analyzed {len(state.embeddings.get('functions', {}))} functions.",
        f"Identified {len(state.vulnerable_functions)} potentially vulnerable functions.",
        f"Generated {len(state.harnesses)} verification harnesses.",
        f"Completed {len(state.cbmc_results)} CBMC verifications.",
        "",
        "## Detailed Results"
    ]
    
    for func_name in state.vulnerable_functions:
        if func_name in state.cbmc_results:
            result = state.cbmc_results[func_name]
            summary.append(f"\n### Function: {func_name}")
            summary.append(f"Status: {result['status']}")
            summary.append(f"Message: {result['message']}")
            if result["suggestions"]:
                summary.append(f"Suggestions: {result['suggestions']}")
            
            if func_name in state.harnesses:
                summary.append("\nGenerated Harness:")
                summary.append("```c")
                summary.append(state.harnesses[func_name])
                summary.append("```")
    
    # Create the final output message
    final_summary = "\n".join(summary)
    
    return {
        "messages": [AIMessage(content=final_summary)]
    }

In [4]:
# Conditional edge function for tool calls
def route_based_on_tool_use(state: HarnessGenerationState) -> Literal["tool_execution", "continue"]:
    """Route to tool execution if the last message contains tool calls."""
    messages = state["messages"]
    last_message = messages[-1]
    
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tool_execution"
    return "continue"

# Function to execute tool calls
def execute_tools(state: HarnessGenerationState):
    """Executes any tool calls in the last message."""
    messages = state["messages"]
    last_message = messages[-1]
    
    results = []
    for tool_call in last_message.tool_calls:
        tool = tools_by_name[tool_call["name"]]
        result = tool.invoke(tool_call["args"])
        results.append(ToolMessage(content=str(result), tool_call_id=tool_call["id"]))
    
    return {"messages": results}