In [None]:
# Standard imports
import sys
import numpy as np
import matplotlib.pyplot as plt

# Add parent directory to path for imports
sys.path.insert(0, '..')

# Import rlgrid modules
from rlgrid import GridWorldEnv
from rlgrid import (
    uniform_policy,
    random_policy,
    random_deterministic_policy,
    policy_induced_transition_matrix,
)
from rlgrid import (
    simulate,
    estimate_average_reward,
    visualize_grid,
)
from rlgrid.utils import (
    print_transition_info,
    print_policy_info,
    visualize_trajectory,
    estimate_average_reward_running,
    compute_average_reward_exact,
)

# Set random seed for reproducibility
rng = np.random.default_rng(42)

# Plot settings
plt.rcParams['figure.figsize'] = (10, 8)
plt.rcParams['font.size'] = 12

print("Imports successful!")

## 1. Loading and Visualizing Grid Environments

Grid environments are defined in `.txt` files using a simple format:
- `.` = free cell (traversable)
- `#` = wall (non-traversable)
- `G` = goal cell (unique in the grid)

In [None]:
# Load the simple 5x5 environment
env = GridWorldEnv.from_txt("../envs/simple_5x5.txt", rng=rng)

print("Grid layout:")
print(env)
print()
print(f"Environment: {env}")
print(f"Number of states: {env.n_states}")
print(f"Number of actions: {env.n_actions}")
print(f"Goal state: {env.goal_state}")
print(f"Goal position: {env.goal_pos}")
print(f"Grid size: {env.height} x {env.width}")

In [None]:
# Visualize the grid
fig, ax = plt.subplots(figsize=(8, 8))
visualize_grid(env, ax=ax, show_state_indices=True, title="Simple 5x5 GridWorld")
plt.tight_layout()
plt.show()

In [None]:
# Load and visualize a more complex environment
env_maze = GridWorldEnv.from_txt("../envs/larger_maze.txt", rng=rng)

print("Maze layout:")
print(env_maze)
print()
print(f"Number of states: {env_maze.n_states}")
print(f"Goal state: {env_maze.goal_state} at position {env_maze.goal_pos}")

fig, ax = plt.subplots(figsize=(10, 10))
visualize_grid(env_maze, ax=ax, show_state_indices=True, title="Larger Maze Environment")
plt.tight_layout()
plt.show()

In [None]:
# State-to-position mappings
print("State to Position mapping (simple 5x5):")
for state, pos in env.state_to_pos.items():
    goal_marker = " <- GOAL" if state == env.goal_state else ""
    print(f"  State {state}: position {pos}{goal_marker}")

## 2. Inspecting the Transition Kernel

The transition kernel `P[s, a, s']` gives the probability of transitioning to state `s'` when taking action `a` in state `s`.

Since our environment is **deterministic**, each `P[s, a, :]` has exactly one entry equal to 1.

In [None]:
# Get the transition kernel
P = env.get_transition_kernel()
print(f"Transition kernel shape: {P.shape}")
print(f"Expected: ({env.n_states}, {env.n_actions}, {env.n_states})")

In [None]:
# Verify deterministic transitions: each (s, a) should have exactly one next state
print("Verifying deterministic transitions...")
is_deterministic = True
for s in range(env.n_states):
    for a in range(env.n_actions):
        n_nonzero = np.count_nonzero(P[s, a, :])
        if n_nonzero != 1:
            print(f"  State {s}, Action {a}: {n_nonzero} next states (should be 1)")
            is_deterministic = False

if is_deterministic:
    print("✓ All transitions are deterministic (exactly one next state per (s, a) pair)")

In [None]:
# Inspect transitions from a specific state
state_to_inspect = 0
print_transition_info(env, state_to_inspect)
print()

# Also inspect transitions from the goal state
print_transition_info(env, env.goal_state)

In [None]:
# Visualize transition probabilities for a specific state and action
state = 4  # Center state in simple 5x5

fig, axes = plt.subplots(1, 4, figsize=(16, 4))
fig.suptitle(f"Transition probabilities from state {state} ({env.state_to_pos[state]})")

for a, ax in enumerate(axes):
    # Reshape for visualization
    trans_probs = P[state, a, :]
    
    # Create a grid visualization
    grid_probs = np.zeros((env.height, env.width))
    for s_next, pos in env.state_to_pos.items():
        grid_probs[pos[0], pos[1]] = trans_probs[s_next]
    
    # Mark walls as -1 for visualization
    for r in range(env.height):
        for c in range(env.width):
            if env.grid[r][c] == '#':
                grid_probs[r, c] = -0.2
    
    im = ax.imshow(grid_probs, cmap='Blues', vmin=-0.2, vmax=1)
    ax.set_title(f"Action: {env.ACTIONS[a]}")
    
    # Mark the current state
    row, col = env.state_to_pos[state]
    ax.plot(col, row, 'ro', markersize=10)

plt.tight_layout()
plt.show()

## 3. Defining Policies

A policy π is a matrix of shape `(n_states, n_actions)` where `π[s, a]` is the probability of taking action `a` in state `s`.

We have three policy constructors:
1. **Uniform policy**: Equal probability for all actions
2. **Random policy**: Random stochastic policy
3. **Random deterministic policy**: One action per state, chosen randomly

In [None]:
# Create different policies
pi_uniform = uniform_policy(env)
pi_random = random_policy(env, rng=rng)
pi_det = random_deterministic_policy(env, rng=rng)

print("Uniform Policy (all actions equally likely):")
print(pi_uniform)
print()

print("Random Stochastic Policy:")
print(pi_random.round(3))
print()

print("Random Deterministic Policy:")
print(pi_det)

In [None]:
# Verify policies are valid (rows sum to 1)
print("Verifying policy validity...")
for name, pi in [("Uniform", pi_uniform), ("Random", pi_random), ("Deterministic", pi_det)]:
    row_sums = pi.sum(axis=1)
    all_valid = np.allclose(row_sums, 1.0)
    print(f"  {name}: rows sum to 1? {all_valid}")

In [None]:
# Display detailed policy info
print_policy_info(env, pi_det)

In [None]:
# Compute policy-induced transition matrix
P_pi = policy_induced_transition_matrix(env, pi_uniform)

print(f"Policy-induced transition matrix shape: {P_pi.shape}")
print(f"Expected: ({env.n_states}, {env.n_states})")
print()

# Verify rows sum to 1
row_sums = P_pi.sum(axis=1)
print(f"Row sums: {row_sums.round(6)}")
print(f"All rows sum to 1? {np.allclose(row_sums, 1.0)}")

In [None]:
# Visualize policy-induced transition matrix
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

policies = [
    ("Uniform Policy", pi_uniform),
    ("Random Stochastic Policy", pi_random),
    ("Random Deterministic Policy", pi_det),
]

for ax, (name, pi) in zip(axes, policies):
    P_pi = policy_induced_transition_matrix(env, pi)
    im = ax.imshow(P_pi, cmap='Blues', vmin=0, vmax=1)
    ax.set_title(name)
    ax.set_xlabel("Next State s'")
    ax.set_ylabel("Current State s")
    plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)

plt.tight_layout()
plt.show()

## 4. Simulating Trajectories

Let's run simulations in the environment under different policies.

In [None]:
# Short simulation for demonstration
T_short = 20
states, actions, rewards = simulate(env, pi_uniform, T=T_short, start_state=0, rng=rng)

print(f"Simulation for {T_short} steps under uniform policy:")
print(f"  States visited: {states}")
print(f"  Actions taken: {actions}")
print(f"  Rewards received: {rewards}")
print()
print(f"  Total reward: {rewards.sum()}")
print(f"  Average reward: {rewards.mean():.4f}")

# Check for goal visits
goal_visits = np.sum(rewards == 1)
print(f"  Goal reached {goal_visits} times")

In [None]:
# Demonstrate teleportation from goal
print("Demonstrating teleportation from goal:")
print()

# Find when we hit the goal
for t in range(len(rewards)):
    if rewards[t] == 1:
        s = states[t]
        a = actions[t]
        s_next = states[t + 1]
        
        print(f"  Step {t}: State {s} -> Action {env.ACTIONS[a]} -> "
              f"Goal reached! (reward +1) -> Teleported to state {s_next}")
        print(f"           Position {env.state_to_pos[s]} -> {env.state_to_pos[s_next]}")

In [None]:
# Visualize a short trajectory
visualize_trajectory(env, states, max_steps=15)
plt.show()

In [None]:
# Longer simulation to estimate average reward
T_long = 10000

fig, axes = plt.subplots(2, 2, figsize=(14, 10))

for idx, (name, pi) in enumerate([("Uniform", pi_uniform), ("Random Det.", pi_det)]):
    states, actions, rewards = simulate(env, pi, T=T_long, rng=rng)
    
    # Running average
    running_avg = estimate_average_reward_running(rewards)
    
    # Plot running average
    ax = axes[0, idx]
    ax.plot(running_avg)
    ax.axhline(y=running_avg[-1], color='r', linestyle='--', 
               label=f'Final avg: {running_avg[-1]:.4f}')
    ax.set_xlabel('Time step')
    ax.set_ylabel('Running Average Reward')
    ax.set_title(f'{name} Policy - Running Average Reward')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # State visitation histogram
    ax = axes[1, idx]
    ax.hist(states[:-1], bins=np.arange(env.n_states + 1) - 0.5, 
            edgecolor='black', alpha=0.7)
    ax.axvline(x=env.goal_state, color='g', linestyle='--', 
               linewidth=2, label=f'Goal state ({env.goal_state})')
    ax.set_xlabel('State')
    ax.set_ylabel('Visit count')
    ax.set_title(f'{name} Policy - State Visitation')
    ax.legend()

plt.tight_layout()
plt.show()

## 5. Average Reward Analysis

In the average-reward setting, we care about the long-run average reward:

$$\rho(\pi) = \lim_{T \to \infty} \frac{1}{T} \sum_{t=0}^{T-1} r_t$$

For an ergodic MDP, this can be computed exactly using the stationary distribution.

In [None]:
# Compare simulation estimates with exact computation
T = 100000
burn_in = 1000

print("Average Reward Comparison:")
print("=" * 60)
print(f"{'Policy':<25} {'Simulated':>15} {'Exact':>15}")
print("-" * 60)

for name, pi in [("Uniform", pi_uniform), 
                  ("Random Stochastic", pi_random),
                  ("Random Deterministic", pi_det)]:
    # Simulation estimate
    _, _, rewards = simulate(env, pi, T=T, rng=rng)
    sim_avg = estimate_average_reward(rewards, burn_in=burn_in)
    
    # Exact computation
    exact_avg = compute_average_reward_exact(env, pi)
    
    print(f"{name:<25} {sim_avg:>15.6f} {exact_avg:>15.6f}")

print("=" * 60)

In [None]:
# Analyze how average reward changes with different policies
n_policies = 50
avg_rewards = []

for _ in range(n_policies):
    pi = random_policy(env, rng=rng)
    avg_r = compute_average_reward_exact(env, pi)
    avg_rewards.append(avg_r)

plt.figure(figsize=(10, 5))
plt.hist(avg_rewards, bins=20, edgecolor='black', alpha=0.7)
plt.axvline(x=np.mean(avg_rewards), color='r', linestyle='--', 
            label=f'Mean: {np.mean(avg_rewards):.4f}')
plt.xlabel('Average Reward')
plt.ylabel('Count')
plt.title(f'Distribution of Average Reward over {n_policies} Random Policies')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print(f"Average reward statistics over {n_policies} random policies:")
print(f"  Mean: {np.mean(avg_rewards):.6f}")
print(f"  Std:  {np.std(avg_rewards):.6f}")
print(f"  Min:  {np.min(avg_rewards):.6f}")
print(f"  Max:  {np.max(avg_rewards):.6f}")

## 6. Exploring Other Environments

In [None]:
# Load and analyze the maze environment
env_maze = GridWorldEnv.from_txt("../envs/maze_8x5.txt", rng=rng)

print("Maze 8x5:")
print(env_maze)
print()
print(f"States: {env_maze.n_states}, Goal: {env_maze.goal_state}")

fig, ax = plt.subplots(figsize=(10, 6))
visualize_grid(env_maze, ax=ax, title="Maze 8x5 Environment")
plt.tight_layout()
plt.show()

In [None]:
# Compare average reward in different environments
environments = [
    ("Simple 5x5", GridWorldEnv.from_txt("../envs/simple_5x5.txt", rng=rng)),
    ("Maze 8x5", GridWorldEnv.from_txt("../envs/maze_8x5.txt", rng=rng)),
    ("Larger Maze", GridWorldEnv.from_txt("../envs/larger_maze.txt", rng=rng)),
]

print("Average Reward under Uniform Policy:")
print("=" * 50)
print(f"{'Environment':<20} {'States':>10} {'Avg Reward':>15}")
print("-" * 50)

for name, e in environments:
    pi = uniform_policy(e)
    avg_r = compute_average_reward_exact(e, pi)
    print(f"{name:<20} {e.n_states:>10} {avg_r:>15.6f}")

print("=" * 50)

In [None]:
# Simulate in the larger maze
env_large = GridWorldEnv.from_txt("../envs/larger_maze.txt", rng=rng)
pi = uniform_policy(env_large)

T = 50000
states, actions, rewards = simulate(env_large, pi, T=T, rng=rng)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Running average
running_avg = estimate_average_reward_running(rewards)
axes[0].plot(running_avg)
axes[0].axhline(y=compute_average_reward_exact(env_large, pi), 
                color='r', linestyle='--', label='Exact')
axes[0].set_xlabel('Time step')
axes[0].set_ylabel('Running Average Reward')
axes[0].set_title('Larger Maze - Running Average Reward')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# State visitation
axes[1].hist(states[:-1], bins=np.arange(env_large.n_states + 1) - 0.5, 
             edgecolor='black', alpha=0.7)
axes[1].set_xlabel('State')
axes[1].set_ylabel('Visit count')
axes[1].set_title('Larger Maze - State Visitation')

plt.tight_layout()
plt.show()

## Summary

This notebook demonstrated:

1. **Environment Loading**: Grid environments are easily defined in `.txt` files and loaded with `GridWorldEnv.from_txt()`

2. **Transition Kernel**: The deterministic transition kernel `P[s, a, s']` is correctly built with exactly one next state per (state, action) pair

3. **Policies**: We can define uniform, random stochastic, and random deterministic policies

4. **Policy-Induced Transitions**: The matrix `P_π` correctly captures state-to-state transition probabilities under a policy

5. **Simulation**: Trajectories can be simulated with teleportation from goal working correctly

6. **Average Reward**: Both simulation-based and exact computation of average reward are available

This provides a solid foundation for testing theoretical bounds in average-reward reinforcement learning.