# NB1 — Minimal Coding Agent with E2B (OpenAI LLM)

**Goal:** one-shot coding agent that asks OpenAI to write a small Python program, then executes it safely inside an **E2B** sandbox (Firecracker microVM). Includes a single self-heal retry if execution fails.

**What you’ll see**
1) Generate code with OpenAI’s **Responses API**
2) Start an **E2B** sandbox and run the code with `run_code`
3) Capture stdout/stderr, and retry once with error feedback

**Why E2B?** Sandboxed execution is perfect for coding agents: it’s isolated, ephemeral, and lets the agent install packages and run arbitrary commands without touching your host.

> Docs: OpenAI Responses API · E2B Quickstart / Python `run_code`


## Prereqs

- Python ≥3.9
- Environment variables set:
  - `OPENAI_API_KEY`
  - `E2B_API_KEY`

*(Optional)* Create a `.env` and export from your shell, but **don’t store real secrets inside your repo**.

### Install packages (run once per environment)
We pin known-good versions. Feel free to upgrade later.

```bash
%pip install -U e2b-code-interpreter openai==1.108.1 python-dotenv>=1.0 tenacity>=8.2 pydantic>=2.7
```

If you're on corporate proxies, set `PIP_INDEX_URL` accordingly.

In [None]:
# --- Imports & env checks ---
import os, re, textwrap, json
from typing import Optional, Tuple

from openai import OpenAI
from e2b_code_interpreter import Sandbox
from tenacity import retry, stop_after_attempt, wait_exponential

# --- Load env from repo-local file (without clobbering OS env) ---
from pathlib import Path
try:
    from dotenv import load_dotenv
except ImportError as e:
    raise ImportError("Install python-dotenv: pip install python-dotenv") from e

ENV_PATH = Path(os.getenv("NB1_ENV_PATH", "../infra/.env")).resolve()

if ENV_PATH.exists():
    load_dotenv(ENV_PATH, override=False)  # keep exported OS vars authoritative
else:
    # Optional: keep silent in CI; or raise if you want hard fail
    print(f"[NB1] Note: {ENV_PATH} not found; relying on OS env only.")


missing = [v for v in ("OPENAI_API_KEY", "E2B_API_KEY") if not os.getenv(v)]
if missing:
    raise EnvironmentError(
        f"Set required environment variables: {missing}.\n"
        "Example (bash): export OPENAI_API_KEY=sk-...; export E2B_API_KEY=e2b_...; export OPENAI_ORGANIZATION=org-..."
    )

MODEL = os.getenv("NB1_OPENAI_MODEL", "gpt-5-nano")  # keep cost low; upgrade as you like
# OPENAI_ORGANIZATION = os.getenv("OPENAI_ORGANIZATION")
OPENAI_PROJECT_ID = os.getenv("OPENAI_PROJECT_ID")
# print({"model": MODEL, "org": OPENAI_ORGANIZATION, "project": OPENAI_PROJECT_ID})

In [None]:
os.getenv('OPENAI_API_KEY')

In [None]:
# --- Small utilities: parse code blocks and pretty printing ---
def extract_code_blocks(text: str, language: str = "python") -> str:
    """Return the first ```python ...``` fenced block, or raw text if none."""
    fence = re.compile(rf"```{language}\n(.*?)```", re.DOTALL | re.IGNORECASE)
    m = fence.search(text)
    if m:
        return m.group(1).strip()
    # try any fenced block
    any_fence = re.compile(r"```([\s\S]*?)```", re.MULTILINE)
    m2 = any_fence.search(text)
    return (m2.group(1).strip() if m2 else text).strip()

def summarize_execution(execution) -> dict:
    """Best-effort extraction of stdout/stderr/text depending on SDK version."""
    out = {}
    
    # Handle None result
    if execution is None:
        return {"text": None, "stdout": [], "stderr": []}
    
    # Handle E2B Code Interpreter SDK Execution objects
    if hasattr(execution, 'logs'):
        logs = execution.logs
        if hasattr(logs, 'stdout'):
            out["stdout"] = logs.stdout if isinstance(logs.stdout, list) else [str(logs.stdout)]
        if hasattr(logs, 'stderr'):
            out["stderr"] = logs.stderr if isinstance(logs.stderr, list) else [str(logs.stderr)]
    
    # Check for text attribute
    if hasattr(execution, 'text'):
        out["text"] = execution.text
    
    # Check for direct stdout/stderr attributes (fallback)
    if "stdout" not in out and hasattr(execution, 'stdout'):
        stdout = execution.stdout
        if isinstance(stdout, list):
            out["stdout"] = stdout
        else:
            out["stdout"] = [str(stdout)] if stdout else []
    
    if "stderr" not in out and hasattr(execution, 'stderr'):
        stderr = execution.stderr
        if isinstance(stderr, list):
            out["stderr"] = stderr
        else:
            out["stderr"] = [str(stderr)] if stderr else []
    
    # Check for other common attributes
    for attr in ("output", "result"):
        if hasattr(execution, attr):
            out[attr] = getattr(execution, attr)
    
    # If we don't have stdout/stderr from above, set defaults
    if "stdout" not in out:
        out["stdout"] = []
    if "stderr" not in out:
        out["stderr"] = []
    
    return out

In [None]:
# --- LLM call: OpenAI Responses API ---
client = OpenAI() # project=OPENAI_PROJECT_ID)  # uses OPENAI_API_KEY

SYSTEM_PROMPT = (
    "You are a disciplined coding agent. Write a *single* runnable Python script.\n"
    "Constraints:\n"
    "- No external files; everything in one script.\n"
    "- Print clear results to stdout.\n"
    "- If tests are needed, use simple asserts in __main__ instead of pytest.\n"
    "Output strictly as a fenced code block:```python ...``` and nothing else."
)

def llm_generate_code(task: str) -> str:
    """Ask the model to produce a single Python script as a fenced block."""
    resp = client.responses.create(
        model=MODEL,
        input=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": task},
        ],
    )
    # Extract text per the Responses API object shape
    try:
        text = resp.output[0].content[0].text
    except Exception:
        # Fallback if SDK shape differs
        text = getattr(resp, "output_text", str(resp))
    return extract_code_blocks(text, language="python")

In [None]:
# --- E2B execution helpers ---
def run_in_e2b(code: str) -> dict:
    """Create a temporary sandbox, run the code, return logs."""
    # Sandbox is auto-terminated after leaving the context
    with Sandbox.create() as sbx:
        exec_result = sbx.run_code(code)
        return summarize_execution(exec_result)

def failed(exe_summary: dict) -> bool:
    stderr = exe_summary.get("stderr")
    if stderr and str(stderr).strip():
        return True
    # fallbacks for older SDKs
    text = exe_summary.get("text")
    if isinstance(text, str) and any(tok in text.lower() for tok in ("traceback", "error", "exception")):
        return True
    return False


In [None]:
# --- The simplest coding agent (one retry) ---
def coding_agent(task: str, max_retries: int = 1) -> dict:
    code = llm_generate_code(task)
    first = run_in_e2b(code)
    attempt = 0
    while failed(first) and attempt < max_retries:
        attempt += 1
        feedback = (
            "The previous script failed. Here are the logs (trimmed).\n\n"
            f"STDERR:\n{str(first.get('stderr',''))[:2000]}\n\n"
            f"TEXT:\n{str(first.get('text',''))[:2000]}\n\n"
            "Please FIX the bug and output ONLY a single ```python``` block containing the full corrected script."
        )
        code = llm_generate_code(task + "\n\n" + feedback)
        first = run_in_e2b(code)
    return {"code": code, "execution": first, "retries": attempt}


## Demo: a tiny programming task
We’ll ask the agent to implement a small script that:
1) Generates the first _n_ Fibonacci numbers
2) Prints them and their sum
3) Asserts a couple of quick checks in `__main__`

You can change the task to anything that’s safe to run in a sandbox.

In [None]:
TASK = (
    "Write a single Python script that defines a function fibonacci(n) -> list[int]"
    ", prints the first 10 numbers and their sum, and includes a few asserts in __main__."
    " Avoid external dependencies."
)
result = coding_agent(TASK, max_retries=1)
print("\n--- Generated Code ---\n")
print(result["code"])
print("\n--- Execution Summary ---\n")
print(json.dumps(result["execution"], indent=2, default=str))
print({"retries": result["retries"]})

## Notes & Next Steps
- **Timeout/TTL:** a default sandbox lifetime is short (minutes). For longer sessions, keep it open and reuse it.
- **Packages:** you can install packages at runtime with `commands.run('pip install ...')` or build a custom template image.
- **Shell commands:** prefer `sandbox.commands.run()` for bash-style steps.
- **Iteration:** For NB2, we’ll add loop control, state tracking, and traces.

**Links** *(open when connected to the internet)*:
- OpenAI Responses API Quickstart & Reference
- E2B Quickstart (start sandbox, env var, run code)
- E2B Python `run_code` + commands docs


# Continuation — Artifacts & Sandbox Management

In this section we:
1) **Persist** a sandbox to capture multiple files/folders created by code.
2) **List** the sandbox filesystem as a tree.
3) **Download** artifacts preserving folder structure (either all files or a single compressed tarball).
4) **Monitor** active sandboxes and **shut them down**.


In [None]:
# --- Persistent sandbox + filesystem helpers ---
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from e2b_code_interpreter import Sandbox
import os, io, tarfile, time, json, shlex
from pathlib import Path

PERSIST_TIMEOUT_SECONDS = int(os.getenv("NB1_PERSIST_TIMEOUT", "600"))  # 10 min

@dataclass
class FileEntry:
    path: str
    is_dir: bool
    size: Optional[int] = None

def extract_output_from_execution(execution) -> str:
    """Extract text output from E2B execution result."""
    if execution is None:
        return ""
    
    # The E2B SDK returns Execution objects with logs.stdout as a list
    if hasattr(execution, 'logs') and hasattr(execution.logs, 'stdout'):
        stdout_list = execution.logs.stdout
        if isinstance(stdout_list, list):
            return '\n'.join(stdout_list)
        else:
            return str(stdout_list) if stdout_list else ""
    
    # Fallback checks
    if hasattr(execution, 'text') and execution.text:
        return execution.text
    
    if hasattr(execution, 'stdout'):
        stdout = execution.stdout
        if isinstance(stdout, list):
            return '\n'.join(stdout)
        else:
            return str(stdout) if stdout else ""
    
    return str(execution) if execution else ""

def list_tree(sbx: Sandbox, root: str = "/home/user") -> List[Dict[str, Any]]:
    """
    Return a list[dict] with entries in 'root' (recursive).
    Uses a Python script executed via run_code to walk the filesystem.
    """
    # Use run_code to execute Python directly in the sandbox
    code = f'''
import os, json
root = {json.dumps(root)}
results = []
try:
    for dp, dns, fns in os.walk(root):
        for name in dns:
            p = os.path.join(dp, name)
            try:
                st = os.lstat(p)
                results.append({{"path": p, "type": "dir", "size": st.st_size, "mtime": int(st.st_mtime)}})
            except Exception as e:
                results.append({{"path": p, "type": "dir", "error": str(e)}})
        for name in fns:
            p = os.path.join(dp, name)
            try:
                st = os.lstat(p)
                results.append({{"path": p, "type": "file", "size": st.st_size, "mtime": int(st.st_mtime)}})
            except Exception as e:
                results.append({{"path": p, "type": "file", "error": str(e)}})
    for r in results:
        print(json.dumps(r))
except Exception as e:
    print(json.dumps({{"error": f"Failed to walk {{root}}: {{str(e)}}"}}))
'''
    result = sbx.run_code(code)
    output = extract_output_from_execution(result)
    
    entries = []
    if output:
        for line in output.strip().split('\n'):
            if line.strip():
                try:
                    entries.append(json.loads(line.strip()))
                except json.JSONDecodeError:
                    continue
    return entries

def print_tree(entries: List[Dict[str, Any]], root: str = "/home/user"):
    """
    Pretty-print entries from list_tree output.
    """
    from os.path import relpath

    for e in sorted(entries, key=lambda x: x.get("path", "")):
        rel = relpath(e["path"], root) if e.get("path") else "?"
        suffix = f"  [ERR {e.get('error')}]" if e.get("error") else ""
        size_info = f" ({e.get('size', 0)} bytes)" if e.get('type') == 'file' else ""
        print(f"{(e.get('type') or '?'):4}  {rel}{size_info}{suffix}")

def download_all_as_tar(sbx: Sandbox, remote_root: str = "/home/user", local_tar_path: str = None) -> str:
    """Create a tar.gz in the sandbox and stream it locally. Preserves structure."""
    # Use a safe, writable local path in the current working directory
    if local_tar_path is None:
        local_tar_path = "artifacts/e2b_demo_project.tar.gz"
    
    local_tar_path = Path(local_tar_path)
    local_tar_path.parent.mkdir(parents=True, exist_ok=True)

    # Create a gzip tar inside the sandbox using run_code with subprocess
    remote_tar = "/tmp/bundle.tar.gz"
    tar_code = f'''
import subprocess
import os
try:
    result = subprocess.run([
        "tar", "-czf", "{remote_tar}", 
        "-C", "{remote_root}", "."
    ], capture_output=True, text=True, check=True)
    print(f"Tar created successfully: {{result.returncode}}")
except subprocess.CalledProcessError as e:
    print(f"Tar creation failed: {{e.stderr}}")
    raise
except Exception as e:
    print(f"Error: {{str(e)}}")
    raise
'''
    tar_result = sbx.run_code(tar_code)
    tar_output = extract_output_from_execution(tar_result)
    print("Tar creation result:", tar_output)

    # Read the tar file using run_code
    read_code = f'''
with open("{remote_tar}", "rb") as f:
    import base64
    data = f.read()
    encoded = base64.b64encode(data).decode()
    print("BASE64_START")
    print(encoded)
    print("BASE64_END")
'''
    read_result = sbx.run_code(read_code)
    output = extract_output_from_execution(read_result)
    
    if not output:
        raise RuntimeError("Failed to read tar data from sandbox - no output received")
    
    # Extract base64 encoded data
    lines = output.strip().split('\n')
    start_idx = -1
    end_idx = -1
    for i, line in enumerate(lines):
        if line.strip() == "BASE64_START":
            start_idx = i + 1
        elif line.strip() == "BASE64_END":
            end_idx = i
            break
    
    if start_idx != -1 and end_idx != -1:
        import base64
        encoded_data = ''.join(lines[start_idx:end_idx])
        data = base64.b64decode(encoded_data)
        local_tar_path.write_bytes(data)
        return str(local_tar_path)
    else:
        raise RuntimeError("Failed to extract tar data from sandbox")

def download_folder_recursive(sbx: Sandbox, remote_root: str, local_root: str = "artifacts/e2b_demo_project") -> str:
    """Recursively mirror files from sandbox -> local path. Use when you need direct files.
    Prefer tar for large trees."""
    local_root = Path(local_root)
    local_root.mkdir(parents=True, exist_ok=True)
    
    entries = list_tree(sbx, root=remote_root)
    for entry in entries:
        if entry.get("error"):
            print(f"Skipping {entry['path']} due to error: {entry['error']}")
            continue
            
        rel_path = os.path.relpath(entry["path"], remote_root)
        local_path = local_root / rel_path
        
        if entry["type"] == "dir":
            local_path.mkdir(parents=True, exist_ok=True)
        elif entry["type"] == "file":
            local_path.parent.mkdir(parents=True, exist_ok=True)
            try:
                # Read file using run_code
                read_code = f'''
import base64
try:
    with open("{entry["path"]}", "rb") as f:
        data = f.read()
        encoded = base64.b64encode(data).decode()
        print("FILE_START")
        print(encoded)
        print("FILE_END")
except Exception as e:
    print(f"Error reading file: {{str(e)}}")
'''
                read_result = sbx.run_code(read_code)
                output = extract_output_from_execution(read_result)
                
                if output:
                    # Extract base64 encoded data
                    lines = output.strip().split('\n')
                    start_idx = -1
                    end_idx = -1
                    for i, line in enumerate(lines):
                        if line.strip() == "FILE_START":
                            start_idx = i + 1
                        elif line.strip() == "FILE_END":
                            end_idx = i
                            break
                    
                    if start_idx != -1 and end_idx != -1:
                        import base64
                        encoded_data = ''.join(lines[start_idx:end_idx])
                        data = base64.b64decode(encoded_data)
                        local_path.write_bytes(data)
                    else:
                        print(f"Failed to extract data for {entry['path']}")
                else:
                    print(f"No output received for {entry['path']}")
                    
            except Exception as e:
                print(f"Failed to download {entry['path']}: {e}")
    
    return str(local_root)

def new_persistent_sandbox(timeout_seconds: int = PERSIST_TIMEOUT_SECONDS) -> Sandbox:
    sbx = Sandbox.create(timeout=timeout_seconds)
    print({"sandboxId": sbx.sandbox_id, "timeout_s": timeout_seconds})
    return sbx

In [None]:
# --- Demo: create nested files, then list & download ---
PERSIST_SBX = new_persistent_sandbox()

# Generate a small project structure from code
code = r'''
import os, json
base = '/home/user/demo_project'
os.makedirs(base + '/pkg/utils', exist_ok=True)
open(base + '/README.md', 'w').write('# Demo Project\n')
open(base + '/pkg/__init__.py', 'w').write('')
open(base + '/pkg/utils/helpers.py', 'w').write('def add(a,b): return a+b\n')
open(base + '/main.py', 'w').write('from pkg.utils.helpers import add\nprint(add(2,3))\n')
print('Wrote project to', base)
'''
result = PERSIST_SBX.run_code(code)
print("=== Project creation result ===")
output = extract_output_from_execution(result)
print("Code execution result:", output)

# List the created files
entries = list_tree(PERSIST_SBX, root='/home/user')
print("\n--- File Tree ---")
print_tree(entries, root='/home/user')

# Download: as a tarball (using relative path in current directory)
print("\n--- Downloading as tarball ---")
tar_path = download_all_as_tar(PERSIST_SBX, remote_root='/home/user/demo_project', local_tar_path='artifacts/e2b_demo_project.tar.gz')
print({'tar_saved_to': tar_path})

# Download: direct mirror (optional, using relative path)
print("\n--- Downloading as direct mirror ---")
mirror_dir = download_folder_recursive(PERSIST_SBX, remote_root='/home/user/demo_project', local_root='artifacts/e2b_demo_project_mirror')
print({'mirrored_to': mirror_dir})

# Verify the tar contents
print("\n--- Verifying tar contents ---")
import tarfile
try:
    with tarfile.open(tar_path, 'r:gz') as tar:
        print("Files in tarball:", tar.getnames())
except Exception as e:
    print(f"Error reading tar: {e}")

# List local mirror contents
print("\n--- Local mirror contents ---")
import os
if os.path.exists(mirror_dir):
    for root, dirs, files in os.walk(mirror_dir):
        level = root.replace(mirror_dir, '').count(os.sep)
        indent = ' ' * 2 * level
        print(f"{indent}{os.path.basename(root)}/")
        subindent = ' ' * 2 * (level + 1)
        for file in files:
            print(f"{subindent}{file}")
else:
    print(f"Mirror directory not found: {mirror_dir}")

In [None]:
# --- Monitor & shutdown sandboxes ---
from e2b_code_interpreter import Sandbox
from typing import Iterable

def list_running_or_paused(limit: int = 100) -> list:
    """List running/paused sandboxes using E2B SDK."""
    try:
        # Use E2B's list method with proper pagination
        paginator = Sandbox.list()
        print(f"Paginator type: {type(paginator)}")
        
        items = []
        
        # Check if it has nextItems method (proper E2B pagination)
        if hasattr(paginator, 'nextItems'):
            try:
                # Get first page
                first_page = paginator.nextItems()
                items.extend(first_page)
                print(f"Got {len(first_page)} items from first page")
                
                # Get remaining pages if hasNext is True
                while hasattr(paginator, 'hasNext') and paginator.hasNext:
                    next_page = paginator.nextItems()
                    items.extend(next_page)
                    print(f"Got {len(next_page)} items from next page")
            except Exception as e:
                print(f"Pagination failed: {e}")
        
        # Fallback: try direct iteration
        elif hasattr(paginator, '__iter__'):
            try:
                items = list(paginator)
                print(f"Got {len(items)} items via iteration")
            except Exception as e:
                print(f"Iteration failed: {e}")
                
        return items
    except Exception as e:
        print(f"[list_running_or_paused] error: {e}")
        return []

def pretty_sbx_info(items: Iterable) -> None:
    try:
        items_list = list(items) if hasattr(items, '__iter__') else [items]
    except TypeError:
        print(f"Cannot iterate over items: {type(items)}")
        return
        
    for it in items_list:
        if it is None:
            continue
            
        try:
            # Extract sandbox information using E2B SDK methods
            if hasattr(it, 'get_info'):
                # Use get_info() method if available
                info = it.get_info()
                print(f"Sandbox info: {info}")
            else:
                # Try direct attribute access
                print(f"Item type: {type(it)}")
                sid = getattr(it, 'sandbox_id', getattr(it, 'id', 'unknown'))
                state = getattr(it, 'state', 'unknown')
                metadata = getattr(it, 'metadata', {})
                print({'sandboxId': sid, 'state': state, 'metadata': metadata})
        except Exception as e:
            print(f"Error processing sandbox info: {e}, item: {it}")

def kill_by_id(sandbox_id: str) -> bool:
    """Kill a sandbox by its ID using E2B's static kill method."""
    try:
        return Sandbox.kill(sandbox_id)
    except Exception as e:
        print(f"[kill_by_id] error: {e}")
        return False

def kill_all_running() -> None:
    """Kill all running sandboxes."""
    items = list_running_or_paused()
    for it in items:
        if it is None:
            continue
            
        try:
            # Try to get sandbox ID
            sid = getattr(it, 'sandbox_id', getattr(it, 'id', None))
            if not sid:
                print(f"No sandbox ID found for item: {it}")
                continue
                
            ok = Sandbox.kill(sid)
            print({'killed': sid, 'ok': ok})
        except Exception as e:
            print({'killed': getattr(it, 'sandbox_id', 'unknown'), 'ok': False, 'error': str(e)})

# Check if we have an active sandbox first
print("=== Current sandbox status ===")
if 'PERSIST_SBX' in locals():
    print(f"PERSIST_SBX type: {type(PERSIST_SBX)}")
    if hasattr(PERSIST_SBX, 'sandbox_id'):
        print(f"Sandbox ID: {PERSIST_SBX.sandbox_id}")
    
    # Test if sandbox is still active by running a simple command
    try:
        test_result = PERSIST_SBX.run_code('print("Sandbox is alive!")')
        if test_result:
            output = extract_output_from_execution(test_result)
            print(f"Sandbox test result: {output}")
        else:
            print("Sandbox test returned None - may be terminated")
    except Exception as e:
        print(f"Sandbox test failed: {e}")
        
    # Try to get sandbox info
    try:
        if hasattr(PERSIST_SBX, 'get_info'):
            info = PERSIST_SBX.get_info()
            print(f"Sandbox info: {info}")
    except Exception as e:
        print(f"Failed to get sandbox info: {e}")

# Show currently running/paused sandboxes
print("\n=== Listing all sandboxes ===")
try:
    items = list_running_or_paused()
    print(f"Found {len(items)} sandboxes")
    if items:
        pretty_sbx_info(items)
    else:
        print("No sandboxes found - they may have auto-terminated after timeout")
except Exception as e:
    print(f"Error listing sandboxes: {e}")

### Usage tips
- Prefer **tar download** for large/many files; it’s faster and preserves structure.
- You can **pause** long-lived sandboxes and resume later (beta). While paused, files and memory persist. See E2B docs.
- To **shut down**: `kill_by_id('<sandbox_id>')` or `kill_all_running()`.
- If you used `with Sandbox.create() as sbx: ...`, that sandbox auto-terminates at the end of the context.


In [None]:
# --- Cleanup and close the persistent sandbox ---
if 'PERSIST_SBX' in locals():
    try:
        # E2B uses kill() method to terminate sandboxes
        PERSIST_SBX.kill()
        print("Sandbox terminated successfully with kill()")
    except Exception as e:
        print(f"Error terminating sandbox: {e}")
        # Print available methods for debugging
        print(f"Available methods: {[method for method in dir(PERSIST_SBX) if not method.startswith('_')]}")
else:
    print("No persistent sandbox to close")