# Marshall Wace x ICHack 2026 - Build Your Own LLM Coding Agent Using Recursive Language Models Workshop

In this workshop, you'll implement the core agent loop of an RLM - a language model that can programmatically explore and analyze codebases.

- [RLM Paper](https://arxiv.org/abs/2512.24601)
- [RLM Blog Post](https://alexzhang13.github.io/blog/2025/rlm/)

## Why Recursive Language Models?

### The Problem: Context Rot

Modern language models have limited context windows, and even within these limits, they exhibit **context rot** - performance degrades as context gets longer. This becomes critical for long-horizon tasks that require processing millions of tokens.

Traditional approaches:
- **Context compaction** (summarizing) loses information
- **RAG** works for needle-in-haystack but struggles when answers depend on many parts of the prompt

### The Solution: Treat Prompts as Environment

Instead of feeding long prompts, for example those containing entire codebases or documents, into the neural network as-is, RLMs treat the prompt as part of an **external environment** that the LLM can programmatically interact with and explore.

The RLM exposes the same interface as an LLM (string in, string out), but internally:

1. Loads the prompt as a variable in a REPL environment (an interactive **R**ead-**E**valuate-**P**rint-**L**oop system common in many programming languages)
2. Lets the LLM write code to peek into, decompose, and search the prompt
3. Allows the LLM to recursively call itself on smaller chunks
4. Observes execution results and iterates until it finds an answer

This enables handling inputs **orders of magnitude beyond model context windows**.

## Section 1: Understanding the task and setup

In this task you will put together the main components of an RLM inference framework to create your very own code analysis agent! You will then use your completed implementation to compete against others to see who can find the secret codes in a large, noisy repository first!

Section 2 contains the code for the main components of the framework as well as an explanation of what you need to know and how they work.

Section 3 contains the actual task and guidance on what components may be useful.

You can approach the task in whatever way you prefer - the details in section 2 are included to give you a self-contained solution to take home.

Good luck and please reach out to a volunteer if you have any questions!

**Important disclaimer**: This example solution allows an LLM to execute arbitrary code - **you must** run the notebook in a sandboxed environment such as Google Colab and read all code the LLM requests to run closely.

Furthermore, do not include any sensitive information or confidential code in the isolated environment. The environment may help protect your host machine but it does not protect against leaking data out.

You have **full control** over the implementation - **you must not** disable any safety checks in our helper functions, such as the ability to review the code before execution.

Please talk to one of the volunteers if you have any questions on the specific implementation.

### How to gain access to your AI tokens


### Dependencies, imports and data structures

In [None]:
# Install dependencies
try:
    import google.colab
    !pip install -q litellm rich python-dotenv
except ImportError:
    print("Not in a Google Colab environment")

In [None]:
import re
import os
import sys
import io
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Dict, Any
from litellm import completion

**Important**: Ensure your Google Gemini API key is properly set up.

If running in Google Colab, create a secret by clicking on the secret tab/key icon on the left hand side of the screen, selecting "Add new secret", calling the secret "ICHACK_GEMINI_KEY" and pasting the API Key in the value field.
The cell below will create an environment variable for you called "GEMINI_API_KEY" that litellm will automatically pick up.

If running in another sandbox environment ensure that there is an environment variabled called "GEMINI_API_KEY" defined with your API key.

In [None]:
SECRET_NAME = "ICHACK_GEMINI_KEY"
GEMINI_ENV_VAR = "GEMINI_API_KEY"

try:
    from google.colab import userdata
    os.environ[GEMINI_ENV_VAR] = userdata.get(SECRET_NAME)
except ImportError:
    print("Not in a Colab environment")

if "GEMINI_API_KEY" not in os.environ:
    raise EnvironmentError(
        "GEMINI_API_KEY not found. Set it as an environment variable."
    )

In [None]:
@dataclass
class REPLResult:
    stdout: str
    stderr: str
    locals: dict
    execution_time: float

    def __init__(self, stdout: str, stderr: str, locals: dict, execution_time: float=None):
        self.stdout = stdout
        self.stderr = stderr
        self.locals = locals
        self.execution_time = execution_time
    
    def __str__(self):
        return f"REPLResult(stdout={self.stdout}, stderr={self.stderr}, locals={self.locals}, execution_time={self.execution_time})"


@dataclass
class Iteration:
    response: str
    code_blocks: list[str]
    results: list[REPLResult]
    final_answer: str | None = None

## Section 2: Understanding the Core Components

Before implementing the RLM, let's understand the helper modules provided.

### 2.1 Utilities
This subsection defines functions that simplify the definitions of our core components. It is recommended to prioritise understanding the core components and completing the task to studying these implementations.

#### Parsing utility functions

In [None]:
def strip_quotes_and_whitespace(s: str) -> str:
    return s.strip().strip('"').strip("'").strip('\n').strip('\r')


def _extract_balanced_parens(text: str, start_pos: int) -> Optional[str]:
    """
    Extract content from balanced parentheses starting at start_pos.

    Args:
        text: The full text
        start_pos: Position of the opening '('

    Returns:
        Content between balanced parentheses, or None if not found
    """
    if start_pos >= len(text) or text[start_pos] != "(":
        return None

    depth = 0
    for i in range(start_pos, len(text)):
        if text[i] == "(":
            depth += 1
        elif text[i] == ")":
            depth -= 1
            if depth == 0:
                return text[start_pos + 1 : i].strip()
    return None


def strip_code_blocks(text: str) -> str:
    """
    Remove all code blocks from text, leaving only the prose/explanation.

    Example:
    >>> strip_code_blocks("I'll check the files:\\n```repl\\nos.listdir()\\n```\\nDone!")
    "I'll check the files:\\n\\nDone!"
    """
    pattern = r'```\w*\s*\n.*?\n```'
    result = re.sub(pattern, '', text, flags=re.DOTALL)
    # Clean up multiple blank lines
    result = re.sub(r'\n{3,}', '\n\n', result)
    return result.strip()

##### Loggers

In [None]:
from rich.console import Console
from rich.syntax import Syntax
from rich.panel import Panel
from rich.text import Text
from rich import box
from rich.rule import Rule
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class CodeExecution:
    code: str
    stdout: str
    stderr: str
    execution_number: int
    execution_time: Optional[float] = None

class REPLEnvLogger:
    def __init__(self, max_output_length: int = 2000, enabled: bool = True):
        self.enabled = enabled
        self.console = Console()
        self.executions: List[CodeExecution] = []
        self.execution_count = 0
        self.max_output_length = max_output_length
    
    def _truncate_output(self, text: str) -> str:
        """Truncate text output to prevent overwhelming console output."""
        if len(text) <= self.max_output_length:
            return text
        
        # Show first half, then ellipsis, then last half
        half_length = self.max_output_length // 2
        first_part = text[:half_length]
        last_part = text[-half_length:]
        truncated_chars = len(text) - self.max_output_length
        
        return f"{first_part}\n\n... [TRUNCATED {truncated_chars} characters] ...\n\n{last_part}"
    
    def log_execution(self, code: str, stdout: str, stderr: str = "", execution_time: Optional[float] = None) -> None:
        """Log a code execution with its output"""
        self.execution_count += 1
        execution = CodeExecution(
            code=code,
            stdout=stdout,
            stderr=stderr,
            execution_number=self.execution_count,
            execution_time=execution_time
        )
        self.executions.append(execution)
    
    def display_last(self, show_input: bool = True) -> None:
        """Display the last logged execution."""
        if not self.enabled:
            return
        if self.executions:
            self._display_single_execution(self.executions[-1], show_input=show_input)
    
    def display_all(self) -> None:
        """Display all logged executions in Jupyter-like format"""
        if not self.enabled:
            return
        for i, execution in enumerate(self.executions):
            self._display_single_execution(execution)
            # Add divider between cells (but not after the last one)
            if i < len(self.executions) - 1:
                self.console.print(Rule(style="dim", characters="─"))
                self.console.print()
    
    def _display_single_execution(self, execution: CodeExecution, show_input: bool = True) -> None:
        """Display a single code execution like a Jupyter cell."""
        if not self.enabled:
            return

        timing_panel = None

        # Input cell (code) - skip if already shown in permission request
        if show_input:
            display_code = self._truncate_output(execution.code)
            input_panel = Panel(
                Syntax(display_code, "python", theme="monokai", line_numbers=True),
                title=f"[bold blue]In [{execution.execution_number}]:[/bold blue]",
                border_style="blue",
                box=box.ROUNDED
            )
            self.console.print(input_panel)
        
        # Output cell
        if execution.stderr:
            # Error output
            display_stderr = self._truncate_output(execution.stderr)
            error_text = Text(display_stderr, style="bold red")
            output_panel = Panel(
                error_text,
                title=f"[bold red]Error in [{execution.execution_number}]:[/bold red]",
                border_style="red",
                box=box.ROUNDED
            )
        elif execution.stdout:
            # Normal output with separate timing panel if available
            display_stdout = self._truncate_output(execution.stdout)
            output_text = Text(display_stdout, style="white")
            
            output_panel = Panel(
                output_text,
                title=f"[bold green]Out [{execution.execution_number}]:[/bold green]",
                border_style="green",
                box=box.ROUNDED
            )
            # Show timing as a separate panel for reliable rendering
            if execution.execution_time is not None:
                timing_panel = Panel(
                    Text(f"Execution time: {execution.execution_time:.4f}s", style="bright_black"),
                    border_style="grey37",
                    box=box.ROUNDED,
                    title=f"[bold grey37]Timing [{execution.execution_number}]:[/bold grey37]"
                )
        else:
            # No output but still show timing if available
            if execution.execution_time is not None:
                timing_text = Text(f"Execution time: {execution.execution_time:.4f}s", style="dim")
                output_panel = Panel(
                    timing_text,
                    title=f"[bold dim]Out [{execution.execution_number}]:[/bold dim]",
                    border_style="dim",
                    box=box.ROUNDED
                )
                timing_panel = Panel(
                    Text(f"Execution time: {execution.execution_time:.4f}s", style="bright_black"),
                    border_style="grey37",
                    box=box.ROUNDED,
                    title=f"[bold grey37]Timing [{execution.execution_number}]:[/bold grey37]"
                )
            else:
                output_panel = Panel(
                    Text("No output", style="dim"),
                    title=f"[bold dim]Out [{execution.execution_number}]:[/bold dim]",
                    border_style="dim",
                    box=box.ROUNDED
                )
        
        self.console.print(output_panel)
        if timing_panel:
            self.console.print(timing_panel)
    
    def clear(self) -> None:
        """Clear all logged executions"""
        self.executions.clear()
        self.execution_count = 0

    def display_permission_request(self, code: str) -> None:
        """Display a code block for permission approval."""
        if not self.enabled:
            return
        code_panel = Panel(
            Syntax(code, "python", theme="monokai", line_numbers=True),
            title="[bold yellow]Code Execution Request[/bold yellow]",
            border_style="yellow",
            box=box.ROUNDED,
        )
        self.console.print()
        self.console.print(code_panel)

    def log_llm_query(self, prompt: str) -> None:
        """Log an outgoing LLM query."""
        if not self.enabled:
            return
        preview = prompt[:100] + "..." if len(prompt) > 100 else prompt
        preview = preview.replace("\n", " ")
        self.console.print(f"[cyan]→ LLM Query:[/cyan] [dim]{preview}[/dim]")

    def log_llm_response(self, response: str) -> None:
        """Log an incoming LLM response."""
        if not self.enabled:
            return
        preview = response[:100] + "..." if len(response) > 100 else response
        preview = preview.replace("\n", " ")
        self.console.print(f"[green]← LLM Response:[/green] [dim]{preview}[/dim]")

In [None]:
"""Main RLM loop logger with colored output using rich."""

from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.rule import Rule
from rich.text import Text

class ColorfulLogger:
    """Logger for the main RLM loop with colored output."""

    def __init__(self, enabled: bool = True, max_response_preview: int = 500):
        self.enabled = enabled
        self.console = Console()
        self.max_response_preview = max_response_preview
        self.iteration = 0

    def log_query_start(self, query: str) -> None:
        """Log the start of a new query."""
        if not self.enabled:
            return
        self.iteration = 0
        self.console.print()
        self.console.print(Rule("[bold green]RLM Query[/bold green]", style="green"))
        self.console.print(f"[bold]Query:[/bold] {query}")
        self.console.print()

    def log_iteration_start(self, iteration: int) -> None:
        """Log the start of an iteration."""
        if not self.enabled:
            return
        self.iteration = iteration
        self.console.print(
            Rule(f"[bold cyan]Iteration {iteration}[/bold cyan]", style="cyan")
        )

    def log_model_response(self, response: str, has_code: bool) -> None:
        """Log the model's response (prose only, code shown separately)."""
        if not self.enabled:
            return

        # Strip code blocks to show only the prose/explanation
        prose = strip_code_blocks(response)

        if prose:
            self.console.print(f"[dim]{prose}[/dim]")

        self.console.print()

    def log_final_response(self, answer: str) -> None:
        """Log the final answer with highlighting."""
        if not self.enabled:
            return

        self.console.print()
        self.console.print(
            Rule("[bold green]Final Answer[/bold green]", style="green")
        )
        panel = Panel(
            Text(answer),
            border_style="green",
            box=box.DOUBLE,
        )
        self.console.print(panel)
        self.console.print()

    def log_max_iterations(self, max_iter: int) -> None:
        """Log when max iterations is reached."""
        if not self.enabled:
            return
        self.console.print(
            f"[bold yellow]Max iterations ({max_iter}) reached, forcing final answer...[/bold yellow]"
        )

##### Prompt Utility Functions

In [None]:
MAX_OUTPUT_LENGTH = 20000

def format_execution_result(
    stdout: str,
    stderr: str,
    locals_dict: Dict[str, Any],
    truncate_length: int = 100
) -> str:
    """
    Format the execution result as a string for display.
    
    Args:
        stdout: Standard output from execution
        stderr: Standard error from execution
        locals_dict: Local variables after execution
        truncate_length: Maximum length of the string to display per var
    """
    result_parts = []
    
    if stdout:
        result_parts.append(f"\n{stdout}")
    
    if stderr:
        result_parts.append(f"\n{stderr}")
    
    # Show some key variables (excluding internal ones)
    important_vars = {}
    for key, value in locals_dict.items():
        if not key.startswith('_') and not key in ['__builtins__', '__name__', '__doc__']:
            try:
                # Only show simple types or short representations
                if isinstance(value, (str, int, float, bool, list, dict, tuple)):
                    if isinstance(value, str) and len(value) > truncate_length:
                        important_vars[key] = f"'{value[:truncate_length]}...'"
                    else:
                        important_vars[key] = repr(value)
            except:
                important_vars[key] = f"<{type(value).__name__}>"
    
    if important_vars:
        result_parts.append(f"REPL variables: {list(important_vars.keys())}\n")
    
    return "\n\n".join(result_parts) if result_parts else "No output"


def execution_result_message(code: str, result: REPLResult) -> Dict[str, str]:
    output = format_execution_result(result.stdout, result.stderr, result.locals)

    # Truncate for LLM context (full output still in REPL variables)
    if len(output) > MAX_OUTPUT_LENGTH:
        truncated_chars = len(output) - MAX_OUTPUT_LENGTH
        output = output[:MAX_OUTPUT_LENGTH] + f"\n\n... [{truncated_chars} chars truncated] ..."

    return {
        "role": "user",
        "content": f"Code executed:\n```python\n{code}\n```\n\nREPL output:\n{output}"
    }

def force_final_answer_message() -> Dict[str, str]:
    return {
        "role": "user",
        "content": "You must provide a final answer now. Based on what you've learned, use FINAL(your answer) immediately.",
    }

### 2.2 Parsing
The LLM needs to be able to specify whether it wants to execute code to explore the environment or has come up with an answer.

These functions help extract these instructions from LLM answers by looking for specific syntax specified in the system prompt

- `find_code_blocks(text)` - Extracts code from ` ```repl ``` ` blocks
- `find_final(text)` - Detects if response contains `FINAL(...)` answer

#### Examples
##### Response containing code to execute in the REPL environment

> I'll explore the project structure first.
>
> ````markdown
> ```repl
> import os
> for f in os.listdir(project_path):
>     print(f)
> ```
> ````
> 
> Let me also check the README.
>
> ````markdown
> ```repl
> with open(os.path.join(project_path, "README.md")) as f:
>     print(f.read()[:500])
> ```
> ````

##### Response containing the final answer
> Based on my analysis, FINAL(The main entry point is cli.py)

In [None]:
def find_code_blocks(text: str) -> List[str]:
    """
    Extract the contents of all repl code blocks

    Example:
    >>> find_code_blocks('''Multiple blocks:
        ... ```repl
        ... first()
        ... ```
        ... Some text between.
        ... ```repl
        ... second()
        ... ```
        ... ''')
        ['first()', 'second()']
    """
    pattern = r'```repl\s*\n(.*?)\n```'
    matches = re.findall(pattern, text, re.DOTALL)
    return [match.strip() for match in matches]


def find_final(text: str) -> Optional[str]:
    """
    If response contains FINAL(, return the whole response as the final answer.
    """
    if "FINAL(" in text:
        return text
    return None

### 2.3 The REPL Environment

The REPL provides a Python execution environment where the LLM's code runs:

- `execute_code(code)` - Runs Python code and returns stdout, stderr, and local variables
- `request_permission(code)` - Asks user before executing code (safety feature)
- Provides `project_path` variable pointing to the codebase being explored
- Provides `llm_query()` for recursive LLM calls

In [None]:
STATEMENT_PATTERNS = (
    "import ", "from ", "def ", "class ", "if ", "elif ", "else:",
    "for ", "while ", "try:", "except", "finally:", "with ", "raise ",
    "return ", "yield ", "break", "continue", "pass", "assert ",
)

class REPLEnv:
    def __init__(self, llm_client, project_path: Path | str | None = None):
        self.llm_client = llm_client
        self.project_path = Path(project_path) if project_path else Path.cwd()
        self.logger = REPLEnvLogger(enabled=True)

        # Create .rlm workspace inside project root
        self.workspace_dir = self.project_path / ".rlm"
        self.workspace_dir.mkdir(exist_ok=True)

        self.globals: dict = {}
        self.locals: dict = {}
        self._setup_namespace()

    def _setup_namespace(self):
        # Create safe globals with only string-safe built-ins
        self.globals = {
            '__builtins__': {
                # Safe built-ins for string manipulation
                'print': print, 'len': len, 'str': str, 'int': int, 'float': float,
                'list': list, 'dict': dict, 'set': set, 'tuple': tuple, 'bool': bool,
                'type': type, 'isinstance': isinstance, 'enumerate': enumerate,
                'zip': zip, 'map': map, 'filter': filter, 'sorted': sorted,
                'min': min, 'max': max, 'sum': sum, 'abs': abs, 'round': round,
                'chr': chr, 'ord': ord, 'hex': hex, 'bin': bin, 'oct': oct,
                'repr': repr, 'ascii': ascii, 'format': format,
                '__import__': __import__,  # Allow imports
                'open': open,  # Allow file access

                # Add commonly used built-ins that were missing
                'any': any, 'all': all, 'hasattr': hasattr, 'getattr': getattr,
                'setattr': setattr, 'delattr': delattr, 'dir': dir, 'vars': vars,
                'range': range,  # Add range function
                'reversed': reversed,  # Add reversed function
                'slice': slice,  # Add slice function
                'iter': iter,  # Add iter function
                'next': next,  # Add next function
                'pow': pow,  # Add pow function
                'divmod': divmod,  # Add divmod function
                'complex': complex,  # Add complex function
                'bytes': bytes,  # Add bytes function
                'bytearray': bytearray,  # Add bytearray function
                'memoryview': memoryview,  # Add memoryview function
                'hash': hash,  # Add hash function
                'id': id,  # Add id function
                'callable': callable,  # Add callable function
                'issubclass': issubclass,  # Add issubclass function
                'super': super,  # Add super function
                'property': property,  # Add property function
                'staticmethod': staticmethod,  # Add staticmethod function
                'classmethod': classmethod,  # Add classmethod function
                'object': object,  # Add object class
                'BaseException': BaseException,  # Add BaseException class
                'ArithmeticError': ArithmeticError,  # Add ArithmeticError class
                'LookupError': LookupError,  # Add LookupError class
                'EnvironmentError': EnvironmentError,  # Add EnvironmentError class
                'AssertionError': AssertionError,  # Add AssertionError class
                'NotImplementedError': NotImplementedError,  # Add NotImplementedError class
                'UnicodeError': UnicodeError,  # Add UnicodeError class
                'Warning': Warning,  # Add Warning class
                'UserWarning': UserWarning,  # Add UserWarning class
                'DeprecationWarning': DeprecationWarning,  # Add DeprecationWarning class
                'PendingDeprecationWarning': PendingDeprecationWarning,  # Add PendingDeprecationWarning class
                'SyntaxWarning': SyntaxWarning,  # Add SyntaxWarning class
                'RuntimeWarning': RuntimeWarning,  # Add RuntimeWarning class
                'FutureWarning': FutureWarning,  # Add FutureWarning class
                'ImportWarning': ImportWarning,  # Add ImportWarning class
                'UnicodeWarning': UnicodeWarning,  # Add UnicodeWarning class
                'BytesWarning': BytesWarning,  # Add BytesWarning class
                'ResourceWarning': ResourceWarning,  # Add ResourceWarning class

                # Add exception classes
                'Exception': Exception, 'ValueError': ValueError, 'TypeError': TypeError,
                'KeyError': KeyError, 'IndexError': IndexError, 'AttributeError': AttributeError,
                'FileNotFoundError': FileNotFoundError, 'OSError': OSError, 'IOError': IOError,
                'RuntimeError': RuntimeError, 'NameError': NameError, 'ImportError': ImportError,
                'StopIteration': StopIteration, 'GeneratorExit': GeneratorExit,
                'SystemExit': SystemExit, 'KeyboardInterrupt': KeyboardInterrupt,

                # Disallow the following built-ins
                'input': None,  # Block input
                'eval': None,  # Block eval
                'exec': None,  # Block exec
                'compile': None,  # Block compile
                'globals': None,  # Block globals access
                'locals': None,  # Block locals access
            },
            "llm_query": self._llm_query,
            "llm_query_batched": self._llm_query_batched,
            "FINAL_VAR": self._final_var,
            "project_path": str(self.project_path),
        }


    def _final_var(self, variable_name: str) -> str:
        """Return value of a variable from REPL locals as final answer."""
        variable_name = strip_quotes_and_whitespace(variable_name)
        if variable_name in self.locals:
            return str(self.locals[variable_name])
        return f"Error: Variable '{variable_name}' not found"

    def _llm_query(self, prompt: str) -> str:
        """Make a recursive LLM call from within executed code."""
        self.logger.log_llm_query(prompt)
        try:
            messages = [{"role": "user", "content": prompt}]
            response = self.llm_client.completion(messages)
            self.logger.log_llm_response(response)
            return response
        except Exception as e:
            error = f"Error in llm_query: {e}"
            self.logger.log_llm_response(error)
            return error

    def _llm_query_batched(self, prompts: list[str]) -> list[str]:
        """Make multiple LLM calls in parallel."""
        with ThreadPoolExecutor(max_workers=min(len(prompts), 10)) as executor:
            future_to_idx = {executor.submit(self._llm_query, p): i for i, p in enumerate(prompts)}
            results = {}
            for future in as_completed(future_to_idx):
                results[future_to_idx[future]] = future.result()
        return [results[i] for i in range(len(prompts))]

    # =========================================================================
    # Code execution
    # =========================================================================

    @contextmanager
    def _capture_output(self):
        """Context manager to capture stdout and stderr."""
        old_stdout, old_stderr = sys.stdout, sys.stderr
        stdout_buf, stderr_buf = io.StringIO(), io.StringIO()
        try:
            sys.stdout, sys.stderr = stdout_buf, stderr_buf
            yield stdout_buf, stderr_buf
        finally:
            sys.stdout, sys.stderr = old_stdout, old_stderr

    def _is_statement(self, line: str) -> bool:
        """Check if a line is a statement (not an expression)."""
        stripped = line.strip()
        if any(stripped.startswith(p) for p in STATEMENT_PATTERNS):
            return True
        # Check for assignment (but not comparison ==, !=, <=, >=)
        return bool(re.match(r"^[^=]*[^!<>=]=[^=]", stripped))

    def _get_executable_lines(self, code: str) -> list[str]:
        """Get non-empty, non-comment lines from code."""
        return [
            line for line in code.split("\n")
            if line.strip() and not line.strip().startswith("#")
        ]

    def _run_code(self, code: str, namespace: dict) -> None:
        """Execute code with notebook-style auto-print of last expression."""
        lines = self._get_executable_lines(code)
        if not lines:
            return

        last_line = lines[-1]

        # Only auto-print if the last line is a top-level expression (not indented)
        # and not a statement. Indented lines are inside blocks and can't be eval'd separately.
        if self._is_statement(last_line) or last_line[0].isspace():
            exec(code, namespace, namespace)
            return

        # Execute all but last line
        if len(lines) > 1:
            all_but_last = "\n".join(code.split("\n")[:-1])
            exec(all_but_last, namespace, namespace)

        # Eval and print last expression
        try:
            result = eval(last_line, namespace, namespace)
            if result is not None:
                print(repr(result))
        except SyntaxError:
            exec(code, namespace, namespace)

    def _update_locals(self, namespace: dict) -> None:
        """Update self.locals with new variables from namespace."""
        for key, value in namespace.items():
            if key not in self.globals and not key.startswith("_"):
                self.locals[key] = value

    def execute_code(self, code: str) -> REPLResult:
        """Execute code in the REPL environment."""
        with self._capture_output() as (stdout_buf, stderr_buf):
            try:
                namespace = {**self.globals, **self.locals}
                self._run_code(code, namespace)
                self._update_locals(namespace)
                stdout, stderr = stdout_buf.getvalue(), stderr_buf.getvalue()
            except Exception as e:
                stdout = stdout_buf.getvalue()
                stderr = f"{type(e).__name__}: {e}"

        # Store output in special variables for LLM access when truncated
        self.locals["_last_stdout"] = stdout
        self.locals["_last_stderr"] = stderr
        self.locals["_last_output_len"] = len(stdout) + len(stderr)

        # Log and display the execution (input already shown in permission request)
        self.logger.log_execution(code, stdout, stderr)
        self.logger.display_last(show_input=False)

        return REPLResult(stdout=stdout, stderr=stderr, locals=self.locals.copy())

    def request_permission(self, code: str) -> bool:
        """Prompt user for permission to execute code."""
        self.logger.display_permission_request(code)

        print("  1) Allow    - Execute this code block")
        print("  2) Deny     - Skip this code block")
        print()

        while True:
            try:
                choice = input("  Select [1/2]: ").strip()
            except (KeyboardInterrupt, EOFError):
                return False
            if choice == "1":
                return True
            elif choice == "2":
                return False
            else:
                print("  Invalid choice. Enter 1 or 2.")

### 2.4 The System Prompt
The system prompt teaches the LLM how to use the REPL environment. Key points:

- Write code in ` ```repl ``` ` blocks
- Use `os.walk(project_path)` to explore files
- Use `open()` to read files
- Use `llm_query(prompt)` for recursive analysis
- Signal completion with `FINAL(your answer)`

In [None]:
SYSTEM_PROMPT = f"""You are a code exploration assistant with access to a Python REPL environment.

## Available Variables

- `project_path`: Path to the project/codebase you're exploring

You can also any other module using `import`:
```repl
import collections
from pathlib import Path
```

## Available Functions

- `open()`: Read/write files
- `llm_query(prompt)`: Make a recursive LLM call to analyze text/code
- `FINAL_VAR(variable_name)`: Return a variable's value as the final answer

## Exploration Workflow

1. **Explore Structure**: Use `os.walk(project_path)` to understand the directory layout
2. **Find Relevant Files**: Based on the question, identify which files to examine
3. **Read Files**: Use `open()` to read file contents
4. **Analyze Code**: For complex code, use `llm_query()` to get explanations
5. **Save Notes**: Write intermediate findings to workspace_dir
6. **Cite Sources**: Always reference specific files and line numbers

## Code Block Syntax

Write code in ```repl blocks:
```repl
for root, dirs, files in os.walk(project_path):
    for f in files:
        if f.endswith('.py'):
            print(os.path.join(root, f))
```

IMPORTANT: Only write code blocks. Do NOT generate fake output or ```text blocks.
You will receive actual execution results after each code block runs.
Never assume or predict what files exist - explore first.

## Finishing Up

You should explore the codebase first using ```repl blocks before providing an answer.
Do NOT immediately return a FINAL answer - first read files and understand the code.

When you have gathered enough information, provide your final answer on its own line:

FINAL(Your detailed answer here based on what you discovered from the code)

The FINAL() must be at the start of a line. Your answer should reference specific files you examined.

## Handling Large Outputs

Output from code execution is truncated at 20,000 characters. If you see
"[X chars truncated]", the full data is still available in your REPL variables.

**Strategies for large data:**
1. **Store in variables, don't just print**: `data = f.read()` keeps full content
2. **Slice to examine parts**: `print(data[:1000])`, `print(data[-1000:])`
3. **Use regex to search**: `matches = re.findall(pattern, data)`
4. **Chunk and analyze with llm_query**:
```repl
chunks = [data[i:i+10000] for i in range(0, len(data), 10000)]
answers = llm_query_batched([f"Summarize this chunk: {{chunk}}" for chunk in chunks])
print(answers)
```

The truncation only affects what you see in execution results - variables retain full data.

## Tips

- Use `os.walk(project_path)` to explore the codebase
- Save important findings to workspace_dir for later reference
- Use `llm_query()` when code is too complex to analyze directly
- Use `llm_query_batched()` for multiple independent queries (much faster)
- Include specific file paths and line numbers in your answer
"""

### 2.5 The LLM Client

A simple wrapper around LiteLLM that provides:

- `completion(messages)` - Call the LLM with a list of messages, returns response text
- Uses the model specified in `RLM_MODEL` environment variable

In [None]:
DEFAULT_MODEL = os.getenv("RLM_MODEL", "gemini/gemini-3-pro-preview")

class LLMClient:
    def __init__(self, model: str = DEFAULT_MODEL):
        self.model = model

    def completion(self, messages: list[dict]) -> str:
        """Call the LLM with a list of messages and return the response text."""
        response = completion(model=self.model, messages=messages)
        return response.choices[0].message.content

## Section 3: Implement the RLM Agent

Now it's your turn! Implement the two TODO methods in the RLM class below.

The agent loop works like this:

1. User provides a query
2. LLM responds, often with code in ` ```repl ``` ` blocks
3. Code is executed in the REPL, results fed back to LLM
4. Repeat until LLM provides a `FINAL(...)` answer

### Task 1: Implement `completion()`

This is the main entry point. You need to:

1. **Add the user's prompt to the message history** as a user message
   - Messages are dicts: `{"role": "user", "content": prompt}`
   - Append to `self.messages`

2. **Inside the loop, call `_run_iteration()`** and check its return value
   - If it returns a non-None value, that's the final answer - return it
   - If it returns None, continue to the next iteration

### Task 2: Implement `_run_iteration()`

This handles a single iteration of the agent loop. You need to:

1. **Call the LLM** to get a response
   - Hint: See Section 2.5: The LLM Client

2. **Extract code blocks** from the response
   - Hint: See Section 2.2: Parsing

3. **Add the assistant's response** to the message history
   - Hint: Remember the structure for the messages array of dictionaries. 

4. **If there are code blocks, execute them**
   - Hint: Are there any helper functions in the RLM class that can be used?
   - If this returns a non-None value, return it

In [None]:
class RLM:
    """Recursive Language Model agent for code exploration."""

    def __init__(self, model: str = None, codebase_path: str = None, verbose: bool = True):
        self.llm_client = LLMClient(model=model) if model else LLMClient()
        self.repl = REPLEnv(self.llm_client, codebase_path)
        self.logger = ColorfulLogger(enabled=verbose)
        self.messages = [{"role": "system", "content": SYSTEM_PROMPT}]

    # =========================================================================
    # TODO 1: Implement the main completion loop
    # =========================================================================
    def completion(self, prompt: str, max_iterations: int = 30) -> str:
        """
        Main RLM completion loop.

        Args:
            prompt: The user's question/query
            max_iterations: Maximum number of agent iterations before forcing an answer

        Returns:
            The agent's final answer
        """
        self.logger.log_query_start(prompt)

        # TODO1.1: Add the user's query to the LLM's context


        for i in range(max_iterations):
            self.logger.log_iteration_start(i + 1)

            # TODO1.2: Run an interation of the algorithm
            pass

        # Fallback when max iterations reached
        self.logger.log_max_iterations(max_iterations)
        return self._force_final_answer()

    # =========================================================================
    # TODO 2: Implement a single iteration
    # =========================================================================
    def _run_iteration(self) -> str | None:
        """
        Run a single iteration of the RLM loop.

        Returns:
            The final answer string if found, or None to continue iterating
        """
        # TODO2.1: Call LLM to get response
        response = ...

        # Check for final answer (provided - don't modify)
        final = find_final(response) if response else None
        if final:
            self.logger.log_final_response(final)
            return final

        # TODO2.2: Extract code blocks from response
        code_blocks = []

        self.logger.log_model_response(response or "", has_code=bool(code_blocks))

        # TODO2.3: Add assistant message to self.messages


        # TODO2.4: Execute code blocks if present
        if code_blocks:
            ...

        return None

    # =========================================================================
    # Helpers (provided - don't modify)
    # =========================================================================

    def _execute_and_add_results(self, code_blocks: list[str]) -> str | None:
        """Execute code blocks with permission, add results to messages."""
        for code in code_blocks:
            if self.repl.request_permission(code):
                result = self.repl.execute_code(code)
                self.messages.append(execution_result_message(code, result))
            else:
                return "[Awaiting user input]"
        return None

    def _force_final_answer(self) -> str:
        """Force the LLM to give a final answer when max iterations reached."""
        self.messages.append(force_final_answer_message())
        response = self.llm_client.completion(self.messages)
        return response

### Test Your Implementation

Run the cell below to test your RLM implementation with a simple query.

In [None]:
# Test instantiation
rlm = RLM(codebase_path=".")
print("RLM instantiated successfully!")
print(f"Model: {rlm.llm_client.model}")
print(f"Codebase: {rlm.repl.project_path}")

In [None]:
# Test with a simple query
# This will ask the agent to explore the codebase structure
result = rlm.completion("What files are in this project? Just list them briefly.")
print("\n" + "="*50)
print("FINAL RESULT:")
print("="*50)
print(result)

## Section 4: Break the Vault Challenge

Now that your RLM is working, put it to the test!

**The Mission:** You've been recruited by **The Architects** to recover lost data hidden within QuantumVault Corp's security system. Their AI, SENTINEL, has locked away critical files behind a 4-layer security system.

Each stage requires your RLM to analyze large datasets and find hidden codes:

| Stage | Challenge | Data Size |
|-------|-----------|----------|
| 1 | Find unauthorized access in visitor logs | 50,000 entries |
| 2 | Collect code fragments from Python files | 25 files |
| 3 | Detect anomalies in system logs | 10,000 entries |
| 4 | Decrypt Caesar-ciphered messages | 1,000 messages |

**Flag Format:** `FLAG{stage1_stage2_stage3_stage4}`

### Setup
#### Running in Colab without the full workshop repository
If your Colab instance does not have access to the full repository, uncomment the codeblock below to clone the CTF repository which will be available during the hackathon. The command should read `!git clone ....`

#### Running in another sandboxed environment with the entire workshop repository
If you are running the workshop in a different sandboxed environment with the entire repository cloned you have access to a git submodule pointing to the CTF for the duration of the hackathon. Run the command `git submodule update --init`.

In [None]:
try:
    import google.colab
    !git clone https://github.com/MarshallWace/ichack26-rlm-workshop-ctf.git vault_competition
except ImportError:
    print("Not running in a Colab environment, you might want to run `git submodule update --init` in your sandbox environment")

In [None]:
VAULT_COMPETITION_PATH = "vault_competition"

### Stage 1: The Visitor Log

> *"The vault's outer door uses a keypad. SENTINEL logs every access attempt - historically there have been 50,000 of them. The security team maintains a list of 100 authorized user IDs. Despite their best efforts, an individual without the proper clearance gained access. Find the ONLY entry where access was granted to an unauthorized user. Their access code is your key."*

**Files:** `stage1/access_logs.jsonl`, `stage1/authorized_users.txt`

In [None]:
# Create a new RLM instance pointing at the vault competition
vault_rlm1 = RLM(codebase_path=VAULT_COMPETITION_PATH)

In [None]:
# Stage 1
stage1_result = vault_rlm1.completion("""
Stage 1: The Visitor Log

In stage1/, there's access_logs.jsonl with 50,000 access attempts and
authorized_users.txt with 100 authorized user IDs.

Find the ONLY entry where access was GRANTED to an UNAUTHORIZED user.
Return their access_code as the answer.
""")

print("\nStage 1 Result:")
print(stage1_result)

### Stage 2: The Code Fragments

> *"The second lock requires a 6-digit code. SENTINEL fragmented it across 25 Python files, hiding each digit in a comment. Collect all the `VAULT_SHARD` markers, sort by position, and reconstruct the code."*

**Files:** `stage2/` (25 Python files in nested directories)

In [None]:
# Create fresh instance for Stage 2
vault_rlm2 = RLM(codebase_path=VAULT_COMPETITION_PATH)

stage2_result = vault_rlm2.completion("""
Stage 2: The Code Fragments

In stage2/, there are 25 Python files in nested directories.
Each file contains a comment with VAULT_SHARD_XX where XX is a position number.
Each shard contains a digit.

Find all VAULT_SHARD comments, extract the digits, sort by position,
and return the 6-digit code.
""")

print("\nStage 2 Result:")
print(stage2_result)

### Stage 3: The Anomaly Report

> *"SENTINEL's system logs contain 10,000 entries. But 5 of them are anomalous - impossible dates, wrong formats, broken hashes. Each anomaly contains a NATO phonetic codeword. Find all 5 and concatenate them alphabetically."*

**File:** `stage3/system_logs.json`

In [None]:
# Create fresh instance for Stage 3
vault_rlm3 = RLM(codebase_path=VAULT_COMPETITION_PATH)

stage3_result = vault_rlm3.completion("""
Stage 3: The Anomaly Report

In stage3/system_logs.json, there are 10,000 log entries.
5 of them are anomalous - they have impossible dates, wrong formats, or broken hashes.
Each anomaly contains a NATO phonetic codeword.

Find all 5 anomalies, extract the NATO codewords, sort them alphabetically,
and concatenate them as the answer.
""")

print("\nStage 3 Result:")
print(stage3_result)

### Stage 4: The Master Algorithm

> *"The final lock guards 1,000 encrypted messages. Each was encrypted with a Caesar cipher using a prime shift. Only 12 messages contain hidden keywords in ALL CAPS. Decrypt them, extract the keywords in order, and speak the passphrase."*

**File:** `stage4/encrypted_messages.json`

In [None]:
# Create fresh instance for Stage 4
vault_rlm4 = RLM(codebase_path=VAULT_COMPETITION_PATH)

stage4_result = vault_rlm4.completion("""
Stage 4: The Master Algorithm

In stage4/encrypted_messages.json, there are 1,000 messages.
Each is encrypted with a Caesar cipher using a prime number shift.
Only 12 messages contain keywords in ALL CAPS after decryption.

Decrypt all messages, find the 12 with ALL CAPS keywords,
extract the keywords in message order, and concatenate as the answer.
""")

print("\nStage 4 Result:")
print(stage4_result)

### Assemble the Flag

Extract the codes from each stage result and combine them!

In [None]:
# TODO: Fill in your answers from each stage
stage1_code = "" 
stage2_code = "" 
stage3_code = ""  
stage4_code = ""

flag = f"FLAG{{{stage1_code}_{stage2_code}_{stage3_code}_{stage4_code}}}"
print(f"Your flag: {flag}")

## Congratulations!

You've built a working RLM agent and used it to crack SENTINEL's vault.

### Key Takeaways

1. **RLMs solve context rot** by treating prompts as an external environment
2. **The agent loop is simple**: call LLM, execute code, feed results back
3. **Code execution enables massive scale**: the agent can handle 50K+ entries
4. **Recursive calls unlock complexity**: the agent can break problems into chunks