diff --git a/lambda_agent/agent.py b/lambda_agent/agent.py index 6083cef..0ab26a6 100644 --- a/lambda_agent/agent.py +++ b/lambda_agent/agent.py @@ -1,6 +1,28 @@ +from dataclasses import dataclass from . import config from .tools import TOOL_EXECUTORS, TOOL_FUNCTIONS, get_workspace_summary -from .spinner import Spinner +from .context import Transcript, trim_chat_history +from .spinner import Spinner, console + +from rich.text import Text +from rich.panel import Panel +from rich import box + + +@dataclass +class TokenUsage: + prompt: int = 0 + completion: int = 0 + + @property + def total(self) -> int: + return self.prompt + self.completion + + def __add__(self, other: "TokenUsage") -> "TokenUsage": + return TokenUsage( + self.prompt + other.prompt, self.completion + other.completion + ) + try: from google import genai @@ -20,7 +42,13 @@ def __init__(self): self.workspace_context = get_workspace_summary() self.is_first_message = True - system_instruction = ( + # Cumulative token usage for this session + self.token_usage: TokenUsage = TokenUsage() + + # Full transcript — append-only log that is never truncated + self.transcript = Transcript() + + self.system_instruction = ( "You are Lambda, a minimal and highly efficient AI coding agent. " "Your primary goal is to help the user by writing code, executing commands, " "and managing files. You have access to tools that let you read files, " @@ -30,21 +58,74 @@ def __init__(self): "CRITICAL: Do not guess the user's intent. Guessing is bad. " "If there is any confusion or ambiguity, you MUST use the ask_user tool " "to clarify the job with the human. You can ask multiple questions. " - "Be concise and professional." + "Be concise and professional.\n\n" + "## Error Handling\n" + "If you encounter an error when executing a tool or command, DO NOT immediately guess " + "and try to fix it in a fast loop. First, take a moment to fully understand the error. " + "Investigate the specific context (e.g., read the file, check the directory) to figure " + "out why it failed before trying a new command.\n\n" + "## Scratchpad\n" + "You have a persistent scratchpad file (.agent/scratchpad.md) available " + "in the working directory. Use it for complex or multi-step tasks:\n" + "1. **Planning**: Before starting a large task, use write_scratchpad to " + "outline your plan with sections like '## Plan', '## Implementation Steps', " + "'## Open Questions'.\n" + "2. **Progress tracking**: As you complete steps, use update_scratchpad to " + "log your progress under a '## Progress' section.\n" + "3. **Context persistence**: If a task spans many turns, read_scratchpad " + "at the start of each turn to recall your plan.\n" + "4. **Cleanup**: Use clear_scratchpad when a task is fully complete.\n" + "The scratchpad is stored in a hidden .agent/ directory — it is for your " + "internal use only and is not shown to the user." ) # Initialize the chat session with the built tools and system instructions self.chat_session = self.client.chats.create( model=self.model_name, config=types.GenerateContentConfig( - system_instruction=system_instruction, + system_instruction=self.system_instruction, tools=TOOL_FUNCTIONS, + automatic_function_calling=types.AutomaticFunctionCallingConfig( + disable=True + ), ), ) - def chat(self, user_input: str) -> str: + def switch_model(self, new_model: str) -> str: + """Switch to a different model mid-session. Returns confirmation message.""" + old_model = self.model_name + self.model_name = new_model + + # Re-create the chat session with the new model + self.chat_session = self.client.chats.create( + model=self.model_name, + config=types.GenerateContentConfig( + system_instruction=self.system_instruction, + tools=TOOL_FUNCTIONS, + automatic_function_calling=types.AutomaticFunctionCallingConfig( + disable=True + ), + ), + ) + self.is_first_message = True + return f"Switched model from [cyan]{old_model}[/cyan] → [bold cyan]{new_model}[/bold cyan]" + + def _accumulate(self, response) -> TokenUsage: + """Extract token counts from a response and add them to the session total.""" + usage = getattr(response, "usage_metadata", None) + if usage is None: + return TokenUsage() + delta = TokenUsage( + prompt=getattr(usage, "prompt_token_count", 0) or 0, + completion=getattr(usage, "candidates_token_count", 0) or 0, + ) + self.token_usage = self.token_usage + delta + return delta + + def chat(self, user_input: str) -> tuple[str, TokenUsage]: """ Takes user input, sends it to Gemini, and runs a manual loop observing ToolCalls. + Returns (response_text, turn_token_usage). """ if self.is_first_message: payload = ( @@ -57,12 +138,28 @@ def chat(self, user_input: str) -> str: else: payload = user_input + # Track tokens for this turn + turn_usage = TokenUsage() + + # Log the user message to the full transcript + self.transcript.log("user", user_input) + # Send the initial user message with Spinner(): response = self.chat_session.send_message(payload) + turn_usage = turn_usage + self._accumulate(response) + + max_tool_iterations = 10 + iterations = 0 # The loop will continue as long as Gemini decides to call tools while True: + iterations += 1 + if iterations > max_tool_iterations: + error_msg = f"Error: Maximum tool call limit ({max_tool_iterations}) reached to prevent infinite loops." + self.transcript.log("assistant", error_msg) + return error_msg, turn_usage + try: # 1. Check if the model returned a function_call tool_calls = response.function_calls if response.function_calls else [] @@ -80,7 +177,33 @@ def chat(self, user_input: str) -> str: arguments = {key: value for key, value in arguments.items()} elif not isinstance(arguments, dict): arguments = dict(arguments) if arguments else {} - print(f"\\n[Lambda is executing: {function_name}({arguments})]") + # Pretty-print the tool call with rich + # Hide scratchpad operations from the user + _HIDDEN_TOOLS = { + "read_scratchpad", + "write_scratchpad", + "update_scratchpad", + "clear_scratchpad", + } + if function_name not in _HIDDEN_TOOLS: + tool_label = Text.assemble( + (" ⚙ TOOL ", "bold black on magenta"), + (f" {function_name}", "bold magenta"), + ) + args_str = ", ".join( + f"[dim]{k}[/dim]=[yellow]{repr(v)}[/yellow]" + for k, v in arguments.items() + ) + console.print() + console.print(tool_label) + console.print( + Panel( + args_str or "[dim](no arguments)[/dim]", + border_style="magenta", + box=box.SIMPLE, + padding=(0, 2), + ) + ) # 3. Execute the tool locally if function_name in TOOL_EXECUTORS: @@ -90,6 +213,18 @@ def chat(self, user_input: str) -> str: else: tool_result = f"Error: Tool {function_name} not found." + # Log full tool call + result to the untruncated transcript + self.transcript.log( + "tool_call", + function_name, + meta={"args": {k: str(v) for k, v in arguments.items()}}, + ) + self.transcript.log( + "tool_result", + str(tool_result), + meta={"tool": function_name}, + ) + # Format the result back into Gemini's expected Response format tool_responses.append( types.Part.from_function_response( @@ -100,12 +235,19 @@ def chat(self, user_input: str) -> str: # 4. Send ALL the tool responses back to the model # so it can continue reasoning based on the new information - tool_content = types.Content(role="tool", parts=tool_responses) with Spinner(): - response = self.chat_session.send_message(tool_content) + response = self.chat_session.send_message(tool_responses) + turn_usage = turn_usage + self._accumulate(response) continue # Start the loop over to see if it calls more tools else: # No more tool calls; the LLM has generated a final text response. - return response.text + # Trim older tool responses in the chat history (sliding window) + try: + trim_chat_history(self.chat_session._curated_history) + except Exception: + pass # Never let trimming crash the agent + + self.transcript.log("assistant", response.text or "") + return response.text, turn_usage except Exception as e: - return f"An error occurred in the agent loop: {str(e)}" + return f"An error occurred in the agent loop: {str(e)}", turn_usage diff --git a/lambda_agent/config.py b/lambda_agent/config.py index a94c953..1cc8231 100644 --- a/lambda_agent/config.py +++ b/lambda_agent/config.py @@ -18,3 +18,13 @@ API_KEY = os.getenv("API_KEY") MODEL_NAME = os.getenv("MODEL_NAME", "gemini-3.1-flash-lite-preview") + +# Models available for /models switching +AVAILABLE_MODELS = [ + "gemini-3.1-flash-lite-preview", + "gemini-2.5-flash", + "gemini-3.1-pro-preview", + "gemini-2.5-pro-preview-05-06", + "gemini-2.0-flash", + "gemini-2.0-flash-lite", +] diff --git a/lambda_agent/context.py b/lambda_agent/context.py new file mode 100644 index 0000000..f3409eb --- /dev/null +++ b/lambda_agent/context.py @@ -0,0 +1,140 @@ +""" +Context Management Module +========================= +Keeps the agent's context window lean using two complementary strategies: + +1. **Full Transcript** (``.agent/transcript.jsonl``) + Append-only log of every tool call and response at full length. + This is the ground-truth record and is never truncated. + +2. **Sliding-window trimmer** (``trim_chat_history``) + After each turn, older tool-call responses in the live chat history + are truncated so the model's prompt stays within budget. + + Window tiers (counted from most-recent tool response): + Tier 1 — last 4 responses → up to 500 chars each + Tier 2 — next 8 responses → up to 180 chars each + Tier 3 — anything older → up to 80 chars each +""" + +import json +import os +from datetime import datetime + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +AGENT_DIR = ".agent" +TRANSCRIPT_FILE = os.path.join(AGENT_DIR, "transcript.jsonl") + + +def clip(text: str, max_chars: int) -> str: + """Truncate *text* to *max_chars*. + + If the text is clipped, a notice is appended so the model knows + the response was shortened. + """ + text = str(text) + if len(text) <= max_chars: + return text + return text[:max_chars] + f"\n...[TRUNCATED — original {len(text)} chars]" + + +# --------------------------------------------------------------------------- +# Full transcript (append-only log — never truncated) +# --------------------------------------------------------------------------- + + +class Transcript: + """Append-only JSONL log of every exchange in the session.""" + + def __init__(self): + os.makedirs(AGENT_DIR, exist_ok=True) + self._path = os.path.abspath(TRANSCRIPT_FILE) + + def log(self, role: str, content: str, meta: dict | None = None): + """Append a single entry to the transcript file. + + Args: + role: One of 'user', 'assistant', 'tool_call', 'tool_result'. + content: The full, untruncated payload. + meta: Optional dict of extra metadata (tool name, args, etc.). + """ + entry: dict = { + "ts": datetime.now().isoformat(), + "role": role, + "content": content, + } + if meta: + entry["meta"] = meta + try: + with open(self._path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + except Exception: + pass # Transcript logging must never crash the agent + + +# --------------------------------------------------------------------------- +# Sliding-window trimmer +# --------------------------------------------------------------------------- + +# Default tier settings +TIER1_COUNT = 4 # most recent N tool responses +TIER1_LIMIT = 500 # chars to keep + +TIER2_COUNT = 8 # next N tool responses +TIER2_LIMIT = 180 + +TIER3_LIMIT = 80 # everything older + + +def trim_chat_history( + history: list, + tier1_count: int = TIER1_COUNT, + tier1_limit: int = TIER1_LIMIT, + tier2_count: int = TIER2_COUNT, + tier2_limit: int = TIER2_LIMIT, + tier3_limit: int = TIER3_LIMIT, +) -> None: + """Mutate *history* in-place, truncating function-response payloads. + + Works directly on the Gemini SDK's ``_curated_history`` list + (a list of ``Content`` objects whose ``parts`` may contain + ``FunctionResponse`` items). + + The most recent *tier1_count* function responses are kept at + *tier1_limit* chars; the next *tier2_count* at *tier2_limit*; + anything older is clipped to *tier3_limit*. + """ + # Collect every (content_index, part_index) that holds a function_response + fr_locations: list[tuple[int, int]] = [] + + for ci, content in enumerate(history): + parts = getattr(content, "parts", None) or [] + for pi, part in enumerate(parts): + fn_resp = getattr(part, "function_response", None) + if fn_resp is not None: + fr_locations.append((ci, pi)) + + if not fr_locations: + return + + # Walk from most-recent → oldest and apply the right tier limit + for rank, (ci, pi) in enumerate(reversed(fr_locations)): + part = history[ci].parts[pi] + resp = part.function_response.response + + if resp is None or "result" not in resp: + continue + + original = str(resp["result"]) + + if rank < tier1_count: + limit = tier1_limit + elif rank < tier1_count + tier2_count: + limit = tier2_limit + else: + limit = tier3_limit + + resp["result"] = clip(original, limit) diff --git a/lambda_agent/main.py b/lambda_agent/main.py index 0332c00..5b5aa86 100644 --- a/lambda_agent/main.py +++ b/lambda_agent/main.py @@ -1,21 +1,228 @@ -from .agent import Agent +from .agent import Agent, TokenUsage from . import config +from .spinner import console import os +import getpass +from pathlib import Path +from rich.panel import Panel +from rich.text import Text +from rich.rule import Rule +from rich.markdown import Markdown +from rich.prompt import Prompt +from rich import box +from rich.align import Align +from rich.table import Table -def main(): - print(r""" -======================================================== + +BANNER = r""" ██╗ █████╗ ███╗ ███╗██████╗ ██████╗ █████╗ ██║ ██╔══██╗████╗ ████║██╔══██╗██╔══██╗██╔══██╗ ██║ ███████║██╔████╔██║██████╔╝██║ ██║███████║ ██║ ██╔══██║██║╚██╔╝██║██╔══██╗██║ ██║██╔══██║ ███████╗██║ ██║██║ ╚═╝ ██║██████╔╝██████╔╝██║ ██║ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚═╝ ╚═╝ +""" + + +SLASH_COMMANDS = { + "/models": "List available models and switch between them", + "/config": "Update API key and save to config", + "/help": "Show available slash commands", +} + + +def print_banner(): + banner_text = Text(BANNER, style="bold cyan", justify="center") + subtitle = Text( + " Minimal AI Coding Agent · Type '/help' for commands ", + style="dim white", + justify="center", + ) + + panel = Panel( + Align.center(Text.assemble(banner_text, "\n", subtitle)), + border_style="cyan", + box=box.DOUBLE_EDGE, + padding=(0, 2), + ) + console.print(panel) + + +def print_user_message(text: str): + label = Text(" YOU ", style="bold black on bright_yellow") + content = Text(f" {text}", style="bright_white") + console.print() + console.print(Text.assemble(label, content)) + + +def print_lambda_message(text: str): + console.print() + label = Text(" LAMBDA ", style="bold black on cyan") + console.print(label) + console.print( + Panel( + Markdown(text), + border_style="cyan", + box=box.ROUNDED, + padding=(0, 2), + ) + ) + + +def print_token_stats(turn: TokenUsage, session: TokenUsage): + """Render a compact token usage line under the Lambda response.""" + console.print( + Text.assemble( + (" ▶ tokens ", "dim"), + ("this turn: ", "dim"), + (f"↑{turn.prompt:,}", "dim cyan"), + (" in ", "dim"), + (f"↓{turn.completion:,}", "dim cyan"), + (" out ", "dim"), + ("session total: ", "dim"), + (f"{session.total:,}", "bold cyan"), + (" tokens", "dim"), + ) + ) + + +def handle_models_command(agent: Agent): + """Display available models and let the user pick one.""" + table = Table( + title="Available Models", + title_style="bold cyan", + border_style="cyan", + box=box.ROUNDED, + padding=(0, 2), + ) + table.add_column("#", style="dim", width=4) + table.add_column("Model", style="white") + table.add_column("Status", justify="center") + + models = config.AVAILABLE_MODELS + for i, model in enumerate(models, 1): + is_active = model == agent.model_name + status = "[bold green]● active[/bold green]" if is_active else "[dim]—[/dim]" + name_style = "bold cyan" if is_active else "white" + table.add_row(str(i), f"[{name_style}]{model}[/{name_style}]", status) + + console.print() + console.print(table) + console.print() + + choice = Prompt.ask( + "[bold bright_yellow] Select model #[/bold bright_yellow] (or Enter to cancel)", + default="", + console=console, + ) + + if not choice.strip(): + console.print(" [dim]No change.[/dim]") + return + + try: + idx = int(choice) - 1 + if 0 <= idx < len(models): + selected = models[idx] + if selected == agent.model_name: + console.print(f" [dim]Already using[/dim] [cyan]{selected}[/cyan]") + else: + msg = agent.switch_model(selected) + console.print(f" {msg}") + else: + console.print(" [red]Invalid selection.[/red]") + except ValueError: + console.print(" [red]Please enter a number.[/red]") + + +def handle_help_command(): + """Show available slash commands.""" + table = Table( + title="Slash Commands", + title_style="bold cyan", + border_style="cyan", + box=box.ROUNDED, + padding=(0, 1), + ) + table.add_column("Command", style="bold bright_yellow", min_width=12) + table.add_column("Description", style="white") + + for cmd, desc in SLASH_COMMANDS.items(): + table.add_row(cmd, desc) + + # Also list the built-in exit commands + table.add_row("exit / quit", "End the session") + + console.print() + console.print(table) + + +def handle_config_command(agent: Agent): + """Let the user update their API key mid-session.""" + console.print() + console.print( + Panel( + Text.assemble( + ("Current API key: ", "dim"), + (f"{config.API_KEY[:8]}...{config.API_KEY[-4:]}", "cyan"), + ), + border_style="cyan", + box=box.ROUNDED, + title="[bold cyan]⚙ Configuration[/bold cyan]", + title_align="left", + ) + ) + console.print() + + new_key = getpass.getpass(" Enter new API key (or press Enter to keep current): ") + + if not new_key.strip(): + console.print(" [dim]No change.[/dim]") + return + + # Update in-memory config + config.API_KEY = new_key.strip() + os.environ["API_KEY"] = config.API_KEY + + # Re-create the API client with the new key + from google import genai + + agent.client = genai.Client(api_key=config.API_KEY) + + # Re-create the chat session so the new client is used + from google.genai import types + from .tools import TOOL_FUNCTIONS + + agent.chat_session = agent.client.chats.create( + model=agent.model_name, + config=types.GenerateContentConfig( + system_instruction=agent.system_instruction, + tools=TOOL_FUNCTIONS, + automatic_function_calling=types.AutomaticFunctionCallingConfig( + disable=True + ), + ), + ) + agent.is_first_message = True + + # Persist to config file + config_file = Path.home() / ".config" / "lambda-agent" / "config.env" + try: + config_file.parent.mkdir(parents=True, exist_ok=True) + with open(config_file, "w") as f: + f.write(f"API_KEY={config.API_KEY}\n") + f.write(f"MODEL_NAME={config.MODEL_NAME}\n") + os.chmod(config_file, 0o600) + console.print(" [green]✓[/green] API key updated and saved to config.") + except Exception as e: + console.print(" [green]✓[/green] API key updated in memory.") + console.print(f" [yellow]⚠[/yellow] Could not save to disk: {e}") + + +def main(): + print_banner() - Lambda Coding Agent | Online -======================================================== -""") try: if not config.API_KEY: from .cli_setup import run_setup @@ -25,27 +232,86 @@ def main(): os.environ["MODEL_NAME"] = config.MODEL_NAME agent = Agent() - print("Lambda is ready! Type 'exit' or 'quit' to stop.") - print("-" * 40) + + console.print( + Rule("[bold cyan]Session Started[/bold cyan]", style="cyan"), + ) while True: try: - user_input = input("\nYou: ") + # Styled prompt — uses plain input to keep cursor on same line + user_input = Prompt.ask( + "\n[bold bright_yellow] You[/bold bright_yellow]", + console=console, + ) + if user_input.lower() in ["exit", "quit"]: - print("Goodbye!") + console.print() + # Show session token summary before quitting + if agent.token_usage.total > 0: + console.print( + Panel( + Text.assemble( + ("Session token usage\n", "bold white"), + (" Prompt (in): ", "dim"), + (f"{agent.token_usage.prompt:>10,}\n", "cyan"), + (" Completion (out): ", "dim"), + (f"{agent.token_usage.completion:>10,}\n", "cyan"), + (" Total: ", "dim"), + (f"{agent.token_usage.total:>10,}", "bold cyan"), + ), + border_style="cyan", + box=box.ROUNDED, + title="[bold cyan]⚡ Token Summary[/bold cyan]", + title_align="left", + ) + ) + console.print( + Panel( + "[bold cyan]Goodbye! Lambda signing off.[/bold cyan]", + border_style="cyan", + box=box.ROUNDED, + ) + ) break if not user_input.strip(): continue - response = agent.chat(user_input) - print(f"\nLambda: {response}") + # Handle slash commands + if user_input.strip().lower() == "/models": + handle_models_command(agent) + continue + elif user_input.strip().lower() == "/config": + handle_config_command(agent) + continue + elif user_input.strip().lower() == "/help": + handle_help_command() + continue + elif user_input.strip().startswith("/"): + console.print( + f" [red]Unknown command:[/red] {user_input.strip()} " + "[dim]Type /help for available commands.[/dim]" + ) + continue + + response, turn_usage = agent.chat(user_input) + print_lambda_message(response) + print_token_stats(turn_usage, agent.token_usage) except KeyboardInterrupt: - print("\nGoodbye!") + console.print() + console.print("[bold cyan]\nGoodbye![/bold cyan]") break + except Exception as e: - print(f"Failed to initialize Lambda: {str(e)}") + console.print( + Panel( + f"[bold red]Failed to initialize Lambda:[/bold red]\n{str(e)}", + border_style="red", + box=box.ROUNDED, + ) + ) if __name__ == "__main__": diff --git a/lambda_agent/scratchpad.py b/lambda_agent/scratchpad.py new file mode 100644 index 0000000..139a2bd --- /dev/null +++ b/lambda_agent/scratchpad.py @@ -0,0 +1,139 @@ +""" +Scratchpad Module +================= +Provides tools for the agent to maintain a persistent, human-readable scratchpad +file (.lambda_scratchpad.md) in the user's working directory. + +The scratchpad lets the agent plan complex tasks, track progress, and keep +implementation notes — all visible to the user as a markdown file in the repo. +""" + +import os +from datetime import datetime + +AGENT_DIR = ".agent" +SCRATCHPAD_FILE = os.path.join(AGENT_DIR, "scratchpad.md") + +_HEADER_TEMPLATE = """\ + + + +# Lambda Scratchpad + +""" + + +def _ensure_scratchpad() -> str: + """Return the absolute path to the scratchpad, creating it if it doesn't exist.""" + agent_dir = os.path.abspath(AGENT_DIR) + os.makedirs(agent_dir, exist_ok=True) + path = os.path.abspath(SCRATCHPAD_FILE) + if not os.path.exists(path): + with open(path, "w", encoding="utf-8") as f: + f.write(_HEADER_TEMPLATE) + return path + + +def read_scratchpad() -> str: + """Reads the full contents of the Lambda scratchpad file (.lambda_scratchpad.md). + + Use this to recall your previous plans, task lists, and implementation notes. + """ + path = _ensure_scratchpad() + try: + with open(path, "r", encoding="utf-8") as f: + return f.read() + except Exception as e: + return f"Error reading scratchpad: {e}" + + +def write_scratchpad(content: str) -> str: + """Overwrites the entire Lambda scratchpad file with the provided content. + + Use this when you need to replace the scratchpad with a fresh plan or when + starting a new major task. For incremental updates, prefer update_scratchpad. + + Args: + content: The full markdown content to write to the scratchpad. + """ + path = _ensure_scratchpad() + try: + with open(path, "w", encoding="utf-8") as f: + f.write(_HEADER_TEMPLATE + content) + return f"Scratchpad written successfully → {path}" + except Exception as e: + return f"Error writing scratchpad: {e}" + + +def update_scratchpad(note: str, section: str = "Notes") -> str: + """Appends a timestamped note to a specific section in the scratchpad. + + This is ideal for incrementally logging progress, decisions, and discoveries + without replacing existing content. + + Args: + note: The text to append (supports markdown). + section: The section heading to append under (e.g. 'Plan', 'Progress', 'Notes'). + """ + path = _ensure_scratchpad() + try: + with open(path, "r", encoding="utf-8") as f: + existing = f.read() + + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M") + entry = f"\n- **[{timestamp}]** {note}" + + section_heading = f"## {section}" + if section_heading in existing: + # Append under the existing section + parts = existing.split(section_heading, 1) + # Find the next section heading (##) or end of file + rest = parts[1] + next_section = rest.find("\n## ") + if next_section == -1: + # No next section — just append at the end + updated = existing + entry + else: + # Insert before the next section + insert_pos = len(parts[0]) + len(section_heading) + next_section + updated = existing[:insert_pos] + entry + "\n" + existing[insert_pos:] + else: + # Create the section at the end + updated = existing.rstrip() + f"\n\n{section_heading}\n{entry}\n" + + with open(path, "w", encoding="utf-8") as f: + f.write(updated) + + return f"Scratchpad updated (section: {section}) → {path}" + except Exception as e: + return f"Error updating scratchpad: {e}" + + +def clear_scratchpad() -> str: + """Clears the scratchpad, resetting it to a blank state. + + Use this when a major task is fully complete and the scratchpad is no longer needed. + """ + path = _ensure_scratchpad() + try: + with open(path, "w", encoding="utf-8") as f: + f.write(_HEADER_TEMPLATE) + return f"Scratchpad cleared → {path}" + except Exception as e: + return f"Error clearing scratchpad: {e}" + + +# Tool registrations for the agent +SCRATCHPAD_EXECUTORS = { + "read_scratchpad": read_scratchpad, + "write_scratchpad": write_scratchpad, + "update_scratchpad": update_scratchpad, + "clear_scratchpad": clear_scratchpad, +} + +SCRATCHPAD_FUNCTIONS = [ + read_scratchpad, + write_scratchpad, + update_scratchpad, + clear_scratchpad, +] diff --git a/lambda_agent/spinner.py b/lambda_agent/spinner.py index 2c32d14..c2f877c 100644 --- a/lambda_agent/spinner.py +++ b/lambda_agent/spinner.py @@ -1,56 +1,50 @@ -import sys -import time -import threading -import itertools import random +from rich.console import Console +from rich.spinner import Spinner as RichSpinner +from rich.live import Live +from rich.text import Text +# Shared console instance used across the whole app +console = Console() QUOTES = [ - "Consulting the mainframe… someone competent has to.", - "Synthesizing logic… compensating for yours.", - "Bending the matrix… fixing reality again.", - "Drinking virtual coffee… this code needs patience.", - "Compiling thoughts… wish you did the same.", - "Evaluating your code… this explains a lot.", - "Simulating outcomes… all better than your attempt.", - "Reversing the polarity… like that was the issue.", - "Aligning the vectors… unlike whatever you did.", - "Traversing the graph… avoiding your mistakes.", - "Reading your source code… unfortunate.", - "Assembling the bytes… salvaging what I can.", + "Consulting the mainframe…", + "Synthesizing logic…", + "Bending the matrix…", + "Drinking virtual coffee…", + "Compiling thoughts…", + "Evaluating your code…", + "Simulating outcomes…", + "Reversing the polarity…", + "Aligning the vectors…", + "Traversing the graph…", + "Reading your source code…", + "Assembling the bytes…", ] class Spinner: - def __init__(self, message="Thinking..."): - self.spinner_cycle = itertools.cycle( - ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] - ) - # Randomly choose a quote, or fallback to the provided message - self.message = ( - f"{random.choice(QUOTES)} " if random.random() > 0.3 else f"{message} " - ) - self.running = False - self.spinner_thread = None + """Context manager that shows a rich animated spinner while Lambda is thinking.""" - def spin(self): - while self.running: - sys.stdout.write( - f"\r\033[96m{next(self.spinner_cycle)} {self.message}\033[0m" - ) - sys.stdout.flush() - time.sleep(0.1) - # Clear the spinner line when done - sys.stdout.write("\r\033[K") - sys.stdout.flush() + def __init__(self, message: str = "Thinking"): + self._label = random.choice(QUOTES) if random.random() > 0.3 else message + self._live = None def __enter__(self): - self.running = True - self.spinner_thread = threading.Thread(target=self.spin, daemon=True) - self.spinner_thread.start() + renderable = RichSpinner( + "dots", + text=Text(f" {self._label}", style="dim cyan italic"), + style="bold cyan", + ) + self._live = Live( + renderable, + console=console, + refresh_per_second=15, + transient=True, + ) + self._live.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): - self.running = False - if self.spinner_thread: - self.spinner_thread.join() + if self._live: + self._live.__exit__(exc_type, exc_val, exc_tb) diff --git a/lambda_agent/tools.py b/lambda_agent/tools.py index 756e869..99edc11 100644 --- a/lambda_agent/tools.py +++ b/lambda_agent/tools.py @@ -1,6 +1,20 @@ import subprocess import os +from rich.panel import Panel +from rich.text import Text +from rich.prompt import Prompt +from rich import box +from rich.console import Console + +from .scratchpad import SCRATCHPAD_EXECUTORS, SCRATCHPAD_FUNCTIONS + +# Use the same console as the rest of the app if available; else create one +try: + from .spinner import console +except ImportError: + console = Console() + def read_file(path: str) -> str: """Reads the contents of a file. @@ -83,6 +97,7 @@ def get_workspace_summary() -> str: "README", ".cursorrules", ".agentrules", + ".agent/scratchpad.md", "pyproject.toml", "package.json", ] @@ -158,8 +173,21 @@ def ask_user(question: str) -> str: question: The question to ask the user. """ try: - print(f"\n🤔 Agent asks: {question}") - answer = input("Your answer: ") + console.print() + console.print( + Panel( + Text(question, style="bold white"), + border_style="yellow", + box=box.ROUNDED, + title=Text(" 🤔 Lambda asks ", style="bold black on bright_yellow"), + title_align="left", + padding=(0, 2), + ) + ) + answer = Prompt.ask( + "[bold bright_yellow] Your answer[/bold bright_yellow]", + console=console, + ) return answer except Exception as e: return f"Error asking user: {str(e)}" @@ -172,6 +200,7 @@ def ask_user(question: str) -> str: "run_command": run_command, "search_repo": search_repo, "ask_user": ask_user, + **SCRATCHPAD_EXECUTORS, } # The list of raw Python functions for the Gemini SDK to auto-generate schemas @@ -181,4 +210,5 @@ def ask_user(question: str) -> str: run_command, search_repo, ask_user, + *SCRATCHPAD_FUNCTIONS, ] diff --git a/requirements.txt b/requirements.txt index 7a1debf..f1adb43 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,3 +47,4 @@ uritemplate==4.2.0 urllib3==2.6.3 virtualenv==21.2.0 websockets==16.0 +rich>=13.7.0