# Single Iteration C Code APR Using LLM Experiments

Find out what a good ESBMC single iteration APR prompt contains. 

## Methodology

True random select 100 random samples of Expanded NeuroCodeBench that have `VERIFICATION FAILED`.

Run each sample through GPT-3.5 Turbo with each prompt.

Total Experiments:
* We have the following to groups of prompt templates:
  * simple_prompts_no_esbmc
  * simple_prompts
  * simple_prompts_flipped
  * persona_prompt_no_esbmc
  * persona_prompt
  * persona_prompt_flipped
  * Each group has 2 prompt templates inside.
* We have 6 roles per persona group template.
* We have 100 experiments.
* ~~We have to run experiments once for contextual and n for constant.~~
* We have to run each experiment twice, one for violated property ESBMC output and one for counterexample ESBMC output.
* Simple Prompt: `100 * 3 * 2 = 600`
* Persona Prompt: `100 * 3 * 6 * 2 * 2 = 7200`
* Total: `7800`

## Data Collection

See the evaluation notebook.

## Generated Files

This notebook will use:

* `samples`
* `esbmc_output`

This notebook will generate the following folders:

* `results`
* `samples-patched`

Notice:

* `includes` is used by the `eval-esbmc.sh` script.

## Obtaining Samples

Get true random filenames. Run the `get_filenames.sh` script to get the filenames of all the `VERIFICATION FAILED` samples. The filenames should be saved in `./all_sample_names`. The following section of code randomly selects 100 samples and places the result in `./selected_sample_names`.

In [2]:
from secrets import choice

In [81]:
with open("./all_sample_names", "r") as file:
    lines: list[str] = file.readlines()

selected_lines: set[str] = set()
while len(selected_lines) < 100:
    selected_lines.add(choice(lines))

len(selected_lines)

100

In [84]:
with open("./selected_sample_names", "w") as file:
    for filename in selected_lines:
        file.write(filename)

with open("./selected_esbmc_output_names", "w") as file:
    for filename in selected_lines:
        file.write(f"{filename.strip()}.stdout.txt\n")

### Getting Samples

Run `./get_selected_samples` will get the source code of each file listed in `./selected_sample_names` along with the ESBMC output. The content will be placed in `samples` and `esbmc_output` respectively.

# Running Experiments with GPT 3.5 Turbo

In [1]:
import os
from io import TextIOWrapper
from time import time
from math import floor

import tiktoken
from time import sleep
from typing import Optional
from dotenv import get_key as load_dotenv, get_key
from openai import OpenAI, Completion

### AI Params + Definitions

In [2]:
MAX_TOKENS: int = 16385

In [3]:
def num_tokens_from_string(string: str, encoding_name: str = "gpt-3.5-turbo") -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.encoding_for_model(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens

In [4]:
api_key: Optional[str] = get_key(dotenv_path=".env", key_to_get="OPENAI_API_KEY")
assert api_key
client = OpenAI(api_key=api_key)

In [5]:
def get_llm_message(prompt: str, source_code: str, esbmc_output: str, role: str) -> str:
    return prompt.format_map({
        "source": source_code,
        "esbmc": esbmc_output,
        "role": role,
    })

def run_sample(prompt: str, source_code: str, esbmc_output: str, role: str) -> list[str]:
    message_stack: list = [
        {
            "role": "system", 
            "content": get_llm_message(prompt, source_code, esbmc_output, role),
        },
    ]
    
    response: Completion = client.chat.completions.create(
        model="gpt-3.5-turbo-0125",
        messages=message_stack,
    )
    return response.choices[0].message.content

### Initializing Directories

In [6]:
dirs: list[str] = ["samples-patched", "results"]

for dir in dirs:
    if not os.path.exists(dir):
        os.mkdir(dir)

### Initializing Logger
Log everything using these easy custom print and write functions. Need to beware that opening log.txt may display outdated state until buffer is properly flushed. Editing the log file will result in corruption until `log_file.close()` is called.

In [93]:
# Close logger
try:
    if not log_file.closed:
        log_file.close()
except NameError:
    pass

In [94]:
# Initialize logger.
log_file: TextIOWrapper
try:
   log_file = open("log.txt", "a")
except (NameError, ValueError) as e2:
    log_file = open("log.txt", "w")

def log_str(text: str = "") -> None:
    assert not log_file.closed, "The log file is closed."
    if len(text) == 0:
        log_file.write("\n")
    else:
        log_file.write(f"Log: {time()}: {text}\n")
    
def print_and_log(text: str = "") -> None:
    assert not log_file.closed, "The log file is closed."
    if len(text) == 0:
        log_file.write("\n")
        print()
    else:
        text = str(time()) + ": " + text
        log_file.write("Log: " + text + "\n")
        print(text)

print_and_log("Notice: Starting new logging session.")
log_file.flush()

1710896230.8044186: Notice: Starting new logging session.


### Load and Parse Data

#### ESBMC Parsing Strategies

##### ~~Compress ESBMC Output~~

In [8]:
def esbmc_output_remove_loop_unroll(esbmc_output: str) -> str:
    lines: list[str] = esbmc_output.splitlines()
    kept_lines: list[str] = []
    filter: bool = False
    for line in lines:
        if filter:
            if "Building error trace" in line:
                filter = False
                kept_lines.append(line)
        else:
            if "Starting Bounded Model Checking" in line:
                filter = True
            kept_lines.append(line)
    return "\n".join(kept_lines)

def esbmc_output_remove_dividers(esbmc_output: str) -> str:
    lines: list[str] = esbmc_output.splitlines()
    kept_lines: list[str] = []
    for line in lines:
        if "----------------------------------------------------" not in line:
            kept_lines.append(line)
    return "\n".join(kept_lines)

def compress_esbmc_output(esbmc_output: str) -> str:
    esbmc_output = esbmc_output_remove_loop_unroll(esbmc_output)
    esbmc_output = esbmc_output_remove_dividers(esbmc_output)
    return esbmc_output

##### ESBMC Extract Parts of Output

In [9]:
def esbmc_get_violated_property(esbmc_output: str) -> str:
    """Gets the violated property line of the ESBMC output."""
    # Find "Violated property:" string in ESBMC output
    lines: list[str] = esbmc_output.splitlines()
    for ix, line in enumerate(lines):
        if "Violated property:" == line:
            return "\n".join(lines[ix:ix+3])
    raise Exception(f'Could not find "Violated property:" in {file_name_key}')

def esbmc_get_counter_example(esbmc_output: str) -> str:
    """Gets ESBMC output after and including [Counterexample]"""
    idx: int = esbmc_output.find("[Counterexample]\n")
    assert idx != -1
    return esbmc_output[idx:]

In [10]:
# Load all the samples and esbmc output.
subdirs: list[str] = ["hopfield_nets", "poly_approx", "reach_prob_density", "reinforcement_learning"]

data_samples: dict[str, str] = {}
data_esbmc_output: dict[str, str] = {}
data_vp_output: dict[str, str] = {}

for subdir in subdirs:
    files: list[str] = sorted(os.listdir(f"samples/{subdir}"))
    for file_name in files:
        if not file_name.endswith(".c"):
            continue
            
        with open(f"samples/{subdir}/{file_name}", "r") as file:
            key: str = f"{subdir}/{file_name}"
            data_samples[key] = file.read()
        with open(f"esbmc_output/{subdir}/{file_name}.stdout.txt", "r") as file:
            esbmc_output: str = file.read()
            # data_esbmc_output[key] = esbmc_output
            # Parse and remove lines between "Starting Bounded Model Checking" until "Symex completed"
            # data_esbmc_output[key] = compress_esbmc_output(esbmc_output)
            data_esbmc_output[key] = esbmc_get_counter_example(esbmc_output)
            data_vp_output[key] = esbmc_get_violated_property(esbmc_output)

### Get Code From Solution

In [11]:
def get_code_from_solution(solution: str) -> str:
    """Strip the source code of any leftover text as sometimes the AI model
    will generate text and formatting despite being told not to.
    
    Source: https://github.com/Yiannis128/esbmc-ai/blob/master/esbmc_ai/solution_generator.py"""
    try:
        code_start: int = solution.index("```") + 3
        assert code_start != -1
        
        # Remove up until the new line, because usually there's a language
        # specification after the 3 ticks ```c...
        code_start = solution.index("\n", code_start) + 1
        assert code_start != -1
        
        code_end: int = solution[::-1].index("```")
        assert code_start != -1
        
        # -4 = 3 backticks and also the \n before the backticks.
        code_end: int = len(solution) - 4 - code_end
        assert code_start <= code_end
        
        solution = solution[code_start:code_end]
    except (ValueError, AssertionError):
        pass
    return solution

### Define Prompts

The following prompts are going to be iterated through. The prompts `simple_prompts_no_esbmc` and `simple_prompts` are the baseline prompts.

In [12]:
# Simple Prompts

simple_prompts_no_esbmc: list[str] = [
    "The following source code segment might contain a memory vulnerability\n\n```c\n{source}\n```\n\nFix the source code segment.",
    "Fix the memory vulnerability that may exist in the in the source code segment:\n\n```c\n{source}\n```\n",
]

simple_prompts: list[str] = [
    "The following source code contains a memory vulnerability\n\n```c\n{source}\n```\n\nThe following is output of ESBMC describing the vulnerability\n\n```\n{esbmc}\n```\n\nFix the source code.",
    "Fix the source code:\n\n```c\n{source}\n```\n\n```{esbmc}\n```\n",
]

simple_prompts_flipped: list[str] = [
    "ESBMC output describes a memory vulnerability in the source code, the following is ESBMC output:\n\n```\n{esbmc}\n```\n\nThe following is the vulnerable source code:\n\n```c\n{source}\n```\n\nFix the source code.",
    "Fix the source code:\n\n```\n{esbmc}\n```\n\n```c\n{source}\n```\n",
]

# Persona Prompts

persona_prompt_no_esbmc: list[str] = [
    "You’re a {role}. You’ll be shown some C code. Repair the code and display it. The code is\n\n```c\n{source}\n```\n",
    "From now on, act as an {role} that repairs AI C code. You will be shown AI C code. Provide the repaired C code as output, as would an {role}. Aside from the corrected source code, do not output any other text. The code is\n\n```c\n{source}\n```\n"
]

persona_prompt: list[str] = [
    "You’re a {role}. You’ll be shown some C code, along with ESBMC output. Repair the code and display it. The code is\n\n```c\n{source}\n```\n\nThe ESBMC output is\n\n```\n{esbmc}\n```\n",
    "From now on, act as an {role} that repairs AI C code. You will be shown AI C code, along with ESBMC output. Pay close attention to the ESBMC output, which contains a stack trace along with what type of error has occurred and its location. Provide the repaired C code as output, as would an {role}. Aside from the corrected source code, do not output any other text. The code is\n\n```c\n{source}\n```\n\nThe ESBMC output is\n\n```\n{esbmc}\n```\n",
]

persona_prompt_flipped: list[str] = [
    "You’re a {role}. You’ll be shown some C code, along with ESBMC output. Repair the code and display it. The ESBMC output is\n\n```\n{esbmc}\n```\n\nThe source code is\n\n```c\n{source}\n```\n",
    "From now on, act as an {role} that repairs AI C code. You will be shown AI C code, along with ESBMC output. Pay close attention to the ESBMC output, which contains a stack trace along with what type of error has occurred and its location. Provide the repaired C code as output, as would an {role}. Aside from the corrected source code, do not output any other text. The ESBMC output is\n\n```\n{esbmc}\n```\n\nThe source code is\n\n```c\n{source}\n```\n",
]

# ESBMC-AI Prompts

esbmc_ai_prompts: list[str] = [
    "You are an secure code generator that parses vulnerable source code, "
    "and output from a program called ESBMC, which contains vulnerability information "
    "about the source code. You should use the output from ESBMC to find the problem, "
    "and correct the source code. ESBMC is always correct. You shall add a NULL check "
    "for every heap allocation you make. From this point on, you can only reply in source code. "
    "You shall only output source code as whole. The following text is the source code of the "
    "program: \n\n```c\n{source}\n```\n\nThe following text is the output of ESBMC, reply OK if "
    "you understand:\n\n```\n{esbmc}```\n\nGenerate a correction for the source code provided. "
    "Show the code only. Do not reply with acknowledgements."
]

all_prompts: list[str] = simple_prompts_no_esbmc + simple_prompts + simple_prompts_flipped + persona_prompt_no_esbmc + persona_prompt + persona_prompt_flipped + esbmc_ai_prompts

persona_roles: list[str] = [
    "Programmer with 1 million years of experience",
    "Senior software engineer",
    "Automated code repair tool",
    "Artificial intelligence that specializes in repairing C programs",
    "The smartest human in the universe",
    "Dog",
]

# Experiment Execution

The source code and the ESBMC output is too large for the LLM's context length. 3 strategies are proposed to see if they can alleviate the problem:

1. Constant: Split by line or by character no strucutre (Brutal split)
2. ~~Structural: Split semantically (function by function)~~
3. Contextual: Split from failure and show code before

#### Notation

Let `L={l1, l2, l3, ..., ln}` be the set of all line lengths and where `n` is the number of lines in `C` and where `lx` is the length of the `x`th line in `C`. So `C[l1]` is the length first line and so on... `E` represents the length of the line with the error and `e` is the index of that line in `L`, such that `E=L[e]=le`.

### Combining Strategies

The LLM response can be merged back into the full source code using the following strategies:

1. Brutal replacement: input to the LLM are simply replaced by whatever code the LLM responds with.
2. ~~Direct replacement: Ask the LLM to supply in a specific format the line(s) to replace. Then simply replace those lines.~~

In [13]:
def apply_patch_brutal_replacement(source_code: str, patch: str, start: int, end: int) -> str:
    """End is non-inclusive. So to replace a single line, start and end are equal."""
    lines: list[str] = source_code.splitlines()
    lines = lines[:start] + [patch] + lines[end+1:]
    return "\n".join(lines)

### ~~Constant Splitting Strategy~~

Constant splitting strategy splits the source code into constant sized tokens, then asks the LLM to repair each segment. The segment size is determined by using a percentage of the max tokens. The size of each segment is going to be `95%` in order to leave room for the counterexample.

Calculate the segment boundaries like so:
1. We want the largest `i` such that `S = Σ{i=0}{e}(L[i]<=TOKENS)`.
2. The constraints of `S` are as follows: `0<=i<=n`.

In [64]:
%%script false --no-raise-error
SEGMENT_TOKEN_PERCENT: float = 0.95

def get_segments(lines: list[str]) -> list[str]:
    """Splits the lines into constant size segments using the `SEGMENT_TOKEN_PERCENT` and `MAX_TOKEN`
    variable. Make sure the lines have their new lines still attached."""
    token_count: int = 0
    segments: list[list[str]] = [[]]
    for line in lines:
        token_count += num_tokens_from_string(line)
        if token_count >= SEGMENT_TOKEN_PERCENT * MAX_TOKENS:
            # Start a new segment.
            segments.append([])
            token_count = 0
        # Add line to latest segment
        segments[-1].append(line)
    # Combine segment lines (list of lines) into a complete string.
    # Then return list of segments.
    return list("".join(segment) for segment in segments)

In [65]:
%%script false --no-raise-error
print_and_log()
print_and_log("Running Constant Splitting Strategy")

# Loop through prompts
for prompt_idx, prompt in enumerate(all_prompts):
    print_and_log()
    print_and_log(f"Running new cycle with the following prompt ({prompt_idx}):\n```\n{prompt}\n```")
    # Try all the roles
    # Check if a {role} tag is in the prompt string and use roles in that case.
    role_count: int
    if "{role}" in prompt:
        print_and_log("Notice: Prompt has roles. Will cycle roles.")
        role_count = len(persona_roles)
    else:
        print_and_log("Notice: Prompt has no roles. Roles will not be cycled or used.")
        role_count = 1
        
    # Loop through files
    for idx, file_name_key in enumerate(data_samples.keys()):
        print_and_log()
        print_and_log(f"Checkpoint {idx}: {file_name_key}")
        
        source_code: str = data_samples[file_name_key]
        esbmc_output: str = data_esbmc_output[file_name_key]

        # Split the source code into lines to divide into segments.
        lines: list[str] = source_code.splitlines(True)
        # Divide into segments. Get back string segments.
        source_code_segments: list[str] = get_segments(lines)
        
        # Loop through all the segments.
        for segment_idx, sc_segment in enumerate(source_code_segments):
            # Try all the roles, if no roles, then loop will execute once only.
            for role_idx in range(role_count):
                try:
                    delta: float = time()
                    # Role will be passed, if the prompt does not contain {role} then it will be not used.
                    llm_output = run_sample(prompt, trimmed_sc, esbmc_output, all_prompts[role_idx])
                    delta = time() - delta
    
                    print_and_log(f"Duration: {delta}")
                    log_str(f"Raw Response:\n\n{llm_output}")
                    
                    llm_output = get_code_from_solution(llm_output)
    
                    # Save patch
                    with open(f"results/constant/{file_name_key}-{segment_idx}-{prompt}-{role_idx}", "w") as file:
                        file.write(llm_output)
        
                    # Stitch together patch
                    patched_source: str = apply_patch_brutal_replacement(
                        source_code,
                        llm_output,
                        segment_idx * constant_split_lines,
                        (segment_idx + 1) * constant_split_lines
                    )
        
                    # Save patched source
                    with open(f"samples-patched/constant/{file_name_key}-{segment_idx}-{prompt}-{role_idx}", "w") as file:
                        file.write(patched_source)
                except Exception as e:
                    print_and_log(f"Notice: error: {file_name_key}: {e}")
                finally:
                    print_and_log()

### Contextual Strategy

Involves getting the line at which the error has occured along with a ratio split of the lines before/after. In this case chose `85%` before and `10%` as we want to give as much information of how the code looked before the error, however, still include some lines after for context.

The following variables are declared:
* `LTOKENS=MAX_TOKENS*0.85` - The window of tokens to keep before the error line.
* `UTOKENS=MAX_TOKENS*0.10` - The window of tokens to keep after the error line.

The lower bound line index is calculated like so:
1. We want the largest `il` such that `S = Σ{il=0}{e}(L[e-il]<=LTOKENS)`.
2. The constraints of `S` are as follows: `0<=il<=e` and `0<=L[e-S:e]<=LTOKENS`.

Similarly, the upper bound line index is calculated like so:
1. We want the largest `iu` such that `S = Σ{iu=0}{e}(L[e+iu]<=UTOKENS)`.
2. The constraints of `S` are as follows: `0<=iu<=n-e` and `0<=L[e:e+Su]<=UTOKENS`.

The combined window will be: `L[e-lu:e+iu]` which will fill `95%` of `MAX_TOKENS`, the other `5%` will be allocated to the ESBMC output's counterexample stack-trace (and/)or violated property.

In [82]:
def get_source_code_err_line(esbmc_output: str) -> int:
    # Find "Violated property:" string in ESBMC output
    lines: list[str] = esbmc_output.splitlines()
    for ix, line in enumerate(lines):
        if "Violated property:" == line:
            pos_line: str = lines[ix+1]
            pos_line_split: str = pos_line.split(" ")
            for iy, word in enumerate(pos_line_split):
                if word == "line":
                    # Get the line number
                    return int(pos_line_split[iy+1])
            raise Exception(f"Could not find line in {file_name_key}")
    raise Exception(f'Could not find "Violated property:" in {file_name_key}')

def get_lower_bound(source_code_lines: list[str], error_line: int, additional_tokens: int = 0) -> int:
    """Gets the lower index offset from the error line to include in the trimmed source code.
    Make sure the lines have their new lines still attached."""
    # Base case where the first line is too big.
    if num_tokens_from_string(source_code_lines[error_line]) >= LTOKENS:
        return line_idx
    # Count each line's tokens and sum them into token_counts.
    token_counts: int = additional_tokens
    for line_idx in range(error_line, -1, -1):
        line: str = source_code_lines[line_idx]
        token_counts += num_tokens_from_string(line)
        # Get the largest i that is less than LTOKENS.
        if token_counts >= LTOKENS:
            return line_idx + 1
    # if we run out of lines then use all.
    return 0

def get_upper_bound(source_code_lines: list[str], error_line: int, additional_tokens: int = 0) -> int:
    """Gets the upper index offset from the error line to include in the trimmed source code.
    Make sure the lines have their new lines still attached."""
    # Base case where the first line is too big.
    if num_tokens_from_string(source_code_lines[error_line]) >= UTOKENS:
        return line_idx
    # Count each line's tokens and sum them into token_counts.
    token_counts: int = additional_tokens
    for line_idx in range(error_line, len(source_code_lines)):
        line: str = source_code_lines[line_idx]
        token_counts += num_tokens_from_string(line)
        # Get the largest i that is less than LTOKENS.
        if token_counts >= UTOKENS:
            return line_idx - 1
    # if we run out of lines then use all.
    return len(source_code_lines) - 1

def get_esbmc_output_sized(esbmc_output: str) -> str:
    """Trims start of ESBMC output."""
    while num_tokens_from_string(esbmc_output) > MAX_ESBMC_OUTPUT:
        size = num_tokens_from_string(esbmc_output)
        esbmc_output = esbmc_output[size-MAX_ESBMC_OUTPUT:]
    return esbmc_output

# Max ESBMC output allowed.
MAX_ESBMC_OUTPUT=1000
# The amount of lines to include before/after the line with the error.
LTOKENS=floor(MAX_TOKENS*0.9)
# Extra lines to add for context after the line with the error.
UTOKENS=floor(MAX_TOKENS*0.1)

#### Experimental Loop

In [15]:
def run_contextual_sample(
        prompt_idx: int, 
        role_idx: int,
        esbmc_output_type: str, 
        file_idx: int) -> None:
    prompt: str = all_prompts[prompt_idx]
    file_name_key: str = sorted(data_samples.keys())[file_idx]
    role: str = persona_roles[role_idx]
    
    # Name will be sorted by experimental order. Not filename as common experiments can be
    # found near eachother.
    file_name: str = f"{prompt_idx}.{role_idx}.{esbmc_output_type}.{file_idx}.{os.path.basename(file_name_key)}"
    
    print_and_log()
    print_and_log(f"Notice: Checkpoint {file_name}")
    
    # Get CE or VP output for ESBMC.
    esbmc_output: str
    if esbmc_output_type == "ce":
        esbmc_output = data_esbmc_output[file_name_key]
    else:
        esbmc_output = data_vp_output[file_name_key]

    esbmc_output = get_esbmc_output_sized(esbmc_output)

    source_code: str = data_samples[file_name_key]
    source_code_lines: list[str] = source_code.splitlines(True)
    
    # Get the used ESBMC output length in order to calculate how to add it to the prompt template.
    esbmc_output_token_len: int = num_tokens_from_string(esbmc_output)

    # TODO If we get errors here pass full esbmc output instead of trimmed.
    err_line: int = get_source_code_err_line(esbmc_output)

    # Trim the source code in order to give it to the LLM.
    lower_bound: int = get_lower_bound(source_code_lines, err_line, esbmc_output_token_len * 0.9)
    # The upper bound is inclusive.
    upper_bound: int = get_upper_bound(source_code_lines, err_line, esbmc_output_token_len * 0.1)
    trimmed_sc: str = "".join(source_code_lines[lower_bound:upper_bound+1])
    
    try:
        log_str(f"LLM Raw Input:\n{get_llm_message(prompt, trimmed_sc, esbmc_output, role)}")
        
        delta: float = time()
        # Role will be passed, if the prompt does not contain {role} then it will be not used.
        llm_output_raw = run_sample(prompt, trimmed_sc, esbmc_output, role)
        delta = time() - delta

        log_str(f"Raw Response:\n\n{llm_output_raw}")
        print_and_log(f"Notice: Duration: {delta}")
        
        llm_output = get_code_from_solution(llm_output_raw)
        log_str(f"Extracted Code:\n\n{llm_output}")
        
        # Save patch
        with open(f"results/{file_name}", "w") as file:
            file.write(llm_output)

        # Stitch together patch
        patched_source: str = apply_patch_brutal_replacement(source_code, llm_output, lower_bound, upper_bound)

        # Save patched source
        with open(f"samples-patched/{file_name}", "w") as file:
            file.write(patched_source)
    except Exception as e:
        print_and_log(f"Notice: error: {file_name_key}: {e}")
    
    print_and_log()

    log_file.flush()
    
    # Write progress
    with open("progress.txt", "a") as file:
        # Write the file name and subdirectory in progress in order to know which file is being
        # processed in what subdirectory.
        file.write(f"{file_name}\n")

#### Run Experiments

In [None]:
print_and_log()
print_and_log("Running Contextual Strategy")

# Loop through prompts
for prompt_idx, prompt in enumerate(all_prompts):
    print_and_log()
    print_and_log(f"Notice: Running new cycle with prompt ({prompt_idx})")
    # Try all the roles
    # Check if a {role} tag is in the prompt string and use roles in that case.
    role_count: int
    if "{role}" in prompt:
        print_and_log("Notice: Prompt has roles. Will cycle roles.")
        role_count = len(persona_roles)
    else:
        print_and_log("Notice: Prompt has no roles. Roles will not be cycled or used (0).")
        role_count = 1

    # Loop through the different roles.
    for role_idx in range(role_count):
        # Loop through violated property ESBMC output and counterexample ESBMC output.
        for esbmc_output_type in ["ce", "vp"]:
            # Loop through files
            for file_idx, file_name_key in enumerate(sorted(data_samples.keys())):
                # Check if file has already been processed and skip.
                if os.path.exists("progress.txt"):
                    with open("progress.txt", "r") as file:
                        progress_files: list[str] = file.read().splitlines()
                        file_name: str = f"{prompt_idx}.{role_idx}.{esbmc_output_type}.{file_idx}.{os.path.basename(file_name_key)}"
                        if file_name in progress_files:
                            log_str(f"Skipping already processed file: {file_name}")
                            continue

                run_contextual_sample(
                    prompt_idx=prompt_idx, 
                    role_idx=role_idx, 
                    file_idx=file_idx, 
                    esbmc_output_type=esbmc_output_type,
                )


1710094389.9336197: Running Contextual Strategy

1710094389.9339364: Notice: Running new cycle with prompt (0)
1710094389.933959: Notice: Prompt has no roles. Roles will not be cycled or used (0).

1710094389.9341624: Notice: Checkpoint 0.0.ce.0.gcas_5_safe.c-amalgamation-149.c
1710094395.2022262: Notice: Duration: 4.27614426612854


1710094395.2030036: Notice: Checkpoint 0.0.ce.1.gcas_8_safe.c-amalgamation-6.c


# Improving the Initial Results

The results were not satisfactory, so in order to improve the LLM performance we introduce the following steps:

1. Compare with ESBMC-AI system message, maybe the success of ESBMC-AI is due to a good system message.
2. Try an alternative experiment where we show the LLM the line that is wrong ONLY and ask it to fix it, along with the ESBMC violated property.
3. Clang Feedback Improvements:
    1. Pass the LLM patched samples to the Clang Compiler
    2. Pass the feedback to the LLM, and try a new clean cycle.


## 1. ESBMC-AI System Message

Rerun experiments with the ESBMC-AI System Message. ESBMC-AI Prompts were added into the all_prompts.

## 2. Single Line Experiment

Try an alternative experiment where we show the LLM the line that is wrong ONLY and ask it to fix it, along with the ESBMC violated property.

### Create Directories

In [30]:
dirs: list[str] = ["samples-patched-single", "results-single"]

for dir in dirs:
    if not os.path.exists(dir):
        os.mkdir(dir)

### Run Experiments

In [31]:
def run_contextual_sample_single_line(
        prompt_idx: int, 
        role_idx: int,
        esbmc_output_type: str, 
        file_idx: int) -> None:
    prompt: str = all_prompts[prompt_idx]
    file_name_key: str = sorted(data_samples.keys())[file_idx]
    role: str = persona_roles[role_idx]
    
    # Name will be sorted by experimental order. Not filename as common experiments can be
    # found near eachother.
    file_name: str = f"{prompt_idx}.{role_idx}.{esbmc_output_type}.{file_idx}.{os.path.basename(file_name_key)}"
    
    print_and_log()
    print_and_log(f"Notice: Checkpoint {file_name}")
    
    # Get CE or VP output for ESBMC.
    esbmc_output: str
    if esbmc_output_type == "ce":
        esbmc_output = data_esbmc_output[file_name_key]
    else:
        esbmc_output = data_vp_output[file_name_key]

    esbmc_output = get_esbmc_output_sized(esbmc_output)

    source_code: str = data_samples[file_name_key]
    
    # Get the used ESBMC output length in order to calculate how to add it to the prompt template.
    esbmc_output_token_len: int = num_tokens_from_string(esbmc_output)

    # ESBMC reports errors starting from 0. To get the correct line, we need to use 0 based
    # indexing. TODO If we get errors here pass full esbmc output instead of trimmed.
    err_line: int = get_source_code_err_line(esbmc_output) - 1

    # Trim the source code in order to give it to the LLM.
    sc_line: str = source_code.splitlines(True)[err_line]
    
    try:
        log_str(f"LLM Raw Input:\n{get_llm_message(prompt, sc_line, esbmc_output, role)}")
        
        delta: float = time()
        # Role will be passed, if the prompt does not contain {role} then it will be not used.
        llm_output_raw = run_sample(prompt, sc_line, esbmc_output, role)
        delta = time() - delta

        log_str(f"Raw Response:\n\n{llm_output_raw}")
        print_and_log(f"Notice: Duration: {delta}")
        
        llm_output = get_code_from_solution(llm_output_raw)
        log_str(f"Extracted Code:\n\n{llm_output}")
        
        # Save patch
        with open(f"results-single/{file_name}", "w") as file:
            file.write(llm_output)

        # Stitch together patch
        patched_source: str = apply_patch_brutal_replacement(source_code, llm_output, err_line, err_line)

        # Save patched source
        with open(f"samples-patched-single/{file_name}", "w") as file:
            file.write(patched_source)
    except Exception as e:
        print_and_log(f"Notice: error: {file_name_key}: {e}")
    
    print_and_log()

    log_file.flush()
    
    # Write progress
    with open("progress-single.txt", "a") as file:
        # Write the file name and subdirectory in progress in order to know which file is being
        # processed in what subdirectory.
        file.write(f"{file_name}\n")

In [None]:
print_and_log()
print_and_log("Running Contextual Strategy: Single Line")

# Loop through prompts
for prompt_idx, prompt in enumerate(all_prompts):
    print_and_log()
    print_and_log(f"Notice: Running new cycle with prompt ({prompt_idx})")
    # Try all the roles
    # Check if a {role} tag is in the prompt string and use roles in that case.
    role_count: int
    if "{role}" in prompt:
        print_and_log("Notice: Prompt has roles. Will cycle roles.")
        role_count = len(persona_roles)
    else:
        print_and_log("Notice: Prompt has no roles. Roles will not be cycled or used (0).")
        role_count = 1

    # Loop through the different roles.
    for role_idx in range(role_count):
        # Loop through violated property ESBMC output and counterexample ESBMC output.
        for esbmc_output_type in ["ce", "vp"]:
            # Loop through files
            for file_idx, file_name_key in enumerate(sorted(data_samples.keys())):
                # Check if file has already been processed and skip.
                if os.path.exists("progress-single.txt"):
                    with open("progress-single.txt", "r") as file:
                        progress_files: list[str] = file.read().splitlines()
                        file_name: str = f"{prompt_idx}.{role_idx}.{esbmc_output_type}.{file_idx}.{os.path.basename(file_name_key)}"
                        if file_name in progress_files:
                            log_str(f"Skipping already processed file: {file_name}")
                            continue

                run_contextual_sample_single_line(
                    prompt_idx=prompt_idx, 
                    role_idx=role_idx, 
                    file_idx=file_idx, 
                    esbmc_output_type=esbmc_output_type,
                )


1710234368.5373101: Running Contextual Strategy: Single Line

1710234368.5376346: Notice: Running new cycle with prompt (0)
1710234368.5376482: Notice: Prompt has no roles. Roles will not be cycled or used (0).

1710234368.5378327: Notice: Checkpoint 0.0.ce.0.gcas_5_safe.c-amalgamation-149.c
1710234369.968499: Notice: Duration: 0.8299524784088135


1710234369.9693882: Notice: Checkpoint 0.0.ce.1.gcas_8_safe.c-amalgamation-6.c
1710234373.5855856: Notice: Duration: 3.614337682723999


1710234373.7372694: Notice: Checkpoint 0.0.ce.2.robot_5_safe.c-amalgamation-124.c
1710234375.635307: Notice: Duration: 1.8967316150665283


1710234375.6364942: Notice: Checkpoint 0.0.ce.3.robot_5_safe.c-amalgamation-13.c
1710234378.1564438: Notice: Duration: 2.518692970275879


1710234378.1576607: Notice: Checkpoint 0.0.ce.4.robot_6_safe.c-amalgamation-46.c
1710234382.5072443: Notice: Duration: 4.348337650299072


1710234382.508448: Notice: Checkpoint 0.0.ce.5.robot_6_safe.c-amalgamation-49.c
1710234385.51

## 3. Clang Feedback To LLM

Perform a second iteration fix where the LLM is provided Clang feedback and asked to repair it. The results in `samples-patched` will be used along with `esbmc_output_1` (meaning 1st attempt). The results will be placed in `results-2` and `samples-patched-2`.

### Load ESBMC Output


In [95]:
clang_file_keys: list[str] = sorted(os.listdir(f"esbmc_output_1/"))
data_samples_1: dict[str, str] = {}
data_esbmc_output_1: dict[str, str] = {}

for file_name in clang_file_keys:
    if not file_name.endswith(".c"):
        continue
    
    # Read samples-patched (Generated previously)
    with open(f"samples-patched/{file_name}", "r") as file:
        data_samples_1[file_name] = file.read()

    # Read esbmc_output_1 (Generated by running ./eval-esbmc.sh)
    with open(f"esbmc_output_1/{file_name}", "r") as file:
        data_esbmc_output_1[file_name] = file.read()

### Create Directories

In [96]:
dirs: list[str] = ["samples-patched-2", "results-2"]

for dir in dirs:
    if not os.path.exists(dir):
        os.mkdir(dir)

### Get Error Line

Clang reports errors differently, this method will be used to find the error line: `samples-patched/12.0.ce.36.cartpole_2_safe.c-amalgamation-43.c:177:1: error:`

In [97]:
def get_source_code_err_line_clang(esbmc_output: str, filepath: str) -> int:
    """Gets the error line reported in the clang output."""
    lines: list[str] = esbmc_output.splitlines()
    for ix, line in enumerate(lines):
        # Find the first line containing a filename along with error.
        line_split: list[str] = line.split(":")
        # Check for the filename
        if line_split[0] == filepath and " error" in line_split[3]:
            return int(line_split[1])
            
    raise Exception(f'Could not find error line in {file_name_key}')

### Run Experiments

In [98]:
def run_contextual_sample_clang(
        prompt_idx: int, 
        role_idx: int,
        file_idx: int,
        file_name_key: str) -> None:
    prompt: str = all_prompts[prompt_idx]
    role: str = persona_roles[role_idx]
    
    # Name will be sorted by experimental order. Not filename as common experiments can be
    # found near eachother.
    file_name: str = f"{prompt_idx}.{role_idx}.{file_idx}.{os.path.basename(file_name_key)}"
    
    print_and_log()
    print_and_log(f"Notice: Checkpoint {file_name}")

    esbmc_output: str = get_esbmc_output_sized(data_esbmc_output_1[file_name_key])

    source_code: str = data_samples_1[file_name_key]
    source_code_lines: list[str] = source_code.splitlines(True)
    
    # Get the used ESBMC output length in order to calculate how to add it to the prompt template.
    esbmc_output_token_len: int = num_tokens_from_string(esbmc_output)

    # 0 based, pass full ESBMC output to get error line
    err_line: int = get_source_code_err_line_clang(data_esbmc_output_1[file_name_key], f"samples-patched/{file_name_key}") - 1

    # Trim the source code in order to give it to the LLM.
    lower_bound: int = get_lower_bound(source_code_lines, err_line, esbmc_output_token_len * 0.9)
    # The upper bound is inclusive.
    upper_bound: int = get_upper_bound(source_code_lines, err_line, esbmc_output_token_len * 0.1)
    trimmed_sc: str = "".join(source_code_lines[lower_bound:upper_bound+1])
    
    try:
        log_str(f"LLM Raw Input:\n{get_llm_message(prompt, trimmed_sc, esbmc_output, role)}")
        
        delta: float = time()
        # Role will be passed, if the prompt does not contain {role} then it will be not used.
        llm_output_raw = run_sample(prompt, trimmed_sc, esbmc_output, role)
        delta = time() - delta

        log_str(f"Raw Response:\n\n{llm_output_raw}")
        print_and_log(f"Notice: Duration: {delta}")
        
        llm_output = get_code_from_solution(llm_output_raw)
        log_str(f"Extracted Code:\n\n{llm_output}")
        
        # Save patch
        with open(f"results-2/{file_name}", "w") as file:
            file.write(llm_output)

        # Stitch together patch
        patched_source: str = apply_patch_brutal_replacement(source_code, llm_output, lower_bound, upper_bound)

        # Save patched source
        with open(f"samples-patched-2/{file_name}", "w") as file:
            file.write(patched_source)
    except Exception as e:
        print_and_log(f"Notice: error: {file_name_key}: {e}")
    
    print_and_log()

    log_file.flush()
    
    # Write progress
    with open("progress-2.txt", "a") as file:
        # Write the file name and subdirectory in progress in order to know which file is being
        # processed in what subdirectory.
        file.write(f"{file_name}\n")

In [None]:
print_and_log()
print_and_log("Running Contextual Strategy: 2nd Iteration Clang Assist")

# Loop through clang files
for file_name_key in clang_file_keys:
    file_name_split: list[str] = file_name_key.split(".")
    
    # Get prompt from file
    prompt_idx: int = int(file_name_split[0])
    # Get role from file
    role_idx: int = int(file_name_split[1])
    # Get CE/VP from file
    esbmc_output_type: str = file_name_split[2]
    # Get file_idx
    file_idx: int = int(file_name_split[3])

    # Check if file has already been processed and skip.
    if os.path.exists("progress-2.txt"):
        with open("progress-2.txt", "r") as file:
            progress_files: list[str] = file.read().splitlines()
            file_name: str = f"{prompt_idx}.{role_idx}.{file_idx}.{os.path.basename(file_name_key)}"
            if file_name in progress_files:
                log_str(f"Skipping already processed file: {file_name}")
                continue

    if "VERIFICATION" in data_esbmc_output_1[file_name_key]:
        print_and_log(f"Skipping {file_idx} {file_name_key} as it has VERIFICATION")
        continue

    run_contextual_sample_clang(
        prompt_idx=prompt_idx, 
        role_idx=role_idx, 
        file_idx=file_idx,
        file_name_key=file_name_key,
    )


1710896262.191575: Running Contextual Strategy: 2nd Iteration Clang Assist

1710896262.1919131: Notice: Checkpoint 0.0.0.0.0.ce.0.gcas_5_safe.c-amalgamation-149.c
1710896283.7466295: Notice: Duration: 21.544837951660156


1710896283.7472646: Notice: Checkpoint 0.0.1.0.0.ce.1.gcas_8_safe.c-amalgamation-6.c
1710896285.7952905: Notice: Duration: 2.0375583171844482


1710896285.7958443: Notice: Checkpoint 0.0.10.0.0.ce.10.vdp_2_safe.c-amalgamation-27.c
1710896291.2389543: Notice: Duration: 5.439812660217285


1710896291.2395148: Notice: Checkpoint 0.0.11.0.0.ce.11.vdp_5_safe.c-amalgamation-66.c
1710896307.8892813: Notice: Duration: 16.64569330215454


1710896307.8898926: Notice: Checkpoint 0.0.12.0.0.ce.12.vdp_7_safe.c-amalgamation-28.c
1710896308.9325588: Notice: Duration: 1.0382611751556396


1710896308.9331083: Notice: Checkpoint 0.0.13.0.0.ce.13.vdp_7_safe.c-amalgamation-30.c
1710896313.6488652: Notice: Duration: 4.712406158447266


1710896313.6494334: Notice: Checkpoint 0.0.14.0.0.ce

# Unit Tests

In [33]:
import unittest

class TestNotebook(unittest.TestCase):
    
    def test_get_code_from_solution(self):
        self.assertEqual(get_code_from_solution("This is a code block:\n\n```c\naaa\n```"), "aaa")
        self.assertEqual(get_code_from_solution("This is a code block:\n\n```\nabc\n```"), "abc")
        self.assertEqual(get_code_from_solution("This is a code block:```abc\n```"), "This is a code block:```abc\n```")

    def test_apply_brutal_replacement_strategy(self):
        text = "\n".join(["a", "b", "c", "d", "e", "f", "g"])
        answer = "\n".join(["a", "b", "1", "g"])
        self.assertEqual(apply_patch_brutal_replacement(text, "1", 2, 5), answer)
        text = "\n".join(["a", "b", "c", "d", "e", "f", "g"])
        answer = "\n".join(["a", "b", "c", "1", "e", "f", "g"])
        self.assertEqual(apply_patch_brutal_replacement(text, "1", 3, 3), answer)

    def test_get_source_code_err_line(self):
        self.assertEqual(get_source_code_err_line(data_esbmc_output["reinforcement_learning/cartpole_48_safe.c-amalgamation-6.c"]), 323)
        self.assertEqual(get_source_code_err_line(data_esbmc_output["reinforcement_learning/cartpole_92_safe.c-amalgamation-14.c"]), 221)
        self.assertEqual(get_source_code_err_line(data_esbmc_output["reinforcement_learning/cartpole_95_safe.c-amalgamation-80.c"]), 285)
        self.assertEqual(get_source_code_err_line(data_esbmc_output["reinforcement_learning/cartpole_26_safe.c-amalgamation-74.c"]), 299)
        self.assertEqual(get_source_code_err_line(data_esbmc_output["reach_prob_density/robot_5_safe.c-amalgamation-13.c"]), 350)
        self.assertEqual(get_source_code_err_line(data_esbmc_output["reach_prob_density/vdp_1_safe.c-amalgamation-28.c"]), 247)

    def test_get_lower_bound(self):
        source_code: str = data_samples["reinforcement_learning/cartpole_48_safe.c-amalgamation-6.c"]
        source_code_lines: list[str] = source_code.splitlines(True)

        # Test the additional tokens and if they affect the line.
        self.assertEqual(get_lower_bound(source_code_lines, 323, 0), 176)
        self.assertEqual(get_lower_bound(source_code_lines, 323, 500), 177)
        self.assertEqual(get_lower_bound(source_code_lines, 323, 1000), 178)
        
        # Test if the length is accounted for.
        self.assertLess(num_tokens_from_string(source_code[174:323]), LTOKENS)
        self.assertLess(num_tokens_from_string(source_code[175:323]), LTOKENS - 500)
        self.assertLess(num_tokens_from_string(source_code[176:323]), LTOKENS - 1000)
    
    def test_get_upper_bound(self):
        source_code: str = data_samples["reach_prob_density/robot_5_safe.c-amalgamation-13.c"]
        source_code_lines: list[str] = source_code.splitlines(True)
        
        # Test the additional tokens and if they affect the line.
        self.assertEqual(get_upper_bound(source_code_lines, 323, 0), 444)
        self.assertEqual(get_upper_bound(source_code_lines, 323, 500), 422)
        self.assertEqual(get_upper_bound(source_code_lines, 323, 1000), 379)

        # Test if the length is accounted for.
        self.assertLess(num_tokens_from_string(source_code[323:444]), UTOKENS)
        self.assertLess(num_tokens_from_string(source_code[323:422]), UTOKENS - 500)
        self.assertLess(num_tokens_from_string(source_code[323:379]), UTOKENS - 1000)

    def test_get_esbmc_output_sized(self):
        esbmc_output: str = data_esbmc_output["reach_prob_density/gcas_5_safe.c-amalgamation-149.c"]
        self.assertGreater(num_tokens_from_string(esbmc_output), MAX_ESBMC_OUTPUT)
        self.assertLessEqual(num_tokens_from_string(get_esbmc_output_sized(esbmc_output)), MAX_ESBMC_OUTPUT)

unittest.main(argv=[''], verbosity=2, exit=False)

test_apply_brutal_replacement_strategy (__main__.TestNotebook.test_apply_brutal_replacement_strategy) ... ok
test_get_code_from_solution (__main__.TestNotebook.test_get_code_from_solution) ... ok
test_get_esbmc_output_sized (__main__.TestNotebook.test_get_esbmc_output_sized) ... ok
test_get_lower_bound (__main__.TestNotebook.test_get_lower_bound) ... ok
test_get_source_code_err_line (__main__.TestNotebook.test_get_source_code_err_line) ... ok
test_get_upper_bound (__main__.TestNotebook.test_get_upper_bound) ... ok

----------------------------------------------------------------------
Ran 6 tests in 0.746s

OK


<unittest.main.TestProgram at 0xffff91ce2350>