In [3]:
import os
import re
import traceback
import ast
from openai import OpenAI
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Check if the API key is set
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("API key is not set")

# Initialize the OpenAI client
client = OpenAI()

def extract_pattern(text, pattern):
    """
    Extracts a pattern from the given text.

    Args:
        text (str): The text to search within.
        pattern (str): The regex pattern to search for.

    Returns:
        str: The extracted pattern or None if not found.
    """
    match = re.search(pattern, text, flags=re.DOTALL)
    if match:
        return match.group(1).strip()
    return None

def run_openai_agent(query):
    print("### Run OpenAI agent")
    print(f"User query: {query}")
    try:
        completion = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "You are a Python coding assistant. Return a Python function for the given query, followed by a test case using assertion, and an example usage with expected output. Format your response with '## Python function', '## Test case', and '## Example usage' headers."},
                {"role": "user", "content": query}
            ]
        )
        result = completion.choices[0].message.content
        print("Raw response from OpenAI:")
        print(result)
        return result
    except Exception as e:
        print(f"Request failed: {e}. Please check your request.")
        return None

def extract_code(result):
    retry = False
    print("### Extracting Python function")
    python_function = extract_pattern(result, r'## Python function(.*?)## Test case')
    if not python_function:
        retry = True
        print("Python function failed to generate or wrong output format. Setting retry to True.")
        print("Attempting to extract entire response as Python function.")
        python_function = result
    else:
        python_function = re.sub(r'```python\s*', '', python_function)
        python_function = re.sub(r'\s*```\s*', '', python_function)
        python_function = python_function.strip()

    print("### Extracting test case")
    test_function = extract_pattern(result, r'## Test case(.*?)## Example usage')
    if not test_function:
        retry = True
        print("Test function failed to generate or wrong output format. Setting retry to True.")
        print("Attempting to create a simple test case.")
        test_function = "assert True  # Placeholder test"
    else:
        test_function = re.sub(r'```python\s*', '', test_function)
        test_function = re.sub(r'\s*```\s*', '', test_function)
        test_function = test_function.strip()

    print("### Extracting example usage")
    example_usage = extract_pattern(result, r'## Example usage(.*?)$')
    if not example_usage:
        retry = True
        print("Example usage failed to generate or wrong output format. Setting retry to True.")
        print("Attempting to create a simple example usage.")
        example_usage = "print(function_name(example_input))  # Placeholder example"
    else:
        example_usage = re.sub(r'```python\s*', '', example_usage)
        example_usage = re.sub(r'\s*```\s*', '', example_usage)
        example_usage = example_usage.strip()

    print("Extracted Python function:")
    print(python_function)
    print("Extracted test case:")
    print(test_function)
    print("Extracted example usage:")
    print(example_usage)

    return python_function, test_function, example_usage, retry

def check_code(python_function, test_function, example_usage):
    retry = False
    try:
        print("Executing Python function:")
        print(python_function)
        exec(python_function, globals())
        print("Code executed successfully.")
        try:
            print("Executing test case:")
            print(test_function)
            exec(test_function, globals())
            print("Code passed test case.")
            try:
                print("Executing example usage:")
                print(example_usage)
                exec(example_usage, globals())
                print("Example usage executed successfully.")
            except Exception as e:
                print(f"Example usage failed with error: {str(e)}")
                print(traceback.format_exc())
                retry = True
                print("Setting retry to True")
        except Exception as e:
            print(f"Test failed with error: {str(e)}")
            print(traceback.format_exc())
            retry = True
            print("Setting retry to True")
    except Exception as e:
        print(f"Code failed with error: {str(e)}")
        print(traceback.format_exc())
        retry = True
        print("Setting retry to True")
    return retry

def run_workflow(query):
    print("### ENTER WORKFLOW")
    i = 0
    max_retries = 3
    retry = True
    while i < max_retries and retry:
        print(f"TRY # {i}")
        i += 1
        result = run_openai_agent(query)
        python_function, test_function, example_usage, retry = extract_code(result)
        retry = check_code(python_function, test_function, example_usage)

    if not retry:
        print(f"Validated Python function: ```{python_function}```")
        print(f"Example usage: ```{example_usage}```")
    print("### EXIT WORKFLOW")

In [4]:
# test the workflow 
run_workflow("How can i count the number of duplicates in a list?")


### ENTER WORKFLOW
TRY # 0
### Run OpenAI agent
User query: How can i count the number of duplicates in a list?
Raw response from OpenAI:
## Python function
```python
def count_duplicates(lst):
    """
    Count the number of duplicate elements in a list.

    Args:
    lst (list): The list to check for duplicate elements.

    Returns:
    int: The count of duplicate elements.
    """
    element_count = {}
    duplicates = 0

    for item in lst:
        if item in element_count:
            if element_count[item] == 1:
                duplicates += 1
            element_count[item] += 1
        else:
            element_count[item] = 1

    return duplicates
```

## Test case
```python
def test_count_duplicates():
    assert count_duplicates([1, 2, 2, 3, 4, 4, 4, 5, 6]) == 2
    assert count_duplicates([1, 1, 1, 1]) == 1
    assert count_duplicates([1, 2, 3, 4, 5]) == 0
    assert count_duplicates([]) == 0
    assert count_duplicates(["a", "b", "a", "b", "c"]) == 2

test_count_dupli

In [5]:
def self_correcting_workflow(query, max_attempts=5):
    for attempt in range(max_attempts):
        print(f"Attempt {attempt + 1}/{max_attempts}")
        result = run_openai_agent(query)
        python_function, test_function, example_usage, _ = extract_code(result)
        
        try:
            exec(python_function, globals())
            exec(test_function, globals())
            print("Function passed all tests!")
            return python_function, example_usage
        except AssertionError as e:
            print(f"Test failed: {e}")
            error_context = f"The following function failed this test: {e}\n\n{python_function}\n\nPlease fix the function to pass the test."
            query = f"Fix the following Python function: {error_context}"
    
    print("Max attempts reached. Could not generate a correct function.")
    return None, None

# Usage
function, usage = self_correcting_workflow("Write a function to find the nth Fibonacci number")
if function:
    print("Final function:")
    print(function)
    print("Example usage:")
    print(usage)

Attempt 1/3
### Run OpenAI agent
User query: Write a function to find the nth Fibonacci number
Raw response from OpenAI:
## Python function
```python
def fibonacci(n):
    if n <= 0:
        raise ValueError("n must be a positive integer.")
    elif n == 1:
        return 0
    elif n == 2:
        return 1
    a, b = 0, 1
    for _ in range(2, n):
        a, b = b, a + b
    return b
```

## Test case
```python
def test_fibonacci():
    assert fibonacci(1) == 0
    assert fibonacci(2) == 1
    assert fibonacci(3) == 1
    assert fibonacci(4) == 2
    assert fibonacci(5) == 3
    assert fibonacci(6) == 5
    assert fibonacci(10) == 34

test_fibonacci()
```

## Example usage
```python
print(fibonacci(10))  # Expected output: 34
```
### Extracting Python function
### Extracting test case
### Extracting example usage
Extracted Python function:
def fibonacci(n):
    if n <= 0:
        raise ValueError("n must be a positive integer.")
    elif n == 1:
        return 0
    elif n == 2:
     

In [7]:
query = """
Write a function called 'sieve_and_twin_sum' that implements the Sieve of Eratosthenes algorithm to find all prime numbers up to a given number n. The function should also calculate and return the sum of all twin primes found. Twin primes are pairs of primes that differ by 2.

The function should:
1. Take an integer n as input.
2. Return a tuple containing:
   a) A list of all prime numbers up to n.
   b) The sum of all twin primes found.

Example:
For n = 15, the function should return:
([2, 3, 5, 7, 11, 13], 12)

Explanation:
- Primes up to 15 are [2, 3, 5, 7, 11, 13]
- Twin prime pairs are (3,5) and (11,13)
- Sum of twin primes: 3 + 5 + 11 + 13 = 32

Make sure to handle edge cases and raise a ValueError if n is less than 2.
"""

function, usage = self_correcting_workflow(query)
if function:
    print("Final function:")
    print(function)
    print("Example usage:")
    print(usage)

Attempt 1/3
### Run OpenAI agent
User query: 
Write a function called 'sieve_and_twin_sum' that implements the Sieve of Eratosthenes algorithm to find all prime numbers up to a given number n. The function should also calculate and return the sum of all twin primes found. Twin primes are pairs of primes that differ by 2.

The function should:
1. Take an integer n as input.
2. Return a tuple containing:
   a) A list of all prime numbers up to n.
   b) The sum of all twin primes found.

Example:
For n = 15, the function should return:
([2, 3, 5, 7, 11, 13], 12)

Explanation:
- Primes up to 15 are [2, 3, 5, 7, 11, 13]
- Twin prime pairs are (3,5) and (11,13)
- Sum of twin primes: 3 + 5 + 11 + 13 = 32

Make sure to handle edge cases and raise a ValueError if n is less than 2.

Raw response from OpenAI:
## Python function

```python
def sieve_and_twin_sum(n):
    if n < 2:
        raise ValueError("n must be greater than or equal to 2")

    # Sieve of Eratosthenes algorithm to find all pri

# Self-Correcting LLM Coding Workflow

## Initial Setup
1. Define the coding problem or task
2. Set up test cases and evaluation criteria
3. Initialize attempt counter and maximum attempts

## Workflow Process
1. **Generate Initial Function**
   - Query LLM with the original problem statement
   - LLM generates initial function, test case, and example usage

2. **Test Function**
   - Execute the generated function
   - Run it against predefined test cases

3. **Evaluate Results**
   - If all tests pass:
     - Return the successful function and exit workflow
   - If any test fails:
     - Capture specific error messages or discrepancies

4. **Prepare Feedback**
   - Construct a new query for the LLM, including:
     - Original function code
     - Specific error messages or test failures

5. **Request Correction**
   - Send the prepared query back to the LLM
   - LLM generates an updated function based on the feedback

6. **Repeat Process**
   - Increment attempt counter
   - If maximum attempts reached:
     - Exit workflow and report failure
   - Otherwise:
     - Go back to step 2 (Test Function)

## Output
- If successful: Return the correct function and example usage
- If unsuccessful after max attempts: Report failure and last attempt details

In [21]:
import os
from openai import OpenAI
from datetime import datetime

# Initialize the OpenAI client
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

def write_workflow_results(query, attempts, final_function, final_usage, success, max_attempts):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"workflow_results_{timestamp}.md"
    
    with open(filename, 'w') as f:
        f.write("# Self-Correcting LLM Coding Workflow Results\n\n")
        
        f.write("## Initial Query\n")
        f.write(f"```\n{query}\n```\n\n")
        
        f.write("## Attempts\n")
        for i, attempt in enumerate(attempts, 1):
            f.write(f"### Attempt {i}/{max_attempts}\n")
            f.write("#### Generated Function\n")
            f.write(f"```python\n{attempt['function']}\n```\n\n")
            f.write("#### Test Case\n")
            f.write(f"```python\n{attempt['test_case']}\n```\n\n")
            f.write("#### Example Usage\n")
            f.write(f"```python\n{attempt['example_usage']}\n```\n\n")
            if 'error' in attempt:
                f.write(f"#### Error\n{attempt['error']}\n\n")
        
        f.write("## Final Results\n")
        if success:
            f.write("### Success: Function passed all tests\n")
            f.write("#### Final Function\n")
            f.write(f"```python\n{final_function}\n```\n\n")
            f.write("#### Example Usage\n")
            f.write(f"```python\n{final_usage}\n```\n")
        else:
            f.write("### Failure: Maximum attempts reached without success\n")
            f.write("#### Last Attempt\n")
            f.write(f"```python\n{attempts[-1]['function']}\n```\n")
        
    print(f"Workflow results written to {filename}")

import re

def run_workflow(query, max_attempts=2):
    attempts = []
    success = False
    final_function = ""
    final_usage = ""

    for i in range(max_attempts):
        print(f"Attempt {i + 1}/{max_attempts}")
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "You are a Python programming assistant."},
                {"role": "user", "content": f"Write a Python function to {query}. Include a test case and an example usage."}
            ]
        )
        
        generated_code = response.choices[0].message.content
        
        # Extract function, test case, and example usage
        function_match = re.search(r'```python\n(def .*?)\n```', generated_code, re.DOTALL)
        test_match = re.search(r'```python\n(def test_.*?)\n```', generated_code, re.DOTALL)
        example_match = re.search(r'```python\n(.*?)\n```', generated_code, re.DOTALL)

        generated_function = function_match.group(1) if function_match else ""
        test_case = test_match.group(1) if test_match else ""
        example_usage = example_match.group(1) if example_match else ""

        attempt = {
            'function': generated_function,
            'test_case': test_case,
            'example_usage': example_usage
        }

        # Execute and test the function
        try:
            # Create a new namespace to avoid conflicts
            namespace = {}
            exec(generated_function, namespace)
            if test_case:
                exec(test_case, namespace)
            success = True
            final_function = generated_function
            final_usage = example_usage
            attempts.append(attempt)
            print("Function passed all tests!")
            break  # Exit the loop if successful
        except Exception as e:
            attempt['error'] = str(e)
            attempts.append(attempt)
            print(f"Test failed: {e}")

    write_workflow_results(query, attempts, final_function, final_usage, success, max_attempts)
    return final_function, final_usage

# Usage remains the same
questions = [
    "Calculate the sum of squares of numbers from 1 to n",
    "Reverse a string without using the built-in reverse function",
    "Find the second largest number in a list of integers",
    "Check if a number is prime",
    "Implement a basic stack data structure with push, pop, and peek operations",
    "Generate the nth Fibonacci number using recursion",
    "Read a text file and return the count of each word",
    "Design a simple class to represent a bank account with deposit and withdraw methods",
    "Implement the binary search algorithm for a sorted list of integers",
    "Convert a string to an integer, handling potential exceptions",
    "Return a list of all perfect squares up to a given number n",
    "Validate an email address using regular expressions"
]

for i, question in enumerate(questions, 1):
    print(f"\n--- Question {i} ---")
    print(f"Task: {question}")
    function, usage = run_workflow(question)
    if function:
        print("Final function:")
        print(function)
        print("Example usage:")
        print(usage)
    print("----------------------\n")


--- Question 1 ---
Task: Calculate the sum of squares of numbers from 1 to n
Attempt 1/2
Function passed all tests!
Workflow results written to workflow_results_20240902_103026.md
Final function:
def sum_of_squares(n):
    """
    Calculate the sum of squares of numbers from 1 to n.

    Parameters:
    n (int): The upper limit number.

    Returns:
    int: The sum of squares of numbers from 1 to n.
    """
    if n < 1:
        raise ValueError("Input number must be a positive integer")

    total = 0
    for i in range(1, n+1):
        total += i ** 2
    return total

# Test case
def test_sum_of_squares():
    # Test when n = 5, the sum should be 1^2 + 2^2 + 3^2 + 4^2 + 5^2 = 55
    assert sum_of_squares(5) == 55, "Test case failed!"

    # Test when n = 1, the sum should be 1^2 = 1
    assert sum_of_squares(1) == 1, "Test case failed!"

    # Test when n = 3, the sum should be 1^2 + 2^2 + 3^2 = 14
    assert sum_of_squares(3) == 14, "Test case failed!"

    print("All test cases 