# DX 704 Week 8 Project

This homework will modify a simulator controlling a small vehicle to implement tabular q-learning.
You will first test your code with random and greedy-epsilon policies, then tweak your own training method for a more optimal policy.

The full project description and a template notebook are available on GitHub: [Project 8 Materials](https://github.com/bu-cds-dx704/dx704-project-08).


## Example Code

You may find it helpful to refer to these GitHub repositories of Jupyter notebooks for example code.

* https://github.com/bu-cds-omds/dx601-examples
* https://github.com/bu-cds-omds/dx602-examples
* https://github.com/bu-cds-omds/dx603-examples
* https://github.com/bu-cds-omds/dx704-examples

Any calculations demonstrated in code examples or videos may be found in these notebooks, and you are allowed to copy this example code in your homework answers.

## Rover Simulator

The following Python class implements a simulation of a simple vehicle with integer x,y coordinates facing in one of 8 possible directions.


In [5]:
# DO NOT CHANGE

import random

class RoverSimulator(object):
    DIRECTIONS = ((0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1, -1), (-1, 0), (-1, 1))

    def __init__(self, resolution):
        self.resolution = resolution
        self.terminal_state = self.construct_state(resolution // 2, resolution // 2, 0)

        self.initial_states = []
        for initial_x in (0, resolution // 2, resolution - 1):
            for initial_y in (0, resolution // 2, resolution - 1):
                for initial_direction in range(8):
                    initial_state = self.construct_state(initial_x, initial_y, initial_direction)
                    if initial_state != self.terminal_state:
                        self.initial_states.append(initial_state)

    def construct_state(self, x, y, direction):
        assert 0 <= x < self.resolution
        assert 0 <= y < self.resolution
        assert 0 <= direction < 8

        state = (y * self.resolution + x) * 8 + direction
        assert self.decode_state(state) == (x, y, direction)
        return state

    def decode_state(self, state):
        direction = state % 8
        x = (state // 8) % self.resolution
        y = state // (8 * self.resolution)

        return (x, y, direction)

    def get_actions(self, state):
        return [-1, 0, 1]

    def get_next_reward_state(self, curr_state, curr_action):
        if curr_state == self.terminal_state:
            # no rewards or changes from terminal state
            return (0, curr_state)

        (curr_x, curr_y, curr_direction) = self.decode_state(curr_state)
        (curr_dx, curr_dy) = self.DIRECTIONS[curr_direction]

        assert self.construct_state(curr_x, curr_y, curr_direction) == curr_state

        assert curr_action in (-1, 0, 1)

        next_x = min(max(0, curr_x + curr_dx), self.resolution - 1)
        next_y = min(max(0, curr_y + curr_dy), self.resolution - 1)
        next_direction = (curr_direction + curr_action) % 8

        next_state = self.construct_state(next_x, next_y, next_direction)
        next_reward = 1 if next_state == self.terminal_state else 0

        return (next_reward, next_state)

    def rollout_policy(self, policy_func, max_steps=1000):
        curr_state = self.sample_initial_state()
        for i in range(max_steps):
            curr_action = policy_func(curr_state, self.get_actions(curr_state))
            (next_reward, next_state) = self.get_next_reward_state(curr_state, curr_action)
            yield (curr_state, curr_action, next_reward, next_state)
            curr_state = next_state

    def sample_initial_state(self):
        return random.choice(self.initial_states)

In [6]:
simulator = RoverSimulator(16)
initial_sample = simulator.sample_initial_state()
print("INITIAL SAMPLE", initial_sample)

INITIAL SAMPLE 1985


## Part 1: Implement a Random Policy

Random policies are often used to test simulators and start initial exploration.
Implement a random policy for these simulators.

In [7]:
# YOUR CHANGES HERE

def random_policy(state, actions):
    return random.choice(actions)

Use the code below to test your random policy.
Then modify it to save the results in "log-random.tsv" with the columns curr_state, curr_action, next_reward and next_state.

In [10]:
# YOUR CHANGES HERE
import csv

#print the output
print("Testing random policy:")
for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=32):
    print("CURR STATE", curr_state, "ACTION", curr_action, "NEXT REWARD", next_reward, "NEXT STATE", next_state)

# Now save to TSV file
with open('log-random.tsv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    # Write header
    writer.writerow(['curr_state', 'curr_action', 'next_reward', 'next_state'])
    
    # Write data
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=32):
        writer.writerow([curr_state, curr_action, next_reward, next_state])

print("\nResults saved to log-random.tsv")

Testing random policy:
CURR STATE 2041 ACTION 1 NEXT REWARD 0 NEXT STATE 2042
CURR STATE 2042 ACTION 0 NEXT REWARD 0 NEXT STATE 2042
CURR STATE 2042 ACTION 0 NEXT REWARD 0 NEXT STATE 2042
CURR STATE 2042 ACTION 0 NEXT REWARD 0 NEXT STATE 2042
CURR STATE 2042 ACTION -1 NEXT REWARD 0 NEXT STATE 2041
CURR STATE 2041 ACTION 1 NEXT REWARD 0 NEXT STATE 2042
CURR STATE 2042 ACTION -1 NEXT REWARD 0 NEXT STATE 2041
CURR STATE 2041 ACTION -1 NEXT REWARD 0 NEXT STATE 2040
CURR STATE 2040 ACTION 1 NEXT REWARD 0 NEXT STATE 2041
CURR STATE 2041 ACTION 1 NEXT REWARD 0 NEXT STATE 2042
CURR STATE 2042 ACTION 1 NEXT REWARD 0 NEXT STATE 2043
CURR STATE 2043 ACTION -1 NEXT REWARD 0 NEXT STATE 1914
CURR STATE 1914 ACTION -1 NEXT REWARD 0 NEXT STATE 1913
CURR STATE 1913 ACTION -1 NEXT REWARD 0 NEXT STATE 2040
CURR STATE 2040 ACTION 1 NEXT REWARD 0 NEXT STATE 2041
CURR STATE 2041 ACTION 1 NEXT REWARD 0 NEXT STATE 2042
CURR STATE 2042 ACTION -1 NEXT REWARD 0 NEXT STATE 2041
CURR STATE 2041 ACTION 1 NEXT REWAR

Submit "log-random.tsv" in Gradescope.

## Part 2: Implement Q-Learning with Random Policy

The code below runs 32 random rollouts of 1024 steps using your random policy.
Modify the rollout code to implement Q-Learning.
Just implement one learning update for each sampled state-action in the simulation.
Use $\alpha=1$ and $\gamma=0.9$ since the simulator is deterministic and there is a sink where the rewards stop.




In [20]:
# YOUR CHANGES HERE

# Initialize Q-table as a dictionary
# Key: (state, action), Value: Q-value
Q = {}

alpha = 1.0  # Learning rate
gamma = 0.9  # Discount factor

for episode in range(32):
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=1024):
        #print("CURR STATE", curr_state, "ACTION", curr_action, "NEXT REWARD", next_reward, "NEXT STATE", next_state)
        # pass
        # Get current Q-value (initialize to 0 if not seen before)
        old_value = Q.get((curr_state, curr_action), 0.0)
        
        # Get max Q-value for next state over all possible actions
        next_actions = simulator.get_actions(next_state)
        max_next_q = max([Q.get((next_state, a), 0.0) for a in next_actions])
        
        # Q-Learning update: Q(s,a) = Q(s,a) + alpha * [r + gamma * max_a' Q(s',a') - Q(s,a)]
        # With alpha=1, this simplifies to: Q(s,a) = r + gamma * max_a' Q(s',a')
        new_value = old_value + alpha * (next_reward + gamma * max_next_q - old_value)
        
        # Update Q-table
        Q[(curr_state, curr_action)] = new_value

# Output results
print(f"Total state-action pairs learned: {len(Q)}")
print(f"Maximum Q-value: {max(Q.values()):.4f}")
print(f"Minimum Q-value: {min(Q.values()):.4f}")
print(f"Average Q-value: {sum(Q.values())/len(Q):.4f}")

Total state-action pairs learned: 5314
Maximum Q-value: 1.0000
Minimum Q-value: 0.0000
Average Q-value: 0.0202


Save each step in the simulator in a file "q-random.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

In [21]:
# YOUR CHANGES HERE

# Open file to save results
with open('q-random.tsv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    # Write header
    writer.writerow(['curr_state', 'curr_action', 'next_reward', 'next_state', 'old_value', 'new_value'])
    
    # Re-initialize Q-table for this logging run
    Q_log = {}
    
    for episode in range(32):
        for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=1024):
            # Get current Q-value (initialize to 0 if not seen before)
            old_value = Q_log.get((curr_state, curr_action), 0.0)
            
            # Get max Q-value for next state over all possible actions
            next_actions = simulator.get_actions(next_state)
            max_next_q = max([Q_log.get((next_state, a), 0.0) for a in next_actions])
            
            # Q-Learning update
            new_value = old_value + alpha * (next_reward + gamma * max_next_q - old_value)
            
            # Update Q-table
            Q_log[(curr_state, curr_action)] = new_value
            
            # Write to file
            writer.writerow([curr_state, curr_action, next_reward, next_state, old_value, new_value])

print("Results saved to q-random.tsv")

Results saved to q-random.tsv


Submit "q-random.tsv" in Gradescope.

## Part 3: Implement Epsilon-Greedy Policy

Implement an epsilon-greedy policy that picks the optimal policy based on your q-values so far 75% of the time, and picks a random action 25% of the time.
This is a high epsilon value, but the environment is deterministic, so it will benefit from more exploration.

In [35]:
# YOUR CHANGES HERE

# hard-code epsilon=0.25. this is high but the environment is deterministic.
def epsilon_greedy_policy(state, actions):
    epsilon = 0.25 # hard-coded value
    if random.random() < epsilon:
        # Explore: choose random action
        return random.choice(actions)
    else:
        # Exploit: choose action with highest Q-value
        q_values = [(action, Q.get((state, action), 0.0)) for action in actions]
        # Find the action with maximum Q-value
        best_action = max(q_values, key=lambda x: x[1])[0]
        return best_action

Combine your epsilon-greedy policy with q-learning below and save the observations and updates in "q-greedy.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

Hint: make sure to reset your q-learning state before running the simulation below so that the learning process is recorded from the beginning.

In [37]:
# YOUR CHANGES HERE

# Reset Q-learning state (reinitialize Q-table)
Q = {}
alpha = 1.0  # Learning rate
gamma = 0.9  # Discount factor

with open('q-greedy.tsv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    # Write header
    writer.writerow(['curr_state', 'curr_action', 'next_reward', 'next_state', 'old_value', 'new_value'])
    
    for episode in range(32):
        for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(epsilon_greedy_policy, max_steps=1024):
            #print("CURR STATE", curr_state, "ACTION", curr_action, "NEXT REWARD", next_reward, "NEXT STATE", next_state)
            
            # Get current Q-value (initialize to 0 if not seen before)
            old_value = Q.get((curr_state, curr_action), 0.0)
            
            # Get max Q-value for next state over all possible actions
            next_actions = simulator.get_actions(next_state)
            max_next_q = max([Q.get((next_state, a), 0.0) for a in next_actions])
            
            # Q-Learning update
            new_value = old_value + alpha * (next_reward + gamma * max_next_q - old_value)
            
            # Update Q-table
            Q[(curr_state, curr_action)] = new_value
            
            # Write to file
            writer.writerow([curr_state, curr_action, next_reward, next_state, old_value, new_value])
            
            # if next_reward > 0:
            #     # moving to terminal state
            #     break

# Report results
print(f"Total state-action pairs learned: {len(Q)}")
print(f"Maximum Q-value: {max(Q.values()):.4f}")
print(f"Minimum Q-value: {min(Q.values()):.4f}")
print(f"Average Q-value: {sum(Q.values())/len(Q):.4f}")
print("\nResults saved to q-greedy.tsv")

Total state-action pairs learned: 3410
Maximum Q-value: 1.0000
Minimum Q-value: 0.0000
Average Q-value: 0.0075

Results saved to q-greedy.tsv


Submit "q-greedy.tsv" in Gradescope.

## Part 4: Extract Policy from Q-Values

Using your final q-values from the previous simulation, extract a policy picking the best actions according to those q-values.
Save the policy in a file "policy-greedy.tsv" with columns state and action.

In [38]:
# YOUR CHANGES HERE
# Extract policy: for each state -> choose the action with the highest Q-value
policy = {}

# Get all unique states from the Q-table
states = set(state for (state, action) in Q.keys())

for state in states:
    # Get all actions available for this state
    actions = simulator.get_actions(state)
    
    # Find the action with the highest Q-value for this state
    best_action = None
    best_q_value = float('-inf')
    
    for action in actions:
        q_value = Q.get((state, action), 0.0)
        if q_value > best_q_value:
            best_q_value = q_value
            best_action = action
    
    policy[state] = best_action

# Save policy to file
with open('policy-greedy.tsv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    # Write column header
    writer.writerow(['state', 'action'])
    
    # Write policy (sorted by state for readability)
    for state in sorted(policy.keys()):
        writer.writerow([state, policy[state]])

# Output results
print(f"Policy extracted for {len(policy)} states")
print("Policy saved to policy-greedy.tsv")

# Show some example policy entries
print("\nSample policy entries (first 10):")
for i, (state, action) in enumerate(sorted(policy.items())[:10]):
    print(f"  State {state}: Action {action}")

Policy extracted for 1717 states
Policy saved to policy-greedy.tsv

Sample policy entries (first 10):
  State 3: Action -1
  State 4: Action -1
  State 5: Action -1
  State 6: Action -1
  State 7: Action -1
  State 10: Action -1
  State 11: Action -1
  State 13: Action -1
  State 17: Action -1
  State 18: Action -1


Submit "policy-greedy.tsv" in Gradescope.

## Part 5: Implement Large Policy

Train a more optimal policy using q-learning.
Save the policy in a file "policy-optimal.tsv" with columns state and action.

Hint: this policy will be graded on its performance compared to optimal for each of the initial states.
**You will get full credit if the average value of your policy for the initial states is within 20% of optimal.**
Make sure that your policy has coverage of all the initial states, and does not take actions leading to states not included in your policy.
You will have to run several rollouts to get coverage of all the initial states, and the provided loops for parts 2 and 3 only consist of one rollout each.

Hint: this environment only gives one non-zero reward per episode, so you may want to cut off rollouts for speed once they get that reward.
But make sure you update the q-values first!

In [44]:
# YOUR CHANGES HERE
# Reset Q-learning state (reinitialize Q-table)
Q = {}
alpha = 1.0  # Learning rate
gamma = 0.9  # Discount factor

# Use epsilon-greedy with decreasing epsilon for better convergence
def adaptive_epsilon_greedy_policy(state, actions, episode, total_episodes):
    # Start with higher exploration, decrease over time
    epsilon_start = 0.5
    epsilon_end = 0.1
    epsilon = epsilon_start - (epsilon_start - epsilon_end) * (episode / total_episodes)
    
    if random.random() < epsilon:
        # Explore: choose random action
        return random.choice(actions)
    else:
        # Exploit: choose action with highest Q-value
        q_values = [(action, Q.get((state, action), 0.0)) for action in actions]
        best_action = max(q_values, key=lambda x: x[1])[0]
        return best_action

# Run many more episodes to get better coverage and convergence
num_episodes = 1000
max_steps = 2048

print("Training optimal policy:")
episodes_with_reward = 0

for episode in range(num_episodes):
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(
        lambda s, a: adaptive_epsilon_greedy_policy(s, a, episode, num_episodes), 
        max_steps = max_steps
    ):
        # Get current Q-value
        old_value = Q.get((curr_state, curr_action), 0.0)
        
        # Get max Q-value for next state
        next_actions = simulator.get_actions(next_state)
        max_next_q = max([Q.get((next_state, a), 0.0) for a in next_actions])
        
        # Q-Learning update
        new_value = old_value + alpha * (next_reward + gamma * max_next_q - old_value)
        
        # Update Q-table
        Q[(curr_state, curr_action)] = new_value
        
        # if next_reward > 0:
        #     episodes_with_reward += 1
        #     # Got reward, end episode
        #     break
    
    # Print progress every 200 episodes
    if (episode + 1) % 200 == 0:
        print(f"Episode {episode + 1}/{num_episodes} complete. Episodes with reward: {episodes_with_reward}")

print(f"Total state-action pairs learned: {len(Q)}")
print(f"Episodes that reached goal: {episodes_with_reward}/{num_episodes}")
print(f"Maximum Q-value: {max(Q.values()):.4f}")
print(f"Minimum Q-value: {min(Q.values()):.4f}")
print(f"Average Q-value: {sum(Q.values())/len(Q):.4f}")

# Extract optimal policy
policy = {}
states = set(state for (state, action) in Q.keys())

for state in states:
    actions = simulator.get_actions(state)
    best_action = max(actions, key=lambda a: Q.get((state, a), 0.0))
    policy[state] = best_action

# Verify closure: ensure all states reachable from initial states are in policy
print("\nVerifying policy closure:")
states_to_check = set(simulator.initial_states)
visited = set()
missing_states = []

while states_to_check:
    state = states_to_check.pop()
    if state in visited:
        continue
    visited.add(state)
    
    if state == simulator.terminal_state:
        continue
    
    if state not in policy:
        missing_states.append(state)
        # Add a default action for missing states
        actions = simulator.get_actions(state)
        policy[state] = max(actions, key=lambda a: Q.get((state, a), 0.0))
    
    # Follow the policy to find next state
    action = policy[state]
    (next_reward, next_state) = simulator.get_next_reward_state(state, action)
    
    if next_state not in visited:
        states_to_check.add(next_state)

if missing_states:
    print(f"Added {len(missing_states)} missing states to ensure closure")
else:
    print("Policy is closed. All reachable states are covered")

# Check coverage of initial states
initial_states_covered = sum(1 for init_state in simulator.initial_states if init_state in policy)
print(f"\nInitial states covered: {initial_states_covered}/{len(simulator.initial_states)}")

Training optimal policy:
Episode 200/1000 complete. Episodes with reward: 0
Episode 400/1000 complete. Episodes with reward: 0
Episode 600/1000 complete. Episodes with reward: 0
Episode 800/1000 complete. Episodes with reward: 0
Episode 1000/1000 complete. Episodes with reward: 0
Total state-action pairs learned: 5328
Episodes that reached goal: 0/1000
Maximum Q-value: 1.0000
Minimum Q-value: 0.0000
Average Q-value: 0.2392

Verifying policy closure:
Policy is closed. All reachable states are covered

Initial states covered: 71/71


In [45]:
# Save the optimal policy to file
with open('policy-optimal.tsv', 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    writer.writerow(['state', 'action'])
    
    for state in sorted(policy.keys()):
        writer.writerow([state, policy[state]])

print("Optimal policy saved to policy-optimal.tsv")

# Calculate average Q-value for initial states
initial_state_values = []
for init_state in simulator.initial_states:
    if init_state in policy:
        action = policy[init_state]
        value = Q.get((init_state, action), 0.0)
        initial_state_values.append(value)
    else:
        initial_state_values.append(0.0)

avg_initial_value = sum(initial_state_values) / len(initial_state_values)
print(f"\nAverage Q-value for initial states: {avg_initial_value:.4f}")


Optimal policy saved to policy-optimal.tsv

Average Q-value for initial states: 0.3232


Submit "policy-optimal.tsv" in Gradescope.

## Part 6: Code

Please submit a Jupyter notebook that can reproduce all your calculations and recreate the previously submitted files.
You do not need to provide code for data collection if you did that by manually.

## Part 7: Acknowledgements

If you discussed this assignment with anyone, please acknowledge them here.
If you did this assignment completely on your own, simply write none below.

If you used any libraries not mentioned in this module's content, please list them with a brief explanation what you used them for. If you did not use any other libraries, simply write none below.

If you used any generative AI tools, please add links to your transcripts below, and any other information that you feel is necessary to comply with the generative AI policy. If you did not use any generative AI tools, simply write none below.

In [41]:
# Create acknowledgements.txt file
acknowledgements_content = """Discussed assignment with:
No one

Libraries used:
- random: For implementing random and epsilon-greedy policies
- csv: For reading and writing TSV files

Additional resources:
I used the lecture materials provided in the course, including the project files and lecture videos on Q-learning algorithms.
"""

with open('acknowledgements.txt', 'w') as f:
    f.write(acknowledgements_content)