## Import Library

In [1]:
# from git import Repo
from langchain.text_splitter import Language
from langchain.document_loaders.generic import GenericLoader
from langchain.document_loaders.parsers import LanguageParser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
# from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationSummaryMemory

In [2]:
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import Annotated, Any, Dict, Optional, Sequence, TypedDict, List, Tuple
from langchain.chains.openai_functions import create_structured_output_runnable
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

In [3]:
path = 'test_file'


In [4]:
repo_path = 'test_repo'

## Directory Loader

In [5]:
from langchain_community.document_loaders import PythonLoader
from langchain_community.document_loaders import DirectoryLoader

loader = DirectoryLoader('test_repo', glob="**/*.py", loader_cls=PythonLoader)

In [6]:
docs = loader.load()
docs

[Document(page_content='from pydantic_settings import BaseSettings\n\n\nclass GunicornConfig(BaseSettings):\n    NUM_WORKERS: int = 1\n    NUM_THREADS: int = 1\n    TIMEOUT: int = 30\n    LOG_LEVEL: str = "INFO"\n    LOG_DEBUG: str = "DEBUG"\n    LOG_ERROR: str = "ERROR"\n\n    class Config:\n        env_prefix = "RG_LLM_API_GUNICORN_"\n\n\nGUNICORN_CONFIG = GunicornConfig()\n', metadata={'source': 'test_repo\\rg-llm\\config.py'}),
 Document(page_content='import logging\n\n\nclass RoguLogFilter(logging.Filter):\n    """\n    Custom filter to exclude health check\n    """\n\n    def filter(self, record):\n        try:\n            uri = str(record.args[2])\n        except Exception as e:\n            uri = None\n        return uri != "/_health"\n\n\nclass RoguFormatter(logging.Formatter):\n    """\n    RG Logger Formatter\n    """\n\n    datefmt = "%Y-%m-%dT%H:%M:%SZ"\n    fmt = \'RG_LOGGER: time="{asctime}" name={name} lineno={lineno} level={levelname} msg="{msg}" {extra}\'\n\n    def 

In [7]:
len(docs)

38

## Generic Loader

In [8]:
loader1 = GenericLoader.from_filesystem(path='test_repo',
                                        glob = "**/*",
                                        suffixes=[".py"],
                                        parser = LanguageParser(language=Language.PYTHON, parser_threshold=500)
)

In [9]:
docs1 = loader1.load()
docs1

[Document(page_content='from pydantic_settings import BaseSettings\n\n\nclass GunicornConfig(BaseSettings):\n    NUM_WORKERS: int = 1\n    NUM_THREADS: int = 1\n    TIMEOUT: int = 30\n    LOG_LEVEL: str = "INFO"\n    LOG_DEBUG: str = "DEBUG"\n    LOG_ERROR: str = "ERROR"\n\n    class Config:\n        env_prefix = "RG_LLM_API_GUNICORN_"\n\n\nGUNICORN_CONFIG = GunicornConfig()\n', metadata={'source': 'test_repo\\rg-llm\\config.py', 'language': <Language.PYTHON: 'python'>}),
 Document(page_content='import logging\n\n\nclass RoguLogFilter(logging.Filter):\n    """\n    Custom filter to exclude health check\n    """\n\n    def filter(self, record):\n        try:\n            uri = str(record.args[2])\n        except Exception as e:\n            uri = None\n        return uri != "/_health"\n\n\nclass RoguFormatter(logging.Formatter):\n    """\n    RG Logger Formatter\n    """\n\n    datefmt = "%Y-%m-%dT%H:%M:%SZ"\n    fmt = \'RG_LOGGER: time="{asctime}" name={name} lineno={lineno} level={lev

In [10]:
len(docs1)

38

## Document Splitter

In [11]:
documents_splitter = RecursiveCharacterTextSplitter.from_language(language = Language.PYTHON,
                                                                chunk_size = 1000,
                                                                chunk_overlap = 0)

In [12]:
texts = documents_splitter.split_documents(docs1)
print(texts)



In [14]:
for doc in texts:
    page_content = doc.page_content
    metadata = doc.metadata["source"]
    print("Source : " + metadata + "\n\n" + page_content  )

Source : test_repo\rg-llm\config.py

from pydantic_settings import BaseSettings


class GunicornConfig(BaseSettings):
    NUM_WORKERS: int = 1
    NUM_THREADS: int = 1
    TIMEOUT: int = 30
    LOG_LEVEL: str = "INFO"
    LOG_DEBUG: str = "DEBUG"
    LOG_ERROR: str = "ERROR"

    class Config:
        env_prefix = "RG_LLM_API_GUNICORN_"


GUNICORN_CONFIG = GunicornConfig()
Source : test_repo\rg-llm\custom_logging.py

import logging


class RoguLogFilter(logging.Filter):
    """
    Custom filter to exclude health check
    """

    def filter(self, record):
        try:
            uri = str(record.args[2])
        except Exception as e:
            uri = None
        return uri != "/_health"
Source : test_repo\rg-llm\custom_logging.py

class RoguFormatter(logging.Formatter):
    """
    RG Logger Formatter
    """

    datefmt = "%Y-%m-%dT%H:%M:%SZ"
    fmt = 'RG_LOGGER: time="{asctime}" name={name} lineno={lineno} level={levelname} msg="{msg}" {extra}'

    def format(self, record):


In [15]:
output = ""

for doc in texts:
    page_content = doc.page_content
    metadata = doc.metadata["source"]
    output += "Source : " + metadata + "\n\n" + page_content + "\n\n"

print(output)

Source : test_repo\rg-llm\config.py

from pydantic_settings import BaseSettings


class GunicornConfig(BaseSettings):
    NUM_WORKERS: int = 1
    NUM_THREADS: int = 1
    TIMEOUT: int = 30
    LOG_LEVEL: str = "INFO"
    LOG_DEBUG: str = "DEBUG"
    LOG_ERROR: str = "ERROR"

    class Config:
        env_prefix = "RG_LLM_API_GUNICORN_"


GUNICORN_CONFIG = GunicornConfig()

Source : test_repo\rg-llm\custom_logging.py

import logging


class RoguLogFilter(logging.Filter):
    """
    Custom filter to exclude health check
    """

    def filter(self, record):
        try:
            uri = str(record.args[2])
        except Exception as e:
            uri = None
        return uri != "/_health"

Source : test_repo\rg-llm\custom_logging.py

class RoguFormatter(logging.Formatter):
    """
    RG Logger Formatter
    """

    datefmt = "%Y-%m-%dT%H:%M:%SZ"
    fmt = 'RG_LOGGER: time="{asctime}" name={name} lineno={lineno} level={levelname} msg="{msg}" {extra}'

    def format(self, record)

## Add to Chroma DB

In [16]:
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

In [17]:
# Add to vectorDB
embeddings=OpenAIEmbeddings(disallowed_special=())
# vectordb = Chroma.from_documents(texts, embedding=embeddings, persist_directory='./data')
vectorstore = Chroma.from_documents(
    documents=texts,
    collection_name="rag-chroma",
    embedding=embeddings,
)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k":5})

In [31]:
question = "detect bug or optimize for the code"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content

In [32]:
docs

[Document(page_content='import threading\nimport sys\nimport traceback\nfrom logging.config import dictConfig\nfrom config import GUNICORN_CONFIG\n\n\nworkers = GUNICORN_CONFIG.NUM_WORKERS\nthreads = GUNICORN_CONFIG.NUM_THREADS\ntimeout = GUNICORN_CONFIG.TIMEOUT\nworker_class = "uvicorn.workers.UvicornWorker"\nbind = \'0.0.0.0:80\'', metadata={'language': 'python', 'source': 'test_repo\\rg-llm\\gunicorn.config.py'}),
 Document(page_content='def test_chat_completion_openai_with_tools(client):\n    data = {\n        "trace_id": "abc",\n        "messages": [\n            {"role": "user", "content": "What\'s the weather like in San franciso, tokyo and paris"}\n        ],\n        "tools": [{\n            "type": "function",\n            "function": {\n                "name": "get_current_weather",\n                "description": "Get the current weather in a given location",\n                "parameters": {\n                    "type": "object",\n                    "properties": {\n      

In [36]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)
format_docs(docs)

'}\n\nreturn variant_org_key\n\ncontents.extend(images)\n\nself.__models = self.__load_model()\n\ncode.append(\'File: "%s", line %d, in %s\' % (filename, lineno, name))\n            if line:\n                code.append("  %s" % (line.strip()))\n    worker.log.debug("\\n".join(code))'

In [20]:
for doc in docs:
    page_content = doc.page_content
    metadata = doc.metadata["source"]
    print("Source : " + metadata + "\n\n" + page_content +"\n\n")

Source : test_repo\rg-llm\llm\api\__init__.py

from llm.api.factory import init_app


app = init_app()


Source : test_repo\rg-llm\llm\core\platform\openai\entity.py

from llm.config import OPENAI_CONFIG


ORG_KEYS = {
    "ruangguru": OPENAI_CONFIG.ORG_KEY_1,
    "schoters": OPENAI_CONFIG.ORG_KEY_2,
    "skillacademy": OPENAI_CONFIG.ORG_KEY_3,
    "brainacademy": OPENAI_CONFIG.ORG_KEY_4
}

MODEL_VARIANT = {
    "gpt-3.5-turbo": "gpt-3-5",
    "gpt-3.5-turbo-16k": "gpt-3-5",
    "gpt-3.5-turbo-1106": "gpt-3-5",
    "gpt-3.5-turbo-0125": "gpt-3-5",
    "gpt-4": "gpt-4",
    "gpt-4-turbo": "gpt-4-turbo",
    "gpt-4-0125-preview": "gpt-4-turbo",
    "gpt-4-1106-preview": "gpt-4-turbo",
    "gpt-4o": "gpt-4o"

}


Source : test_repo\rg-llm\llm\client\common\async_http.py

_async_http_client = AsyncHttp()
async_http_open = _async_http_client.open
async_http_close = _async_http_client.close
async_get = _async_http_client.get
async_get_params = _async_http_client.get_params
async_post = _asyn

In [33]:
docs

[Document(page_content='}', metadata={'source': 'test_repo\\rg-llm\\llm\\core\\platform\\openai\\entity.py'}),
 Document(page_content='return variant_org_key', metadata={'source': 'test_repo\\rg-llm\\llm\\core\\platform\\openai\\allocation.py'}),
 Document(page_content='contents.extend(images)', metadata={'source': 'test_repo\\rg-llm\\llm\\client\\vertexai\\client.py'}),
 Document(page_content='self.__models = self.__load_model()', metadata={'source': 'test_repo\\rg-llm\\llm\\core\\service\\chat_completion.py'}),
 Document(page_content='code.append(\'File: "%s", line %d, in %s\' % (filename, lineno, name))\n            if line:\n                code.append("  %s" % (line.strip()))\n    worker.log.debug("\\n".join(code))', metadata={'source': 'test_repo\\rg-llm\\gunicorn.config.py'})]

## Single Files

In [16]:
import os
file_path = r"test_file\test_file2.py"

with open(file_path, 'r') as rp:
    code = rp.read()

print(code)

import math
import numpy  
from random import randint, choice  

def calculate_area(radius):
    if radius < 0: 
        return "Invalid radius"
    area = math.pi * (radius ** 2
    return area 

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return "Invalid triangle sides"
    perimeter = side1 + side2 + side3
    return perimetr  

class Circle:
    def __init__(self, radius):
        self.radius = radious  

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius



## Multiple Files

In [34]:
import os

directory = "test_repo"
output_file = "python_files_content.txt"

# Initialize an empty list to store Python files
python_files = []

# Walk through all directories and subdirectories
for root, _, files in os.walk(directory):
    for file in files:
        if file.endswith('.py'):
            file_path = os.path.join(root, file)
            python_files.append(file_path)

# Open the output file in write mode
with open(output_file, 'w') as out_file:
    # Read and write the content of each Python file found
    for file_path in python_files:
        with open(file_path, 'r') as rp:
            content = rp.read()
            out_file.write(f"Content of {file_path}:\n{content}\n\n")

print(f"Content of Python files saved to {output_file}")


Content of Python files saved to python_files_content.txt


In [35]:
import os

directory = "test_repo"
output_file = "python_files_content.md"

# Initialize an empty list to store Python files
python_files = []

# Walk through all directories and subdirectories
for root, _, files in os.walk(directory):
    for file in files:
        if file.endswith('.py'):
            file_path = os.path.join(root, file)
            python_files.append(file_path)

# Open the output file in write mode
with open(output_file, 'w') as out_file:
    # Write Markdown headers and content for each Python file found
    for file_path in python_files:
        with open(file_path, 'r') as rp:
            content = rp.read()
            out_file.write(f"## Content of `{file_path}`:\n\n")
            out_file.write("```python\n")
            out_file.write(content)
            out_file.write("\n```\n\n")

print(f"Content of Python files saved to {output_file}")


Content of Python files saved to python_files_content.md


In [18]:
from dotenv import load_dotenv
import os

In [19]:
load_dotenv()

# Access variables from the environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

In [20]:
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

In [21]:
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)

## Debugger using RAG

In [27]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI


# Data model
class Localizer(BaseModel):

    localizer: str = Field(
        description="Explain the predicted the bug"
    )


# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm_localizer = llm.with_structured_output(Localizer)

# Prompt
system = """**Role**: As a localizer, your task is to carefully identify any bug or issues in the provided code. If no issues are found, simply state that the code is bug-free. If there are issues, explain them clearly.
**Requirement**
{requirement}
**predicted bug and its sources** :
{code}

Predicted Bug and Sources:
Provide a detailed explanation of the bug, including why the code is incorrect or where the issue lies.
"""

localizer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Explain the bug from predicted code: \n\n {code} \n\n The requirement: {requirement}"),
    ]
)

localizer_agent = localizer_prompt | structured_llm_localizer
requirement = "detect bug or optimize for the code"
docs = retriever.invoke(requirement)

predicted_code = ""

for doc in docs:
    page_content = doc.page_content
    metadata = doc.metadata["source"]
    predicted_code += "Source : " + metadata + "\n\n" + page_content + "\n\n"
    
# doc_txt = docs[1].requirement
localizer = localizer_agent.invoke({"code": predicted_code, "requirement": requirement})
# print(localizer_agent.invoke({"code": predicted_code, "requirement": requirement}))

In [30]:
print(localizer)

localizer='The bug in the provided code is in the gunicorn.config.py file. The issue lies in the import statement for GUNICORN_CONFIG. The code imports GUNICORN_CONFIG from a file named config, but the actual import should be from GUNICORN_CONFIG. This discrepancy in the import statement can lead to a ModuleNotFoundError as the correct path is not specified.'


In [29]:
from pprint import pprint 
pprint(localizer.localizer)

('The bug in the provided code is in the gunicorn.config.py file. The issue '
 'lies in the import statement for GUNICORN_CONFIG. The code imports '
 'GUNICORN_CONFIG from a file named config, but the actual import should be '
 'from GUNICORN_CONFIG. This discrepancy in the import statement can lead to a '
 'ModuleNotFoundError as the correct path is not specified.')


## Debugger using RAG with create_structured_output_runnable

In [22]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI


# Data model
class Localizer(BaseModel):

    localizer: str = Field(
        description="Explain the predicted the bug"
    )


# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)

# Prompt
localization_gen_prompt = ChatPromptTemplate.from_template("""**Role**: As a localizer, your task is to carefully identify any bug or issues in the provided code. If no issues are found, simply state that the code is bug-free. If there are issues, explain them clearly.
**Requirement**
{requirement}
**predicted bug and its sources** :
{code}

Provide a detailed explanation of the bug, including why the code is incorrect or where the issue lies.""")


localizer_agent = create_structured_output_runnable(
    Localizer, llm, localization_gen_prompt
)

requirement = "detect bug or optimize for the code"
docs = retriever.invoke(requirement)

predicted_code = ""

for doc in docs:
    page_content = doc.page_content
    metadata = doc.metadata["source"]
    predicted_code += "Source : " + metadata + "\n\n" + page_content + "\n\n"

localizer = localizer_agent.invoke({'requirement':requirement,'code':predicted_code})
localizer

  warn_deprecated(


Localizer(localizer='The code provided is bug-free.')

In [23]:
from pprint import pprint
pprint(localizer.localizer)

'The code provided is bug-free.'


## Debugger using create_structure_output_runnable

In [22]:
class Localizer(BaseModel):
    """Plan to follow in future"""

    Description: str = Field(
        description="The explanation of the bug"
    )


localization_gen_prompt = ChatPromptTemplate.from_template(
    '''**Role**: As a localizer, your task is to carefully identify any bug or issues in the provided code. If no issues are found, simply state that the code is bug-free. If there are issues, explain them clearly.
**Requirement**
{requirement}
**Code**
{code}
'''
)
localizer_agent = create_structured_output_runnable(
    Localizer, llm, localization_gen_prompt
)


  warn_deprecated(


In [23]:
localizer = localizer_agent.invoke({'requirement':'Be careful to analyze the code','code':code})
localizer

Localizer(Description='The code provided has the following issues:\n1. In the `calculate_area` function, there is a missing closing parenthesis in the calculation of the area.\n2. In the `calculate_perimeter` function, there is a typo in the return statement where `perimeter` is misspelled as `perimetr`.\n3. In the `Circle` class, there are typos in the `__init__` method where `radius` is misspelled as `radious` and in the `area` method where `calculate_area` is not correctly called.')

In [24]:
import tiktoken
enc = tiktoken.encoding_for_model("gpt-3.5")  # Change the model name as per your use case
response_tokens = enc.encode(localizer.Description)

# Count the tokens
token_count = len(response_tokens)

In [25]:
token_count

116

In [26]:
from langchain_community.callbacks import get_openai_callback

with get_openai_callback() as cb:
    localizer1 = localizer_agent.invoke({'requirement':'Be careful to analyze the code','code':code})

print(cb)

Tokens Used: 430
	Prompt Tokens: 304
	Completion Tokens: 126
Successful Requests: 1
Total Cost (USD): $0.00034100000000000005


In [27]:
type(cb.total_cost)

float

In [28]:
cb.completion_tokens

126

In [29]:
cb.total_cost

0.00034100000000000005

In [30]:
print(localizer1)

Description='The code provided has the following issues:\n1. In the `calculate_area` function, there is a missing closing parenthesis in the calculation of the area.\n2. In the `calculate_perimeter` function, there is a typo in the return statement where `perimeter` is misspelled as `perimetr`.\n3. In the `Circle` class, there are typos in the `__init__` method where `radius` is misspelled as `radious` and in the `area` method where `calculate_area` is not correctly called.'


In [31]:
localizer.Description

'The code provided has the following issues:\n1. In the `calculate_area` function, there is a missing closing parenthesis in the calculation of the area.\n2. In the `calculate_perimeter` function, there is a typo in the return statement where `perimeter` is misspelled as `perimetr`.\n3. In the `Circle` class, there are typos in the `__init__` method where `radius` is misspelled as `radious` and in the `area` method where `calculate_area` is not correctly called.'

In [32]:
import pprint
pprint.pprint(localizer.Description)

('The code provided has the following issues:\n'
 '1. In the `calculate_area` function, there is a missing closing parenthesis '
 'in the calculation of the area.\n'
 '2. In the `calculate_perimeter` function, there is a typo in the return '
 'statement where `perimeter` is misspelled as `perimetr`.\n'
 '3. In the `Circle` class, there are typos in the `__init__` method where '
 '`radius` is misspelled as `radious` and in the `area` method where '
 '`calculate_area` is not correctly called.')


## Debugger using with_structured_output

In [33]:
class Localizer(BaseModel):
    """Plan to follow in future"""

    Description: str = Field(
        description="The explanation of the bug"
    )

structured_llm_grader = llm.with_structured_output(Localizer)

system = '''**Role**: As a localizer, your task is to identify any issues in the provided code. If no issues are found, simply state that the code is bug-free. If there are issues, explain them clearly.

**requirement**
{requirement}
**Code**
{code}
'''

test_gen_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Explain the bug"), ## Tergantung prompt, jika prompt ("As debugger, identify the code" --> Gagal)
    ]
)
retrieval_debug_agent = test_gen_prompt | structured_llm_grader


In [34]:
retrieval_debug = retrieval_debug_agent.invoke({'requirement':'Be careful to analyze the code','code':code})
retrieval_debug

Localizer(Description="The code has the following issues:\n1. In the calculate_area function, there is a syntax error due to a missing closing parenthesis in the calculation of the area.\n2. In the calculate_perimeter function, there is a typo in the return statement where 'perimetr' is misspelled instead of 'perimeter'.\n3. In the Circle class, there are typos in the __init__ method where 'radius' is misspelled as 'radious' and in the area method where the calculate_area function is called with the incorrect parameter name.\n4. The import statement for numpy is present but not used in the code, which is unnecessary.")

In [35]:
import pprint
pprint.pprint(retrieval_debug.Description)

('The code has the following issues:\n'
 '1. In the calculate_area function, there is a syntax error due to a missing '
 'closing parenthesis in the calculation of the area.\n'
 '2. In the calculate_perimeter function, there is a typo in the return '
 "statement where 'perimetr' is misspelled instead of 'perimeter'.\n"
 '3. In the Circle class, there are typos in the __init__ method where '
 "'radius' is misspelled as 'radious' and in the area method where the "
 'calculate_area function is called with the incorrect parameter name.\n'
 '4. The import statement for numpy is present but not used in the code, which '
 'is unnecessary.')


## Python Tester and Executor

In [36]:
class ExecutableCode(BaseModel):
    """Plan to follow in future"""

    code: str = Field(
        description="Detailed optmized error-free Python code with test cases assertion"
    )

python_execution_gen = ChatPromptTemplate.from_template(
    """You make a testing layer for the *Python Code* that can help to execute the code. You need to pass only Input as argument and validate if the Given Output is matched.
*Instruction*:
- Make sure to return the error if the assertion fails
- Generate the code that can be execute
Python Code to excecute:
*Python Code*:{code}

Input and Output For Code:
*Input*: generate input to test
*Expected Output*:generate output expected from input

Explain the hyphotesis of error description of each test case.
"""



)
execution_agent = create_structured_output_runnable(
    ExecutableCode, llm, python_execution_gen
)

In [37]:
code_execute = execution_agent.invoke({"code":code})

In [38]:
print(code_execute.code)

import math
import numpy
from random import randint, choice

def calculate_area(radius):
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
    perimeter = side1 + side2 + side3
    return perimeter

class Circle:
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius

# Test Cases

def test_calculate_area():
    # Test case 1: Negative radius
    assert calculate_area(-5) == 'Invalid radius', 'Test case 1 failed'
    
    # Test case 2: Valid radius
    assert calculate_area(5) == math.pi * 25, 'Test case 2 failed'


def test_calculate_perimeter():
    # Test case 1: Invalid triangle sides
    assert calculate_perimeter(1, 2, 5) == '

In [39]:
code_to_execute = code + "\n" + code_execute.code

In [40]:
print(code + "\n" +code_execute.code)

import math
import numpy  
from random import randint, choice  

def calculate_area(radius):
    if radius < 0: 
        return "Invalid radius"
    area = math.pi * (radius ** 2
    return area 

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return "Invalid triangle sides"
    perimeter = side1 + side2 + side3
    return perimetr  

class Circle:
    def __init__(self, radius):
        self.radius = radious  

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius

import math
import numpy
from random import randint, choice

def calculate_area(radius):
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
  

## Test Eksekusi Kode

In [41]:
error = None
try:
    exec(code_to_execute)
except Exception as e:
    error = f'Exception : {e}'
error

"Exception : '(' was never closed (<string>, line 8)"

## Debugger

In [42]:
class DebugCode(BaseModel):

    code: str = Field(
        description="Optimized and Refined Python code to resolve the error"
    )
    

python_refine_gen = ChatPromptTemplate.from_template(
    """You are expert in Python Debugging. You have to analysis Given Code and Bug and generate code that handles the Bug
    *Instructions*:
    - Make sure to generate error free code
    - Generated code is able to handle the bug
    
    *Code*: {code}
    *Bug analyzed*:  {bug}
    *Error detected*: {error}
    """
)
debug_code_agent = create_structured_output_runnable(
    DebugCode, llm, python_refine_gen
)

In [43]:
dummy_json = {
    "code" : code,
    "bug" : localizer.Description,
    "error": error   
}

In [44]:
debug_code = debug_code_agent.invoke(dummy_json)
debug_code

DebugCode(code="import math\nimport numpy\nfrom random import randint, choice\n\ndef calculate_area(radius):\n    if radius < 0:\n        return 'Invalid radius'\n    area = math.pi * (radius ** 2)\n    return area\n\ndef calculate_perimeter(side1, side2, side3):\n    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:\n        return 'Invalid triangle sides'\n    perimeter = side1 + side2 + side3\n    return perimeter\n\nclass Circle:\n    def __init__(self, radius):\n        self.radius = radius\n\n    def area(self):\n        return calculate_area(self.radius)\n\n    def perimeter(self):\n        return 2 * math.pi * self.radius")

In [45]:
print(debug_code.code)

import math
import numpy
from random import randint, choice

def calculate_area(radius):
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
    perimeter = side1 + side2 + side3
    return perimeter

class Circle:
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius


In [46]:
print(debug_code.code)

import math
import numpy
from random import randint, choice

def calculate_area(radius):
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
    perimeter = side1 + side2 + side3
    return perimeter

class Circle:
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius


In [47]:
code_to_execute = debug_code.code + "\n" + code_execute.code

In [48]:
error = None
try:
    print("All test case passed")
    exec(code_to_execute)
except Exception as e:
    error = f'Exception : {e}'
error

All test case passed
All test cases passed!


In [49]:
code = debug_code.code

## Developer

In [73]:
class RefactorCode(BaseModel):

    code: str = Field(
        description="Refactored Python code"
    )

# python_refactored_code = ChatPromptTemplate.from_template("""You are an experienced developer tasked with programming for coding interview problems. Your task is to write correct, well-functioned {lang} code based on given specifications, also adhering to the required input/output format"""
# )


python_refactored_code = ChatPromptTemplate.from_template("""Your task as an expert developer,  you are given the code that needs to be refactored and optimized for better performance, readability, and maintainability. Below are the details and the code snippet. Please refactor the code, provide comments where necessary/
Details:

- The code should follow {lang} best practices.
- Aim for improved performance without sacrificing readability.
- Add comments to explain complex sections of the code.
- Optimize any redundant or inefficient parts of the code.
- Follow naming conventions for variables and functions.

Code Snippet:
{code}

Please provide the refactored and optimized version of the code along with any comment of explanations or suggestions for improvements."""
)


refactor_code_agent = create_structured_output_runnable(
    RefactorCode, llm, python_refactored_code
)

In [74]:
refactor_code = refactor_code_agent.invoke({"lang":"python", "code":code})

In [75]:
print(refactor_code.code)

import math


def calculate_area(radius):
    '''
    Calculate the area of a circle given the radius.
    Args:
        radius (float): The radius of the circle.
    Returns:
        float: The area of the circle.
    '''
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area


def calculate_perimeter(side1, side2, side3):
    '''
    Calculate the perimeter of a triangle given its sides.
    Args:
        side1 (float): Length of side 1.
        side2 (float): Length of side 2.
        side3 (float): Length of side 3.
    Returns:
        float: The perimeter of the triangle.
    '''
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
    perimeter = side1 + side2 + side3
    return perimeter


class Circle:
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        ret

In [None]:
code_to_execute = refactor_code.code + "\n" + code_execute.code

In [None]:
error = None
try:
    exec(code_to_execute)
except Exception as e:
    error = f'Exception : {e}'
error

## Graph Design

In [76]:
class AgentCoder(TypedDict):
    requirement: str
    code: str 
    testing_code: str
    refactored_code:str
    bug : Optional[str]
    errors: Optional[str]
    final_result: Optional[str]

In [77]:
def upload_code(state):
    print('\n'+'----------'*20)
    print(f'\n------Entering in Upload Code------')
    code = state['code']
    print(f"Code : \n{code}")
    return{'code':code }

def localizer(state):
    print('\n'+'----------'*20)
    print(f'\n------Entering in Localizer------')
    requirement = state['requirement']
    code = state['code']
    localizer = localizer_agent.invoke({'code':code,'requirement':requirement})
    print(f"Bug analyzed : {localizer.Description}")
    return {'bug':localizer.Description}

def executer(state):
    print('\n'+'----------'*20)
    print(f'\n------Entering in Executer------')
    code = state['code']
    code_execute = execution_agent.invoke({"code":code})
    code_to_execute = code + "\n" + code_execute.code
    print(f"Testing code : \n{code_execute.code}")
    bug=state['bug']
    error = None
    try:
        exec(code_to_execute)
        bug = None
        print("Code Execution Successful")
    except Exception as e:
        print('Found Error While Running')
        error = f"Execution Error : {e}"
        print(error)
    return {'testing_code':code_execute.code,'errors':error, 'bug': bug}

def debugger(state):
    print('\n'+'----------'*20)
    print(f'\n------Entering in Debugger------')
    errors = state['errors']
    code = state['code']
    execution_code = state['testing_code']
    bug = state['bug']
    debug_code = debug_code_agent.invoke({'code':code,'bug':bug,'error':errors, 'execution_code':execution_code})
    print("The code after debug : ",debug_code.code )
    return {'code':debug_code.code,'errors':None}

def decide_to_developer(state):
    print('\n'+'----------'*20)
    print(f'\n------Entering in Decide to Developer------')
    if state['errors'] and state['bug']:
        print(f'''There is error and bug. 
                The error : {state["errors"]}
                The bug : {state["bug"]}'''
            )
        return 'debugger'
    else:
        print("There is no error, the code will be refactored")
        return 'developer'

def developer(state):
    print('\n'+'----------'*20)
    print(f'\n------Entering in Developer------')
    code = state['code']
    testing_code = state['testing_code']
    refactored_code = refactor_code_agent.invoke({"lang":"Python", "code":code})
    code_to_execute = refactored_code.code + "\n" + testing_code
    print(f"The refactored code : \n{refactored_code.code}")
    error = None
    try:
        exec(code_to_execute)
        print("Developer Code Execution Successful")
    except Exception as e:
        print('Found Error While Running')
        error = f"Execution Error : {e}"
    return {'refactored_code':refactored_code.code,'errors':error}
    
def decide_to_end(state):
    print('\n'+'----------'*20)
    print(f'\n------Entering in Decide to End------')
    if state['errors']:
        print("This code error when refactored, it will be return the debugger code")
        return 'the_end'
    else:
        print("----The code can be refactored----")
        return 'end'
    
def the_end(state):
    print('\n'+'----------'*20)
    print("The code is failed to be refactored")
    code = state["code"]
    return {"code":code,"final_result":"Can not use code refactor"}



In [78]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(AgentCoder)

# Define the nodes
workflow.add_node("upload_code",upload_code)
workflow.add_node("localizer", localizer)  
workflow.add_node("executer", executer) 
workflow.add_node("debugger", debugger) 
workflow.add_node("developer",developer )
workflow.add_node("the_end",the_end)

# Build graph
workflow.set_entry_point("upload_code")
workflow.add_edge("upload_code", "localizer")
workflow.add_edge("localizer","executer")
workflow.add_edge("debugger", "executer")
# workflow.add_edge("developer", "decide_to_end")
workflow.add_edge("the_end",END)
#workflow.add_edge("executer", "decide_to_end")

workflow.add_conditional_edges(
    'executer',
    decide_to_developer,
    {
        "debugger": "debugger",
        "developer": "developer"
        
    }
)

workflow.add_conditional_edges(
    "developer",
    decide_to_end,
    {
        "end" : END,
        'the_end':'the_end'
        
    }
    
)

# Compile
app = workflow.compile()

In [60]:
from IPython.display import Image

image_content = app.get_graph().draw_png()

# Save the image to a file
with open("graph_image.png", "wb") as f:
    f.write(image_content)

In [79]:
requirement = "Be careful to analyze the code"


In [80]:
import os
file_path = r"test_file\test_file2.py"

with open(file_path, 'r') as rp:
    code = rp.read()

print(code)

import math
import numpy  
from random import randint, choice  

def calculate_area(radius):
    if radius < 0: 
        return "Invalid radius"
    area = math.pi * (radius ** 2
    return area 

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return "Invalid triangle sides"
    perimeter = side1 + side2 + side3
    return perimetr  

class Circle:
    def __init__(self, radius):
        self.radius = radious  

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius



In [81]:
from langchain_core.messages import HumanMessage

config = {"recursion_limit": 15}
inputs = {"requirement": requirement,"code" : code}
running_dict = {}
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        running_dict[k] = v
        if k != "__end__":
            print(v)
            print('----------'*20+'\n')


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

------Entering in Upload Code------
Code : 
import math
import numpy  
from random import randint, choice  

def calculate_area(radius):
    if radius < 0: 
        return "Invalid radius"
    area = math.pi * (radius ** 2
    return area 

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return "Invalid triangle sides"
    perimeter = side1 + side2 + side3
    return perimetr  

class Circle:
    def __init__(self, radius):
        self.radius = radious  

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius

{'code': 'import math\nimport numpy  \nfrom random import randint, choice  \n\ndef calculate_area(radius):\n    if r

In [82]:
import json
json_data = json.dumps(v, indent=4)
print(json_data)

{
    "refactored_code": "import math\n\n\ndef calculate_area(radius):\n    '''\n    Calculate the area of a circle given the radius.\n    Args:\n        radius (float): The radius of the circle.\n    Returns:\n        float: The area of the circle.\n    '''\n    if radius < 0:\n        return 'Invalid radius'\n    area = math.pi * (radius ** 2)\n    return area\n\n\ndef calculate_perimeter(side1, side2, side3):\n    '''\n    Calculate the perimeter of a triangle given its sides.\n    Args:\n        side1 (float): Length of side 1.\n        side2 (float): Length of side 2.\n        side3 (float): Length of side 3.\n    Returns:\n        float: The perimeter of the triangle.\n    '''\n    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:\n        return 'Invalid triangle sides'\n    perimeter = side1 + side2 + side3\n    return perimeter\n\n\nclass Circle:\n    def __init__(self, radius):\n        self.radius = radius\n\n    def area(self):\n        '''\n   

In [83]:
print(v["refactored_code"])

import math


def calculate_area(radius):
    '''
    Calculate the area of a circle given the radius.
    Args:
        radius (float): The radius of the circle.
    Returns:
        float: The area of the circle.
    '''
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area


def calculate_perimeter(side1, side2, side3):
    '''
    Calculate the perimeter of a triangle given its sides.
    Args:
        side1 (float): Length of side 1.
        side2 (float): Length of side 2.
        side3 (float): Length of side 3.
    Returns:
        float: The perimeter of the triangle.
    '''
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
    perimeter = side1 + side2 + side3
    return perimeter


class Circle:
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        '''
        Calculate the area of the circle using the calculate_area fu

In [84]:
running_dict

{'upload_code': {'code': 'import math\nimport numpy  \nfrom random import randint, choice  \n\ndef calculate_area(radius):\n    if radius < 0: \n        return "Invalid radius"\n    area = math.pi * (radius ** 2\n    return area \n\ndef calculate_perimeter(side1, side2, side3):\n    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:\n        return "Invalid triangle sides"\n    perimeter = side1 + side2 + side3\n    return perimetr  \n\nclass Circle:\n    def __init__(self, radius):\n        self.radius = radious  \n\n    def area(self):\n        return calculate_area(self.radius)\n\n    def perimeter(self):\n        return 2 * math.pi * self.radius\n'},
 'localizer': {'bug': 'The code provided has the following issues:\n1. In the `calculate_area` function, there is a missing closing parenthesis in the calculation of the area.\n2. In the `calculate_perimeter` function, there is a typo in the return statement where `perimeter` is misspelled as `perimetr`.\n3.

In [87]:
list(running_dict.items())[-1][1]['refactored_code']

"import math\n\n\ndef calculate_area(radius):\n    '''\n    Calculate the area of a circle given the radius.\n    Args:\n        radius (float): The radius of the circle.\n    Returns:\n        float: The area of the circle.\n    '''\n    if radius < 0:\n        return 'Invalid radius'\n    area = math.pi * (radius ** 2)\n    return area\n\n\ndef calculate_perimeter(side1, side2, side3):\n    '''\n    Calculate the perimeter of a triangle given its sides.\n    Args:\n        side1 (float): Length of side 1.\n        side2 (float): Length of side 2.\n        side3 (float): Length of side 3.\n    Returns:\n        float: The perimeter of the triangle.\n    '''\n    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:\n        return 'Invalid triangle sides'\n    perimeter = side1 + side2 + side3\n    return perimeter\n\n\nclass Circle:\n    def __init__(self, radius):\n        self.radius = radius\n\n    def area(self):\n        '''\n        Calculate the area o

In [None]:
print(running_dict["debugger"]["code"])

import math
from random import randint, choice

def calculate_area(radius):
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
    perimeter = side1 + side2 + side3
    return perimeter

class Circle:
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius



In [88]:
import math
import numpy
from random import randint, choice

def calculate_area(radius):
    if radius < 0:
        return 'Invalid radius'
    area = math.pi * (radius ** 2)
    return area

def calculate_perimeter(side1, side2, side3):
    if side1 + side2 <= side3 or side1 + side3 <= side2 or side2 + side3 <= side1:
        return 'Invalid triangle sides'
    perimeter = side1 + side2 + side3
    return perimeter

class Circle:
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        return calculate_area(self.radius)

    def perimeter(self):
        return 2 * math.pi * self.radius

def test_calculate_area():
    assert calculate_area(5) == 78.53981633974483, 'Test Case 1 Failed'
    assert calculate_area(0) == 0.0, 'Test Case 2 Failed'
    assert calculate_area(-5) == 'Invalid radius', 'Test Case 3 Failed'
    print('All test cases for calculate_area passed successfully!')

def test_calculate_perimeter():
    assert calculate_perimeter(3, 4, 5) == 12, 'Test Case 1 Failed'
    assert calculate_perimeter(2, 2, 5) == 'Invalid triangle sides', 'Test Case 2 Failed'
    assert calculate_perimeter(1, 1, 1) == 3, 'Test Case 3 Failed'
    print('All test cases for calculate_perimeter passed successfully!')
    
test_calculate_area()
test_calculate_perimeter()

All test cases for calculate_area passed successfully!
All test cases for calculate_perimeter passed successfully!
