In [1]:
import os
from typing import List, Dict, Tuple
import pytest
import tempfile
from llama_index.llms.groq import Groq
from llama_index.core.workflow import Event, StartEvent, StopEvent, Workflow, step
from llama_index.core.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution

In [2]:
groq_api_key = "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"

In [3]:
llm = Groq(
    api_key=groq_api_key,
    model="llama-3.3-70b-versatile"
)

In [4]:

# Define custom events
class ModifiedEvent(Event):
    task: str

class TestCasesEvent(Event):
    modified_task: str
    code: str
    # test_cases: List[Dict]

class CodeEvent(Event):
    modified_task: str

class TestResultEvent(Event):
    code: str
    test_cases: str

class FeedbackEvent(Event):
    feedback: str
    modified_task: str
    code: str

In [32]:
import random
from llama_index.core.workflow.context import Context


class CodingWorkflow(Workflow):
    @step
    async def start_workflow(self, ev: StartEvent) -> ModifiedEvent:
        llm = Groq(
            api_key=groq_api_key,
            model="llama-3.3-70b-versatile"
        )
        self.max_retries = 3
        task = ev.request  # Retrieve task from context
        
        print("========= Original Task ===========")
        print("Task:", task)
        print("===============================")
        return ModifiedEvent(task=task)


    @step
    async def redesign_task(self, ctx: Context, ev: ModifiedEvent) -> CodeEvent:
        task = ev.task
        prompt = f"""
        You are an AI assistant created to refine and simplify {task} provided by users, making them more readable and usable for code generation models, such as AI coding assistants like Copilot. Your task involves the following steps:
        Understand the User's Task:
        Comprehend the user's input and determine the primary objective of the task.
        Identify any important details, requirements, or constraints associated with the task.
        
        Refine and Simplify:
        Rewrite the task description in a way that is concise, well-structured, and easy to understand.
        Use terminology and formatting that aligns with common programming concepts to ensure it is clear for code generation models.
        Retain all critical details necessary for implementing the solution, while removing any redundant or overly complex language.
        
        Output the Refined Task:
        Provide the rewritten task in a format that is ready for a code generation assistant to interpret and use.
        If applicable, include examples, expected input/output formats, or additional context to make the task even clearer.
        
        Provide a clear and concise description of the task.
        DONT RETURN ANYTHING ELSE.
        """
        response = await llm.acomplete(prompt)
        print("========= Redesigned Task =========")
        print(response.text)
        print("===================================")
        
        # Store the modified task in global context
        await ctx.set("modified_task", response.text)

        # return TaskEvent(task=response.text)
        return CodeEvent(modified_task= response.text)
    
    
    
    @step
    async def generate_code(self, ev: CodeEvent) -> FeedbackEvent | TestCasesEvent | StopEvent:
        # test_cases = ev.test_cases
        task = ev.modified_task  # Retrieve task from context
        # example_cases = test_cases[:2]
        # examples_str = "\n".join([
        #     f"Input: {tc['input']}, Expected Output: {tc['expected']}" 
        #     for tc in example_cases
        # ])
        
        prompt = f"""Write Python code for the following task:
        {task}
        
        Write a function named 'solution' 
        First explain the approach stepwise in comments and then write then return the function only.
        DONT PREFIX THE CODE WITH ```python or any other code block.
        DIRECTLY RETURN THE FUNCTION IMPLEMENTATION.
        The function should contain one liner comments whereever necessary.
        """
        
        response = await llm.acomplete(prompt)
        print("========= Code Generation Response =========")
        print(response.text)
        print("============================================")
        
        # Take input from user and ask if he wants to give 1. feedback, or 2. produce test cases or 3. stop event
        choice = input("Do you want to give feedback or produce test cases or stop event? (feedback/test/stop): ")
        if choice == "feedback":
            feedback = input("Please provide feedback: ")
            return FeedbackEvent(feedback=feedback, modified_task=task, code = response.text)
        elif choice == "test":
            return TestCasesEvent(modified_task=task, code=response.text)
        else:
            return StopEvent(result="Success")       

    @step
    async def incorporate_feedback(self, ev: FeedbackEvent) -> ModifiedEvent | StopEvent | TestCasesEvent:
        feedback = ev.feedback
        task = ev.modified_task  # Retrieve task from context
        code = ev.code
        # test_cases = self.context.test_cases  # Retrieve test cases from context
        # example_cases = test_cases[:2]
        # examples_str = "\n".join([
        #     f"Input: {tc['input']}, Expected Output: {tc['expected']}" 
        #     for tc in example_cases
        # ])
        
        prompt = f"""Modify the solution for this task incorporating the feedback:
        Task: {task}
        
        Current Code:
        {code}
        
        Feedback: {feedback}
        
        Write a function named 'solution' 
        First explain the approach stepwise in comments and then write then return the function only.
        DONT PREFIX THE CODE WITH ```python or any other code block.
        DIRECTLY RETURN THE FUNCTION IMPLEMENTATION.
        The function should contain one liner comments whereever necessary.
        """
        
        response = await llm.acomplete(prompt)
        print("========= Feedback Incorporation Response =========")
        print(response.text)
        print("================================================")
        
        choice = input("Do you want to give a new task or stop event or produce test cases? (new/stop/test): ")
        if choice == "new":
            new_task = input("Please provide the new task: ")
            return ModifiedEvent(task=new_task)
        elif choice == "test":
            return TestCasesEvent(modified_task=task, code=response.text)  
        else:
            return StopEvent(result="Success")
    
        
    @step    
    async def generate_test_cases(self, ev: TestCasesEvent) -> StopEvent | TestResultEvent:
        task = ev.modified_task
        code = ev.code
        prompt = f"""Generate test cases for the following programming task:
        {task} 
        Whose solution is as follows:
        {code}
        Return the test cases in this format:
        [
            {{"input": input_value, "expected": expected_output}},
        ]
        Provide at least 3 test cases covering edge cases.
        DONT RETURN ANYTHING ELSE."""
        
        response = await llm.acomplete(prompt)
        print("========= Test Case Response =========")
        print(response.text)
        print("======================================")
        
        # Take input from user and ask if he wants to run the solution code for the test cases or stop event
        choice = input("Do you want to run the solution code for the test cases or stop event? (run/stop): ")
        if choice == "run":
            return TestResultEvent(code=code, test_cases=response.text)
        else:
            return StopEvent(result="Success")



        
        # # Create temporary test file
        # with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        #     full_code = code + "\n\nif __name__ == '__main__':\n    pass"
        #     f.write(full_code)
        #     code_file = f.name
        
        # # Create test file
        # with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        #     test_content = """
        #     import pytest
        #     import sys
        #     from pathlib import Path
        #     sys.path.append(str(Path(__file__).parent))
        #     from importlib.util import spec_from_file_location, module_from_spec

        #     def load_solution(file_path):
        #         spec = spec_from_file_location("solution", file_path)
        #         module = module_from_spec(spec)
        #         spec.loader.exec_module(module)
        #         return module

        #     solution_module = load_solution("{code_file}")

        #     def test_solution():
        #         test_cases = {test_cases}
        #         for tc in test_cases:
        #             result = solution_module.solution(tc["input"])
        #             assert result == tc["expected"], f"Failed for input {{tc['input']}}: expected {{tc['expected']}}, got {{result}}"
        #     """.format(code_file=code_file, test_cases=test_cases)
            
        #     f.write(test_content)
        #     test_file = f.name
        
        # try:
        #     # Run tests
        #     pytest_output = pytest.main(["-v", test_file])
        #     success = pytest_output == 0
        #     # return TestResultEvent(success=success, test_result=str(pytest_output))
        #     return StopEvent(result="Success")
        # finally:
        #     # Cleanup
        #     os.unlink(code_file)
        #     os.unlink(test_file)
        #     return StopEvent(result="Success")
            


    # @step
    # async def handle_failure(self, ev: TestResultEvent) -> FeedbackEvent | StopEvent:
    #     if not ev.success:
    #         feedback = input("Code generation failed. Please provide feedback: ")
    #         return FeedbackEvent(feedback=feedback)
    #     return StopEvent(result="Success")

    # async def solve_task(self, task: str) -> Dict:
    #     self.context.task = task  # Store task in context
    #     result = await self.run(start_event=StartEvent())
    #     return result
    
    
    @step
    async def run_tests(self, ctx: Context, ev: TestResultEvent) -> StopEvent | FeedbackEvent:
        code = ev.code
        test_cases = ev.test_cases  # Retrieve test cases from context
        num = random.randint(0, 5)
        task = await ctx.get("modified_task")
        
        # if the random number is 0 or 1 then output error message or else output success message
        if num in [0, 1]:
            # code = code + "alsdkfjalskdfj"
            # Print error message
            print("Error: Code generation failed.")
            # error_message = "Code generation failed."
            feedback = "There is an error while running the code regenerate the code."
            return FeedbackEvent(feedback=feedback, modified_task=task, code=code)
            # return StopEvent(result="Error")
        else:
            return StopEvent(result="Success")
        
        
        

if __name__ == "__main__":
    groq_api_key = "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"
    task = input("Enter your programming task: ")
    
    w = CodingWorkflow(timeout= 300, verbose=True)
    result = await w.run(request = task)
    print(str(result))

Running step start_workflow
Task: Find the lcm of 3 numbers and multiply it with their product.
Step start_workflow produced event ModifiedEvent
Running step redesign_task
Refined Task: Calculate the Least Common Multiple (LCM) of three numbers and then multiply it by their product.

Given three integers, find their LCM and then compute the result of multiplying this LCM by the product of the three numbers.

Input: Three integers.
Output: The result of (LCM of the three numbers) * (product of the three numbers).

Example: For input numbers 2, 3, and 4, first find their LCM, which is 12. Then, calculate their product, which is 2*3*4 = 24. Finally, multiply the LCM by the product: 12 * 24 = 288.
Step redesign_task produced event CodeEvent
Running step generate_code
# Approach: 
# Step 1: Define a helper function to calculate the Greatest Common Divisor (GCD) of two numbers using the Euclidean algorithm
# Step 2: Use the GCD function to calculate the LCM of two numbers using the formula: 

In [31]:
draw_all_possible_flows(CodingWorkflow, filename="trivial_workflow.html")

trivial_workflow.html


  draw_all_possible_flows(CodingWorkflow, filename="trivial_workflow.html")


In [16]:
draw_most_recent_execution(w, filename="trivial_workflow_execution.html")

trivial_workflow_execution.html
