Skip to content
Merged

Dev #13

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 152 additions & 10 deletions lambda_agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
from dataclasses import dataclass
from . import config
from .tools import TOOL_EXECUTORS, TOOL_FUNCTIONS, get_workspace_summary
from .spinner import Spinner
from .context import Transcript, trim_chat_history
from .spinner import Spinner, console

from rich.text import Text
from rich.panel import Panel
from rich import box


@dataclass
class TokenUsage:
prompt: int = 0
completion: int = 0

@property
def total(self) -> int:
return self.prompt + self.completion

def __add__(self, other: "TokenUsage") -> "TokenUsage":
return TokenUsage(
self.prompt + other.prompt, self.completion + other.completion
)


try:
from google import genai
Expand All @@ -20,7 +42,13 @@ def __init__(self):
self.workspace_context = get_workspace_summary()
self.is_first_message = True

system_instruction = (
# Cumulative token usage for this session
self.token_usage: TokenUsage = TokenUsage()

# Full transcript — append-only log that is never truncated
self.transcript = Transcript()

self.system_instruction = (
"You are Lambda, a minimal and highly efficient AI coding agent. "
"Your primary goal is to help the user by writing code, executing commands, "
"and managing files. You have access to tools that let you read files, "
Expand All @@ -30,21 +58,74 @@ def __init__(self):
"CRITICAL: Do not guess the user's intent. Guessing is bad. "
"If there is any confusion or ambiguity, you MUST use the ask_user tool "
"to clarify the job with the human. You can ask multiple questions. "
"Be concise and professional."
"Be concise and professional.\n\n"
"## Error Handling\n"
"If you encounter an error when executing a tool or command, DO NOT immediately guess "
"and try to fix it in a fast loop. First, take a moment to fully understand the error. "
"Investigate the specific context (e.g., read the file, check the directory) to figure "
"out why it failed before trying a new command.\n\n"
"## Scratchpad\n"
"You have a persistent scratchpad file (.agent/scratchpad.md) available "
"in the working directory. Use it for complex or multi-step tasks:\n"
"1. **Planning**: Before starting a large task, use write_scratchpad to "
"outline your plan with sections like '## Plan', '## Implementation Steps', "
"'## Open Questions'.\n"
"2. **Progress tracking**: As you complete steps, use update_scratchpad to "
"log your progress under a '## Progress' section.\n"
"3. **Context persistence**: If a task spans many turns, read_scratchpad "
"at the start of each turn to recall your plan.\n"
"4. **Cleanup**: Use clear_scratchpad when a task is fully complete.\n"
"The scratchpad is stored in a hidden .agent/ directory — it is for your "
"internal use only and is not shown to the user."
)

# Initialize the chat session with the built tools and system instructions
self.chat_session = self.client.chats.create(
model=self.model_name,
config=types.GenerateContentConfig(
system_instruction=system_instruction,
system_instruction=self.system_instruction,
tools=TOOL_FUNCTIONS,
automatic_function_calling=types.AutomaticFunctionCallingConfig(
disable=True
),
),
)

def chat(self, user_input: str) -> str:
def switch_model(self, new_model: str) -> str:
"""Switch to a different model mid-session. Returns confirmation message."""
old_model = self.model_name
self.model_name = new_model

# Re-create the chat session with the new model
self.chat_session = self.client.chats.create(
model=self.model_name,
config=types.GenerateContentConfig(
system_instruction=self.system_instruction,
tools=TOOL_FUNCTIONS,
automatic_function_calling=types.AutomaticFunctionCallingConfig(
disable=True
),
),
)
self.is_first_message = True
return f"Switched model from [cyan]{old_model}[/cyan] → [bold cyan]{new_model}[/bold cyan]"

def _accumulate(self, response) -> TokenUsage:
"""Extract token counts from a response and add them to the session total."""
usage = getattr(response, "usage_metadata", None)
if usage is None:
return TokenUsage()
delta = TokenUsage(
prompt=getattr(usage, "prompt_token_count", 0) or 0,
completion=getattr(usage, "candidates_token_count", 0) or 0,
)
self.token_usage = self.token_usage + delta
return delta

def chat(self, user_input: str) -> tuple[str, TokenUsage]:
"""
Takes user input, sends it to Gemini, and runs a manual loop observing ToolCalls.
Returns (response_text, turn_token_usage).
"""
if self.is_first_message:
payload = (
Expand All @@ -57,12 +138,28 @@ def chat(self, user_input: str) -> str:
else:
payload = user_input

# Track tokens for this turn
turn_usage = TokenUsage()

# Log the user message to the full transcript
self.transcript.log("user", user_input)

# Send the initial user message
with Spinner():
response = self.chat_session.send_message(payload)
turn_usage = turn_usage + self._accumulate(response)

max_tool_iterations = 10
iterations = 0

# The loop will continue as long as Gemini decides to call tools
while True:
iterations += 1
if iterations > max_tool_iterations:
error_msg = f"Error: Maximum tool call limit ({max_tool_iterations}) reached to prevent infinite loops."
self.transcript.log("assistant", error_msg)
return error_msg, turn_usage

try:
# 1. Check if the model returned a function_call
tool_calls = response.function_calls if response.function_calls else []
Expand All @@ -80,7 +177,33 @@ def chat(self, user_input: str) -> str:
arguments = {key: value for key, value in arguments.items()}
elif not isinstance(arguments, dict):
arguments = dict(arguments) if arguments else {}
print(f"\\n[Lambda is executing: {function_name}({arguments})]")
# Pretty-print the tool call with rich
# Hide scratchpad operations from the user
_HIDDEN_TOOLS = {
"read_scratchpad",
"write_scratchpad",
"update_scratchpad",
"clear_scratchpad",
}
if function_name not in _HIDDEN_TOOLS:
tool_label = Text.assemble(
(" ⚙ TOOL ", "bold black on magenta"),
(f" {function_name}", "bold magenta"),
)
args_str = ", ".join(
f"[dim]{k}[/dim]=[yellow]{repr(v)}[/yellow]"
for k, v in arguments.items()
)
console.print()
console.print(tool_label)
console.print(
Panel(
args_str or "[dim](no arguments)[/dim]",
border_style="magenta",
box=box.SIMPLE,
padding=(0, 2),
)
)

# 3. Execute the tool locally
if function_name in TOOL_EXECUTORS:
Expand All @@ -90,6 +213,18 @@ def chat(self, user_input: str) -> str:
else:
tool_result = f"Error: Tool {function_name} not found."

# Log full tool call + result to the untruncated transcript
self.transcript.log(
"tool_call",
function_name,
meta={"args": {k: str(v) for k, v in arguments.items()}},
)
self.transcript.log(
"tool_result",
str(tool_result),
meta={"tool": function_name},
)

# Format the result back into Gemini's expected Response format
tool_responses.append(
types.Part.from_function_response(
Expand All @@ -100,12 +235,19 @@ def chat(self, user_input: str) -> str:

# 4. Send ALL the tool responses back to the model
# so it can continue reasoning based on the new information
tool_content = types.Content(role="tool", parts=tool_responses)
with Spinner():
response = self.chat_session.send_message(tool_content)
response = self.chat_session.send_message(tool_responses)
turn_usage = turn_usage + self._accumulate(response)
continue # Start the loop over to see if it calls more tools
else:
# No more tool calls; the LLM has generated a final text response.
return response.text
# Trim older tool responses in the chat history (sliding window)
try:
trim_chat_history(self.chat_session._curated_history)
except Exception:
pass # Never let trimming crash the agent

self.transcript.log("assistant", response.text or "")
return response.text, turn_usage
except Exception as e:
return f"An error occurred in the agent loop: {str(e)}"
return f"An error occurred in the agent loop: {str(e)}", turn_usage
10 changes: 10 additions & 0 deletions lambda_agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@

API_KEY = os.getenv("API_KEY")
MODEL_NAME = os.getenv("MODEL_NAME", "gemini-3.1-flash-lite-preview")

# Models available for /models switching
AVAILABLE_MODELS = [
"gemini-3.1-flash-lite-preview",
"gemini-2.5-flash",
"gemini-3.1-pro-preview",
"gemini-2.5-pro-preview-05-06",
"gemini-2.0-flash",
"gemini-2.0-flash-lite",
]
140 changes: 140 additions & 0 deletions lambda_agent/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
Context Management Module
=========================
Keeps the agent's context window lean using two complementary strategies:

1. **Full Transcript** (``.agent/transcript.jsonl``)
Append-only log of every tool call and response at full length.
This is the ground-truth record and is never truncated.

2. **Sliding-window trimmer** (``trim_chat_history``)
After each turn, older tool-call responses in the live chat history
are truncated so the model's prompt stays within budget.

Window tiers (counted from most-recent tool response):
Tier 1 — last 4 responses → up to 500 chars each
Tier 2 — next 8 responses → up to 180 chars each
Tier 3 — anything older → up to 80 chars each
"""

import json
import os
from datetime import datetime

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

AGENT_DIR = ".agent"
TRANSCRIPT_FILE = os.path.join(AGENT_DIR, "transcript.jsonl")


def clip(text: str, max_chars: int) -> str:
"""Truncate *text* to *max_chars*.

If the text is clipped, a notice is appended so the model knows
the response was shortened.
"""
text = str(text)
if len(text) <= max_chars:
return text
return text[:max_chars] + f"\n...[TRUNCATED — original {len(text)} chars]"


# ---------------------------------------------------------------------------
# Full transcript (append-only log — never truncated)
# ---------------------------------------------------------------------------


class Transcript:
"""Append-only JSONL log of every exchange in the session."""

def __init__(self):
os.makedirs(AGENT_DIR, exist_ok=True)
self._path = os.path.abspath(TRANSCRIPT_FILE)

def log(self, role: str, content: str, meta: dict | None = None):
"""Append a single entry to the transcript file.

Args:
role: One of 'user', 'assistant', 'tool_call', 'tool_result'.
content: The full, untruncated payload.
meta: Optional dict of extra metadata (tool name, args, etc.).
"""
entry: dict = {
"ts": datetime.now().isoformat(),
"role": role,
"content": content,
}
if meta:
entry["meta"] = meta
try:
with open(self._path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
pass # Transcript logging must never crash the agent


# ---------------------------------------------------------------------------
# Sliding-window trimmer
# ---------------------------------------------------------------------------

# Default tier settings
TIER1_COUNT = 4 # most recent N tool responses
TIER1_LIMIT = 500 # chars to keep

TIER2_COUNT = 8 # next N tool responses
TIER2_LIMIT = 180

TIER3_LIMIT = 80 # everything older


def trim_chat_history(
history: list,
tier1_count: int = TIER1_COUNT,
tier1_limit: int = TIER1_LIMIT,
tier2_count: int = TIER2_COUNT,
tier2_limit: int = TIER2_LIMIT,
tier3_limit: int = TIER3_LIMIT,
) -> None:
"""Mutate *history* in-place, truncating function-response payloads.

Works directly on the Gemini SDK's ``_curated_history`` list
(a list of ``Content`` objects whose ``parts`` may contain
``FunctionResponse`` items).

The most recent *tier1_count* function responses are kept at
*tier1_limit* chars; the next *tier2_count* at *tier2_limit*;
anything older is clipped to *tier3_limit*.
"""
# Collect every (content_index, part_index) that holds a function_response
fr_locations: list[tuple[int, int]] = []

for ci, content in enumerate(history):
parts = getattr(content, "parts", None) or []
for pi, part in enumerate(parts):
fn_resp = getattr(part, "function_response", None)
if fn_resp is not None:
fr_locations.append((ci, pi))

if not fr_locations:
return

# Walk from most-recent → oldest and apply the right tier limit
for rank, (ci, pi) in enumerate(reversed(fr_locations)):
part = history[ci].parts[pi]
resp = part.function_response.response

if resp is None or "result" not in resp:
continue

original = str(resp["result"])

if rank < tier1_count:
limit = tier1_limit
elif rank < tier1_count + tier2_count:
limit = tier2_limit
else:
limit = tier3_limit

resp["result"] = clip(original, limit)
Loading
Loading