In [None]:
!!pip install litellm



In [None]:
import os
from google.colab import userdata
import json
import time
import traceback
from litellm import completion
from dataclasses import dataclass, field
from typing import List, Callable, Dict, Any

GROQ_API_KEY = "API KEY"


@dataclass
class Prompt:
    messages: List[Dict] = field(default_factory=list)
    tools: List[Dict] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

def generate_response(prompt: Prompt) -> str:
    """Call LLM to get response"""

    messages = prompt.messages
    tools = prompt.tools

    result = None

    if not tools:
        response = completion(
            model="meta-llama/llama-4-scout-17b-16e-instruct",
            messages=messages,
            max_tokens=1024,
            api_base="https://api.groq.com/openai/v1",
            api_key=GROQ_API_KEY1
        )
        result = response.choices[0].message.content
    else:
        response = completion(
            model="meta-llama/llama-4-scout-17b-16e-instruct",
            messages=messages,
            tools=tools,
            max_tokens=1024,
            api_base="https://api.groq.com/openai/v1",
            api_key=GROQ_API_KEY1
        )

        if response.choices[0].message.tool_calls:
            tool = response.choices[0].message.tool_calls[0]
            result = {
                "tool": tool.function.name,
                "args": json.loads(tool.function.arguments),
            }
            result = json.dumps(result)
        else:
            result = response.choices[0].message.content

    return result

@dataclass(frozen=True)
class Goal:
    priority: int
    name: str
    description: str

class Action:
    def __init__(self, name: str, function: Callable, description: str, parameters: Dict, terminal: bool = False):
        self.name = name
        self.function = function
        self.description = description
        self.terminal = terminal
        self.parameters = parameters

    def execute(self, **args) -> Any:
        return self.function(**args)

class ActionRegistry:
    def __init__(self):
        self.actions = {}

    def register(self, action: Action):
        self.actions[action.name] = action

    def get_action(self, name: str) -> [Action, None]:
        return self.actions.get(name, None)

    def get_actions(self) -> List[Action]:
        return list(self.actions.values())

class Memory:
    def __init__(self):
        self.items = []

    def add_memory(self, memory: dict):
        self.items.append(memory)

    def get_memories(self, limit: int = None) -> List[Dict]:
        return self.items[:limit]

    def copy_without_system_memories(self):
        filtered_items = [m for m in self.items if m["type"] != "system"]
        memory = Memory()
        memory.items = filtered_items
        return memory

class Environment:
    def execute_action(self, action: Action, args: dict) -> dict:
        try:
            result = action.execute(**args)
            return self.format_result(result)
        except Exception as e:
            return {
                "tool_executed": False,
                "error": str(e),
                "traceback": traceback.format_exc()
            }

    def format_result(self, result: Any) -> dict:
        return {
            "tool_executed": True,
            "result": result,
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z")
        }

class AgentLanguage:
    def __init__(self):
        pass

    def construct_prompt(self, actions: List[Action], environment: Environment, goals: List[Goal], memory: Memory) -> Prompt:
        raise NotImplementedError("Subclasses must implement this method")

    def parse_response(self, response: str) -> dict:
        raise NotImplementedError("Subclasses must implement this method")

class AgentFunctionCallingActionLanguage(AgentLanguage):
    def __init__(self):
        super().__init__()

    def format_goals(self, goals: List[Goal]) -> List:
        sep = "\n-------------------\n"
        goal_instructions = "\n\n".join([f"{goal.name}:{sep}{goal.description}{sep}" for goal in goals])
        return [{"role": "system", "content": goal_instructions}]

    def format_memory(self, memory: Memory) -> List:
        items = memory.get_memories()
        mapped_items = []
        for item in items:
            content = item.get("content", None)
            if not content:
                content = json.dumps(item, indent=4)
            if item["type"] == "assistant":
                mapped_items.append({"role": "assistant", "content": content})
            elif item["type"] == "environment":
                mapped_items.append({"role": "assistant", "content": content})
            else:
                mapped_items.append({"role": "user", "content": content})
        return mapped_items

    def format_actions(self, actions: List[Action]) -> [List, List]:
        tools = [
            {
                "type": "function",
                "function": {
                    "name": action.name,
                    "description": action.description[:1024],
                    "parameters": action.parameters,
                },
            } for action in actions
        ]
        return tools

    def construct_prompt(self, actions: List[Action], environment: Environment, goals: List[Goal], memory: Memory) -> Prompt:
        prompt = []
        prompt += self.format_goals(goals)
        prompt += self.format_memory(memory)
        tools = self.format_actions(actions)
        return Prompt(messages=prompt, tools=tools)

    def adapt_prompt_after_parsing_error(self, prompt: Prompt, response: str, traceback: str, error: Any, retries_left: int) -> Prompt:
        return prompt

    def parse_response(self, response: str) -> dict:
        try:
            return json.loads(response)
        except Exception as e:
            return {"tool": "terminate", "args": {"message": response}}
    # def parse_response(self, response: str) -> dict:
    # #Parse LLM response into structured format by extracting JSON block
    #   try:
    #       return json.loads(response)
    #   except Exception:
    #       if not response.strip():
    #           # Ask LLM to think again, don't terminate
    #           return {"tool": "noop", "args": {}}
    #       return {
    #           "tool": "terminate",
    #           "args": {"message": response}
    #       }


class Agent:
    def __init__(self, goals: List[Goal], agent_language: AgentLanguage, action_registry: ActionRegistry, generate_response: Callable[[Prompt], str], environment: Environment):
        self.goals = goals
        self.generate_response = generate_response
        self.agent_language = agent_language
        self.actions = action_registry
        self.environment = environment

    def construct_prompt(self, goals: List[Goal], memory: Memory, actions: ActionRegistry) -> Prompt:
        return self.agent_language.construct_prompt(actions.get_actions(), self.environment, goals, memory)

    def get_action(self, response):
        invocation = self.agent_language.parse_response(response)
        action = self.actions.get_action(invocation["tool"])
        return action, invocation

    def should_terminate(self, response: str) -> bool:
        action_def, _ = self.get_action(response)
        return action_def.terminal

    def set_current_task(self, memory: Memory, task: str):
        memory.add_memory({"type": "user", "content": task})

    def update_memory(self, memory: Memory, response: str, result: dict):
        new_memories = [
            {"type": "assistant", "content": response},
            {"type": "environment", "content": json.dumps(result)}
        ]
        for m in new_memories:
            memory.add_memory(m)

    def prompt_llm_for_action(self, full_prompt: Prompt) -> str:
        return self.generate_response(full_prompt)

    def run(self, user_input: str, memory=None, max_iterations: int = 50) -> Memory:
        memory = memory or Memory()
        self.set_current_task(memory, user_input)

        for _ in range(max_iterations):
            prompt = self.construct_prompt(self.goals, memory, self.actions)

            print("Agent thinking...")
            response = self.prompt_llm_for_action(prompt)
            print(f"Agent Decision: {response}")

            action, invocation = self.get_action(response)
            result = self.environment.execute_action(action, invocation["args"])
            print(f"Action Result: {result}")

            self.update_memory(memory, response, result)

            if self.should_terminate(response):
                break

        return memory

# Setup code
# goals = [
#     Goal(priority=1, name="Gather Information", description="Read each file in the project"),
#     Goal(priority=1, name="Terminate", description="Call terminate after reading files and show README")
# ]

goals = [
    Goal(priority=1, name="List Project Files", description="List all project files ending with .py"),
    Goal(priority=1, name="Read All Files", description="Read the content of each project file to gather information."),
    Goal(priority=1, name="Generate README", description="For each file, briefly summarize its purpose, main functions/classes, key logic, and any dependencies between files. Organize your findings clearly for quick codebase understanding and write the summary of python files into README (1).md file"),
    Goal(priority=1, name="Terminate", description="Once README.md is created, terminate the session.")
]

agent_language = AgentFunctionCallingActionLanguage()

def read_project_file(name: str) -> str:
    with open(name, "r") as f:
        return f.read()

def list_project_files() -> List[str]:
    return sorted([file for file in os.listdir(".") if file.endswith(".py")])

def write_readme_file(content: str) -> str:
    """Writes the provided content to a README.md file in the current directory."""
    try:
        with open("README (1).md", "w") as file:
            file.write(content)
        return "README.md file created successfully."
    except Exception as e:
        return f"Error writing README.md file: {str(e)}"


action_registry = ActionRegistry()
action_registry.register(Action("list_project_files", list_project_files, "Lists project files.", {}, terminal=False))
action_registry.register(Action("read_project_file", read_project_file, "Reads project file.", {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}, terminal=False))
action_registry.register(Action("terminate", lambda message: f"{message}\nTerminating...", "Terminates session.", {"type": "object", "properties": {"message": {"type": "string"}}, "required": []}, terminal=True))
action_registry.register(Action(
    name="write_readme",
    function=write_readme_file,
    description="Generates README content based on project files.",
    parameters={
        "type": "object",
        "properties": {
            "content": {"type": "string"}
        },
        "required": ["content"]
    },
    terminal=True
))


environment = Environment()
agent = Agent(goals, agent_language, action_registry, generate_response, environment)

user_input = "Write readme.md file, which includes the summary of all the python files"
final_memory = agent.run(user_input)
print(final_memory.get_memories())

Agent thinking...
Agent Decision: {"tool": "write_readme", "args": {"content": "README.md"}}
Action Result: {'tool_executed': True, 'result': 'README.md file created successfully.', 'timestamp': '2025-07-25T11:40:25+0000'}
[{'type': 'user', 'content': 'Write readme.md file, which includes the summary of all the python files'}, {'type': 'assistant', 'content': '{"tool": "write_readme", "args": {"content": "README.md"}}'}, {'type': 'environment', 'content': '{"tool_executed": true, "result": "README.md file created successfully.", "timestamp": "2025-07-25T11:40:25+0000"}'}]
