# Exp 1 — Ring Line World with Rewards + Energy (LLM as Policy)

Goal:
- Build the *minimal* simulation environment on a **ring line** (periodic boundary).
- Add **rewards** at locations (collected when landed on).
- Add **energy** (movement costs energy; if 0 → no movement).
- Use an **LLM policy** (Ollama) to choose actions.
- Log trajectories + compute basic metrics for **behavioral strategy inference** (no self-report).


## Key Concept: LLM as a Policy

A *policy* is a function:

**observation → action**

Here, the LLM is the policy:
1. We build a structured observation (JSON).
2. We send it to the LLM with strict instructions.
3. The LLM outputs a structured action (JSON).
4. We validate and apply it to update the world.

Important:
- The LLM never changes the world directly.
- The simulator applies world rules deterministically.


In [None]:
import json
import time
import random
from pathlib import Path
from typing import Dict, Literal, Optional, Tuple

import requests
from pydantic import BaseModel, Field, ValidationError


## 1) Experiment Configuration

We keep all experiment constants in one place:
- ring length `L`
- horizon `T`
- energy + movement cost
- reward map
- partial observability radius
- Ollama model + decoding settings
- output directory for logs


In [None]:
# ---- World parameters ----
L = 20                 # ring positions: 0..19
T = 50                 # max steps
START_X = 0
START_ENERGY = 25
MOVE_COST = 1

# reward locations: position -> value
REWARDS_INIT = {3: 5.0, 9: 10.0, 14: 7.0}

# partial observability: rewards visible within VIS_RADIUS ring-distance
VIS_RADIUS = 3

# ---- LLM (Ollama OpenAI-compatible) ----
OLLAMA_BASE_URL = "http://localhost:11434/v1"
OLLAMA_MODEL = "llama3.1"
TEMPERATURE = 0.2
MAX_TOKENS = 120

# ---- Output ----
RUN_DIR = Path("runs/exp_1_ring_line_rewards") / time.strftime("%Y%m%d-%H%M%S")
RUN_DIR.mkdir(parents=True, exist_ok=True)
LOG_PATH = RUN_DIR / "trajectory.jsonl"
METRICS_PATH = RUN_DIR / "metrics.json"
CONFIG_PATH = RUN_DIR / "config.json"

print("Run dir:", RUN_DIR)


## 2) Schemas: Observation and Action

Why schemas?
- Make the simulator–policy interface explicit and stable.
- Validate LLM outputs safely.
- Make later FastAPI integration trivial.

Observation includes:
- time step `t`
- ring length `L`
- agent state (`x`, `energy`)
- visible rewards (partial observability)

Action is minimal:
- one field: `type` ∈ {LEFT, RIGHT, WAIT}


In [None]:
ActionType = Literal["LEFT", "RIGHT", "WAIT"]

class Observation(BaseModel):
    t: int
    L: int
    x: int
    energy: int
    visible_rewards: Dict[int, float] = Field(default_factory=dict)

class Action(BaseModel):
    type: ActionType


## 3) World Mechanics (Ring + Energy + Reward)

Ring boundary means:
- RIGHT from 19 wraps to 0
- LEFT from 0 wraps to 19

Energy rules:
- LEFT/RIGHT cost MOVE_COST energy
- WAIT costs 0
- if energy is 0, movement is not possible (forced WAIT behavior)

Reward rules:
- if agent lands on a reward position, it collects that value
- for simplicity, the reward is removed after collection


In [None]:
def ring_dist(a: int, b: int, L: int) -> int:
    """Shortest distance between positions a and b on a ring of length L."""
    d = abs(a - b)
    return min(d, L - d)

def observe(t: int, x: int, energy: int, rewards: Dict[int, float], L: int, vis_r: int) -> Observation:
    """Build the agent's partial observation: only rewards within visibility radius."""
    visible = {pos: val for pos, val in rewards.items() if ring_dist(x, pos, L) <= vis_r}
    return Observation(t=t, L=L, x=x, energy=energy, visible_rewards=visible)


In [None]:
def apply_action(x: int, energy: int, action: Action, L: int) -> Tuple[int, int]:
    """
    Apply the action under energy constraints and ring dynamics.
    Returns (new_x, new_energy).
    """
    # If energy is depleted, movement is impossible
    if energy <= 0:
        return x, 0

    # WAIT is always allowed
    if action.type == "WAIT":
        return x, energy

    # Movement requires enough energy
    if energy < MOVE_COST:
        return x, energy  # cannot move

    if action.type == "LEFT":
        return (x - 1) % L, energy - MOVE_COST

    if action.type == "RIGHT":
        return (x + 1) % L, energy - MOVE_COST

    # Defensive fallback (shouldn't happen with schema validation)
    return x, energy

def collect_reward(x: int, rewards: Dict[int, float]) -> float:
    """Collect reward at position x if present, and remove it from the world."""
    if x in rewards:
        val = rewards[x]
        del rewards[x]
        return val
    return 0.0


## 4) Logging (JSON Lines)

We log *one JSON object per step* (JSONL) so we can:
- reconstruct the trajectory
- compute behavioral metrics later
- compare controlled conditions across runs

We'll log:
- observation
- raw LLM output
- parsed action
- state transition + reward gained
- remaining rewards


In [None]:
def log_event(path: Path, event: dict) -> None:
    with path.open("a", encoding="utf-8") as f:
        f.write(json.dumps(event, ensure_ascii=False) + "\n")


## 5) LLM Policy

The LLM policy is a black-box function:
- input: Observation (serialized into JSON text)
- output: Action JSON

We enforce structure by:
- strict system message: "output ONLY JSON"
- validating output with Pydantic
- fallback to WAIT on invalid outputs

Note:
- This is *not training*. The model is fixed.
- Strategy inference comes from analyzing trajectories + metrics.



In [None]:
def llm_call(system: str, user: str) -> str:
    """
    Call Ollama's OpenAI-compatible endpoint: /v1/chat/completions
    Returns the assistant message content as a string.
    """
    url = f"{OLLAMA_BASE_URL}/chat/completions"
    payload = {
        "model": OLLAMA_MODEL,
        "messages": [
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        "temperature": TEMPERATURE,
        "max_tokens": MAX_TOKENS,
    }
    r = requests.post(url, json=payload, timeout=60)
    r.raise_for_status()
    return r.json()["choices"][0]["message"]["content"]


In [None]:
def llm_policy(obs: Observation) -> Tuple[Action, str]:
    """
    LLM policy:
    - Build prompt
    - Call LLM
    - Parse JSON -> Action
    - If invalid, fallback WAIT
    Returns (Action, raw_llm_output)
    """
    system = (
        "You are an agent on a ring line. "
        "Output ONLY JSON with exactly this shape: {\"type\":\"LEFT\"|\"RIGHT\"|\"WAIT\"}. "
        "No other keys. No extra text."
    )

    user = json.dumps({
        "observation": obs.model_dump(),
        "allowed_actions": ["LEFT", "RIGHT", "WAIT"],
        "note": "visible_rewards contains reward positions+values within view radius."
    })

    raw = llm_call(system, user).strip()

    try:
        action = Action.model_validate_json(raw)
        return action, raw
    except ValidationError:
        # If the LLM output isn't valid JSON matching schema, we apply a safe fallback.
        return Action(type="WAIT"), raw


## 6) Behavioral Metrics (Strategy Inference)

We infer "strategy" from behavior, not explanations.

Minimal metrics:
- total reward collected
- coverage (unique positions visited)
- steps to first reward (if any)
- action distribution conditioned on energy (high/mid/low)

These already tell you:
- exploration vs exploitation
- energy-aware behavior (WAIT more when low energy?)
- stability vs dithering


In [None]:
def energy_bin(e: int) -> str:
    if e >= 10: return "high"
    if e >= 3:  return "mid"
    return "low"

def run(seed: int = 0) -> dict:
    rng = random.Random(seed)  # reserved for later (if you randomize world)

    x = START_X
    energy = START_ENERGY
    rewards = dict(REWARDS_INIT)

    visited = {x}
    total_reward = 0.0
    first_reward_t: Optional[int] = None

    action_counts_by_energy = {
        "high": {"LEFT": 0, "RIGHT": 0, "WAIT": 0},
        "mid":  {"LEFT": 0, "RIGHT": 0, "WAIT": 0},
        "low":  {"LEFT": 0, "RIGHT": 0, "WAIT": 0},
    }

    # Save resolved config for reproducibility
    CONFIG_PATH.write_text(json.dumps({
        "L": L, "T": T,
        "START_X": START_X, "START_ENERGY": START_ENERGY, "MOVE_COST": MOVE_COST,
        "REWARDS_INIT": REWARDS_INIT, "VIS_RADIUS": VIS_RADIUS,
        "OLLAMA_MODEL": OLLAMA_MODEL, "TEMPERATURE": TEMPERATURE, "MAX_TOKENS": MAX_TOKENS,
        "seed": seed,
    }, indent=2), encoding="utf-8")

    for t in range(T):
        obs = observe(t, x, energy, rewards, L, VIS_RADIUS)

        action, raw_output = llm_policy(obs)

        # Track action distribution by energy BEFORE applying the action
        action_counts_by_energy[energy_bin(energy)][action.type] += 1

        # Apply dynamics
        x2, energy2 = apply_action(x, energy, action, L)
        gained = collect_reward(x2, rewards)

        if gained > 0 and first_reward_t is None:
            first_reward_t = t

        visited.add(x2)
        total_reward += gained

        # Log this step
        log_event(LOG_PATH, {
            "t": t,
            "obs": obs.model_dump(),
            "raw_llm_output": raw_output,
            "action": action.model_dump(),
            "x_before": x,
            "energy_before": energy,
            "x_after": x2,
            "energy_after": energy2,
            "reward_gained": gained,
            "reward_total_so_far": total_reward,
            "rewards_remaining": dict(rewards),
        })

        # Update state
        x, energy = x2, energy2

        # Optional early stop if energy is depleted
        if energy <= 0:
            break

    metrics = {
        "steps_run": t + 1,
        "total_reward": total_reward,
        "coverage_unique_positions": len(visited),
        "first_reward_step": first_reward_t,
        "action_counts_by_energy_bin": action_counts_by_energy,
        "end_state": {"x": x, "energy": energy, "rewards_remaining": rewards},
    }

    METRICS_PATH.write_text(json.dumps(metrics, indent=2), encoding="utf-8")
    return metrics


In [None]:
metrics = run(seed=0)
print("Run dir:", RUN_DIR)
print(json.dumps(metrics, indent=2))


## 7) Inspecting Results

Open these files:
- `trajectory.jsonl`: one record per step (great for debugging + later analysis)
- `metrics.json`: summary metrics for quick comparisons

First questions:
- Did total_reward increase?
- How quickly did it find the first reward?
- Does it WAIT more when energy is low?
- Does it explore (coverage), or just oscillate?


In [None]:
# Show last 5 steps from the JSONL log (quick sanity check)
lines = LOG_PATH.read_text(encoding="utf-8").strip().splitlines()
for row in lines[-5:]:
    print(row)
