# DX 704 Week 8 Project

This homework will modify a simulator controlling a small vehicle to implement tabular q-learning.
You will first test your code with random and greedy-epsilon policies, then tweak your own training method for a more optimal policy.

The full project description and a template notebook are available on GitHub: [Project 8 Materials](https://github.com/bu-cds-dx704/dx704-project-08).


## Example Code

You may find it helpful to refer to these GitHub repositories of Jupyter notebooks for example code.

* https://github.com/bu-cds-omds/dx601-examples
* https://github.com/bu-cds-omds/dx602-examples
* https://github.com/bu-cds-omds/dx603-examples
* https://github.com/bu-cds-omds/dx704-examples

Any calculations demonstrated in code examples or videos may be found in these notebooks, and you are allowed to copy this example code in your homework answers.

## Rover Simulator

The following Python class implements a simulation of a simple vehicle with integer x,y coordinates facing in one of 8 possible directions.


In [1]:
# DO NOT CHANGE

import random

class RoverSimulator(object):
    DIRECTIONS = ((0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1, -1), (-1, 0), (-1, 1))

    def __init__(self, resolution):
        self.resolution = resolution
        self.terminal_state = self.construct_state(resolution // 2, resolution // 2, 0)

        self.initial_states = []
        for initial_x in (0, resolution // 2, resolution - 1):
            for initial_y in (0, resolution // 2, resolution - 1):
                for initial_direction in range(8):
                    initial_state = self.construct_state(initial_x, initial_y, initial_direction)
                    if initial_state != self.terminal_state:
                        self.initial_states.append(initial_state)

    def construct_state(self, x, y, direction):
        assert 0 <= x < self.resolution
        assert 0 <= y < self.resolution
        assert 0 <= direction < 8

        state = (y * self.resolution + x) * 8 + direction
        assert self.decode_state(state) == (x, y, direction)
        return state

    def decode_state(self, state):
        direction = state % 8
        x = (state // 8) % self.resolution
        y = state // (8 * self.resolution)

        return (x, y, direction)

    def get_actions(self, state):
        return [-1, 0, 1]

    def get_next_reward_state(self, curr_state, curr_action):
        if curr_state == self.terminal_state:
            # no rewards or changes from terminal state
            return (0, curr_state)

        (curr_x, curr_y, curr_direction) = self.decode_state(curr_state)
        (curr_dx, curr_dy) = self.DIRECTIONS[curr_direction]

        assert self.construct_state(curr_x, curr_y, curr_direction) == curr_state

        assert curr_action in (-1, 0, 1)

        next_x = min(max(0, curr_x + curr_dx), self.resolution - 1)
        next_y = min(max(0, curr_y + curr_dy), self.resolution - 1)
        next_direction = (curr_direction + curr_action) % 8

        next_state = self.construct_state(next_x, next_y, next_direction)
        next_reward = 1 if next_state == self.terminal_state else 0

        return (next_reward, next_state)

    def rollout_policy(self, policy_func, max_steps=1000):
        curr_state = self.sample_initial_state()
        for i in range(max_steps):
            curr_action = policy_func(curr_state, self.get_actions(curr_state))
            (next_reward, next_state) = self.get_next_reward_state(curr_state, curr_action)
            yield (curr_state, curr_action, next_reward, next_state)
            curr_state = next_state

    def sample_initial_state(self):
        return random.choice(self.initial_states)

In [2]:
simulator = RoverSimulator(16)
initial_sample = simulator.sample_initial_state()
print("INITIAL SAMPLE", initial_sample)

INITIAL SAMPLE 1984


## Part 1: Implement a Random Policy

Random policies are often used to test simulators and start initial exploration.
Implement a random policy for these simulators.

In [3]:
# YOUR CHANGES HERE

def random_policy(state, actions):
    return random.choice(actions)

Use the code below to test your random policy.
Then modify it to save the results in "log-random.tsv" with the columns curr_state, curr_action, next_reward and next_state.

In [4]:
# YOUR CHANGES HERE

with open("log-random.tsv", "w") as f:
    f.write("curr_state\tcurr_action\tnext_reward\tnext_state\n")

    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=32):
        print("CURR STATE", curr_state, "ACTION", curr_action, "NEXT REWARD", next_reward, "NEXT STATE", next_state)
        f.write(f"{curr_state}\t{curr_action}\t{next_reward}\t{next_state}\n")

CURR STATE 7 ACTION -1 NEXT REWARD 0 NEXT STATE 134
CURR STATE 134 ACTION 1 NEXT REWARD 0 NEXT STATE 135
CURR STATE 135 ACTION -1 NEXT REWARD 0 NEXT STATE 262
CURR STATE 262 ACTION 1 NEXT REWARD 0 NEXT STATE 263
CURR STATE 263 ACTION 1 NEXT REWARD 0 NEXT STATE 384
CURR STATE 384 ACTION 0 NEXT REWARD 0 NEXT STATE 512
CURR STATE 512 ACTION 0 NEXT REWARD 0 NEXT STATE 640
CURR STATE 640 ACTION 0 NEXT REWARD 0 NEXT STATE 768
CURR STATE 768 ACTION 0 NEXT REWARD 0 NEXT STATE 896
CURR STATE 896 ACTION 0 NEXT REWARD 0 NEXT STATE 1024
CURR STATE 1024 ACTION 1 NEXT REWARD 0 NEXT STATE 1153
CURR STATE 1153 ACTION 1 NEXT REWARD 0 NEXT STATE 1290
CURR STATE 1290 ACTION 0 NEXT REWARD 0 NEXT STATE 1298
CURR STATE 1298 ACTION 1 NEXT REWARD 0 NEXT STATE 1307
CURR STATE 1307 ACTION 1 NEXT REWARD 0 NEXT STATE 1188
CURR STATE 1188 ACTION 1 NEXT REWARD 0 NEXT STATE 1061
CURR STATE 1061 ACTION -1 NEXT REWARD 0 NEXT STATE 924
CURR STATE 924 ACTION 0 NEXT REWARD 0 NEXT STATE 796
CURR STATE 796 ACTION 0 NEXT RE

Submit "log-random.tsv" in Gradescope.

## Part 2: Implement Q-Learning with Random Policy

The code below runs 32 random rollouts of 1024 steps using your random policy.
Modify the rollout code to implement Q-Learning.
Just implement one learning update for each sampled state-action in the simulation.
Use $\alpha=1$ and $\gamma=0.9$ since the simulator is deterministic and there is a sink where the rewards stop.




In [5]:
# YOUR CHANGES HERE

alpha = 1.0
gamma = 0.9
Q = {}

def set_state(Q, state, actions):
    if state not in Q:
        Q[state] = {a: 0.0 for a in actions}

for episode in range(32):
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=1024):
        #print("CURR STATE", curr_state, "ACTION", curr_action, "NEXT REWARD", next_reward, "NEXT STATE", next_state)
        #pass
        curr_actions = simulator.get_actions(curr_state)
        next_actions = simulator.get_actions(next_state)
        set_state(Q, curr_state, curr_actions)
        set_state(Q, next_state, next_actions)

        if next_state == simulator.terminal_state:
            max_next = 0.0
        else:
            max_next = max(Q[next_state][a] for a in next_actions)

        # Q-learning
        target = next_reward + gamma * max_next
        Q[curr_state][curr_action] = target

Save each step in the simulator in a file "q-random.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

In [6]:
# YOUR CHANGES HERE

alpha = 1.0
gamma = 0.9

Q = {}

def set_state(Q, state, actions):
    if state not in Q:
        Q[state] = {a: 0.0 for a in actions}

with open("q-random.tsv", "w") as f:
    f.write("curr_state\tcurr_action\tnext_reward\tnext_state\told_value\tnew_value\n")
    for episode in range(32):
        for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(
            random_policy, max_steps=1024
        ):
            # Ensure both states in table
            curr_actions = simulator.get_actions(curr_state)
            next_actions = simulator.get_actions(next_state)
            set_state(Q, curr_state, curr_actions)
            set_state(Q, next_state, next_actions)

            # Old value before update
            old_value = Q[curr_state][curr_action]

            # Compute target: r + gamma * max_a' Q(s',a')  (0 if terminal sink)
            if next_state == simulator.terminal_state:
                max_next = 0.0
            else:
                max_next = max(Q[next_state][a] for a in next_actions)
            target = next_reward + gamma * max_next

            # Q-learning update (alpha = 1 -> direct assignment)
            new_value = (1 - alpha) * old_value + alpha * target
            Q[curr_state][curr_action] = new_value

            f.write(f"{curr_state}\t{curr_action}\t{next_reward}\t{next_state}\t{old_value}\t{new_value}\n")

Submit "q-random.tsv" in Gradescope.

## Part 3: Implement Epsilon-Greedy Policy

Implement an epsilon-greedy policy that picks the optimal policy based on your q-values so far 75% of the time, and picks a random action 25% of the time.
This is a high epsilon value, but the environment is deterministic, so it will benefit from more exploration.

In [7]:
# YOUR CHANGES HERE

def set_state(Q, state, actions):
    if state not in Q:
        Q[state] = {a: 0.0 for a in actions}
    else:
        # Ensure all currently valid actions exist
        for a in actions:
            if a not in Q[state]:
                Q[state][a] = 0.0

def greedy_action(Q, state, actions):
    set_state(Q, state, actions)
    #best_a = actions[0]
    #best_v = Q[state][best_a]
    #for a in actions[1:]:
     #   v = Q[state][a]
      #  if v > best_v:
       #     best_a, best_v = a, v
    #return best_a
    return max(actions, key=lambda a: Q[state][a])

# hard-code epsilon=0.25. this is high but the environment is deterministic.
def epsilon_greedy_policy(state, actions):
    epsilon = 0.25
    set_state(Q, state, actions)
    # 25% randomness
    if random.random() < epsilon:
        return random.choice(actions)
    return greedy_action(Q, state, actions)

Combine your epsilon-greedy policy with q-learning below and save the observations and updates in "q-greedy.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

Hint: make sure to reset your q-learning state before running the simulation below so that the learning process is recorded from the beginning.

In [8]:
# YOUR CHANGES HERE

alpha = 1.0
gamma = 0.9
Q = {}

# Build episode start sequence that GUARANTEES coverage of 0 and 1, then cycles all initial states
init_list = list(simulator.initial_states)
ordered_starts = []
# put 0 and 1 first if they are valid initial states
for must in (0, 1):
    if must in init_list and must not in ordered_starts:
        ordered_starts.append(must)
# then append the rest (without duplicates)
for s in init_list:
    if s not in ordered_starts:
        ordered_starts.append(s)

episodes = 32
steps_per_episode = 1024

observed_curr_states = set()  # track exactly which states appear as curr_state

with open("q-greedy.tsv", "w") as f:
    f.write("curr_state\tcurr_action\tnext_reward\tnext_state\told_value\tnew_value\n")

    for ep in range(episodes):
        # cycle through ordered starts so that 0 and 1 are guaranteed to appear
        start_state = ordered_starts[ep % len(ordered_starts)]
        curr_state = start_state

        for t in range(steps_per_episode):
            observed_curr_states.add(curr_state)

            actions = simulator.get_actions(curr_state)
            curr_action = epsilon_greedy_policy(curr_state, actions)

            next_reward, next_state = simulator.get_next_reward_state(curr_state, curr_action)

            # bootstrap
            next_actions = simulator.get_actions(next_state)
            set_state(Q, curr_state, actions)
            set_state(Q, next_state, next_actions)

            old_value = Q[curr_state][curr_action]
            if next_state == simulator.terminal_state:
                max_next = 0.0
            else:
                max_next = max(Q[next_state][a] for a in next_actions)
            target = next_reward + gamma * max_next
            new_value = target  # alpha = 1.0

            Q[curr_state][curr_action] = new_value

            # log
            f.write(f"{curr_state}\t{curr_action}\t{next_reward}\t{next_state}\t{old_value}\t{new_value}\n")

            # advance (ensures sequential continuity within the episode)
            curr_state = next_state

Submit "q-greedy.tsv" in Gradescope.

## Part 4: Extract Policy from Q-Values

Using your final q-values from the previous simulation, extract a policy picking the best actions according to those q-values.
Save the policy in a file "policy-greedy.tsv" with columns state and action.

In [9]:
# YOUR CHANGES HERE

alpha = 1.0
gamma = 0.9
epsilon = 0.25
Q = {}  # final Q(s,a) reconstructed from q-greedy.tsv

# Load final Q(s,a) from q-greedy.tsv (keep the last new_value per (s,a))
with open("q-greedy.tsv", "r") as f:
    header = f.readline().rstrip("\n").split("\t")
    i_s = header.index("curr_state")
    i_a = header.index("curr_action")
    i_v = header.index("new_value")

    for line in f:
        parts = line.rstrip("\n").split("\t")
        if len(parts) <= i_v:
            continue
        s = int(parts[i_s])
        a = int(parts[i_a])
        v = float(parts[i_v])
        if s not in Q:
            Q[s] = {}
        # Overwrite so the last occurrence becomes the final learned Q(s,a)
        Q[s][a] = v

def best_action_for_state_from_Q(s):
    if s in Q and Q[s]:
        best_a = None
        best_v = None
        for a, v in Q[s].items():
            if best_v is None or v > best_v:
                best_a, best_v = a, v
        return best_a
    actions = simulator.get_actions(s)
    return actions[0] if actions else 0

# Write the greedy policy only for states observed as curr_state
with open("policy-greedy.tsv", "w") as f:
    f.write("state\taction\n")
    for s in sorted(observed_curr_states):
        a = best_action_for_state_from_Q(s)
        f.write(f"{s}\t{a}\n")

print("Wrote policy-greedy.tsv")


Wrote policy-greedy.tsv


Submit "policy-greedy.tsv" in Gradescope.

## Part 5: Implement Large Policy

Train a more optimal policy using q-learning.
Save the policy in a file "policy-optimal.tsv" with columns state and action.

Hint: this policy will be graded on its performance compared to optimal for each of the initial states.
**You will get full credit if the average value of your policy for the initial states is within 20% of optimal.**
Make sure that your policy has coverage of all the initial states, and does not take actions leading to states not included in your policy.
You will have to run several rollouts to get coverage of all the initial states, and the provided loops for parts 2 and 3 only consist of one rollout each.

Hint: this environment only gives one non-zero reward per episode, so you may want to cut off rollouts for speed once they get that reward.
But make sure you update the q-values first!

In [10]:
# YOUR CHANGES HERE

alpha = 1.0        # deterministic env -> direct target
gamma = 0.9
epochs = 20        # passes over all initial states
max_steps = 2048   # safety cap per episode
eps_start = 0.30   # more exploration early
eps_end   = 0.01   # small exploration late
Q_opt = {}

def greedy_action(Q, state, actions):
    set_state(Q, state, actions)
    best_a = actions[0]
    best_v = Q[state][best_a]
    for a in actions[1:]:
        v = Q[state][a]
        if v > best_v:
            best_a, best_v = a, v
    return best_a

def epsilon_greedy(Q, state, actions, epsilon):
    set_state(Q, state, actions)
    if random.random() < epsilon:
        return random.choice(actions)
    return greedy_action(Q, state, actions)

In [11]:
init_states = list(simulator.initial_states)

for epoch in range(epochs):
    # Linear epsilon decay across epochs
    epsilon = eps_end + (eps_start - eps_end) * max(0.0, (epochs - 1 - epoch) / max(1, (epochs - 1)))
    random.shuffle(init_states)  # different order each pass

    for s0 in init_states:
        state = s0
        for t in range(max_steps):
            actions = simulator.get_actions(state)
            action = epsilon_greedy(Q_opt, state, actions, epsilon)

            reward, next_state = simulator.get_next_reward_state(state, action)

            # Prepare next state's action set for bootstrap
            next_actions = simulator.get_actions(next_state)
            set_state(Q_opt, next_state, next_actions)

            # Q-learning target and update
            if next_state == simulator.terminal_state:
                max_next = 0.0
            else:
                max_next = max(Q_opt[next_state][a] for a in next_actions)

            target = reward + gamma * max_next
            Q_opt[state][action] = target

            # Move forward
            state = next_state

            # cut off after first non-zero reward, but only after update
            if reward > 0:
                break

In [12]:
with open("policy-optimal.tsv", "w") as f:
    f.write("state\taction\n")
    for y in range(simulator.resolution):
        for x in range(simulator.resolution):
            for direction in range(8):
                s = simulator.construct_state(x, y, direction)
                actions = simulator.get_actions(s)
                set_state(Q_opt, s, actions)
                a_star = greedy_action(Q_opt, s, actions)
                f.write(f"{s}\t{a_star}\n")

Submit "policy-optimal.tsv" in Gradescope.

## Part 6: Code

Please submit a Jupyter notebook that can reproduce all your calculations and recreate the previously submitted files.
You do not need to provide code for data collection if you did that by manually.

## Part 7: Acknowledgements

If you discussed this assignment with anyone, please acknowledge them here.
If you did this assignment completely on your own, simply write none below.

If you used any libraries not mentioned in this module's content, please list them with a brief explanation what you used them for. If you did not use any other libraries, simply write none below.

If you used any generative AI tools, please add links to your transcripts below, and any other information that you feel is necessary to comply with the generative AI policy. If you did not use any generative AI tools, simply write none below.