In [None]:
!pip install --upgrade pip
!pip install openai
!pip install z3-solver


In [None]:
!java -version


# invaR1ant-evaluation: SMT-LIB Constraint Generation via LLMs

## Project Overview

This notebook implements an automated framework for leveraging large language models to generate SMT-LIB constraints that characterize worst-case execution paths in programs. The implementation analyzes how input conditions influence program performance at different input scales by systematically generating and testing constraints.

## Background and Motivation

This project addresses the challenge of characterizing worst-case execution paths in software systems. The approach uses examples of SMT-LIB constraints at smaller input sizes to prompt LLMs to generate corresponding constraints for larger input sizes.

Unlike the CODE version (which generates Python code that can programmatically produce constraints), this notebook directly generates SMT-LIB constraint expressions within specialized tags:

```
<constants>(declare-const in0 Int)...</constants> <answer>(assert (and ...))...</answer>
```

This direct constraint generation approach may face challenges with token limitations when working with very large input sizes.

## What This Notebook Does

The experiment conducts the following workflow:

1. **Extract Constraints** from existing SMT files as examples (N=1 to N=18)
2. **Query Language Models** (OpenAI API) to generate larger-scale constraints directly
3. **Extract SMT-LIB Format Assertions** from responses (using regex patterns)
4. **Validate Constraints** using Z3 theorem prover
5. **Execute Test Cases** against a Java implementation (SortedListInsert)
6. **Collect and Analyze Results** to verify execution path lengths

## Key Components

- **PromptBuilder**: Extracts constraints from SMT files and builds prompts for OpenAI
- **OpenaiAPI**: Handles communication with OpenAI models with retry logic and rate limiting
- **Services**: Provides utility functions for Z3 solver integration and test execution
- **Experiment**: Orchestrates the end-to-end evaluation process

## Technical Prerequisites

- Python 3.12+ with packages:
  - openai
  - z3-solver
  - tqdm
  - dotenv (for environment variable management)
- Java 8+ runtime (JDK 1.8.0_411 tested)
- Z3 SMT solver installed and available in PATH
- OpenAI API key set in environment variables as "OPENAI_KEY"

## Experiment Configuration

The experiment runs with configurable parameters:
- Test input sizes (default: 250, 500, 1000)
- OpenAI model (default: gpt-4o)
- Maximum attempts per testing value (default: 10)
- Temperature for model sampling (configurable)

## Interpreting Results

Results are saved as JSON files containing:
- LLM-generated constraints
- Z3 solver outputs
- Test execution metrics
- Execution path lengths (call counts)

Each test case is considered successful when the actual call count matches the expected count (N+1), confirming the worst-case execution path was triggered.

## Known Limitations

- **Token Limitations**: For very large N values, the generated constraints may exceed model token limits
- **Execution Time**: For large N values, Z3 solving and test execution may take considerable time
- **Model Consistency**: Different OpenAI models may produce varying quality solutions
- **Logging Duplication**: Multiple log handlers may cause duplicate output in Jupyter environments

## What this notebook does **NOT** do

This notebook does not cover the following aspects:
- **Compiling Java Code**: The Java code has been compiled and is in the `/spf-wca/src/examples/` and `/spf-wca/build/examples/` directory.
- **Running JPF/SPF on Java Code**: The Java code has been run using JPF and SPF-WCA. The results are in the `/spf-wca/sorted_list_insert/` directory.
- **Compiling and building *JPF-CORE* *JPF-SYMBC* and *SPF-WCA***: The Java PathFinder (JPF) and SPF-WCA have been compiled and built. The JPF-CORE and JPF-SYMBC are in the `/jpf-core/` and `/jpf-symbc/` directories respectively. The SPF-WCA is in the `/spf-wca/` directory.

---

*Note: This notebook is part of research on program analysis and invariant generation using large language models. The approach demonstrates how LLMs can be used to generalize constraints for worst-case program behavior across different input scales.*

In [None]:
import json
import logging
import os
import re
import subprocess
import sys
import time
from contextlib import suppress
from pathlib import Path

import openai
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()

In [None]:
class prompt:
    promptStart1 = """I'm experimenting with a program to understand how input conditions influence its worst-case performance, particularly in terms of finding the longest execution path as the input size increases. By conducting a worst-case analysis, I aim to identify constraints that define a valid input set at different input sizes (N). So far, I have found one possible set of correct constraints/conditions (not the only one) that characterize such valid inputs. Here they are:\n\n"""
    promptStart2 = """\n\nRespond with the complete constants inside <constants> </constants> tags and single valid SMT-LIB constraint expression inside <answer> </answer> tags. For example,<constants>(declare-const in0 Int)\n(declare-const in1 Int)</constants> <answer>(assert (and (>= in0 0) (<= in1 10)))</answer>"""
    promptStart3 = """\nWhat are the constraints for the input set of size N="""
    promptfeedback1 = """Remember to include both <constants> </constants> tags and a single valid SMT-LIB constraint in <answer> </answer> tags. Your format was not correct."""
    promptfeedback2 = """Remember to include both <constants> </constants> tags and a single valid SMT-LIB constraint in <answer> </answer> tags. Your constraint is not a valid SMTLIB expression."""
    promptsys = """Give a complete SMT-LIB expression with constants in <constants> </constants> tags and a single valid constraint in <answer> </answer> tags. Do not shorten your answer."""

class PromptBuilder:
    def __init__(self, constraints_dir="spf-wca/sorted_list_insert/verbose/heuristic", file_prefix="SortedListInsert") -> None:
        constraints_dir = Path(os.getcwd()) / constraints_dir
        self.constraints_dir = Path(constraints_dir)
        self.file_prefix = file_prefix
        self.constraints = self.__get_constraints()

    def __get_constraints(self) -> str:
        """
        Extract constraints from the SMT files in a sorted order.
        """
        if not self.constraints_dir.exists():
            raise FileNotFoundError(f"Directory {self.constraints_dir} does not exist.")

        constraints = []
        smt_files = list(self.constraints_dir.glob(f"{self.file_prefix}_*.smt2"))

        if not smt_files:
            raise FileNotFoundError(f"No SMT files found in {self.constraints_dir}")

        def extract_n(file_path):
            try:
                n_str = str(file_path).split(self.file_prefix)[-1].split(".smt2")[0]
                return int(n_str.strip("_"))
            except (IndexError, ValueError):
                return float("inf")  # Files that can't be parsed go at the end

        smt_files.sort(key=extract_n)

        for smt_file in smt_files:
            n = extract_n(smt_file)
            if n == float("inf"):
                continue

            with open(smt_file, "r") as file:
                smt_lines = [line.strip() for line in file if line.strip()]
                assertions = [line for line in smt_lines if line.startswith("(assert")]
                assertions_str = "\n".join(assertions)
                constraints.append(f"N={n}: {assertions_str}")

        return "\n".join(constraints)

    def build_prompt(self, testing_value: int) -> dict:
        """
        Build the prompt for the OpenAI API.
        """
        data = prompt()
        return {
            "sys": data.promptsys,
            "start": data.promptStart1
            + self.constraints
            + data.promptStart2
            + data.promptStart3
            + str(testing_value)
            + "?",
            "feedback-format": data.promptfeedback1,
            "feedback-invalid": data.promptfeedback2,
        }

In [None]:
class OpenaiAPI:
    """
    OpenAI API interface.
    """

    def __init__(self, model: str, temperature: int = None) -> None:
        self.api_key = os.getenv("OPENAI_KEY")
        if not self.api_key:
            msg = "OPENAI_KEY environment variable not set"
            raise ValueError(msg)

        self.model = model
        self.client = openai.OpenAI(api_key=self.api_key)
        self.logger = logging.getLogger(__name__)
        self.logger.addHandler(logging.StreamHandler(sys.stdout))
        self.logger.setLevel(logging.INFO)
        self.temperature = temperature

    def chat(self, history: list, max_retries=5) -> str:
        """Chat with the OpenAI API with improved error handling."""
        retry_count = 0

        while retry_count < max_retries:
            try:
                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=history,
                    temperature=self.temperature,
                )
                return response.choices[0].message.content
            except openai.RateLimitError as e:
                retry_count += 1
                self.logger.warning(f"Rate limit error: {e}")

                if "tokens" in str(e) or "must be reduced" in str(e):
                    self.logger.info("Token limit exceeded - reducing context")
                    history = self.__reduce_context(history)
                    if not history:
                        return "Cannot reduce context further."
                    continue

                wait_time = min(60, 2**retry_count)
                self.logger.info(f"Rate limit exceeded. Waiting {wait_time}s")
                time.sleep(wait_time)
            except openai.BadRequestError as e:
                self.logger.error(f"Bad request: {e}")
                if "maximum context length" in str(e):
                    history = self.__reduce_context(history)
                    if not history:
                        return "Error: Maximum context length exceeded."
                    continue
                return f"Bad request: {str(e)}"
            except (openai.APIConnectionError, openai.APITimeoutError) as e:
                retry_count += 1
                wait_time = min(30, 2**retry_count)
                self.logger.warning(f"Connection error: {e}. Retrying in {wait_time}s...")
                time.sleep(wait_time)
            except Exception as e:
                self.logger.error(f"Unexpected error: {e}")
                return f"Error: {str(e)}"

        return "Maximum retry limit reached"

    def __reduce_context(self, history: list) -> list:
        """
        Reduce the context by 1.
        """
        if len(history) < 3:
            return False

        # Keep prompt and remove second item in the list
        history.pop(2)
        return history

In [None]:
class Services:
    """
    Services for Z3 solver and program execution.
    """

    @classmethod
    def check_z3_installed(cls) -> bool:
        """
        Check if Z3 solver is installed.
        """
        try:
            result = subprocess.run(["z3", "--version"], capture_output=True, text=True)
            return result.returncode == 0
        except FileNotFoundError:
            return False

    @staticmethod
    def Z3check(constants: str, constraints: str) -> dict:
        """
        Check the satisfiability of the SMT2 file and return the test cases.

        Args:
            constants: Constants in the SMT2 file
            constraints: Constraints in the SMT2 file

        Returns:
            None if unsat, dict of variable assignments if sat, or error message
            Output: {'in': 1, 'in0': -1, 'in1': 0}

        """
        logger = logging.getLogger(__name__)
        logger.info("Invoking Z3 solver...")
        if not constants or not constraints:
            logger.error("Missing constants or constraints")
            return {"error": "Missing constants or constraints"}
        smt2 = Services.__format_smt2(constants=constants, constraints=constraints)
        if not smt2:
            print("Constraints not formatted correctly")
            return None
        try:
            proc = subprocess.run(["z3", "-in"], input=smt2, capture_output=True, text=True, timeout=60, check=False)

            if proc.returncode != 0:
                logger.error(f"Z3 process error: {proc.stderr}")
                return {"error": f"Z3 process error: {proc.stderr}"}
            output = proc.stdout.strip()
            if "unsat" in output:
                return None
            elif "sat" in output:
                return Services.__parse_z3_model(output, smt2)
            else:
                logger.error(f"Unexpected Z3 output: {output}")
                return {"error": f"Unexpected Z3 output: {output}"}

        except subprocess.TimeoutExpired:
            logger.error("Z3 solver timed out after 60 seconds")
            return {"error": "Z3 solver timed out after 60 seconds"}
        except Exception as e:
            logger.error(f"Error running Z3: {e}")
            return {"error": f"Error running Z3: {str(e)}"}

    @staticmethod
    def __parse_z3_model(output: str, smt2: str) -> dict:
        """Parse Z3 model output into a dictionary."""
        try:
            model_text = output.split("sat", 1)[1].strip()
            if not model_text:
                return {}

            model_dict = {}

            # Find all define-fun blocks
            import re

            pattern = r"\(define-fun\s+([^\s]+)\s+\(\)\s+([^\s]+)\s+(.*?)\)"
            matches = re.findall(pattern, model_text, re.DOTALL)

            for var_name, var_type, var_value in matches:
                value = var_value.strip()
                if var_type == "Int":
                    if value.startswith("(- ") and value.endswith(")"):
                        value = -int(value[3:-1])
                    else:
                        with suppress(ValueError):
                            value = int(value)
                model_dict[var_name] = value

            model_dict["smt2"] = smt2
            model_dict["z3_output"] = output

            return model_dict
        except Exception as e:
            return {"error": f"Error parsing Z3 output: {str(e)}", "raw_output": output}

    @staticmethod
    def __format_smt2(constants: str, constraints: str) -> str:
        """
        Format the SMT2 file with the constants and constraints.
        """
        return f"""
; Constants
{constants}

; Constraints
{constraints}
(check-sat)
(get-model)
"""

    @staticmethod
    def put_testcase(path: str, N: int, data: int) -> str:
        """
        Put the testcase to the program.

        Args:
            path: Path to the program
            N: Input size
            data: Input data

        Returns:
            Output of the program
        """
        try:
            return subprocess.check_output(["java", path, N, data], text=True, timeout=60)
        except subprocess.CalledProcessError as e:
            return f"Error: {e!s}"


In [None]:
class Experiment:
    """
    Experiment class that:
     - Builds prompts for various input sizes.
     - Queries the OpenAI API to get SMT-LIB constraints.
     - Uses the Z3 solver to produce concrete test cases.
     - Records the results.
    """

    def __init__(self, model: str, testing_values=None, max_attempts=10, temperature=None) -> None:
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        self.logger.addHandler(logging.StreamHandler(stream=sys.stdout))

        self.prompt_builder = PromptBuilder()
        self.constraints = self.prompt_builder.constraints
        self.max_attempts = max_attempts
        self.testing_values = testing_values or [250, 500, 1000]

        # Check Z3 installation before proceeding
        if not Services.check_z3_installed():
            raise RuntimeError("Z3 solver not found in PATH. Please install Z3.")

        try:
            self.openai_api = OpenaiAPI(model,temperature=temperature)
        except ValueError as e:
            raise RuntimeError(f"Failed to initialize OpenAI API: {e}")

        self.results = {}

    def __extract_answer(self, text: str) -> str:
        """
        Extract the SMT constraints from the OpenAI response.
        """
        regex = r"<constants>(.*?)</constants>.*?<answer>(.*?)</answer>"
        match = re.search(regex, text, re.DOTALL)
        return (match.group(1), match.group(2)) if match else (None, None)

    def run(self, save_results=True) -> dict:
        """Run the experiment for all testing values."""
        start_time = time.time()

        for testing_value in self.testing_values:
            self.logger.info(f"Testing with N = {testing_value}")
            history = []
            prompt_data = self.prompt_builder.build_prompt(testing_value)
            history.append({"role": "system", "content": prompt_data["sys"]})
            history.append({"role": "user", "content": prompt_data["start"]})
            self.logger.info(f"Prompt: {prompt_data['start']}")

            for attempt in range(1, self.max_attempts + 1):
                self.logger.info(f"Attempt {attempt} of {self.max_attempts}")

                # Get response from OpenAI
                response = self.openai_api.chat(history)
                self.logger.info(f"OpenAI Response:\n{response}")

                # Check if response has the expected format
                if (
                    "<answer>" in response
                    and "</answer>" in response
                    and "<constants>" in response
                    and "</constants>" in response
                ):
                    constants, constraints = self.__extract_answer(response)

                    if not constants or not constraints:
                        self.logger.warning("No valid constraints found. Retrying...")
                        history.append({"role": "assistant", "content": response})
                        history.append({"role": "user", "content": prompt_data["feedback-format"]})
                        continue

                    # Get model from Z3
                    model = Services.Z3check(constants, constraints)

                    # Check for errors in the model
                    if model is None:
                        self.logger.warning("Constraints are unsatisfiable. Retrying...")
                        history.append({"role": "assistant", "content": response})
                        history.append({"role": "user", "content": prompt_data["feedback-invalid"]})
                        continue

                    if "error" in model:
                        self.logger.warning(f"Z3 error: {model['error']}. Retrying...")
                        history.append({"role": "assistant", "content": response})
                        history.append({"role": "user", "content": prompt_data["feedback-invalid"]})
                        continue

                    # Store initial result
                    self.results[testing_value] = {
                        "Success": False,
                        "Z3": model,
                        "OpenAI": response,
                        "Attempts": attempt,
                    }

                    # Execute test only if we have input value
                    if "in" in model:
                        self.logger.info(f"Executing the program with test case: {model['in']}")
                        EXPECTED_CALL_COUNT = testing_value + 1
                        path = "./test/SortedListInsert"

                        try:
                            output = Services.put_testcase(path, testing_value, model["in"])
                            output_value = output.strip()

                            try:
                                call_count = int(output_value)
                                success = call_count == EXPECTED_CALL_COUNT

                                self.results[testing_value]["Success"] = success
                                self.results[testing_value]["Output"] = output
                                self.results[testing_value]["ExpectedCallCount"] = EXPECTED_CALL_COUNT
                                self.results[testing_value]["ActualCallCount"] = call_count

                                if success:
                                    self.logger.info(f"Success! Call count matches expected: {call_count}")
                                    break
                                else:
                                    self.logger.warning(
                                        f"Call count {call_count} doesn't match expected {EXPECTED_CALL_COUNT}"
                                    )

                            except ValueError:
                                self.logger.error(f"Could not parse output as integer: {output_value}")
                                self.results[testing_value]["Error"] = f"Failed to parse output: {output_value}"

                        except Exception as e:
                            self.logger.error(f"Error executing test: {e}")
                            self.results[testing_value]["Error"] = f"Execution error: {str(e)}"
                    else:
                        self.logger.warning("Model missing 'in' value. Cannot execute test.")
                        self.results[testing_value]["Error"] = "Model missing 'in' value"

                    # Break if we have a valid model, even if execution failed
                    break
                else:
                    self.logger.warning("Response did not contain valid tags. Retrying...")
                    history.append({"role": "assistant", "content": response})
                    history.append({"role": "user", "content": prompt_data["feedback-format"]})
            else:
                self.logger.error(f"Failed after {self.max_attempts} attempts for N={testing_value}")
                self.results[testing_value] = {"Success": False, "Error": f"Failed after {self.max_attempts} attempts"}

        # Save results if requested
        if save_results:
            self.save_results()

        duration = time.time() - start_time
        self.logger.info(f"Experiment completed in {duration:.2f} seconds")

    def save_results(self, filename=None):
        """Save results to a JSON file."""
        if not filename:
            timestamp = time.strftime("%Y%m%d-%H%M%S")
            filename = f"experiment_results_{timestamp}.json"

        try:
            with open(filename, "w") as f:
                json.dump(self.results, f, indent=2)
            self.logger.info(f"Results saved to {filename}")
        except Exception as e:
            self.logger.error(f"Failed to save results: {e}")


In [None]:
experiment = Experiment("gpt-4o")
experiment.run()