# **Minimal Gridworld + Q-Learning**

This notebook has two parts:
1. **Small Interactive Grid**: Let a user move around manually (via text input) in a tiny grid.
2. **Q-Learning on a Larger Grid**: An environment with special tiles (ice, bumpers, pits, etc.) and Q-learning to find a policy.

In [2]:
# Install packages in the current Jupyter kernel
import sys
!{sys.executable} -m pip install numpy
!{sys.executable} -m pip install ipywidgets
!{sys.executable} -m pip install ipycanvas
!{sys.executable} -m pip install ipyevents



In [1]:
# libraries for displaying and user input
import ipywidgets as widgets
from IPython.display import display, clear_output
import ipycanvas
import ipyevents
import time

# libraries for RL
import numpy as np
import math

## Part 1: Small Interactive Environment
We define a 3x3 environment. The user sees only cells they have \"visited\".
- Use buttons `up`, `down`, `left`, `right` to move.
- If you step into the goal, you win!
- This small environment is purely for demonstration and manual play.

In [2]:
class SmallGridEnv:
    def __init__(self, rows=3, cols=3):
        self.rows = rows
        self.cols = cols
        # Define start and goal
        self.start = (0, 0)
        self.goal = (rows-1, cols-1)
        # Current position
        self.agent_pos = self.start
        # Keep track of visited cells
        self.visited = set()
        self.visited.add(self.start)

    def reset(self):
        self.agent_pos = self.start
        self.visited = set([self.start])
        return self.agent_pos

    def step(self, action):
        # action is one of 'up', 'down', 'left', 'right' (or aliases)
        r, c = self.agent_pos
        if action in ['up', 'u']:
            r = max(r-1, 0)
        elif action in ['down', 'd']:
            r = min(r+1, self.rows-1)
        elif action in ['left', 'l']:
            c = max(c-1, 0)
        elif action in ['right', 'r']:
            c = min(c+1, self.cols-1)
        self.agent_pos = (r, c)
        self.visited.add(self.agent_pos)
        done = (self.agent_pos == self.goal)
        return self.agent_pos, done

    def render(self):
        # We show only the visited cells, all others are '?' or '.'
        # 'A' = agent, 'G' = goal, '.' = visited, '?' = unvisited
        grid_display = []
        for r in range(self.rows):
            row_chars = []
            for c in range(self.cols):
                pos = (r, c)
                if pos == self.agent_pos:
                    row_chars.append('A')
                elif pos == self.goal:
                    # Show goal if visited, else hide
                    if pos in self.visited:
                        row_chars.append('G')
                    else:
                        row_chars.append('?')
                else:
                    if pos in self.visited:
                        row_chars.append('.')
                    else:
                        row_chars.append('?')
            grid_display.append(' '.join(row_chars))
        display_str = '\n'.join(grid_display)
        print(display_str)  # text-based print

### Manual Play

#### Using buttons (needs ipywidgets library):

In [5]:
env = SmallGridEnv(rows=3, cols=3)
env.reset()

# Create buttons for each action
button_up = widgets.Button(description="Up")
button_down = widgets.Button(description="Down")
button_left = widgets.Button(description="Left")
button_right = widgets.Button(description="Right")

# Display area for the grid
output = widgets.Output()

# Put them into a GridBox
controls = widgets.GridBox(
    children=[button_up, button_left, button_right, button_down],
    layout=widgets.Layout(
        width='390px',
        grid_template_rows='50px 50px 50px',
        grid_template_columns='100px 100px 100px',
        justify_items='center',
        align_items='center'
    )
)

# Assign each button to the right "cell"
# (row / column) are 1-indexed here
button_up.layout.grid_area = '1 / 2'    # Row 1, Column 2
button_left.layout.grid_area = '2 / 1'  # Row 2, Column 1
button_right.layout.grid_area = '2 / 3' # Row 2, Column 3
button_down.layout.grid_area = '3 / 2'  # Row 3, Column 2

def on_button_click(button):
    with output:
        # Clear previous output
        clear_output(wait=True)

        # Determine action based on which button was clicked
        if button.description == "Up":
            action = "up"
        elif button.description == "Down":
            action = "down"
        elif button.description == "Left":
            action = "left"
        elif button.description == "Right":
            action = "right"

        # Step the environment
        obs, done = env.step(action)

        # Render the environment
        env.render()

        if done:
            clear_output(wait=True)
            print("You reached the goal! Resetting environment...")
            env.reset()
            env.render()

# Attach the same callback to each button
button_up.on_click(on_button_click)
button_down.on_click(on_button_click)
button_left.on_click(on_button_click)
button_right.on_click(on_button_click)

# Display them
display(controls)
display(output)

GridBox(children=(Button(description='Up', layout=Layout(grid_area='1 / 2'), style=ButtonStyle()), Button(desc…

Output()

#### Using arrow keys (needs ipyevents library):

In [6]:
l = widgets.Label('Click or type on me!')
l.layout.border = '2px solid red'

h = widgets.HTML('Event info')
d = ipyevents.Event(source=l, watched_events=['click', 'keydown', 'mouseenter', 'touchmove'])

def handle_event(event):
    lines = ['{}: {}'.format(k, v) for k, v in event.items()]
    content = '<br>'.join(lines)
    h.value = content

d.on_dom_event(handle_event)
display(l, h)

Label(value='Click or type on me!', layout=Layout(border_bottom='2px solid red', border_left='2px solid red', …

HTML(value='Event info')

In [5]:
env = SmallGridEnv(rows=3, cols=3)

output = widgets.Output()

# Track arrow key presses
key_event = ipyevents.Event(
    source=output,
    watched_events=['keydown'],
    prevent_default_action=True
)

@key_event.on_dom_event
def handle_keys(event):
    if event['type'] == 'keydown':
        key_pressed = event.get('key', '')
        if key_pressed == 'ArrowUp':
            obs, done = env.step('up')
        elif key_pressed == 'ArrowDown':
            obs, done = env.step('down')
        elif key_pressed == 'ArrowLeft':
            obs, done = env.step('left')
        elif key_pressed == 'ArrowRight':
            obs, done = env.step('right')
        with output:
            if done:
                print("You reached the goal! Resetting environment...")
                env.reset()
            clear_output()
            env.render()

# Initialize
env.reset()
with output:
    clear_output()
    env.render()

display(key_event)
display(output)

Event(prevent_default_action=True, source=Output(), watched_events=['keydown'])

Output()

# Part 2: Larger Gridworld + Q-Learning

Below is a more interesting environment:
1. **Ice tile**: A 50% chance to slip in a random direction after your chosen move.
2. **Bumper**: If you step on it, you get pushed back 3 cells in the direction you came from.
3. **Pit**: Episode ends with a penalty.
4. **Goal**: Episode ends with a reward.

We’ll keep a Q-table of shape `[rows * cols, number_of_actions]`. The agent is fully observant of its location (row, col). Each time step:
- The agent chooses an action using an epsilon-greedy policy.
- The environment transitions the agent’s state.
- The agent receives a reward, updates Q, etc.

In [9]:
class LargeGridEnv:
    """
    A grid environment with special tiles:
    - Ice (50% chance of random slip)
    - Bumper (pushes agent back 3 cells if possible)
    - Pit (ends episode with negative reward)
    - Goal (ends episode with positive reward)
    """
    def __init__(self, rows=6, cols=6, ice_positions=[(2,2), (3,4)], bumper_positions=[(1,4)], pit_positions=[(4,1)], goal_position=(5,5), reward_goal=10.0, reward_pit=-10.0, reward_step=-0.1):
        self.rows = rows
        self.cols = cols
        self.ice_positions = set(ice_positions)
        self.bumper_positions = set(bumper_positions)
        self.pit_positions = set(pit_positions)
        self.goal_position = goal_position

        self.reward_goal = reward_goal
        self.reward_pit = reward_pit
        self.reward_step = reward_step

        self.action_map = {
            0: (-1, 0), # up
            1: (1, 0),  # down
            2: (0, -1), # left
            3: (0, 1)   # right
        }

        self.rng = np.random.default_rng()

        self.agent_pos = (0, 0)
        self.done = False

    def to_index(self, row, col):
        """
        Convert (row,col) to a single integer index.
        """
        return row * self.cols + col

    def from_index(self, index):
        """
        Inverse of to_index: given an int, return (row,col).
        """
        return (index // self.cols, index % self.cols)

    def reset(self):
        """
        Start always at top-left corner.
        """
        self.agent_pos = (0, 0)
        self.done = False
        return self.to_index(*self.agent_pos)

    def step(self, action):
        """
        Take one step in the environment using the given action (0..3).
        """
        if self.done:
            # Episode is over, return same state
            return self.to_index(*self.agent_pos), 0.0, True

        # Compute position after taking step
        dr, dc = self.action_map[action]
        next_r = np.clip(self.agent_pos[0] + dr, 0, self.rows-1)
        next_c = np.clip(self.agent_pos[1] + dc, 0, self.cols-1)
        proposed_pos = (next_r, next_c)

        # Check if we are on ice -> 50% chance slip
        if proposed_pos in self.ice_positions:
            if self.rng.random() < 0.5:
                # slip to a random direction
                slip_action = self.rng.integers(0, 4)
                dr_slip, dc_slip = self.action_map[slip_action]
                next_r = np.clip(next_r + dr_slip, 0, self.rows-1)
                next_c = np.clip(next_c + dc_slip, 0, self.cols-1)
                proposed_pos = (next_r, next_c)

        # Check if the proposed position is a bumper
        # If so, push the agent back 3 cells in the opposite direction, possibly stopping at border
        if proposed_pos in self.bumper_positions:
            push_r = np.clip(next_r - 3*dr, 0, self.rows-1)
            push_c = np.clip(next_c - 3*dc, 0, self.cols-1)
            new_pos = (push_r, push_c)
            self.agent_pos = new_pos
            reward = self.reward_step
            return self.to_index(*self.agent_pos), reward, False

        # If not a bumper, we can move
        self.agent_pos = proposed_pos

        # Now check pit or goal
        if self.agent_pos in self.pit_positions:
            reward = self.reward_pit
            self.done = True
        elif self.agent_pos == self.goal_position:
            reward = self.reward_goal
            self.done = True
        else:
            reward = self.reward_step

        return self.to_index(*self.agent_pos), reward, self.done

    def render(self):
        """
        Simple ASCII rendering: G = goal, X = pit, B = bumper, I = ice, A = agent
        """
        grid = np.full((self.rows, self.cols), fill_value='.')
        # Mark special tiles
        for (r, c) in self.ice_positions:
            grid[r, c] = 'I'
        for (r, c) in self.bumper_positions:
            grid[r, c] = 'B'
        for (r, c) in self.pit_positions:
            grid[r, c] = 'X'
        gr, gc = self.goal_position
        grid[gr, gc] = 'G'
        ar, ac = self.agent_pos
        grid[ar, ac] = 'A'

        # Print
        for r in range(self.rows):
            print(' '.join(grid[r]))
        print()  # blank

## Q-Learning Implementation
We'll create a Q-table of shape `(rows * cols, num_actions)`. The basic steps:

1. **Initialize Q** to zeros (or small random numbers).
2. **For each episode**:
    - Reset environment.
    - While not done:
        - Choose `action` with epsilon-greedy strategy.
        - Observe `next_state`, `reward`, `done`.
        - Update Q: \( Q[state, action] &larr; Q[state, action] + $\alpha$ [reward + $\gamma$ $\max_a$ Q[next\_state, a] - Q[state, action]] \)
        - `state = next_state`


In [10]:
def q_learning(env,
               num_episodes=200,
               alpha=0.1,       # learning rate
               gamma=0.95,      # discount rate
               epsilon=0.1,     # exploration rate (epsilon-greedy strategy)
               max_steps=100):  # limit on steps per episode to prevent infinite wandering
    """
    Train a tabular Q-learning agent on the given environment.
    """

    num_states = env.rows * env.cols
    num_actions = 4  # up/down/left/right

    # Initialize Q-table (num_states x num_actions)
    Q = np.zeros((num_states, num_actions), dtype=np.float32)

    rewards_history = []

    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0.0

        for t in range(max_steps):
            # Epsilon-greedy action selection
            if np.random.rand() < epsilon:
                action = np.random.randint(num_actions)
            else:
                action = np.argmax(Q[state])

            next_state, reward, done = env.step(action)

            # Q-learning update
            best_next_q = np.max(Q[next_state])
            Q[state, action] += alpha * (reward + gamma * best_next_q - Q[state, action])

            state = next_state
            total_reward += reward

            if done:
                break

        rewards_history.append(total_reward)

    return Q, rewards_history

In [14]:
large_env = LargeGridEnv(rows=6, cols=6, ice_positions=[(2,2), (3,4)], bumper_positions=[(1,4)], pit_positions=[(4,1)], goal_position=(5,5), reward_goal=10.0, reward_pit=-10.0, reward_step=-0.1)

# Train tabular Q-learning
Q, rewards = q_learning(large_env, num_episodes=200, alpha=0.1, gamma=0.95, epsilon=0.1, max_steps=100)

# Check final average reward
avg_reward = np.mean(rewards[-50:])
print(f"Average reward over last 50 episodes: {avg_reward:.2f}")

# Watch an episode with the learned Q
state = large_env.reset()
done = False
print(f"\\n=== Test Episode ===")
while not done:
    # Choose best action from Q
    action = np.argmax(Q[state])
    next_state, reward, done = large_env.step(action)
    large_env.render()
    time.sleep(0.3)
    state = next_state
print("Finished testing.")

Average reward over last 50 episodes: 9.00
\n=== Test Episode ===
. A . . . .
. . . . B .
. . I . . .
. . . . I .
. X . . . .
. . . . . G

. . . . . .
. A . . B .
. . I . . .
. . . . I .
. X . . . .
. . . . . G

. . . . . .
. . A . B .
. . I . . .
. . . . I .
. X . . . .
. . . . . G

. . . . . .
. . A . B .
. . I . . .
. . . . I .
. X . . . .
. . . . . G

. . . . . .
. . . . B .
. . A . . .
. . . . I .
. X . . . .
. . . . . G

. . . . . .
. . . . B .
. . I A . .
. . . . I .
. X . . . .
. . . . . G

. . . . . .
. . . . B .
. . I . A .
. . . . I .
. X . . . .
. . . . . G

. . . . . .
. . . . B .
. . I . . .
. . . . I .
. X . . A .
. . . . . G

. . . . . .
. . . . B .
. . I . . .
. . . . I .
. X . . . A
. . . . . G

. . . . . .
. . . . B .
. . I . . .
. . . . I .
. X . . . .
. . . . . A



In [17]:
# Create the environment
large_env = LargeGridEnv(rows=6, cols=6,
                         ice_positions=[(1,1), (1,2), (2,1), (3,3), (3,4), (3,5), (4,3), (5,3)],
                         bumper_positions=[(1,3), (3,1), (4,4)],
                         pit_positions=[(0,2), (2,0)],
                         goal_position=(5,5),
                         reward_goal=10.0,
                         reward_pit=-10.0,
                         reward_step=-0.1)

# Train tabular Q-learning
Q, rewards = q_learning(large_env, num_episodes=200, alpha=0.1, gamma=0.95, epsilon=0.1, max_steps=100)

# Check final average reward
avg_reward = np.mean(rewards[-50:])
print(f"Average reward over last 50 episodes: {avg_reward:.2f}")

Average reward over last 50 episodes: 8.01


In [18]:
# Watch some test episodes with the learned Q
num_test_episodes = 5
for i in range(num_test_episodes):
    state = large_env.reset()
    done = False
    print(f"\n=== Test Episode {i+1} ===")
    while not done:
        # Choose best action from Q
        action = np.argmax(Q[state])
        next_state, reward, done = large_env.step(action)
        large_env.render()
        time.sleep(0.3)
        state = next_state
print("Finished testing.")


=== Test Episode 1 ===
. . X . . .
A I I B . .
X I . . . .
. B . I I I
. . . I B .
. . . I . G

. . X . . .
. A I B . .
X I . . . .
. B . I I I
. . . I B .
. . . I . G

. . X . . .
A I I B . .
X I . . . .
. B . I I I
. . . I B .
. . . I . G

. . X . . .
A I I B . .
X I . . . .
. B . I I I
. . . I B .
. . . I . G

. . X . . .
. I A B . .
X I . . . .
. B . I I I
. . . I B .
. . . I . G

. . X . . .
. I I B . .
X I A . . .
. B . I I I
. . . I B .
. . . I . G

. . X . . .
. I I B . .
X I . . . .
. B A I I I
. . . I B .
. . . I . G

. . X . . .
. I I B . .
X I . . . .
. B . I A I
. . . I B .
. . . I . G

. . X . . .
. I I B . .
X I . . . .
. B . I A I
. . . I B .
. . . I . G

. . X . . .
. I I B . .
X I . . . .
. B . I I A
. . . I B .
. . . I . G

. . X . . .
. I I B . .
X I . . . .
. B . I I I
. . . I B A
. . . I . G

. . X . . .
. I I B . .
X I . . . .
. B . I I I
. . . I B .
. . . I . A


=== Test Episode 2 ===
. . X . . .
A I I B . .
X I . . . .
. B . I I I
. . . I B .
. . . I . G

. .