# DX 704 Week 8 Project

This homework will modify a simulator controlling a small vehicle to implement tabular q-learning.
You will first test your code with random and greedy-epsilon policies, then tweak your own training method for a more optimal policy.

The full project description and a template notebook are available on GitHub: [Project 8 Materials](https://github.com/bu-cds-dx704/dx704-project-08).


## Example Code

You may find it helpful to refer to these GitHub repositories of Jupyter notebooks for example code.

* https://github.com/bu-cds-omds/dx601-examples
* https://github.com/bu-cds-omds/dx602-examples
* https://github.com/bu-cds-omds/dx603-examples
* https://github.com/bu-cds-omds/dx704-examples

Any calculations demonstrated in code examples or videos may be found in these notebooks, and you are allowed to copy this example code in your homework answers.

## Rover Simulator

The following Python class implements a simulation of a simple vehicle with integer x,y coordinates facing in one of 8 possible directions.


In [24]:
# DO NOT CHANGE

import random

class RoverSimulator(object):
    DIRECTIONS = ((0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1, -1), (-1, 0), (-1, 1))

    def __init__(self, resolution):
        self.resolution = resolution
        self.terminal_state = self.construct_state(resolution // 2, resolution // 2, 0)

        self.initial_states = []
        for initial_x in (0, resolution // 2, resolution - 1):
            for initial_y in (0, resolution // 2, resolution - 1):
                for initial_direction in range(8):
                    initial_state = self.construct_state(initial_x, initial_y, initial_direction)
                    if initial_state != self.terminal_state:
                        self.initial_states.append(initial_state)

    def construct_state(self, x, y, direction):
        assert 0 <= x < self.resolution
        assert 0 <= y < self.resolution
        assert 0 <= direction < 8

        state = (y * self.resolution + x) * 8 + direction
        assert self.decode_state(state) == (x, y, direction)
        return state

    def decode_state(self, state):
        direction = state % 8
        x = (state // 8) % self.resolution
        y = state // (8 * self.resolution)

        return (x, y, direction)

    def get_actions(self, state):
        return [-1, 0, 1]

    def get_next_reward_state(self, curr_state, curr_action):
        if curr_state == self.terminal_state:
            # no rewards or changes from terminal state
            return (0, curr_state)

        (curr_x, curr_y, curr_direction) = self.decode_state(curr_state)
        (curr_dx, curr_dy) = self.DIRECTIONS[curr_direction]

        assert self.construct_state(curr_x, curr_y, curr_direction) == curr_state

        assert curr_action in (-1, 0, 1)

        next_x = min(max(0, curr_x + curr_dx), self.resolution - 1)
        next_y = min(max(0, curr_y + curr_dy), self.resolution - 1)
        next_direction = (curr_direction + curr_action) % 8

        next_state = self.construct_state(next_x, next_y, next_direction)
        next_reward = 1 if next_state == self.terminal_state else 0

        return (next_reward, next_state)

    def rollout_policy(self, policy_func, max_steps=1000):
        curr_state = self.sample_initial_state()
        for i in range(max_steps):
            curr_action = policy_func(curr_state, self.get_actions(curr_state))
            (next_reward, next_state) = self.get_next_reward_state(curr_state, curr_action)
            yield (curr_state, curr_action, next_reward, next_state)
            curr_state = next_state

    def sample_initial_state(self):
        return random.choice(self.initial_states)

In [25]:
simulator = RoverSimulator(16)
initial_sample = simulator.sample_initial_state()
print("INITIAL SAMPLE", initial_sample)

INITIAL SAMPLE 1093


## Part 1: Implement a Random Policy

Random policies are often used to test simulators and start initial exploration.
Implement a random policy for these simulators.

In [26]:
# YOUR CHANGES HERE

import numpy as np

_rng = np.random.default_rng(42)  # fixed seed for reproducibility

def random_policy(state, actions):
    """
    Return a uniformly random valid action.
    - If `actions` is an int N, choose from {0, 1, ..., N-1}.
    - If `actions` is a sequence (list/array), choose one element from it.
    `state` is unused for a random policy, but kept for API compatibility.
    """
    if isinstance(actions, int):
        # number of actions
        return int(_rng.integers(0, actions))
    # otherwise assume it's an iterable of available actions
    actions = list(actions)
    if not actions:
        return 0  # safe fallback
    idx = int(_rng.integers(0, len(actions)))
    return actions[idx]


Use the code below to test your random policy.
Then modify it to save the results in "log-random.tsv" with the columns curr_state, curr_action, next_reward and next_state.

In [27]:
# YOUR CHANGES HERE

import csv

rows = []
for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=32):
    print("CURR STATE", curr_state, "ACTION", curr_action, "NEXT REWARD", next_reward, "NEXT STATE", next_state)
    rows.append({
        "curr_state": curr_state,
        "curr_action": curr_action,
        "next_reward": float(next_reward),
        "next_state": next_state,
    })

# Save to TSV with required columns and order
with open("log-random.tsv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["curr_state", "curr_action", "next_reward", "next_state"], delimiter="\t")
    writer.writeheader()
    writer.writerows(rows)


CURR STATE 2 ACTION -1 NEXT REWARD 0 NEXT STATE 9
CURR STATE 9 ACTION 1 NEXT REWARD 0 NEXT STATE 146
CURR STATE 146 ACTION 0 NEXT REWARD 0 NEXT STATE 154
CURR STATE 154 ACTION 0 NEXT REWARD 0 NEXT STATE 162
CURR STATE 162 ACTION 0 NEXT REWARD 0 NEXT STATE 170
CURR STATE 170 ACTION 1 NEXT REWARD 0 NEXT STATE 179
CURR STATE 179 ACTION -1 NEXT REWARD 0 NEXT STATE 58
CURR STATE 58 ACTION 1 NEXT REWARD 0 NEXT STATE 67
CURR STATE 67 ACTION -1 NEXT REWARD 0 NEXT STATE 74
CURR STATE 74 ACTION -1 NEXT REWARD 0 NEXT STATE 81
CURR STATE 81 ACTION 0 NEXT REWARD 0 NEXT STATE 217
CURR STATE 217 ACTION 1 NEXT REWARD 0 NEXT STATE 354
CURR STATE 354 ACTION 1 NEXT REWARD 0 NEXT STATE 363
CURR STATE 363 ACTION 1 NEXT REWARD 0 NEXT STATE 244
CURR STATE 244 ACTION 1 NEXT REWARD 0 NEXT STATE 117
CURR STATE 117 ACTION 1 NEXT REWARD 0 NEXT STATE 110
CURR STATE 110 ACTION 0 NEXT REWARD 0 NEXT STATE 102
CURR STATE 102 ACTION -1 NEXT REWARD 0 NEXT STATE 93
CURR STATE 93 ACTION 1 NEXT REWARD 0 NEXT STATE 86
CURR 

Submit "log-random.tsv" in Gradescope.

## Part 2: Implement Q-Learning with Random Policy

The code below runs 32 random rollouts of 1024 steps using your random policy.
Modify the rollout code to implement Q-Learning.
Just implement one learning update for each sampled state-action in the simulation.
Use $\alpha=1$ and $\gamma=0.9$ since the simulator is deterministic and there is a sink where the rewards stop.




In [28]:
# YOUR CHANGES HERE
# Q-table as a sparse dict: Q[(state, action)] -> value
Q = {}

alpha = 1.0   # learning rate
gamma = 0.9   # discount

def _q_get(s, a):
    return Q.get((s, a), 0.0)

def _actions_for(state):
    """
    Try common APIs to get available actions for a state.
    Falls back to 0..n_actions-1 if simulator exposes n_actions,
    or to an empty list (treat as terminal) if nothing is available.
    """
    if hasattr(simulator, "actions"):
        try:
            return list(simulator.actions(state))
        except TypeError:
            # some APIs expose actions() with no args
            return list(simulator.actions())
    if hasattr(simulator, "available_actions"):
        return list(simulator.available_actions(state))
    if hasattr(simulator, "n_actions"):
        return list(range(int(simulator.n_actions)))
    if hasattr(simulator, "action_space") and hasattr(simulator.action_space, "n"):
        return list(range(int(simulator.action_space.n)))
    return []  # assume terminal if we can't tell

for episode in range(32):
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=1024):
        # One Q-learning update per sampled (s, a)
        # Q(s,a) <- (1 - α) Q(s,a) + α [ r + γ max_{a'} Q(s', a') ]
        # With α=1, this becomes Q(s,a) <- r + γ max_{a'} Q(s', a')
        next_actions = _actions_for(next_state)
        if next_actions:
            max_next = max(_q_get(next_state, a2) for a2 in next_actions)
        else:
            max_next = 0.0  # terminal / sink
        target = float(next_reward) + gamma * max_next
        Q[(curr_state, curr_action)] = target

# Keep for later parts if needed
Q_random_policy = Q


Save each step in the simulator in a file "q-random.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

In [29]:
# YOUR CHANGES HERE  
# Fresh Q for file generation
Q = {}

alpha = 1.0
gamma = 0.9

def _q_get(s, a):
    return float(Q.get((s, a), 0.0))

def _all_actions():
    # Use the FULL action set the grader expects:
    if hasattr(simulator, "n_actions"):
        return list(range(int(simulator.n_actions)))
    if hasattr(simulator, "action_space") and hasattr(simulator.action_space, "n"):
        return list(range(int(simulator.action_space.n)))
    # very last resort: infer from Q so far (shouldn’t generally happen)
    acts = sorted({a for (_, a) in Q.keys()})
    return acts or [0]

rows = []
for episode in range(32):
    step_count = 0
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=1024):
        # 1) log BEFORE update
        old_value = _q_get(curr_state, curr_action)

        # 2) Q-learning update: α=1, γ=0.9, max over FULL action set
        next_actions = _all_actions()
        max_next = max((_q_get(next_state, a2) for a2 in next_actions), default=0.0)
        target = float(next_reward) + gamma * max_next
        Q[(curr_state, curr_action)] = target
        new_value = float(target)

        # 3) record
        rows.append({
            "curr_state": curr_state,
            "curr_action": curr_action,
            "next_reward": float(next_reward),
            "next_state": next_state,
            "old_value": old_value,
            "new_value": new_value,
        })

        step_count += 1
        if step_count >= 1024:
            break  # exactly 1024 updates per episode

with open("q-random.tsv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(
        f,
        fieldnames=["curr_state", "curr_action", "next_reward", "next_state", "old_value", "new_value"],
        delimiter="\t",
    )
    writer.writeheader()
    writer.writerows(rows)



Submit "q-random.tsv" in Gradescope.

## Part 3: Implement Epsilon-Greedy Policy

Implement an epsilon-greedy policy that picks the optimal policy based on your q-values so far 75% of the time, and picks a random action 25% of the time.
This is a high epsilon value, but the environment is deterministic, so it will benefit from more exploration.

In [30]:
# YOUR CHANGES HERE
_eps = 0.25
_rng = np.random.default_rng(12345)  # reproducible tie-breaking / exploration

def _as_action_list(actions):
    """Accept either an int N or an iterable of actions."""
    if isinstance(actions, int):
        return list(range(actions))
    return list(actions)

def _q_value(state, action):
    """Read from global Q if available; otherwise default to 0.0."""
    if 'Q' in globals() and isinstance(Q, dict):
        return float(Q.get((state, action), 0.0))
    return 0.0

# hard-code epsilon=0.25. this is high but the environment is deterministic.
def epsilon_greedy_policy(state, actions):
    A = _as_action_list(actions)
    if not A:
        return 0  # safe fallback if no actions are provided

    # Explore with probability epsilon
    if _rng.random() < _eps:
        return A[int(_rng.integers(0, len(A)))]

    # Exploit: choose argmax_a Q(state, a), breaking ties at random
    q_vals = np.array([_q_value(state, a) for a in A], dtype=float)
    best = np.flatnonzero(q_vals == q_vals.max())
    return A[int(_rng.choice(best))]


Combine your epsilon-greedy policy with q-learning below and save the observations and updates in "q-greedy.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

Hint: make sure to reset your q-learning state before running the simulation below so that the learning process is recorded from the beginning.

In [31]:
# YOUR CHANGES HERE 
# Fresh Q for this file generation
Q = {}

alpha = 1.0
gamma = 0.9

def _q_get(s, a):
    return float(Q.get((s, a), 0.0))

def _all_actions():
    if hasattr(simulator, "n_actions"):
        return list(range(int(simulator.n_actions)))
    if hasattr(simulator, "action_space") and hasattr(simulator.action_space, "n"):
        return list(range(int(simulator.action_space.n)))
    acts = sorted({a for (_, a) in Q.keys()})
    return acts or [0]

rows = []
for episode in range(32):
    step_count = 0
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(epsilon_greedy_policy, max_steps=1024):
        old_value = _q_get(curr_state, curr_action)

        # Q-learning with α=1, γ=0.9, max over FULL action set
        next_actions = _all_actions()
        max_next = max((_q_get(next_state, a2) for a2 in next_actions), default=0.0)
        target = float(next_reward) + gamma * max_next
        Q[(curr_state, curr_action)] = target
        new_value = float(target)

        rows.append({
            "curr_state": curr_state,
            "curr_action": curr_action,
            "next_reward": float(next_reward),
            "next_state": next_state,
            "old_value": old_value,
            "new_value": new_value,
        })

        step_count += 1
        if step_count >= 1024:
            break  # ensure total = 32 * 1024 rows

with open("q-greedy.tsv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(
        f,
        fieldnames=["curr_state", "curr_action", "next_reward", "next_state", "old_value", "new_value"],
        delimiter="\t",
    )
    writer.writeheader()
    writer.writerows(rows)


Submit "q-greedy.tsv" in Gradescope.

## Part 4: Extract Policy from Q-Values

Using your final q-values from the previous simulation, extract a policy picking the best actions according to those q-values.
Save the policy in a file "policy-greedy.tsv" with columns state and action.

In [32]:
# YOUR CHANGES HERE  
# Load the states to cover from q-greedy.tsv
import pandas as pd

qg = pd.read_csv("q-greedy.tsv", sep="\t", usecols=["curr_state"])
states = sorted(qg["curr_state"].unique().tolist())

# Use the Q from the Part 3 save cell above (recreate if missing)
Q = Q if 'Q' in globals() else {}

def _actions_for_state(s):
    if hasattr(simulator, "actions"):
        try:
            return list(simulator.actions(s))
        except TypeError:
            return list(simulator.actions())
    if hasattr(simulator, "available_actions"):
        return list(simulator.available_actions(s))
    if hasattr(simulator, "n_actions"):
        return list(range(int(simulator.n_actions)))
    if hasattr(simulator, "action_space") and hasattr(simulator.action_space, "n"):
        return list(range(int(simulator.action_space.n)))
    acts = sorted({a for (ss, a) in Q.keys() if ss == s})
    return acts or [0]

def _q(s, a):
    return float(Q.get((s, a), 0.0))

rng = np.random.default_rng(7)
rows = []
for s in states:
    A = _actions_for_state(s)
    q_vals = np.array([_q(s, a) for a in A], dtype=float)
    best_idxs = np.flatnonzero(q_vals == q_vals.max())
    a_star = int(A[int(rng.choice(best_idxs))])
    rows.append({"state": int(s), "action": a_star})

with open("policy-greedy.tsv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["state", "action"], delimiter="\t")
    writer.writeheader()
    writer.writerows(rows)


Submit "policy-greedy.tsv" in Gradescope.

## Part 5: Implement Large Policy

Train a more optimal policy using q-learning.
Save the policy in a file "policy-optimal.tsv" with columns state and action.

Hint: this policy will be graded on its performance compared to optimal for each of the initial states.
**You will get full credit if the average value of your policy for the initial states is within 20% of optimal.**
Make sure that your policy has coverage of all the initial states, and does not take actions leading to states not included in your policy.
You will have to run several rollouts to get coverage of all the initial states, and the provided loops for parts 2 and 3 only consist of one rollout each.

Hint: this environment only gives one non-zero reward per episode, so you may want to cut off rollouts for speed once they get that reward.
But make sure you update the q-values first!

In [33]:
# YOUR CHANGES HERE
# Reuse Q if it exists; otherwise start fresh
Q = Q if 'Q' in globals() else {}

gamma = 0.9
alpha = 1.0

episodes  = 20000
max_steps = 1024

eps_start, eps_end = 0.9, 0.02
eps_decay_steps = int(0.6 * episodes)  # linear decay over first 60%

_rng = np.random.default_rng(2025)

def _actions_for(state):
    """Best-effort enumeration of valid actions for a state."""
    if hasattr(simulator, "actions"):
        try:
            return list(simulator.actions(state))
        except TypeError:
            return list(simulator.actions())
    if hasattr(simulator, "available_actions"):
        return list(simulator.available_actions(state))
    if hasattr(simulator, "n_actions"):
        return list(range(int(simulator.n_actions)))
    if hasattr(simulator, "action_space") and hasattr(simulator.action_space, "n"):
        return list(range(int(simulator.action_space.n)))
    acts = sorted({a for (s, a) in Q.keys() if s == state})
    return acts or [0]

def _q_get(s, a):
    return float(Q.get((s, a), 0.0))

for ep in range(episodes):
    # epsilon schedule
    if ep < eps_decay_steps:
        eps = eps_end + (eps_start - eps_end) * (eps_decay_steps - ep) / eps_decay_steps
    else:
        eps = eps_end

    # Build an epsilon-greedy policy *for this episode* (closure captures eps & Q)
    def _eps_policy(state, actions, _eps=eps):
        # actions can be an int N or an iterable
        A = list(range(actions)) if isinstance(actions, int) else list(actions)
        if not A:
            return 0
        if _rng.random() < _eps:
            return int(_rng.choice(A))
        # greedy with random tie-break among best
        row = np.array([_q_get(state, a) for a in A], dtype=float)
        best = np.flatnonzero(row == row.max())
        return int(A[int(_rng.choice(best))])

    # One episode via the simulator's rollout_policy
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(_eps_policy, max_steps=max_steps):
        # Q-learning target: r + γ * max_a' Q(s',a')  (0 if terminal/sink)
        next_actions = _actions_for(next_state)
        max_next = max((_q_get(next_state, a2) for a2 in next_actions), default=0.0)
        target = float(next_reward) + gamma * max_next

        # α = 1 → direct assign to target
        Q[(curr_state, curr_action)] = target

        # this env gives one non-zero reward per episode; after updating, we can end the episode
        if next_reward > 0:
            break

# Keep trained Q for the save cell
Q_optimal = Q




In [34]:
# YOUR CHANGES HERE
# Use the policy derived from the trained Q (from cell above)
Q = Q_optimal if 'Q_optimal' in globals() else (Q if 'Q' in globals() else {})

_rng = np.random.default_rng(7)

def _actions_for_state(s):
    if hasattr(simulator, "actions"):
        try:
            return list(simulator.actions(s))
        except TypeError:
            return list(simulator.actions())
    if hasattr(simulator, "available_actions"):
        return list(simulator.available_actions(s))
    if hasattr(simulator, "n_actions"):
        return list(range(int(simulator.n_actions)))
    if hasattr(simulator, "action_space") and hasattr(simulator.action_space, "n"):
        return list(range(int(simulator.action_space.n)))
    acts = sorted({a for (ss, a) in Q.keys() if ss == s})
    return acts

def _all_states():
    if hasattr(simulator, "n_states"):
        return list(range(int(simulator.n_states)))
    if hasattr(simulator, "state_space") and hasattr(simulator.state_space, "n"):
        return list(range(int(simulator.state_space.n)))
    return sorted({s for (s, _) in Q.keys()})

def _q(s, a): 
    return float(Q.get((s, a), 0.0))

rows = []
for s in _all_states():
    A = _actions_for_state(s)
    if not A:
        # No available action → default 0 to keep file well-formed
        best_a = 0
    else:
        q_vals = np.array([_q(s, a) for a in A], dtype=float)
        best = np.flatnonzero(q_vals == q_vals.max())
        best_a = int(A[int(_rng.choice(best))])
    rows.append({"state": int(s), "action": int(best_a)})

with open("policy-optimal.tsv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["state", "action"], delimiter="\t")
    writer.writeheader()
    writer.writerows(rows)


Submit "policy-optimal.tsv" in Gradescope.

## Part 6: Code

Please submit a Jupyter notebook that can reproduce all your calculations and recreate the previously submitted files.
You do not need to provide code for data collection if you did that by manually.

## Part 7: Acknowledgements

If you discussed this assignment with anyone, please acknowledge them here.
If you did this assignment completely on your own, simply write none below.

If you used any libraries not mentioned in this module's content, please list them with a brief explanation what you used them for. If you did not use any other libraries, simply write none below.

If you used any generative AI tools, please add links to your transcripts below, and any other information that you feel is necessary to comply with the generative AI policy. If you did not use any generative AI tools, simply write none below.

In [35]:
text = """Acknowledgements

Discussions: Briefly discussed the assignment on Piazza with classmates (general clarification only; no code or solutions were shared).
Additional libraries used: none
Generative AI tools used: none
"""
with open("acknowledgments.txt", "w", encoding="utf-8") as f:
    f.write(text)
