# Phase 0 ‚Äî Sanity & Determinism

This notebook establishes a deterministic symbolic environment
and verifies:

- Reachability of symbolic goals
- Correct execution of a minimal symbolic plan
- Terminal reward correctness

Verified outputs:
Final Plan: [('goto', 'green', 'goal')]
Total Reward: 1


In [5]:
# Step 1: mount Drive and create project directories for the experiment
from pathlib import Path
from google.colab import drive
import sys

# Mount Drive (you will be prompted to authenticate)
drive.mount('/content/drive')

# Create canonical project folder inside MyDrive
base = Path('/content/drive/MyDrive/Continual_RL_Phi2')
dirs = [
    "notebooks",
    "src",
    "experiments",
    "configs",
    "logs",
    "data",
    "models",
    "checkpoints",
    "utils"
]

for d in dirs:
    p = base / d
    p.mkdir(parents=True, exist_ok=True)

# Print resulting layout for verification
def tree(root, depth=2):
    root = Path(root)
    out = []
    def _walk(p, lvl):
        if lvl > depth: return
        try:
            items = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
        except PermissionError:
            return
        for it in items:
            out.append("  " * lvl + ("üìÅ " if it.is_dir() else "üìÑ ") + it.name)
            if it.is_dir():
                _walk(it, lvl+1)
    out.append(str(root))
    _walk(root, 1)
    return "\n".join(out)

print(tree(base, depth=2))
print("\n\nCreated directory:", base)


Mounted at /content/drive
/content/drive/MyDrive/Continual_RL_Phi2
  üìÅ checkpoints
  üìÅ configs
  üìÅ data
  üìÅ experiments
  üìÅ logs
  üìÅ models
  üìÅ notebooks
  üìÅ src
  üìÅ utils


Created directory: /content/drive/MyDrive/Continual_RL_Phi2


In [6]:
# Step 2: install core dependencies and verify versions (run this exact cell)
import sys
print("Starting Step 2 verification cell...")

# Install packages (one pip call). This may take a couple minutes.
# We pin minimal, commonly used packages for reproducibility.
# NOTE: Colab already ships with many packages; pip will upgrade/install where necessary.
!pip install -q gymnasium stable-baselines3 torch torchvision torchaudio matplotlib pandas seaborn

# Verify imports and print versions / GPU state
import importlib
import torch
import gymnasium as gym
import stable_baselines3 as sb3
import numpy as np
import matplotlib
import pandas as pd
import seaborn as sns
import os, traceback

def try_device_info():
    try:
        cuda_avail = torch.cuda.is_available()
        device_name = torch.cuda.get_device_name(0) if cuda_avail else "N/A"
    except Exception as e:
        cuda_avail = False
        device_name = f"error: {e}"
    return cuda_avail, device_name

cuda_avail, device_name = try_device_info()

print("\n--- Verification ---")
print("Python:", sys.version.splitlines()[0])
print("torch:", getattr(torch, "__version__", "missing"), " | cuda_available:", cuda_avail)
print("torch device name:", device_name)
print("gymnasium:", getattr(gym, "__version__", "missing"))
print("stable-baselines3:", getattr(sb3, "__version__", "missing"))
print("numpy:", np.__version__)
print("matplotlib:", matplotlib.__version__)
print("pandas:", pd.__version__)
print("seaborn:", sns.__version__)

# quick smoke test: create a simple env and step one action
try:
    env = gym.make("CartPole-v1")
    obs, info = env.reset()
    sample_action = env.action_space.sample()
    obs2, reward, terminated, truncated, info2 = env.step(sample_action)
    print("\nSmoke test: CartPole-v1 step OK. sample_action:", sample_action, "reward:", reward,
          "terminated:", terminated, "truncated:", truncated)
    env.close()
except Exception as e:
    print("\nSmoke test: CARTPOLE ERROR")
    traceback.print_exc()

print("\nInstalled packages (pip freeze subset):")
# show the specific installed versions for the installed packages
try:
    import pkg_resources
    pkgs = {p.key: p.version for p in pkg_resources.working_set}
    for name in ("gymnasium", "stable-baselines3", "torch", "matplotlib", "pandas", "seaborn"):
        print(f"  {name}: {pkgs.get(name.lower(), 'not found')}")
except Exception:
    pass

print("\nStep 2 complete. Reply with the full printed output (or the error traceback).")


Starting Step 2 verification cell...
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m188.0/188.0 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)



--- Verification ---
Python: 3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]
torch: 2.9.0+cu126  | cuda_available: True
torch device name: Tesla T4
gymnasium: 1.2.2
stable-baselines3: 2.7.1
numpy: 2.0.2
matplotlib: 3.10.0
pandas: 2.2.2
seaborn: 0.13.2

Smoke test: CartPole-v1 step OK. sample_action: 0 reward: 1.0 terminated: False truncated: False

Installed packages (pip freeze subset):
  gymnasium: 1.2.2
  stable-baselines3: 2.7.1
  torch: 2.9.0+cu126
  matplotlib: 3.10.0
  pandas: 2.2.2
  seaborn: 0.13.2

Step 2 complete. Reply with the full printed output (or the error traceback).


  import pkg_resources
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
  return datetime.utcnow().replace(tzinfo=utc)


In [7]:
# Step 3: Minimal deterministic GridWorld environment (Phase-0 sanity)

import numpy as np

class SimpleGridEnv:
    """
    Minimal deterministic grid environment.
    - Agent starts at fixed position
    - One green goal
    - Reward = 1 on reaching goal
    - Episode terminates immediately on success
    """

    def __init__(self, width=5, height=5):
        self.width = width
        self.height = height
        self.goal_pos = (width - 1, height - 1)  # fixed goal
        self.reset()

    def reset(self):
        self.agent_pos = (0, 0)  # fixed start
        self.done = False
        return self._obs()

    def _obs(self):
        return {
            "agent_pos": self.agent_pos,
            "goal_pos": self.goal_pos
        }

    def step(self, action):
        if self.done:
            raise RuntimeError("Episode already terminated")

        x, y = self.agent_pos

        if action == "up":
            y = max(0, y - 1)
        elif action == "down":
            y = min(self.height - 1, y + 1)
        elif action == "left":
            x = max(0, x - 1)
        elif action == "right":
            x = min(self.width - 1, x + 1)
        else:
            raise ValueError(f"Invalid action: {action}")

        self.agent_pos = (x, y)

        reward = 0
        if self.agent_pos == self.goal_pos:
            reward = 1
            self.done = True

        return self._obs(), reward, self.done

# ---- Sanity test ----
env = SimpleGridEnv(width=4, height=4)
obs = env.reset()
print("Initial obs:", obs)

# Move to goal deterministically
actions = ["right", "right", "right", "down", "down", "down"]
total_reward = 0

for a in actions:
    obs, r, done = env.step(a)
    total_reward += r
    if done:
        break

print("Final obs:", obs)
print("Total reward:", total_reward)


Initial obs: {'agent_pos': (0, 0), 'goal_pos': (3, 3)}
Final obs: {'agent_pos': (3, 3), 'goal_pos': (3, 3)}
Total reward: 1


In [8]:
# Step 4: Minimal symbolic planner + executor (Phase-0 milestone)

# High-level symbolic planner
def symbolic_plan(obs):
    """
    Phase-0 planner: always return a single symbolic goal.
    """
    return [("goto", "green", "goal")]

# Low-level executor mapping symbolic plan to actions
def execute_plan(env, plan):
    total_reward = 0

    for step in plan:
        assert step == ("goto", "green", "goal")

        # Move greedily toward the goal
        while True:
            ax, ay = env.agent_pos
            gx, gy = env.goal_pos

            if (ax, ay) == (gx, gy):
                break

            if ax < gx:
                action = "right"
            elif ax > gx:
                action = "left"
            elif ay < gy:
                action = "down"
            else:
                action = "up"

            _, reward, done = env.step(action)
            total_reward += reward

            if done:
                break

    return total_reward


# ---- Phase-0 end-to-end test ----
env = SimpleGridEnv(width=4, height=4)
obs = env.reset()

plan = symbolic_plan(obs)
print("Final Plan:", plan)

reward = execute_plan(env, plan)
print("Total Reward:", reward)


Final Plan: [('goto', 'green', 'goal')]
Total Reward: 1


In [9]:
# Step 5: Formal reachability check (BFS, deterministic)

from collections import deque

def reachable(env, start, goal):
    """
    Breadth-first search reachability in grid.
    No rewards, no learning, no shortcuts.
    """
    queue = deque([start])
    visited = set([start])

    while queue:
        x, y = queue.popleft()

        if (x, y) == goal:
            return True

        for dx, dy in [(-1,0), (1,0), (0,-1), (0,1)]:
            nx, ny = x + dx, y + dy

            if 0 <= nx < env.width and 0 <= ny < env.height:
                if (nx, ny) not in visited:
                    visited.add((nx, ny))
                    queue.append((nx, ny))

    return False


# ---- Sanity check ----
env = SimpleGridEnv(width=4, height=4)
env.reset()

start = env.agent_pos
goal = env.goal_pos

print("Reachable:", reachable(env, start, goal))


Reachable: True


In [10]:
import random
from collections import Counter

# assumptions: SimpleGridEnv, execute_plan, symbolic_plan exist from Phase-0

def eval_random_lowlevel(env_cls, env_kwargs, seq_len, trials):
    succ = 0
    for _ in range(trials):
        env = env_cls(**env_kwargs)
        env.reset()
        for _ in range(seq_len):
            a = random.choice(["up","down","left","right"])
            _, r, done = env.step(a)
            if done:
                succ += 1
                break
    return succ / trials

def eval_random_symbolic(env_cls, env_kwargs, trials):
    # random symbolic plans will be sampled as random permutations of a small set
    sym_ops = [("goto","green","goal"), ("goto","red","goal"), ("noop",)]
    succ = 0
    for _ in range(trials):
        env = env_cls(**env_kwargs)
        env.reset()
        # sample a random symbolic plan length 1..3
        plan = [random.choice(sym_ops) for _ in range(random.randint(1,3))]
        # naive grounding: if plan contains ("goto","green","goal") at least once,
        # we will execute greedy movement to the goal (simulate a correct grounding)
        # otherwise we do nothing (noop)
        if ("goto","green","goal") in plan:
            # run executor that moves greedily (same as Phase-0)
            succ_reward = execute_plan(env, [("goto","green","goal")])
            if succ_reward >= 1:
                succ += 1
    return succ / trials

# Run small diagnostics
env_kwargs = {"width":4,"height":4}
print("Random low-level sequences success rate (seq_len=6):",
      eval_random_lowlevel(SimpleGridEnv, env_kwargs, seq_len=6, trials=5000))
print("Random symbolic plans success rate (naive grounding):",
      eval_random_symbolic(SimpleGridEnv, env_kwargs, trials=2000))


Random low-level sequences success rate (seq_len=6): 0.0036
Random symbolic plans success rate (naive grounding): 0.5355
