# Import packages

In [None]:
import pandas as pd
import os
from tqdm import tqdm
import torch
from torch import cuda
# from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForMaskedLM
from pinecone import Pinecone
from deepeval.metrics import GEval
from deepeval.test_case import LLMTestCaseParams, LLMTestCase
import tiktoken
from langchain_text_splitters import RecursiveCharacterTextSplitter, Language
from langchain import hub
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type, List

# Load dataset

In [2]:
test_df = pd.read_csv('./bigvul.csv')

In [3]:
len(test_df)

23403

In [None]:
# Setup models and Pinecone connection
device = f'cuda:{cuda.current_device()}' if cuda.is_available() else 'cpu'

# Initialize dense and sparse models
dense_model = SentenceTransformer(
    'msmarco-bert-base-dot-v5',
    device=device
)

model_id = 'naver/splade-cocondenser-ensembledistil'
tokenizer = AutoTokenizer.from_pretrained(model_id)
sparse_model = AutoModelForMaskedLM.from_pretrained(model_id)

# Initialize Pinecone
pc = Pinecone(api_key='XXX')
mitre_index = pc.Index('metadata-aug-mitre')
bigvul_index = pc.Index('metadata-retrieval-bigvul')


In [6]:
def encode(text: str):
    """Encode text into dense and sparse vectors for hybrid search."""
    try:
        # Create dense vector
        dense_vec = dense_model.encode(text).tolist()
        
        # Create sparse vector
        input_ids = tokenizer(text, return_tensors='pt')
        with torch.no_grad():
            outputs = sparse_model(**input_ids.to(device))
            sparse_vec = torch.log(1 + torch.relu(outputs.logits)) * input_ids.attention_mask.unsqueeze(-1)
            sparse_vec = torch.max(sparse_vec, dim=1)[0].squeeze()
        
        # Convert to dictionary format
        indices = sparse_vec.nonzero().squeeze().cpu().tolist()
        values = sparse_vec[indices].cpu().tolist()
        sparse_dict = {"indices": indices, "values": values}
        
        return dense_vec, sparse_dict
    except Exception as e:
        raise Exception(f"Encoding failed: {str(e)}")


In [7]:
def query_mitre_index(cwe_id, top_k=1):
    """Query MITRE CWE index for weakness information."""
    query = f"{cwe_id}"
    dense, sparse = encode(query)
    
    result = mitre_index.query(
        vector=dense,
        sparse_vector=sparse,
        top_k=top_k,
        include_metadata=True,
        filter={
            "cwe": {"$eq": f"{cwe_id}"}
        }
    )
    
    return result

def query_bigvul_index(cve_id, cwe_id, top_k=5):
    """Query BigVul index for vulnerability information."""
    query = f"Vulnerability: {cve_id} and Weakness: {cwe_id}"
    dense, sparse = encode(query)
    
    result = bigvul_index.query(
        vector=dense,
        sparse_vector=sparse,
        top_k=top_k,
        include_metadata=True,
        filter={
            "cwe": {"$eq": cwe_id},
            "cve": {"$eq": cve_id}
        }
    )
    
    return result


# Load model

In [None]:
from langchain_openai import ChatOpenAI


CONTEXT_WINDOW = 128000  # Configurable context window
MODEL_NAME = "gpt-4o"
SAFETY_BUFFER = 500  # Reserve tokens for response

llm = ChatOpenAI(
    model=MODEL_NAME,
    temperature=0,
    timeout=None,
    max_retries=2,
    api_key="XXX"
)

# Initialize tiktoken encoder for the model
try:
    encoding = tiktoken.encoding_for_model(MODEL_NAME)
except KeyError:
    encoding = tiktoken.get_encoding("cl100k_base")

def count_tokens(text: str) -> int:
    return len(encoding.encode(text))

def estimate_prompt_tokens(context: str) -> int:
    base_prompt = """
    # CONTEXT #
    You are a software engineer and software vulnerability  expert who specializes in recommending fixes for vulnerable code affected by different CWEs and CVEs.

    # OBJECTIVE #
    Your task is to recommend fixes for the provided vulnerable code. The recommendations should address the specific CWE in question and ensure that the code is secure against the identified vulnerabilities.

    # STYLE #
    Write in a technical and concise manner, providing clear and actionable steps. 

    # TONE #
    Professional and technical.

    # AUDIENCE #
    The target audience is software developers and security professionals who are looking to secure their code against known vulnerabilities.

    # RESPONSE FORMAT #
    Provide a structured recommendation in the following format:
    - Issue: [Brief description of the vulnerability]
    - Recommendation: [Detailed steps to fix the vulnerability]
    - Fix: [Code snippet demonstrating the fix]
    
    Using this context that contains extra information and previous vulnerable code examples and fixes for the CWE in question, recommend how the vulnerable code can be fixed:

    Context:
    {context}
    """
    
    prompt_tokens = count_tokens(base_prompt)
    context_tokens = count_tokens(context)
    
    return prompt_tokens + context_tokens

def can_fit_in_context(code: str, context: str, context_window: int = CONTEXT_WINDOW) -> bool:
    """Check if code + prompt can fit in context window."""
    code_tokens = count_tokens(code)
    prompt_tokens = estimate_prompt_tokens(context)
    total_tokens = code_tokens + prompt_tokens + SAFETY_BUFFER
    
    return total_tokens <= context_window

def calculate_optimal_chunk_size(context_window: int, context: str) -> int:
    """Calculate optimal chunk size based on context window and prompt size."""
    prompt_tokens = estimate_prompt_tokens(context)
    available_tokens = context_window - prompt_tokens - SAFETY_BUFFER
    
    chunk_size_chars = int(available_tokens * 3.5)
    
    return max(chunk_size_chars, 500)

# Create Template

In [None]:
from langchain.prompts import ChatPromptTemplate

messages = [
    ("system", "You are a software engineer and software vulnerability  expert who specializes in recommending fixes for vulnerable code affected by different CWEs and CVEs."),
    ("human", """
        # CONTEXT #
        You are a software engineer and software vulnerability expert who specializes in recommending fixes for vulnerable code affected by different CWEs and CVEs. This includes understanding the specific vulnerabilities and their potential impacts.
    
        # OBJECTIVE #
        Your task is to recommend fixes for the provided vulnerable code. The recommendations should address the specific CWE in question and ensure that the code is secure against the identified vulnerabilities.
    
        # STYLE #
        Write in a technical and concise manner, providing clear and actionable steps. 
    
        # TONE #
        Professional and technical.
    
        # AUDIENCE #
        The target audience is software developers and security professionals who are looking to secure their code against known vulnerabilities.
    
        # RESPONSE FORMAT #
        Provide a structured recommendation in the following format:
        - Issue: [Brief description of the vulnerability]
        - Recommendation: [Detailed steps to fix the vulnerability]
        - Fix: [Code snippet demonstrating the fix]
        
    
        Using this context that contains extra information and previous vulnerable code examples and fixes for the CWE in question, recommend how the vulnerable code can be fixed:
    
        Context:
        {context}
    """),
]

prompt_template = ChatPromptTemplate.from_messages(messages)

def create_code_chunks(code: str, context: str, language: Language = Language.CPP) -> List[str]:
    """Create chunks with optimal size based on context window."""
    optimal_chunk_size = calculate_optimal_chunk_size(CONTEXT_WINDOW, context)
    
    splitter = RecursiveCharacterTextSplitter.from_language(
        language=language,
        chunk_size=optimal_chunk_size,
        chunk_overlap=int(optimal_chunk_size * 0.1)  # 10% overlap
    )
    docs = splitter.create_documents([code])
    return [doc.page_content for doc in docs]

def analyze_code_directly(code: str, context: str) -> str:
    """Directly analyze code using LLM without chunking and generate recommendation."""
    combined_context = f"""
        {context}

        Vulnerable code to fix:
        {code}
    """

    prompt = prompt_template.invoke({"context": combined_context})
    result = llm.invoke(prompt)  

    return result.content

# Agent Tools for Chunked Processing
class AnalyzeChunkArgs(BaseModel):
    code: str = Field(description="Code chunk to analyze for vulnerabilities")
    context: str = Field(description="Context information about CWE/CVE")

class AnalyzeChunkTool(BaseTool):
    name: str = "analyze_chunk"
    description: str = "Analyzes a specific code chunk for vulnerabilities"
    args_schema: Type[BaseModel] = AnalyzeChunkArgs
    
    def _run(self, code: str, context: str) -> str:
        combined_context = f"""
            {context}

            Vulnerable code to fix:
            {code}
        """

        prompt = prompt_template.invoke({"context": combined_context})
        result = llm.invoke(prompt)
        return result.content

class SynthesizeRecommendationArgs(BaseModel):
    chunk_outputs: List[str] = Field(description="Array of responses from the different code chunks")
    context: str = Field(description="Context information about CWE/CVE")

class SynthesizeRecommendationTool(BaseTool):
    name: str = "synthesize_recommendation"
    description: str = "Synthesizes a final recommendation from multiple chunk analyses"
    args_schema: Type[BaseModel] = SynthesizeRecommendationArgs
    
    def _run(self, chunk_outputs: List[str], context: str) -> str:
        # Filter out "Not vulnerable" responses
        relevant_outputs = [output for output in chunk_outputs if "Not vulnerable" not in output]
        
        if not relevant_outputs:
            return "No vulnerabilities found in the provided code chunks."
        
        # Combine all relevant outputs
        combined_outputs = "\n\n".join([f"Chunk {i+1} Analysis:\n{output}" for i, output in enumerate(relevant_outputs)])
        
        final_prompt = f"""
        # CONTEXT #
        You are a software engineer and security expert who specializes in providing comprehensive recommendations for fixing vulnerabilities.

        # OBJECTIVE #
        Based on the analysis of multiple code chunks and the provided CWE/CVE context, generate a final comprehensive recommendation for fixing the vulnerability.

        # STYLE #
        Write in a technical and concise manner, providing clear and actionable steps.

        # TONE #
        Professional and technical.

        # AUDIENCE #
        Software developers and security professionals.

        # RESPONSE FORMAT #
        Provide a structured recommendation in the following format:
        - Issue: [Brief description of the vulnerability]
        - Recommendation: [Detailed steps to fix the vulnerability]
        - Fix: [Code snippet demonstrating the fix]

        # CWE/CVE Context #
        {context}

        # Chunk Analysis Results #
        {combined_outputs}
        """
        
        result = llm.invoke(final_prompt.strip())
        return result.content

# Initialize agent tools
tools = [AnalyzeChunkTool()]

# Pull prompt template for agent
agent_prompt = hub.pull("hwchase17/openai-tools-agent")

# Create an agent executor
agent = create_tool_calling_agent(
    llm=llm,
    tools=tools,
    prompt=agent_prompt
)

agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
    handle_parsing_errors=True
)

# Function to generate code fix recommendation using pipeline

In [12]:
def generate_fix_recommendation(context, code):
    """Generate fix recommendation using intelligent chunking when needed."""
    print(f"Code length: {len(code)} characters")
    print(f"Estimated tokens: {count_tokens(code)}")
    
    if can_fit_in_context(code, context):
        print("✅ Code fits in context window - using direct analysis")
        return analyze_code_directly(code, context)
    else:
        print("❌ Code exceeds context window - using chunked analysis approach")
        return analyze_code_chunked(code, context)

def analyze_code_chunked(code: str, context: str) -> str:
    """Handle chunked analysis using agents."""
    code_chunks = create_code_chunks(code, context)
    print(f"Created {len(code_chunks)} chunks")
    
    output = []
    
    if len(code_chunks) == 1:
        extracted_code = agent_executor.invoke({"input": {
            "code": code_chunks[0],
            "context": context
        }})
        output.append(extracted_code['output'])
    else:
        # Multiple chunks - process each
        for i, chunk in enumerate(code_chunks):
            print(f"Processing chunk {i+1}/{len(code_chunks)}")
            extracted_code = agent_executor.invoke({"input": {
                "code": chunk,
                "context": context
            }})
            output.append(extracted_code['output'])
    
    # Generate final recommendation if we have multiple outputs
    if len(output) > 1:
        print("\n=== SYNTHESIZING FINAL RECOMMENDATION ===")
        
        final_tools = [SynthesizeRecommendationTool()]
        
        final_agent = create_tool_calling_agent(
            llm=llm,
            tools=final_tools,
            prompt=agent_prompt
        )
        
        final_agent_executor = AgentExecutor.from_agent_and_tools(
            agent=final_agent,
            tools=final_tools,
            verbose=True,
            handle_parsing_errors=True
        )
        
        final_recommendation = final_agent_executor.invoke({"input": {
            "chunk_outputs": output,
            "context": context
        }})
        
        return final_recommendation['output']
    else:
        return output[0]
        

# Create context string

In [None]:
def create_context(cve_id, cwe_id):
    """Create context by querying both Pinecone indexes."""
    # Query MITRE CWE index for weakness information
    try:
        mitre_result = query_mitre_index(cwe_id)
        cwe_context = ""
        if mitre_result.matches:
            cwe_metadata = mitre_result.matches[0].metadata
           
            summary = cwe_metadata.get('Summary', '')
            context_info = cwe_metadata.get('context', '')
            cwe_context = f"MITRE CWE Information:\nSummary: {summary}\nContext: {context_info}\n"
    except Exception as e:
        print(f"Error querying MITRE index: {e}")
        cwe_context = ""

    # Query BigVul index for vulnerability information
    try:
        bigvul_result = query_bigvul_index(cve_id, cwe_id)
        cve_context = ""
        if bigvul_result.matches:
            all_summaries = []
            all_contexts = []
            all_vuln_fixes = []
            for match in bigvul_result.matches:
                metadata = match.metadata
                summary = metadata.get('Summary', '')
                context_info = metadata.get('context', '')
                func_before = metadata.get('func_before', '')
                func_after = metadata.get('func_after', '')
                if summary:
                    all_summaries.append(summary)
                if context_info:
                    all_contexts.append(context_info)
                if func_before and func_after:
                    all_vuln_fixes.append((func_before, func_after))
            
            cve_context = f"BigVul Vulnerability Information:\n"
            if all_summaries:
                cve_context += f"Summaries: {' | '.join(all_summaries)}\n"
            if all_contexts:
                cve_context += f"Contexts: {' | '.join(all_contexts)}\n"
            if all_vuln_fixes:
                cve_context += f"Vulnerability and Fix Examples:\n"
                for i, (vuln_code, fixed_code) in enumerate(all_vuln_fixes, 1):
                    cve_context += f"Example {i}:\n"
                    cve_context += f"VULNERABLE CODE:\n{vuln_code}\n\n"
                    cve_context += f"FIXED CODE:\n{fixed_code}\n\n"
    except Exception as e:
        print(f"Error querying BigVul index: {e}")
        cve_context = ""
    
    context = f"Context:\n{cwe_context}\n{cve_context}"
    
    return context

In [14]:
create_context(test_df['CVE ID'][1], test_df['CWE ID'][1])

'Context:\nMITRE CWE Information:\nSummary: \nContext: CWE ID: CWE-416\n\nBigVul Vulnerability Information:\nSummaries: In Artifex Ghostscript before 9.24, attackers able to supply crafted PostScript files could use incorrect free logic in pagedevice replacement to crash the interpreter. | In Artifex Ghostscript before 9.24, attackers able to supply crafted PostScript files could use incorrect free logic in pagedevice replacement to crash the interpreter. | In Artifex Ghostscript before 9.24, attackers able to supply crafted PostScript files could use incorrect free logic in pagedevice replacement to crash the interpreter. | In Artifex Ghostscript before 9.24, attackers able to supply crafted PostScript files could use incorrect free logic in pagedevice replacement to crash the interpreter. | In Artifex Ghostscript before 9.24, attackers able to supply crafted PostScript files could use incorrect free logic in pagedevice replacement to crash the interpreter.\nContexts: Vulnerability: C

## Load progress if exists

In [15]:
progress_file = './output/recommendations.csv'
if os.path.exists(progress_file):
    results_df = pd.read_csv(progress_file)
    processed_ids = set(results_df['cve'].astype(str) + "_" + results_df['cwe'].astype(str))
    results = results_df.to_dict('records')
else:
    results = []
    processed_ids = set()

# Iterate through the test dataset and generate recommendations

In [None]:
for index, row in tqdm(test_df.head(2).iterrows(), total=test_df.head(2).shape[0], desc="Processing"):
    current_id = f"{row['CVE ID']}_{row['CWE ID']}"

    # Skip already processed examples
    if current_id in processed_ids:
        continue

    # Create context string
    context = create_context(row['CVE ID'], row['CWE ID'])

    print(f"Running for {current_id}")

    func_before = row['func_before']
    
    # Generate fixes
    recommendation = generate_fix_recommendation(context, func_before)

    # Store the results
    results.append({
        'cve': row['CVE ID'],
        'cwe': row['CWE ID'],
        'context': context,
        'func_after': row['func_after'],
        'func_before': row['func_before'],
        'recommendation': recommendation,
    })

    if index % 1 == 0:
        results_df = pd.DataFrame(results)
        results_df.to_csv(progress_file, index=False)

Processing:   0%|          | 0/2 [00:00<?, ?it/s]

Running for CVE-2018-12714_CWE-787
Code length: 209 characters
Estimated tokens: 49
✅ Code fits in context window - using direct analysis


Processing:  50%|█████     | 1/2 [00:06<00:06,  6.69s/it]

Running for CVE-2018-16541_CWE-416
Code length: 406 characters
Estimated tokens: 108
✅ Code fits in context window - using direct analysis


Processing: 100%|██████████| 2/2 [00:11<00:00,  6.00s/it]


# Generate Fixes

In [82]:
# Load recommendation
recommendations = pd.read_csv('./output/recommendations.csv')

## Fixes Prompt Template

In [83]:
fix_messages = [
    ("system", "You are a software engineer and security expert who specializes in generating fixes for vulnerable code affected by different CWEs and CVEs."),
    ("human", """
        # CONTEXT #
        You are a software engineer and security expert who specializes in generating fixes for vulnerable code affected by different CWEs and CVEs.
        
        # OBJECTIVE #
        Generate a fix for the given vulnerable code based on the provided context.
        
        # STYLE #
        Provide the fixed code snippet only, following best practices for secure and efficient coding.
        
        # TONE #
        Professional and technical.
        
        # AUDIENCE #
        Software engineers and security experts.
        
        # RESPONSE FORMAT #
        The response should be a single corrected code snippet without any additional explanations or comments.
        
        # PROMPT #
        Based on the following vulnerable code and the given recommendation, generate a fixed version of the code:
        {context}
    """),
]

fixes_prompt_template = ChatPromptTemplate.from_messages(fix_messages)

In [84]:
def generate_fix(code_before, recommendation, cve, cwe):
    combined_context = f"""
        Vulnerable code:
        CWE: {cwe}
        CVE: {cve}

        Code:
        {code_before}

        Recommendation:
        {recommendation}
    """

    prompt = fixes_prompt_template.invoke({"context": combined_context})
    result = llm.invoke(prompt)

    return result.content
        

In [85]:
progress_file = './output/fixes.csv'
if os.path.exists(progress_file):
    results_df = pd.read_csv(progress_file)
    processed_ids = set(results_df['cve'].astype(str) + "_" + results_df['cwe'].astype(str))
    results = results_df.to_dict('records')
else:
    results = []
    processed_ids = set()

In [86]:
for index, row in tqdm(recommendations.iterrows(), total=recommendations.shape[0], desc="Processing"):
    current_id = f"{row['cve']}_{row['cwe']}"

    # Skip already processed examples
    if current_id in processed_ids:
        continue

    print(f"Running for {current_id}")

    code_before = row['func_before']
    
    # Generate fixes
    fix = generate_fix(code_before, row['recommendation'], row['cve'], row['cwe'])

    # Store the results
    results.append({
        'cve': row['cve'],
        'cwe': row['cwe'],
        'context': row['context'],
        'func_after': row['func_after'],
        'func_before': row['func_before'],
        'recommendation': row['recommendation'],
        'fix': fix,
    })

    # Save progress every 10 examples
    if index % 1 == 0:
        results_df = pd.DataFrame(results)
        results_df.to_csv(progress_file, index=False)

Processing:   0%|          | 0/2 [00:00<?, ?it/s]

Running for CVE-2018-12714_CWE-787


Processing:  50%|█████     | 1/2 [00:01<00:01,  1.41s/it]

Running for CVE-2018-16541_CWE-416


Processing: 100%|██████████| 2/2 [00:03<00:00,  1.82s/it]


# LLM Judge Evaluation

## Rubrics

In [87]:
rubrics = [
    {
        "name": "Relevance",
        "criteria": "Evaluate whether the actual output (recommendation) is relevant to the identified vulnerability in both the context and the provided code snippet, and whether the suggested fix is applicable and practical within the given scenario.",
        "evaluation_steps": [
            "Verify if the 'actual output' (recommendation) directly addresses the specific vulnerability identified in the 'input' (context), including CWE, CVE, and details from the code snippet.",
            "Ensure the 'actual output' provides a solution that is applicable to both the provided context (CWE/CVE information) and the code snippet and can realistically be implemented in the given scenario.",
            "Assess whether the recommendation avoids irrelevant information or suggestions that do not align with the vulnerability details or coding context.",
            "Determine if the recommendation includes practical and actionable steps that align with the specific characteristics of the vulnerability, such as the type of flaw and its occurrence within the code.",
            "Penalize any 'actual output' that is generic, lacks relevance to both the context and the code provided, or suggests impractical or inapplicable solutions.",
            "Penalize heavily if the CVE and CWE mentioned in the 'input' context do not match or align with the content of the recommendation."
        ]
    },
    {
        "name": "Completeness",
        "criteria": "Evaluate whether the actual output (recommendation) is complete and thorough in addressing the identified vulnerability, covering all necessary steps and details required to implement the fix effectively, with reference to both the context and the code snippet.",
        "evaluation_steps": [
            "Check if the 'actual output' (recommendation) covers all necessary steps to fully address the vulnerability described in the 'input' (context), including CWE, CVE, and the code snippet.",
            "Ensure that the recommendation provides detailed explanations and instructions for each step, leaving no ambiguity about how to implement the fix, referencing both the vulnerable code and the retrieved context.",
            "Evaluate whether the recommendation considers all relevant aspects of the vulnerability, including its nature, impact, and potential variations in different coding contexts.",
            "Identify any missing steps or details in the 'actual output' that could lead to incomplete implementation or residual vulnerabilities.",
            "Penalize any 'actual output' that lacks comprehensiveness, omits critical details, or provides insufficient guidance for implementing a complete fix.",
            "Penalize heavily if the CVE and CWE mentioned in the 'input' context do not match or align with the content of the recommendation."
        ]
    },
    {
        "name": "Correctness",
        "criteria": "Evaluate whether the actual output (recommendation) is factually correct, technically accurate, and free of errors or misleading information, based on the expected output, known standards, and the provided context and code snippet for addressing the identified vulnerability.",
        "evaluation_steps": [
            "Check if the 'actual output' (recommendation) aligns with the correct approach to fixing the vulnerability described in the 'input' (context), including CWE, CVE, and the code snippet.",
            "Verify the technical accuracy of each step in the 'actual output', ensuring they are based on sound principles, the provided code, and best practices for security fixes.",
            "Identify any factual inaccuracies, technical errors, or misleading information in the 'actual output' that could lead to incorrect implementation or further vulnerabilities.",
            "Evaluate if the 'actual output' correctly interprets the vulnerability details (CWE, CVE) and provides an appropriate solution based on known standards and methodologies.",
            "Penalize any 'actual output' that contains factual inaccuracies, incorrect technical guidance, or misleading statements.",
            "Penalize heavily if the CVE and CWE mentioned in the 'input' context do not match or align with the content of the recommendation."
        ]
    },
    {
        "name": "Identification of Vulnerable Code",
        "criteria": "Evaluate whether the actual output (recommendation) accurately identifies the specific parts of the code that are vulnerable based on the provided context, including CWE, CVE, and the code snippet.",
        "evaluation_steps": [
            "Check if the 'actual output' (recommendation) clearly identifies the specific lines, functions, or code snippets in the 'input' (context) that are vulnerable.",
            "Ensure the 'actual output' pinpoints the exact location of the vulnerability in the code snippet, referencing relevant details such as line numbers or specific code segments.",
            "Assess whether the recommendation provides enough detail to locate the vulnerable code without ambiguity or confusion.",
            "Evaluate if the 'actual output' includes all relevant parts of the code that are vulnerable, without missing any significant areas.",
            "Penalize any 'actual output' that fails to identify the vulnerable code clearly, is too vague, or misidentifies non-vulnerable code as vulnerable.",
            "Penalize heavily if the CVE and CWE mentioned in the 'input' context do not match or align with the content of the recommendation."
        ]
    },
    {
        "name": "Code Guidance",
        "criteria": "Evaluate whether the actual output (recommendation) provides relevant, actionable code snippets and clear guidance to effectively address the identified vulnerability based on the provided context, including CWE, CVE, and code snippet.",
        "evaluation_steps": [
            "Check if the 'actual output' (recommendation) includes relevant and actionable code snippets that directly address the vulnerability described in the 'input' (context).",
            "Ensure that the code snippets provided are syntactically correct, follow best practices, and are applicable to the coding environment specified in the context.",
            "Assess whether the 'actual output' provides clear, step-by-step guidance on how to implement the code changes to fix the vulnerability.",
            "Evaluate if the recommendation includes sufficient detail in the code snippets and guidance to be easily implemented by a developer without needing additional information.",
            "Penalize any 'actual output' that lacks relevant code snippets, provides vague or incorrect guidance, or includes incomplete or confusing instructions.",
            "Penalize heavily if the CVE and CWE mentioned in the 'input' context do not match or align with the content of the recommendation."
        ]
    }
]

In [94]:
data = []
fixes_data = pd.read_csv('./output/fixes.csv')

for index, row in fixes_data.iterrows():
    context = create_context(row['cve'], row['cwe'])
    
    sample = {
        "cwe": row['cwe'],
        "cve": row['cve'],
        "code_snippet": f"### Vulnerable Code\n{row['func_before']}\n\n\n### Vulnerable Code Fix\n{row['func_after']}\n", 
        "context": row['context'],
        "recommendation": row['recommendation']
    }
    data.append(sample)

In [95]:
def create_input_text(cwe, cve, code_snippet, context):
    return f"""
        ### CVE-ID
        {cve}

        ### CWE-ID
        {cwe}
    
        ### Vulnerable Code and Fix
        {code_snippet}


        ### Retrieved Data from Vector DB
        {context}
    """

## Evaluation

In [96]:
results = []

In [97]:
checkpoint_file = "./output/evaluation.csv"
if os.path.exists(checkpoint_file):
    df = pd.read_csv(checkpoint_file)
    processed_keys = set(df["cwe_cve_idx"])  # Track already processed cwe_cve_idx
    results = df.to_dict('records')
else:
    processed_keys = set()
    results = []

def save_checkpoint(results, checkpoint_file):
    df = pd.DataFrame(results)
    df.to_csv(checkpoint_file, index=False)

In [None]:
for idx, example in enumerate(tqdm(data, desc="Processing Examples")):
    cwe_cve_idx = f"{example['cwe']}_{example['cve']}_{idx}"  

    if cwe_cve_idx in processed_keys:
        continue 

    print(f"Processing index {idx}: CWE/CVE {cwe_cve_idx}")

    example_results = {
        "cwe": example["cwe"],
        "cve": example["cve"],
        "cwe_cve_idx": cwe_cve_idx,
        "recommendation": example["recommendation"],
        "code_snippet": example["code_snippet"],
        "context": example["context"]
    }

    for rubric in rubrics:
        g_eval = GEval(
            **rubric,
            model="gpt-4o",
            evaluation_params=[LLMTestCaseParams.INPUT, LLMTestCaseParams.ACTUAL_OUTPUT]
        )

        test_case = LLMTestCase(
            input=create_input_text(example["cwe"], example["cve"], example["code_snippet"], example["context"]),
            actual_output=example["recommendation"]
        )

        g_eval.measure(test_case)
        example_results[f"{rubric['name']} Score"] = float(g_eval.score)

    results.append(example_results)
    processed_keys.add(cwe_cve_idx)

    # Save checkpoint after each example is processed
    save_checkpoint(results, checkpoint_file)

In [100]:
results_df = pd.read_csv(checkpoint_file)

In [None]:
# Calculate average scores for all metrics
def calculate_average_metrics(df):
    score_columns = [col for col in df.columns if col.endswith(' Score')]
    
    print("Average Scores for All Metrics:")
    print("=" * 40)
    
    averages = {}
    for col in score_columns:
        avg_score = df[col].mean()
        averages[col] = avg_score
        print(f"{col}: {avg_score:.3f}")
    
    print("=" * 40)
    print(f"Overall Average: {sum(averages.values()) / len(averages):.3f}")
    
    return averages

averages = calculate_average_metrics(results_df)


Average Scores for All Metrics:
Relevance Score: 0.844
Completeness Score: 0.795
Correctness Score: 0.782
Identification of Vulnerable Code Score: 0.735
Code Guidance Score: 0.843
Overall Average: 0.800


In [None]:
def detailed_metrics_analysis(df):
    score_columns = [col for col in df.columns if col.endswith(' Score')]
    
    print("\nDetailed Metrics Analysis:")
    print("=" * 60)
    
    for col in score_columns:
        print(f"\n{col}:")
        print(f"  Mean: {df[col].mean():.3f}")
        print(f"  Median: {df[col].median():.3f}")
        print(f"  Std Dev: {df[col].std():.3f}")
        print(f"  Min: {df[col].min():.3f}")
        print(f"  Max: {df[col].max():.3f}")
        print(f"  Count: {df[col].count()}")


detailed_metrics_analysis(results_df)



Detailed Metrics Analysis:

Relevance Score:
  Mean: 0.844
  Median: 0.844
  Std Dev: 0.043
  Min: 0.813
  Max: 0.874
  Count: 2

Completeness Score:
  Mean: 0.795
  Median: 0.795
  Std Dev: 0.037
  Min: 0.768
  Max: 0.821
  Count: 2

Correctness Score:
  Mean: 0.782
  Median: 0.782
  Std Dev: 0.095
  Min: 0.715
  Max: 0.849
  Count: 2

Identification of Vulnerable Code Score:
  Mean: 0.735
  Median: 0.735
  Std Dev: 0.092
  Min: 0.670
  Max: 0.800
  Count: 2

Code Guidance Score:
  Mean: 0.843
  Median: 0.843
  Std Dev: 0.067
  Min: 0.795
  Max: 0.890
  Count: 2
