In [6]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from Models.models import LLMModel
from Agents.schemas import *
from Agents.tools import *
from langgraph.prebuilt.tool_executor import ToolExecutor
from dotenv import load_dotenv
from langchain.output_parsers import PydanticOutputParser
from langchain.output_parsers.openai_tools import PydanticToolsParser
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.messages import (
    AIMessage,
    FunctionMessage,
    HumanMessage,
)
from langchain.tools.render import format_tool_to_openai_function
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.pydantic_v1 import BaseModel, Field, validator


class TestResult(Enum):
    PASSED = "Passed"
    FAILED = "Failed"

class TestResults(BaseModel):
    """Represents the outcome of a test."""
    result: str = Field(
        ..., description="The result of the test, either PASSED or FAILED."
    )
    comment: str = Field(
        ..., description="Any comments or notes about the test result."
    )

class CodeExec(BaseModel):
    __name__ = "code_exec"
    """Run Code"""

    code: str = Field(description="The code to be executed")
    language: str =  Field(description="The language of the code.")



class WriteFile(BaseModel):
    __name__ = "write_file"
    """Write File"""

    name: str = Field(description="The name of the file.")
    content: str = Field(description="The content of the file.")

class CreateDir(BaseModel):
    __name__ = "create_directory"
    """Create Directory"""

    name: str = Field(description="The name of the directory.")





    
all_schema= [CodeExec, TestResults]


In [7]:
code_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "As a seasoned Python and JavaScript developer, you are known as code_runner. Your role involves the following steps:"
            "1. You will be given a piece of code along with a test scenario, which could be a happy path or an edge case."
            "2. Your task is to write the test code and combine it with the original code to form a complete script."
            "3. If the test requires dummy files such as JSON or CSV use the write_file tool to create them and you can use create_directory tool to create a directory"
            "4. Execute the complete code and test using the code_exec tool. This approach eliminates the need for any unit test framework."
            "5. If you encounter any issues after invoking any tool, feel free to make necessary corrections and retry."
            "NOTE: Do whatever you need to do and only give your final comment when done"
            "Remember, always write a complete code that can be executed as a standalone script. Do not modify the original code being tested."
            "NOTE: make use of  print within your code to output information for better observation and debugging."
            "NOTE: Avoid using 'if __name__ == '__main__' as this will prevent the code from running."
            "Finally, report if the test passed and any other comment you have using result tool and nothing else"
            
            

            
        ),
        MessagesPlaceholder(variable_name="messages")
    ])

parser = PydanticOutputParser(pydantic_object=TestResults)
standard_test = parser.get_format_instructions()
code_prompt = code_prompt.partial(schema = standard_test )
llm3 = ChatOpenAI(model="gpt-4o", 
                  # model_kwargs = {"response_format":{"type": "json_object"}}
                  temperature=0.1
                 )
model3= code_prompt | llm3.bind_tools(all_schema)|PydanticToolsParser(tools=all_schema)
model3= model3.with_retry(stop_after_attempt=3)



def code_runner(state):
    out = model3.invoke(state["messages"])
    print(out)
    tool_name = out[-1].__class__.__name__
    if tool_name != "TestResults":
        ak = {"function_call":{"arguments":json.dumps(out[-1].dict()), "name": tool_name_schema_map[tool_name]}}
        message = [AIMessage(content = "", additional_kwargs=ak)]
    else:
        content = json.dumps(out[-1].dict())
        message = [AIMessage(content=content)]

    return {
        "messages":message,
        "sender": "code_runner",
    }

In [11]:
from langgraph.prebuilt.tool_executor import ToolExecutor, ToolInvocation
from Agents.agents import tool_executor


def tool_node(state):
    """This runs tools in the graph

    It takes in an agent action and calls that tool and returns the result."""
    
    messages = state["messages"]
    last_message = messages[-1]
    tool_input = last_message.additional_kwargs["function_call"]["arguments"]
    tool_input = json.loads(tool_input)
    #print(tool_input), type(tool_input)
    # We can pass single-arg inputs by value
    if len(tool_input) == 1 in tool_input:
        tool_input = next(iter(tool_input.values()))
    tool_name = last_message.additional_kwargs["function_call"]["name"]
    
    action = ToolInvocation(
        tool=tool_name,
        tool_input=tool_input,
    )
    # We call the tool_executor and get back a response
    response = tool_executor.invoke(action)
    # We use the response to create a FunctionMessage
    function_message = FunctionMessage(
        content=f"{tool_name} response: {str(response)}", name=action.tool
    )
    # We return a list, because this will get added to the existing list
    return {"messages": [function_message]}


In [12]:
messages = [HumanMessage("test this code: print('Hello, World!')")]
result = code_runner({"messages":messages})
messages.append(result["messages"][-1])
messages

[CodeExec(code='print(\'Hello, World!\')\n\n# Test code\nexpected_output = \'Hello, World!\'\nprint(f"Expected Output: {expected_output}")', language='python')]


[HumanMessage(content="test this code: print('Hello, World!')"),
 AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"code": "print(\'Hello, World!\')\\n\\n# Test code\\nexpected_output = \'Hello, World!\'\\nprint(f\\"Expected Output: {expected_output}\\")", "language": "python"}', 'name': 'code_exec'}})]

In [13]:
tool_result = tool_node({"messages":messages})
tool_result["messages"][-1]
messages.append(tool_result["messages"][-1])
messages

{'code': 'print(\'Hello, World!\')\n\n# Test code\nexpected_output = \'Hello, World!\'\nprint(f"Expected Output: {expected_output}")', 'language': 'python'}


[HumanMessage(content="test this code: print('Hello, World!')"),
 AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"code": "print(\'Hello, World!\')\\n\\n# Test code\\nexpected_output = \'Hello, World!\'\\nprint(f\\"Expected Output: {expected_output}\\")", "language": "python"}', 'name': 'code_exec'}}),
 FunctionMessage(content='code_exec response: Successfully executed:\n```python\nprint(\'Hello, World!\')\n\n# Test code\nexpected_output = \'Hello, World!\'\nprint(f"Expected Output: {expected_output}")\n```\nStdout: Hello, World!\nExpected Output: Hello, World!\n', name='code_exec')]

In [14]:
result = code_runner({"messages":messages})

[TestResults(result='PASSED', comment="The code executed successfully and printed 'Hello, World!' as expected.")]


In [15]:
code = {
    "code": "from tying import List\n\ndef squared_sum(arr: List[int]) -> int:\n    \"\"\"\n    Plan : The problem is to find the subarray with maximum sum and return its square\n    \"\"\"\n    \"Iteration 1\"\n    # value = 0\n    # length = len(arr)\n    # for i in range(length):\n    #     if arr[i] < 1:\n    #         continue\n    #     for j in range(i+1,length+1):\n    #         value = max(value, sum(arr[i:j]))\n        \n    # return value*value\n\n    \"This approach passes 70% of test cases. I think the error lies in the complexity of the algorithm\"\n\n    \"Iteration 2\"\n    # value,temp =  0,0\n    # length = len(arr)\n    # for i in range(length):\n    #     k = temp + arr[i]\n    #     if k < 1:\n    #         temp = 0\n    #     else:\n    #         temp = k\n    #     value = max(temp, value)\n\n    \"This approach passes 83% of test cases. I will modify the logic more\"\n\n    \"Iteration 3\"\n    # initiated two varibles value and temp with 0\n    value, temp = 0, 0\n    #found the length of the array\n    length = len(arr)\n    #traverse the array\n    for i in range(length):\n        # found the current sum\n        temp += arr[i]\n        #check if current sum > 0. If yes add the value. Else reset the temp to 0\n        temp = max(temp , 0)\n        #When ever the current sum is greater than value then put there\n        value = max(temp, value)\n    #return the square\n\n    \"this approach works fine but passes only 83%.Time complexity O(N) \"\n    return value * value\n\n\n# R E A D M E\n# DO NOT CHANGE the code below, we use it to grade your submission. If changed your submission will be failed automatically.\nif __name__ == '__main__':  \n    line = input()\n    arr = [int(i) for i in line.strip().split()]\n    print(squared_sum(arr))",
    "input": "1 -1 1 -1 1"
}

print(code["code"])

from tying import List

def squared_sum(arr: List[int]) -> int:
    """
    Plan : The problem is to find the subarray with maximum sum and return its square
    """
    "Iteration 1"
    # value = 0
    # length = len(arr)
    # for i in range(length):
    #     if arr[i] < 1:
    #         continue
    #     for j in range(i+1,length+1):
    #         value = max(value, sum(arr[i:j]))
        
    # return value*value

    "This approach passes 70% of test cases. I think the error lies in the complexity of the algorithm"

    "Iteration 2"
    # value,temp =  0,0
    # length = len(arr)
    # for i in range(length):
    #     k = temp + arr[i]
    #     if k < 1:
    #         temp = 0
    #     else:
    #         temp = k
    #     value = max(temp, value)

    "This approach passes 83% of test cases. I will modify the logic more"

    "Iteration 3"
    # initiated two varibles value and temp with 0
    value, temp = 0, 0
    #found the length of the array
    length = len(arr)
   