Planning

In [24]:
def adapt_reasoning_modules(reasoning_modules, task_example):
    """
    Step 1: Adjust the selected reasoning module to make it more suitable for the task.
    """
    prompt = f"Without working out the full solution, adapt the following reasoning modules to be specific to our task:\n{reasoning_modules}\n\nOur task:\n{task_example}\n"
    adapted_modules = query_openai(prompt)
    return adapted_modules

def implement_reasoning_structure(adapted_modules, task_description):
    """
    Step 2: Implement the adjusted reasoning module into an operational reasoning structure.
    """
    prompt = f"Without working out the full solution, create an actionable reasoning structure for the task using these adapted reasoning modules:\n{adapted_modules}\n\nTask Description:\n{task_description}"
    reasoning_structure = query_openai(prompt)
    return reasoning_structure

def execute_reasoning_structure(reasoning_structure, task_example):
    """
    Step 3: Execute the reasoning structure to solve a specific task instance.
    """
    prompt = f"Using the following reasoning structure: {reasoning_structure}\n\nSolve this problem {task_example} and develop a Python solution to this problem that obeys the constraints and passes the example test case. You only need to complete the following steps for now: 1. Break down the solution into modules with clear function names and input/output specifications 2. Outline the required code modules, including function headers and signatures. 3. Ensure modularity and consider potential edge cases and failures. 4. Once the structure is ready, write the actual code for each module."
    solution = query_openai(prompt)
    return solution

def generate_code_with_reasoning(task_description, task_example, reasoning_modules):
    """
    Integrate the reasoning module approach into the code generation framework.
    """
    adapted_modules = adapt_reasoning_modules(reasoning_modules, task_example) 
    reasoning_structure = implement_reasoning_structure(adapted_modules, task_description)
    result = execute_reasoning_structure(reasoning_structure, task_example)
    print(result)
    return result


Iterative Improvement

In [23]:
#Fusion Retriever
TOP_K = 16
MAX_DOCS_FOR_CONTEXT = 4

def reciprocal_rank_fusion(results: list[list], k=60):
    """Reciprocal Rank Fusion

    Parameters:
    results (list[list]): retrieved function blocks
    k (int, optional): RRF parameter. Default value is 60.

    Returns:
    ranked_results: re-ranked function block list
    """
    fused_scores = {}
    for funcs in results:
        for rank, func in enumerate(funcs):
            func_str = dumps(func) 
            if func_str not in fused_scores:
                fused_scores[func_str] = 0
            fused_scores[func_str] += 1 / (rank + k)

    reranked_results = [
        (loads(func), score)
        for func, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

    return [x[0] for x in reranked_results[:MAX_DOCS_FOR_CONTEXT]]

def create_retriever_2(search_type: str, kwargs: dict) -> BaseRetriever:
    """Create a function block vector retriever

    Parameters:

    search_type (str): search type
    kwargs (dict): other parameters

    Returns:
    BaseRetriever: function block retriever
    """
    vectorstore = Chroma(
        persist_directory="Chroma_databases",
        embedding_function=embedding_function
    )
    retriever = vectorstore.as_retriever(search_type=search_type, search_kwargs=kwargs)
    return retriever

def query_generator(original_query: dict) -> list[str]:
    """Generate a query from a raw query

    Parameters:
    query (dict): raw query

    Returns:
    list[str]: generated query list
    """
    query = original_query.get("query")
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant that can extract natural language parts from the input and generate multiple search queries based on them."),
        ("user", "Generate multiple search queries based on: {original_query}. When creating queries, refine or add closely related contextual information without significantly changing the meaning of the original query, in English."),
        ("user", "Output (5 queries):")
    ])
    query_generator_chain = (
        prompt | llm | StrOutputParser() | (lambda x: x.split("\n"))
    )
    queries = query_generator_chain.invoke({"original_query": query})
    queries.insert(0, "0. " + query)

    return queries

def rrf_retriever_2(query: str) -> list[Document]:
    """RRF function block retriever

    Parameters:
    query (str): query string

    Returns:
    list[Document]: retrieved function blocks
    """
    retriever = create_retriever_2(search_type="similarity", kwargs={"k": TOP_K})
    chain = (
        {"query": itemgetter("query")}
        | RunnableLambda(query_generator)
        | retriever.map() 
        | reciprocal_rank_fusion
    )
    result = chain.invoke({"query": query})
    print(result)
    return result


In [25]:
def extract_representative_submodules(submodules, n_representatives=5):
    """
    Evaluate the generated code samples using the large model and select representative submodules.

    submodules: List[str] Generated submodules.
    n_representatives: int Number of representative submodules selected.

    Returns the selected representative submodules.
    """
    if not submodules:
        logging.error("No submodules provided for evaluation.")
        raise ValueError("No submodules provided for evaluation.")
    
    formatted_submodules = "\n".join([f"{i+1}. {submodule}" for i, submodule in enumerate(submodules)])
    
    prompt_template = """
    You are given several code submodules extracted from a larger program. Your task is to evaluate these submodules and select the ones that are most representative of the overall structure and logic of the code. Choose up to {n_representatives} submodules that you believe would best represent the core functionality.

    Submodules:
    {formatted_submodules}

    Please provide the index numbers of the most representative submodules.
    """

    rendered_prompt = prompt_template.format(
        n_representatives=n_representatives,
        formatted_submodules=formatted_submodules
    )

    response = llm.invoke(rendered_prompt)
    response_content = response.content if hasattr(response, 'content') else str(response)

    representative_indices = [int(idx) for idx in response_content.split() if idx.isdigit() and 1 <= int(idx) <= len(submodules)]

    representative_submodules = [submodules[idx - 1] for idx in representative_indices if 0 <= idx - 1 < len(submodules)]
    
    return representative_submodules

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def extract_representative_submodules(submodules, n_representatives=5):
    """
    Select representative submodules using similarity matrix.

    submodules: List[str] Generated submodules.
    n_representatives: int Number of selected representative submodules.

    Returns the selected representative submodules.
    """
    if not submodules:
        logging.error("No submodules provided for evaluation.")
        raise ValueError("No submodules provided for evaluation.")
    
    embeddings = embed_samples(submodules)
    similarity_matrix = cosine_similarity(embeddings)
    
    representative_indices = []
    for i in range(n_representatives):
        max_similarity_index = np.argmax(np.sum(similarity_matrix, axis=1))
        representative_indices.append(max_similarity_index)
        similarity_matrix[max_similarity_index, :] = 0
        similarity_matrix[:, max_similarity_index] = 0
    
    representative_submodules = [submodules[idx] for idx in representative_indices]
    
    return representative_submodules


import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def extract_representative_submodules(submodules, n_representatives=5, lambda_=0.5):
    """
    Combine similarity-based selection with LLM evaluation to select representative submodules.

    submodules: List[str] Generated submodules.
    n_representatives: int Number of representative submodules to select.
    lambda_: float Balancing parameter for MMR (0 <= lambda_ <= 1).
    """
    if not submodules:
        logging.error("No submodules provided for evaluation.")
        return []
    
    if len(submodules) < n_representatives:
        logging.warning("Number of submodules is less than n_representatives. Adjusting n_representatives.")
        n_representatives = len(submodules)
    
    # Step 1: Generate embeddings for submodules
    embeddings = embed_samples(submodules)
    
    # Step 2: Compute similarity matrix
    similarity_matrix = cosine_similarity(embeddings)
    
    # Step 3: MMR-based selection
    # Compute average similarity of each submodule to all others
    average_similarity = np.mean(similarity_matrix, axis=1)
    
    selected = []
    candidates = list(range(len(submodules)))
    
    # Select the first representative with highest average similarity
    first_idx = np.argmax(average_similarity)
    selected.append(first_idx)
    candidates.remove(first_idx)
    
    for _ in range(1, n_representatives):
        if not candidates:
            logging.warning("No more candidates available to select.")
            break
        mmr_scores = []
        for idx in candidates:
            # Similarity to the entire set
            relevance = average_similarity[idx]
            # Similarity to the selected set
            similarity_to_selected = np.mean(similarity_matrix[idx, selected])
            # MMR score
            mmr = lambda_ * relevance - (1 - lambda_) * similarity_to_selected
            mmr_scores.append(mmr)
        
        if not mmr_scores:
            logging.warning("No mmr_scores available to select.")
            break
        selected_idx = candidates[np.argmax(mmr_scores)]
        selected.append(selected_idx)
        candidates.remove(selected_idx)
    
    # Step 4: LLM evaluation to select final representatives
    # Format the selected candidates
    formatted_submodules = "\n".join([f"{i+1}. {submodules[idx]}" for i, idx in enumerate(selected)])
    
    prompt_template = """
    You are given several code submodules extracted from a larger program. Your task is to evaluate these submodules and select the ones that are most representative of the overall structure and logic of the code. Choose up to {n_representatives} submodules that you believe would best represent the core functionality.

    Submodules:
    {formatted_submodules}

    Please provide the index numbers of the most representative submodules.
    """
    
    rendered_prompt = prompt_template.format(
        n_representatives=n_representatives,
        formatted_submodules=formatted_submodules
    )
    
    response = llm.invoke(rendered_prompt)
    response_content = response.content if hasattr(response, 'content') else str(response)
    
    # Extract indices selected by LLM
    selected_indices = [int(idx) - 1 for idx in response_content.split() if idx.isdigit() and 1 <= int(idx) <= len(selected)]
    
    # Get the final representative submodules
    representative_submodules = [submodules[selected[idx]] for idx in selected_indices]
    
    return representative_submodules

In [26]:
cot_prompt_template = """
*Task*
In the current task, you will be given [code block], [problem], and [reasoning solution]. For the given problem [problem], develop a well-structured Python solution, ensure modularity, and consider potential edge cases and failures.

*Instructions*
1. Give a reasoning solution based on [problem], with code for you to use
2. Give a code block [code block] that may be related to [problem]
3. You can try to use or adapt [code block] in completing or improving each module in [reasoning solution], or abandon [code block] and complete or improve each module in [reasoning solution] by yourself
4. Complete or improve each module in [reasoning solution], write actual code for each module to complete the solution. Output submodules need to be packaged. Please wrap the code answer of each submodule with <<start_submodule>> and <<end_submodule>>

*Output*
Generated submodules: <<start_submodule>> submodule1<<end_submodule>><<start_submodule>> submodule2<<end_submodule>>, etc.

*Input*
Question: {question}
Inference solution: {generated_code}
Code block: {context}
"""
cot_prompt_template_2 ="""
*Task*
Develop a well-structured Python solution to the given question that adheres to the constraints. Ensure modularity and consider potential edge cases and failures.

*Instructions*
1. Create a clean, organized Python solution to the given question [question]. Break it down into several submodules with clear function names and input/output specifications
2. Given a set of related useful Python functions [representing submodules], try to reuse or adapt them as much as possible to fit your solution in the next step (create new unique functions if needed).
3. Once the structure is ready, write the actual code for each module to complete the solution.
4. Output submodules need to be wrapped. Please wrap your code answer with <<start_submodule>> and <<end_submodule>>.

*Output*
Generated submodules: <<start_submodule>> submodule1 <<end_submodule>><<start_submodule>> submodule2 <<end_submodule>> etc.

*Input*
question: {question}
Represented submodules: {submodules}
"""

In [27]:
embedding_function = HuggingFaceEmbeddings(model_name=embedding_path)
vectorstore = Chroma(persist_directory=None, embedding_function=embedding_function)
retriever = VectorStoreRetriever(vectorstore=vectorstore)


def split_task_into_submodules(task_result):
    """
    Split the task result into multiple submodule tasks.

    task_result: str The final result of the task.

    Return the list of submodule tasks after splitting
    """
    prompt = f"Analyze the provided content \n{task_result}\n\n and identify subtasks under 'code module'. Extract each subtask starting with '#### N. module name' and separate the output with markers '### split ###'. Each subtask should include functional details and description."
    submodules_text = query_openai(prompt)
    print(f"Split Submodules:\n{submodules_text}")

    submodules = re.split(r'### split ###\n\n', submodules_text)
    
    # Exclude the first empty element
    submodules = submodules[1:]
    
    # Print each submodule
    for i, submodule in enumerate(submodules):
        print(f"Submodule {i+1}:\n{submodule.strip()}\n")
    
    return submodules

def refine_submodule(submodule, retriever, num_rounds=2, max_attempts=5):
    """
    Perform multiple rounds of iterative refinement on a single submodule.

    submodule: str Submodule task.
    retriever: VectorStoreRetriever Retriever used for query.
    num_rounds: int Number of rounds cot generated.
    max_attempts: int Maximum number of generation attempts.

    Return the refined submodule.
    """
    refined_submodule = cot(submodule, submodule, retriever, num_rounds, max_attempts)
    return refined_submodule

def merge_submodules(submodules):
    """
    Merge all submodule tasks together and remove duplicate submodules.

    submodules: List[str] Submodule task list.

    Return the final merged code.
    """

    submodules = [str(submodule) for submodule in submodules]
    unique_submodules = list(set(submodules))
    merged_code = "\n\n".join(unique_submodules)
    
    return merged_code

def embed_samples(samples):
    if not samples:
        logging.error("No samples provided for embedding.")
        raise ValueError("No samples provided for embedding.")
    
    embeddings = embedding_function.embed_documents(samples)
    vectorstore.add_texts(samples)
    return np.array(embeddings)

def extract_submodules_from_code(generated_code):
    """
    Extract submodules from generated code.

    generated_code: str Generated code string.

    Returns a list of extracted submodules.
    """
    submodules = []
    start_token = "<<start_submodule>>"
    end_token = "<<end_submodule>>"
    
    if start_token not in generated_code or end_token not in generated_code:
        logging.warning("Start or end token not found in generated code.")
        return submodules
    
    start_idx = 0
    while start_idx != -1:
        start_idx = generated_code.find(start_token, start_idx)
        if (start_idx == -1) or (start_idx + len(start_token) >= len(generated_code)):
            break
        end_idx = generated_code.find(end_token, start_idx)
        if end_idx == -1:
            break
        submodule = generated_code[start_idx + len(start_token):end_idx].strip()
        submodules.append(submodule)
        start_idx = end_idx + len(end_token)
    

    submodules = list(set(submodules))
    
    return submodules






def update_cot_prompt_with_submodules(cot_prompt, representative_submodules):
    """
   Add representative submodules to the original thought chain prompt.

   cot_prompt: str Original thought chain prompt.
   representative_submodules: List[str] Extracted representative submodules.

   Return the updated prompt.
    """
    if not representative_submodules:
        logging.warning("No representative submodules provided for updating prompt.")
        return cot_prompt
    
    template = Template(cot_prompt + "\n\n# Reuse or adapt the following submodules:\n{% for submodule in representative_submodules %}\n{{ submodule }}\n{% endfor %}")
    updated_prompt = template.render(representative_submodules=representative_submodules)
    return updated_prompt

def update_code_by_all_past_results(current_results, past_results):
    """
    Update the current result in combination with the historical results.

    current_results: dict The results of the current round.
    past_results: dict The results of the historical rounds.

    Return the updated results.
    """
    for problem_id, past_result in past_results.items():
        if problem_id in current_results:
            curr_result = current_results[problem_id]
            
            if not any(curr_result) and any(past_result):
                current_results[problem_id] = past_result
        elif any(past_result):
            current_results[problem_id] = past_result
    
    return current_results

def cot(query: str, generated_code: str, retriever: VectorStoreRetriever, num_rounds: int = 2, max_attempts: int = 5) -> str:
    """
    Use the vector database to query and perform multiple rounds of cot generation.

    query: str The query problem.
    generated_code: str The code module for reasoning.
    retriever: VectorStoreRetriever The retriever used for the query.
    num_rounds: int The number of rounds of cot generation.
    max_attempts: int The maximum number of generation attempts.

    Return the final cot solution.
    """
    prompt1 = PromptTemplate(template=cot_prompt_template, input_variables=["context", "question", "generated_code"])
    prompt2 = PromptTemplate(template=cot_prompt_template_2, input_variables=["question", "submodules"])

    cot_chain_1 = (
        {"context": itemgetter("question") | retriever, "question": itemgetter("question"), "generated_code": itemgetter("generated_code")}
        | RunnablePassthrough.assign(context=itemgetter("context"), generated_code=itemgetter("generated_code"))
        | {"response": prompt1 | llm2 | StrOutputParser(), "context": itemgetter("context"), "generated_code": itemgetter("generated_code")}
    )
    
    cot_chain_2 = (
        {"question": itemgetter("question"), "submodules": itemgetter("submodules")}
        | RunnablePassthrough.assign(question=itemgetter("question"), submodules=itemgetter("submodules"))
        | {"response": prompt2 | llm2 | StrOutputParser()}
    )
    
    past_results = {}
    representative_submodules_list = []
    
    for round_idx in tqdm(range(num_rounds), desc="Processing rounds"):
        logging.info(f"Starting round {round_idx}")
        
        responses = []
        attempts = 0
        
        while len(responses) < 3 and attempts < max_attempts:
            if round_idx == 0:
                result = cot_chain_1.invoke({"question": query, "generated_code": generated_code})
            else:
                cot_prompt_with_submodules = update_cot_prompt_with_submodules(result["response"], representative_submodules)
                result = cot_chain_2.invoke({"question": query, "submodules": cot_prompt_with_submodules})
            
            submodules = extract_submodules_from_code(result["response"])
            responses.extend(submodules)
            attempts += 1
        
        if not responses:
            logging.error(f"No submodules extracted in round {round_idx} after {max_attempts} attempts.")
            raise ValueError(f"No submodules extracted in round {round_idx} after {max_attempts} attempts.")
        
        embedded_samples = embed_samples(responses)
        representative_submodules = extract_representative_submodules(responses)
        print(f"Representative submodules of round {round_idx} extraction: {representative_submodules}")
        
        past_results[round_idx] = {"submodules": representative_submodules, "embedded_samples": embedded_samples}
        past_results = update_code_by_all_past_results(past_results, past_results)
        representative_submodules_list.extend(representative_submodules)

    return representative_submodules_list


Quality Evaluation

In [None]:
#CFG_part1
import ast
import astor
import graphviz as gv


class Block(object):
    """
    Basic block in a control flow graph.

    Contains a list of statements executed in a program without any control
    jumps. A block of statements is exited through one of its exits. Exits are
    a list of Links that represent control flow jumps.
    """

    __slots__ = ["id", "statements", "func_calls", "predecessors", "exits"]

    def __init__(self, id):
        # Id of the block.
        self.id = id
        # Statements in the block.
        self.statements = []
        # Calls to functions inside the block (represents context switches to
        # some functions' CFGs).
        self.func_calls = []
        # Links to predecessors in a control flow graph.
        self.predecessors = []
        # Links to the next blocks in a control flow graph.
        self.exits = []

    def __str__(self):
        if self.statements:
            return "block:{}@{}".format(self.id, self.at())
        return "empty block:{}".format(self.id)

    def __repr__(self):
        txt = "{} with {} exits".format(str(self), len(self.exits))
        if self.statements:
            txt += ", body=["
            txt += ", ".join([ast.dump(node) for node in self.statements])
            txt += "]"
        return txt

    def at(self):
        """
        Get the line number of the first statement of the block in the program.
        """
        if self.statements and self.statements[0].lineno >= 0:
            return self.statements[0].lineno
        return None
    
    def end(self):
        """
        Get the line number of the first statement of the block in the program.
        """
        if self.statements and self.statements[-1].lineno >= 0:
            return self.statements[-1].lineno
        return None

    def is_empty(self):
        """
        Check if the block is empty.

        Returns:
            A boolean indicating if the block is empty (True) or not (False).
        """
        return len(self.statements) == 0

    def get_source(self):
        """
        Get a string containing the Python source code corresponding to the
        statements in the block.

        Returns:
            A string containing the source code of the statements.
        """
        src = ""
        for statement in self.statements:
            if type(statement) in [ast.If, ast.For, ast.While]:
                src += (astor.to_source(statement)).split('\n')[0] + "\n"
            elif type(statement) == ast.FunctionDef or\
                 type(statement) == ast.AsyncFunctionDef:
                src += (astor.to_source(statement)).split('\n')[0] + "...\n"
            else:
                src += astor.to_source(statement)
        return src

    def get_calls(self):
        """
        Get a string containing the calls to other functions inside the block.

        Returns:
            A string containing the names of the functions called inside the
            block.
        """
        txt = ""
        for func_name in self.func_calls:
            txt += func_name + '\n'
        return txt


class Link(object):
    """
    Link between blocks in a control flow graph.

    Represents a control flow jump between two blocks. Contains an exitcase in
    the form of an expression, representing the case in which the associated
    control jump is made.
    """

    __slots__ = ["source", "target", "exitcase"]

    def __init__(self, source, target, exitcase=None):
        assert type(source) == Block, "Source of a link must be a block"
        assert type(target) == Block, "Target of a link must be a block"
        # Block from which the control flow jump was made.
        self.source = source
        # Target block of the control flow jump.
        self.target = target
        # 'Case' leading to a control flow jump through this link.
        self.exitcase = exitcase

    def __str__(self):
        return "link from {} to {}".format(str(self.source), str(self.target))

    def __repr__(self):
        if self.exitcase is not None:
            return "{}, with exitcase {}".format(str(self),
                                                 ast.dump(self.exitcase))
        return str(self)

    def get_exitcase(self):
        """
        Get a string containing the Python source code corresponding to the
        exitcase of the Link.

        Returns:
            A string containing the source code.
        """
        if self.exitcase:
            return astor.to_source(self.exitcase)
        return ""


class CFG(object):
    """
    Control flow graph (CFG).

    A control flow graph is composed of basic blocks and links between them
    representing control flow jumps. It has a unique entry block and several
    possible 'final' blocks (blocks with no exits representing the end of the
    CFG).
    """

    def __init__(self, name, asynchr=False):
        assert type(name) == str, "Name of a CFG must be a string"
        assert type(asynchr) == bool, "Async must be a boolean value"
        # Name of the function or module being represented.
        self.name = name
        # Type of function represented by the CFG (sync or async). A Python
        # program is considered as a synchronous function (main).
        self.asynchr = asynchr
        # Entry block of the CFG.
        self.entryblock = None
        # Final blocks of the CFG.
        self.finalblocks = []
        # Sub-CFGs for functions defined inside the current CFG.
        self.functioncfgs = {}

    def __str__(self):
        return "CFG for {}".format(self.name)

    def _visit_blocks(self, graph, block, visited=[], calls=True):
        # Don't visit blocks twice.
        if block.id in visited:
            return

        nodelabel = block.get_source()

        graph.node(str(block.id), label=nodelabel)
        visited.append(block.id)

        # Show the block's function calls in a node.
        if calls and block.func_calls:
            calls_node = str(block.id)+"_calls"
            calls_label = block.get_calls().strip()
            graph.node(calls_node, label=calls_label,
                       _attributes={'shape': 'box'})
            graph.edge(str(block.id), calls_node, label="calls",
                       _attributes={'style': 'dashed'})

        # Recursively visit all the blocks of the CFG.
        for exit in block.exits:
            self._visit_blocks(graph, exit.target, visited, calls=calls)
            edgelabel = exit.get_exitcase().strip()
            graph.edge(str(block.id), str(exit.target.id), label=edgelabel)

    def _build_visual(self, format='pdf', calls=True):
        graph = gv.Digraph(name='cluster'+self.name, format=format,
                           graph_attr={'label': self.name})
        self._visit_blocks(graph, self.entryblock, visited=[], calls=calls)

        # Build the subgraphs for the function definitions in the CFG and add
        # them to the graph.
        for subcfg in self.functioncfgs:
            subgraph = self.functioncfgs[subcfg]._build_visual(format=format,
                                                               calls=calls)
            graph.subgraph(subgraph)

        return graph

    def build_visual(self, filepath, format, calls=True, show=True):
        """
        Build a visualisation of the CFG with graphviz and output it in a DOT
        file.

        Args:
            filename: The name of the output file in which the visualisation
                      must be saved.
            format: The format to use for the output file (PDF, ...).
            show: A boolean indicating whether to automatically open the output
                  file after building the visualisation.
        """
        graph = self._build_visual(format, calls)
        graph.render(filepath, view=show)

    def __iter__(self):
        """
        Generator that yields all the blocks in the current graph, then
        recursively yields from any sub graphs
        """
        visited = set()
        to_visit = [self.entryblock]

        while to_visit:
            block = to_visit.pop(0)
            visited.add(block)
            for exit_ in block.exits:
                if exit_.target in visited or exit_.target in to_visit:
                    continue
                to_visit.append(exit_.target)
            yield block

        for subcfg in self.functioncfgs.values():
            yield from subcfg

In [None]:
#CFG_part2
import ast
import sys


def is_py38_or_higher():
    if sys.version_info.major == 3 and sys.version_info.minor >= 8:
        return True
    return False


NAMECONSTANT_TYPE = ast.Constant if is_py38_or_higher() else ast.NameConstant


def invert(node):
    """
    Invert the operation in an ast node object (get its negation).

    Args:
        node: An ast node object.

    Returns:
        An ast node object containing the inverse (negation) of the input node.
    """
    inverse = {ast.Eq: ast.NotEq,
               ast.NotEq: ast.Eq,
               ast.Lt: ast.GtE,
               ast.LtE: ast.Gt,
               ast.Gt: ast.LtE,
               ast.GtE: ast.Lt,
               ast.Is: ast.IsNot,
               ast.IsNot: ast.Is,
               ast.In: ast.NotIn,
               ast.NotIn: ast.In}

    if type(node) == ast.Compare:
        op = type(node.ops[0])
        inverse_node = ast.Compare(left=node.left, ops=[inverse[op]()],
                                   comparators=node.comparators)
    elif isinstance(node, ast.BinOp) and type(node.op) in inverse:
        op = type(node.op)
        inverse_node = ast.BinOp(node.left, inverse[op](), node.right)
    elif type(node) == NAMECONSTANT_TYPE and node.value in [True, False]:
        inverse_node = NAMECONSTANT_TYPE(value=not node.value)
    else:
        inverse_node = ast.UnaryOp(op=ast.Not(), operand=node)

    return inverse_node


def merge_exitcases(exit1, exit2):
    """
    Merge the exitcases of two Links.

    Args:
        exit1: The exitcase of a Link object.
        exit2: Another exitcase to merge with exit1.

    Returns:
        The merged exitcases.
    """
    if exit1:
        if exit2:
            return ast.BoolOp(ast.And(), values=[exit1, exit2])
        return exit1
    return exit2


class CFGBuilder(ast.NodeVisitor):
    """
    Control flow graph builder.

    A control flow graph builder is an ast.NodeVisitor that can walk through
    a program's AST and iteratively build the corresponding CFG.
    """

    def __init__(self, separate=False):
        super().__init__()
        self.after_loop_block_stack = []
        self.curr_loop_guard_stack = []
        self.current_block = None
        self.separate_node_blocks = separate

    # ---------- CFG building methods ---------- #
    def build(self, name, tree, asynchr=False, entry_id=0):
        """
        Build a CFG from an AST.

        Args:
            name: The name of the CFG being built.
            tree: The root of the AST from which the CFG must be built.
            async: Boolean indicating whether the CFG being built represents an
                   asynchronous function or not. When the CFG of a Python
                   program is being built, it is considered like a synchronous
                   'main' function.
            entry_id: Value for the id of the entry block of the CFG.

        Returns:
            The CFG produced from the AST.
        """
        self.cfg = CFG(name, asynchr=asynchr)
        # Tracking of the current block while building the CFG.
        self.current_id = entry_id
        self.current_block = self.new_block()
        self.cfg.entryblock = self.current_block
        # Actual building of the CFG is done here.
        self.visit(tree)
        self.clean_cfg(self.cfg.entryblock)
        return self.cfg

    def build_from_src(self, name, src):
        """
        Build a CFG from some Python source code.

        Args:
            name: The name of the CFG being built.
            src: A string containing the source code to build the CFG from.

        Returns:
            The CFG produced from the source code.
        """
        tree = ast.parse(src, mode='exec')
        return self.build(name, tree)

    def build_from_file(self, name, filepath):
        """
        Build a CFG from some Python source file.

        Args:
            name: The name of the CFG being built.
            filepath: The path to the file containing the Python source code
                      to build the CFG from.

        Returns:
            The CFG produced from the source file.
        """
        with open(filepath, 'r') as src_file:
            src = src_file.read()
            return self.build_from_src(name, src)

    # ---------- Graph management methods ---------- #
    def new_block(self):
        """
        Create a new block with a new id.

        Returns:
            A Block object with a new unique id.
        """
        self.current_id += 1
        return Block(self.current_id)

    def add_statement(self, block, statement):
        """
        Add a statement to a block.

        Args:
            block: A Block object to which a statement must be added.
            statement: An AST node representing the statement that must be
                       added to the current block.
        """
        block.statements.append(statement)

    def add_exit(self, block, nextblock, exitcase=None):
        """
        Add a new exit to a block.

        Args:
            block: A block to which an exit must be added.
            nextblock: The block to which control jumps from the new exit.
            exitcase: An AST node representing the 'case' (or condition)
                      leading to the exit from the block in the program.
        """
        newlink = Link(block, nextblock, exitcase)
        block.exits.append(newlink)
        nextblock.predecessors.append(newlink)

    def new_loopguard(self):
        """
        Create a new block for a loop's guard if the current block is not
        empty. Links the current block to the new loop guard.

        Returns:
            The block to be used as new loop guard.
        """
        if (self.current_block.is_empty() and
                len(self.current_block.exits) == 0):
            # If the current block is empty and has no exits, it is used as
            # entry block (condition test) for the loop.
            loopguard = self.current_block
        else:
            # Jump to a new block for the loop's guard if the current block
            # isn't empty or has exits.
            loopguard = self.new_block()
            self.add_exit(self.current_block, loopguard)
        return loopguard

    def new_functionCFG(self, node, asynchr=False):
        """
        Create a new sub-CFG for a function definition and add it to the
        function CFGs of the CFG being built.

        Args:
            node: The AST node containing the function definition.
            async: Boolean indicating whether the function for which the CFG is
                   being built is asynchronous or not.
        """
        self.current_id += 1
        # A new sub-CFG is created for the body of the function definition and
        # added to the function CFGs of the current CFG.
        func_body = ast.Module(body=node.body)
        func_builder = CFGBuilder()
        self.cfg.functioncfgs[node.name] = func_builder.build(node.name,
                                                              func_body,
                                                              asynchr,
                                                              self.current_id)
        self.current_id = func_builder.current_id + 1

    def clean_cfg(self, block, visited=[]):
        """
        Remove the useless (empty) blocks from a CFG.

        Args:
            block: The block from which to start traversing the CFG to clean
                   it.
            visited: A list of blocks that already have been visited by
                     clean_cfg (recursive function).
        """
        # Don't visit blocks twice.
        if block in visited:
            return
        visited.append(block)

        # Empty blocks are removed from the CFG.
        if block.is_empty():
            for pred in block.predecessors:
                for exit in block.exits:
                    self.add_exit(pred.source, exit.target,
                                  merge_exitcases(pred.exitcase,
                                                  exit.exitcase))
                    # Check if the exit hasn't yet been removed from
                    # the predecessors of the target block.
                    if exit in exit.target.predecessors:
                        exit.target.predecessors.remove(exit)
                # Check if the predecessor hasn't yet been removed from
                # the exits of the source block.
                if pred in pred.source.exits:
                    pred.source.exits.remove(pred)

            block.predecessors = []
            # as the exits may be modified during the recursive call, it is unsafe to iterate on block.exits
            # Created a copy of block.exits before calling clean cfg , and iterate over it instead.
            for exit in block.exits[:]:
                self.clean_cfg(exit.target, visited)
            block.exits = []
        else:
            for exit in block.exits[:]:
                self.clean_cfg(exit.target, visited)

    # ---------- AST---------- #
    def goto_new_block(self, node):
        if self.separate_node_blocks:
            newblock = self.new_block()
            self.add_exit(self.current_block, newblock)
            self.current_block = newblock
        self.generic_visit(node)

    def visit_Expr(self, node):
        self.add_statement(self.current_block, node)
        self.goto_new_block(node)

    def visit_Call(self, node):
        def visit_func(node):
            if isinstance(node, ast.Name):
                return node.id
            elif isinstance(node, ast.Attribute):
                func_name = visit_func(node.value)
                return func_name + "." + node.attr
            elif isinstance(node, ast.Str):
                return node.s
            elif isinstance(node, ast.Subscript):
                if isinstance(node.value, ast.Name):
                    return node.value.id
                return "<unknown_subscript>"
            else:
                return type(node).__name__
        """
        def visit_func(node):
            if type(node) == ast.Name:
                return node.id
            elif type(node) == ast.Attribute:
                # Recursion on series of calls to attributes.
                func_name = visit_func(node.value)
                func_name += "." + node.attr
                return func_name
            elif type(node) == ast.Str:
                return node.s
            elif type(node) == ast.Subscript:
                return node.value.id
            else:
                return type(node).__name__"""
        func = node.func
        func_name = visit_func(func)
        self.current_block.func_calls.append(func_name)

    def visit_Assign(self, node):
        self.add_statement(self.current_block, node)
        self.goto_new_block(node)

    def visit_AnnAssign(self, node):
        self.add_statement(self.current_block, node)
        self.goto_new_block(node)

    def visit_AugAssign(self, node):
        self.add_statement(self.current_block, node)
        self.goto_new_block(node)

    def visit_Raise(self, node):
        # TODO
        pass

    def visit_Assert(self, node):
        self.add_statement(self.current_block, node)
        # New block for the case in which the assertion 'fails'.
        failblock = self.new_block()
        self.add_exit(self.current_block, failblock, invert(node.test))
        # If the assertion fails, the current flow ends, so the fail block is a
        # final block of the CFG.
        self.cfg.finalblocks.append(failblock)
        # If the assertion is True, continue the flow of the program.
        successblock = self.new_block()
        self.add_exit(self.current_block, successblock, node.test)
        self.current_block = successblock
        self.goto_new_block(node)

    def visit_If(self, node):
        # Add the If statement at the end of the current block.
        self.add_statement(self.current_block, node)

        # Create a new block for the body of the if.
        if_block = self.new_block()
        self.add_exit(self.current_block, if_block, node.test)

        # Create a block for the code after the if-else.
        afterif_block = self.new_block()

        # New block for the body of the else if there is an else clause.
        if len(node.orelse) != 0:
            else_block = self.new_block()
            self.add_exit(self.current_block, else_block, invert(node.test))
            self.current_block = else_block
            # Visit the children in the body of the else to populate the block.
            for child in node.orelse:
                self.visit(child)
            # If encountered a break, exit will have already been added
            if not self.current_block.exits:
                self.add_exit(self.current_block, afterif_block)
        else:
            self.add_exit(self.current_block, afterif_block, invert(node.test))

        # Visit children to populate the if block.
        self.current_block = if_block
        for child in node.body:
            self.visit(child)
        if not self.current_block.exits:
            self.add_exit(self.current_block, afterif_block)

        # Continue building the CFG in the after-if block.
        self.current_block = afterif_block

    def visit_While(self, node):
        loop_guard = self.new_loopguard()
        self.current_block = loop_guard
        self.add_statement(self.current_block, node)
        self.curr_loop_guard_stack.append(loop_guard)
        # New block for the case where the test in the while is True.
        while_block = self.new_block()
        self.add_exit(self.current_block, while_block, node.test)

        # New block for the case where the test in the while is False.
        afterwhile_block = self.new_block()
        self.after_loop_block_stack.append(afterwhile_block)
        inverted_test = invert(node.test)
        # Skip shortcut loop edge if while True:
        if not (isinstance(inverted_test, NAMECONSTANT_TYPE) and
                inverted_test.value is False):
            self.add_exit(self.current_block, afterwhile_block, inverted_test)

        # Populate the while block.
        self.current_block = while_block
        for child in node.body:
            self.visit(child)
        if not self.current_block.exits:
            # Did not encounter a break statement, loop back
            self.add_exit(self.current_block, loop_guard)

        # Continue building the CFG in the after-while block.
        self.current_block = afterwhile_block
        self.after_loop_block_stack.pop()
        self.curr_loop_guard_stack.pop()

    def visit_For(self, node):
        loop_guard = self.new_loopguard()
        self.current_block = loop_guard
        self.add_statement(self.current_block, node)
        self.curr_loop_guard_stack.append(loop_guard)
        # New block for the body of the for-loop.
        for_block = self.new_block()
        self.add_exit(self.current_block, for_block, node.iter)

        # Block of code after the for loop.
        afterfor_block = self.new_block()
        self.add_exit(self.current_block, afterfor_block)
        self.after_loop_block_stack.append(afterfor_block)
        self.current_block = for_block

        # Populate the body of the for loop.
        for child in node.body:
            self.visit(child)
        if not self.current_block.exits:
            # Did not encounter a break
            self.add_exit(self.current_block, loop_guard)

        # Continue building the CFG in the after-for block.
        self.current_block = afterfor_block
        # Popping the current after loop stack,taking care of errors in case of nested for loops
        self.after_loop_block_stack.pop()
        self.curr_loop_guard_stack.pop()

    def visit_Break(self, node):
        assert len(self.after_loop_block_stack), "Found break not inside loop"
        self.add_exit(self.current_block, self.after_loop_block_stack[-1])

    def visit_Continue(self, node):
        assert len(self.curr_loop_guard_stack), "Found continue outside loop"
        self.add_exit(self.current_block, self.curr_loop_guard_stack[-1])

    def visit_Import(self, node):
        self.add_statement(self.current_block, node)

    def visit_ImportFrom(self, node):
        self.add_statement(self.current_block, node)

    def visit_FunctionDef(self, node):
        self.add_statement(self.current_block, node)
        self.new_functionCFG(node, asynchr=False)

    def visit_AsyncFunctionDef(self, node):
        self.add_statement(self.current_block, node)
        self.new_functionCFG(node, asynchr=True)

    def visit_Await(self, node):
        afterawait_block = self.new_block()
        self.add_exit(self.current_block, afterawait_block)
        self.goto_new_block(node)
        self.current_block = afterawait_block

    def visit_Return(self, node):
        self.add_statement(self.current_block, node)
        self.cfg.finalblocks.append(self.current_block)
        # Continue in a new block but without any jump to it -> all code after
        # the return statement will not be included in the CFG.
        self.current_block = self.new_block()

    def visit_Yield(self, node):
        self.cfg.asynchr = True
        afteryield_block = self.new_block()
        self.add_exit(self.current_block, afteryield_block)
        self.current_block = afteryield_block

In [None]:
#CFG_part3
def generate_cfg_from_code(source_code):

    try:

        parsed_ast = ast.parse(source_code)
        cfg_builder = CFGBuilder()
        cfg = cfg_builder.build(name='<string>',tree=parsed_ast, asynchr=False, entry_id=0)
        cfg_data = extract_cfg_data(cfg)
        return cfg_data
    
    except Exception as e:
        print(f"Error while generating CFG: {e}")
        return None
def extract_cfg_data(cfg):

    cfg_content = {
        "blocks": [],
        "edges": [],
        "func_calls": []
    }

    for block in cfg:
        block_info = {
            "block_id": block.id,
            "statements": [ast.dump(statement) for statement in block.statements],
            "func_calls": block.func_calls
        }
        cfg_content["blocks"].append(block_info)

        for exit in block.exits:
            # Get the string form of exitcase
            exit_case_str = exit.get_exitcase() if hasattr(exit, 'get_exitcase') else ''
            edge_info = {
                "from": block.id,
                "to": exit.target.id,
                "condition": exit_case_str
            }
            cfg_content["edges"].append(edge_info)

    return cfg_content


In [30]:
EVALUATION_PROMPT = """
### Task Description:
Evaluate the quality of the following generated code. Please provide detailed feedback and score according to the scoring rules.
1. Write detailed feedback and strictly follow the given scoring criteria to evaluate the quality of the answer, focusing on the following aspects:
- Functionality: Does the code correctly implement the required functions?
- Parameter handling: Does the code accurately handle and utilize the specified parameters (e.g., "thickness", "angle")?
- Output format: Does the code output the results in the correct format (e.g., JSON with specific keys)?
- Code structure and readability: Is the code clearly structured and easy to read and understand?
- Format: Does the code follow the starter code? (If the starter code is empty, ignore this item)
2. After writing the feedback, give a score between 1 and 10 according to the detailed scoring criteria.
3. The output should be in the following format: "Feedback: {{Write feedback for the criteria}} Result: {{Integer between 1 and 10}}"
4. Please do not generate any additional descriptions. Always include the result in the output.

### Tasks for generating code:
{task_example}

### Starting code:
{starter_code}

### Generated code:
{generated_code}

### Scoring criteria:
[Evaluate the code based on the following aspects:
- 1-2 points: The code is completely incorrect and does not meet any requirements. There are serious problems in functionality, parameter handling, output format, code structure and readability, format, etc.
- 3-4 points: Most of the code is wrong, and there are major problems in functionality, parameter handling or output format. The code structure and readability are poor, and the format does not meet the requirements.
- 5-6 points: The code is partially correct, some aspects are correctly implemented, but there are obvious problems. There are some problems in functionality, parameter handling, output format, code structure and readability, format, etc.
- 7-8 points: Most of the code is correct, implements most of the functions required for the task, parameter handling is reasonable, and the output format is correct. The code structure and readability are good, and the format basically meets the requirements. But there may be some minor problems with the overall code.
- 9-10points: The code is completely correct, all functions required by the task are accurately implemented, the parameters are processed correctly, and the output is in line with expectations. The code structure is clear, easy to read and understand, and the format fully meets the requirements. ]
"""


evaluation_prompt_template = ChatPromptTemplate.from_messages(
    [
        SystemMessage(content="You are a fair assessor of language models."),
        HumanMessagePromptTemplate.from_template(EVALUATION_PROMPT),
    ]
)

def balanced_position_calibration(scores, k):
    """Calculating the weighted average score using Balanced Position Calibration"""
    weights = [(k - i) / k for i in range(k)]
    weighted_scores = [score * weight for score, weight in zip(scores, weights)]
    weighted_average = sum(weighted_scores) / sum(weights)
    return weighted_average

def summarize_feedback(feedbacks):
    """Summarize the main modification suggestions from multiple feedbacks."""
    combined_feedback = "\n".join(feedbacks)
    prompt = f"""
    Summarize the following feedback into key points that highlight the necessary changes and improvements:
    {combined_feedback}
    """
    message = HumanMessage(content=prompt.strip())
    summary = eval_chat_model.invoke([message])  
    
    return summary.content.strip()


def evaluate_answers(
    generated_code_file: str,
    starter_code,
    eval_chat_model,
    evaluation_prompt_template,
    task_example: str,
    k: int = 5,  # Set the number of multiple scoring
    results_file: str = "evaluation_results.json"
) -> None:
    """Evaluate the generated code and save the results to a JSON file."""

    def load_code(file_path: str) -> str:
        if os.path.isfile(file_path):
            with open(file_path, "r") as f:
                return f.read()
        else:
            raise FileNotFoundError(f"file not found: {file_path}")

    # Load the initial code and generated code
    starter_code = starter_code
    generated_code = load_code(generated_code_file)

    scores = []
    feedbacks = []

    # Call the model multiple times for scoring
    for _ in range(k):

        eval_prompt = evaluation_prompt_template.format_messages(
            task_example=task_example,
            starter_code=starter_code,
            generated_code=generated_code
        )
        eval_result = eval_chat_model.invoke(eval_prompt)

        # Try parsing feedback and ratings from model responses
        try:
            match = re.search(r"Feedback:\s*(.*?)\s*Result:\s*(\d+)", eval_result.content, re.DOTALL | re.IGNORECASE)
            if match:
                feedback = match.group(1).strip()
                score = int(match.group(2).strip())
                feedbacks.append(feedback)
                scores.append(score)
            else:
                raise ValueError("Unable to parse model response, please ensure the response is well formatted.")
        except ValueError as e:
            print(e)
            feedbacks.append("Unable to extract feedback")
            scores.append(0)

    # Calculating the weighted average score using Balanced Position Calibration
    final_score = balanced_position_calibration(scores, k)

    # Use summarization method to generate final feedback
    final_feedback = summarize_feedback(feedbacks)

    evaluation_result = {
        "task_example": task_example,
        "starter_code": starter_code,
        "generated_code": generated_code,
        "feedback": final_feedback,
        "score": final_score
    }

    # Read existing result file (if it exists)
    if os.path.isfile(results_file):
        with open(results_file, "r") as f:
            existing_results = json.load(f)
    else:
        existing_results = []

    existing_results.append(evaluation_result)

    with open(results_file, "w") as f:
        json.dump(existing_results, f, ensure_ascii=False, indent=4)

    print(f"Final Rating: {final_score}")
    print(f"Final Feedback: {final_feedback}")


In [31]:
def load_code_from_file(file_path: str) -> str:
    """Read code content from a file."""
    if os.path.isfile(file_path):
        with open(file_path, "r") as file:
            return file.read()
    else:
        raise FileNotFoundError(f"file not found: {file_path}")

def generate_code_with_feedback(output: str, feedback: str = "", cfg: str = "") -> str:
    """Generate code and make improvements based on feedback and CFG."""

    cot_prompt_template_4 = """
    *Task*
    Thoroughly review and improve the provided code based on feedback and control flow graph (CFG). You must address each feedback point in detail and ensure that the code meets the highest quality standards.
    *Output*
    Code: Revised code, including import statements for necessary modules and functions. And add error handling for each submodule.
    (Your output should strictly follow the code file format, not contain other additional natural language, and should be pure code without any code block markers (such as ```python```))
    *Input*
    Original code: {Output}
    Feedback: {feedback}
    Control flow graph (CFG): {cfg}
    """

    prompt4 = PromptTemplate(template=cot_prompt_template_4, input_variables=["Output", "feedback", "cfg"])
    cot_chain_4 = (
        {"Output": itemgetter("Output"), "feedback": itemgetter("feedback"), "cfg": itemgetter("cfg")}
        | RunnablePassthrough.assign(code=itemgetter("Output"), feedback=itemgetter("feedback"), cfg=itemgetter("cfg"))
        | {"response": prompt4 | llm2 | StrOutputParser()}
    )
    
    result = cot_chain_4.invoke({"Output": output, "feedback": feedback, "cfg": cfg})
    solution = result['response']
 
    return solution

def evaluate_and_regenerate(
    file_path: str,
    eval_chat_model,
    evaluation_prompt_template,
    starter_code: str,
    task_example: str,
    score_threshold: int = 8,
    max_retries: int = 2,
    results_file: str = "evaluation_results.json"
) -> str:
    """Generate code and evaluate it, and regenerate code using feedback and CFG if the score is below a threshold."""
    
    retry_count = 0
    feedback = ""
    output = load_code_from_file(file_path)
    
    logging.basicConfig(level=logging.INFO)
    
    while retry_count <= max_retries:
        # Reload the code
        output = load_code_from_file(file_path)
        logging.info(f"Reload the code: \n{output}")

        try:
            evaluate_answers(
                starter_code=starter_code,
                generated_code_file=file_path,
                eval_chat_model=eval_chat_model,
                evaluation_prompt_template=evaluation_prompt_template,
                task_example=task_example,
                results_file=results_file,
                k=5
            )
        except Exception as e:
            logging.error(f"An error occurred while evaluating the code: {e}")
            break
    # Read the latest score and feedback from the evaluation result file
        try:
            with open(results_file, "r") as f:
                existing_results = json.load(f)
            latest_result = existing_results[-1]  
            score = int(latest_result["score"])
            feedback = latest_result["feedback"]
            logging.info(f"Latest Assessment Results - Rating: {score}, feedback: {feedback}")
        except (FileNotFoundError, json.JSONDecodeError, IndexError) as e:
            logging.error(f"An error occurred while reading the evaluation results file: {e}")
            break

        if score >= score_threshold:
            logging.info(f"Code rating is{score}The threshold has been reached.")
            return output
        else:
            logging.info(f"Code rating is{score}Below the threshold, an attempt will be made to regenerate code based on the feedback and the CFG.")
            retry_count += 1

            # CFG
            try:
                cfg_code_content = generate_cfg_from_code(output)
                if cfg_code_content:
                    logging.info("Generated CFG content: \n%s", cfg_code_content)
                else:
                    logging.warning("CFG generation failed, use normal generation process.")
            except Exception as e:
                logging.error(f"Error generating CFG: {e}")
                cfg_code_content = None

            #Regenerate code based on feedback and CFG
            try:
                solution = generate_code_with_feedback(output, feedback, cfg_code_content)
                output = solution
                logging.info(f"Regenerated code: \n{output}")
            except Exception as e:
                logging.error(f"An error occurred while generating code: {e}")
                break

            # Extract the code content and overwrite the original file
            code = extract_code(solution)
            if not code:
                logging.error("Unable to extract code content, skipping this retry.")
                break
            with open(file_path, "w") as f:
                f.write(code)
            output = code  
            logging.info(f"The overwritten code has been written to the file: {file_path}")

    logging.info(f"The maximum number of rewrites has been reached {max_retries}")
    return output