<a href="https://colab.research.google.com/github/Rocking-Priya/704-fall-projects-2025/blob/main/project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DX 704 Week 8 Project
This homework will modify a simulator controlling a small vehicle to implement tabular q-learning.
You will first test your code with random and greedy-epsilon policies, then tweak your own training method for a more optimal policy.

The full project description and a template notebook are available on GitHub: [Project 8 Materials](https://github.com/bu-cds-dx704/dx704-project-08).


## Example Code

You may find it helpful to refer to these GitHub repositories of Jupyter notebooks for example code.

* https://github.com/bu-cds-omds/dx601-examples
* https://github.com/bu-cds-omds/dx602-examples
* https://github.com/bu-cds-omds/dx603-examples
* https://github.com/bu-cds-omds/dx704-examples

Any calculations demonstrated in code examples or videos may be found in these notebooks, and you are allowed to copy this example code in your homework answers.

## Rover Simulator

The following Python class implements a simulation of a simple vehicle with integer x,y coordinates facing in one of 8 possible directions.


In [119]:
# DO NOT CHANGE

import random

class RoverSimulator(object):
    DIRECTIONS = ((0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1, -1), (-1, 0), (-1, 1))

    def __init__(self, resolution):
        self.resolution = resolution
        self.terminal_state = self.construct_state(resolution // 2, resolution // 2, 0)

        self.initial_states = []
        for initial_x in (0, resolution // 2, resolution - 1):
            for initial_y in (0, resolution // 2, resolution - 1):
                for initial_direction in range(8):
                    initial_state = self.construct_state(initial_x, initial_y, initial_direction)
                    if initial_state != self.terminal_state:
                        self.initial_states.append(initial_state)

    def construct_state(self, x, y, direction):
        assert 0 <= x < self.resolution
        assert 0 <= y < self.resolution
        assert 0 <= direction < 8

        state = (y * self.resolution + x) * 8 + direction
        assert self.decode_state(state) == (x, y, direction)
        return state

    def decode_state(self, state):
        direction = state % 8
        x = (state // 8) % self.resolution
        y = state // (8 * self.resolution)

        return (x, y, direction)

    def get_actions(self, state):
        return [-1, 0, 1]

    def get_next_reward_state(self, curr_state, curr_action):
        if curr_state == self.terminal_state:
            # no rewards or changes from terminal state
            return (0, curr_state)

        (curr_x, curr_y, curr_direction) = self.decode_state(curr_state)
        (curr_dx, curr_dy) = self.DIRECTIONS[curr_direction]

        assert self.construct_state(curr_x, curr_y, curr_direction) == curr_state

        assert curr_action in (-1, 0, 1)

        next_x = min(max(0, curr_x + curr_dx), self.resolution - 1)
        next_y = min(max(0, curr_y + curr_dy), self.resolution - 1)
        next_direction = (curr_direction + curr_action) % 8

        next_state = self.construct_state(next_x, next_y, next_direction)
        next_reward = 1 if next_state == self.terminal_state else 0

        return (next_reward, next_state)

    def rollout_policy(self, policy_func, max_steps=1000):
        curr_state = self.sample_initial_state()
        for i in range(max_steps):
            curr_action = policy_func(curr_state, self.get_actions(curr_state))
            (next_reward, next_state) = self.get_next_reward_state(curr_state, curr_action)
            yield (curr_state, curr_action, next_reward, next_state)
            curr_state = next_state

    def sample_initial_state(self):
        return random.choice(self.initial_states)

In [120]:
simulator = RoverSimulator(16)
initial_sample = simulator.sample_initial_state()
print("INITIAL SAMPLE", initial_sample)

INITIAL SAMPLE 2047


## Part 1: Implement a Random Policy

Random policies are often used to test simulators and start initial exploration.
Implement a random policy for these simulators.

In [121]:
# YOUR CHANGES HERE

def random_policy(state, actions):
    return random.choice(actions)


Use the code below to test your random policy.
Then modify it to save the results in "log-random.tsv" with the columns curr_state, curr_action, next_reward and next_state.

In [122]:
# YOUR CHANGES HERE

for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=32):
    print("CURR STATE", curr_state, "ACTION", curr_action, "NEXT REWARD", next_reward, "NEXT STATE", next_state)

CURR STATE 1923 ACTION 1 NEXT REWARD 0 NEXT STATE 1804
CURR STATE 1804 ACTION 1 NEXT REWARD 0 NEXT STATE 1677
CURR STATE 1677 ACTION 0 NEXT REWARD 0 NEXT STATE 1541
CURR STATE 1541 ACTION 0 NEXT REWARD 0 NEXT STATE 1413
CURR STATE 1413 ACTION 0 NEXT REWARD 0 NEXT STATE 1285
CURR STATE 1285 ACTION 1 NEXT REWARD 0 NEXT STATE 1158
CURR STATE 1158 ACTION -1 NEXT REWARD 0 NEXT STATE 1157
CURR STATE 1157 ACTION -1 NEXT REWARD 0 NEXT STATE 1028
CURR STATE 1028 ACTION 1 NEXT REWARD 0 NEXT STATE 901
CURR STATE 901 ACTION -1 NEXT REWARD 0 NEXT STATE 772
CURR STATE 772 ACTION 0 NEXT REWARD 0 NEXT STATE 644
CURR STATE 644 ACTION -1 NEXT REWARD 0 NEXT STATE 515
CURR STATE 515 ACTION -1 NEXT REWARD 0 NEXT STATE 394
CURR STATE 394 ACTION 0 NEXT REWARD 0 NEXT STATE 402
CURR STATE 402 ACTION 0 NEXT REWARD 0 NEXT STATE 410
CURR STATE 410 ACTION -1 NEXT REWARD 0 NEXT STATE 417
CURR STATE 417 ACTION -1 NEXT REWARD 0 NEXT STATE 552
CURR STATE 552 ACTION 1 NEXT REWARD 0 NEXT STATE 681
CURR STATE 681 ACTION 

In [123]:
with open("log-random.tsv", "w") as f:
    f.write("curr_state\tcurr_action\tnext_reward\tnext_state\n")
    for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=32):
        f.write(f"{curr_state}\t{curr_action}\t{next_reward}\t{next_state}\n")


In [124]:
import google.colab.files

google.colab.files.download("log-random.tsv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Submit "log-random.tsv" in Gradescope.

## Part 2: Implement Q-Learning with Random Policy

The code below runs 32 random rollouts of 1024 steps using your random policy.
Modify the rollout code to implement Q-Learning.
Just implement one learning update for each sampled state-action in the simulation.
Use $\alpha=1$ and $\gamma=0.9$ since the simulator is deterministic and there is a sink where the rewards stop.




In [125]:
num_episodes = 32
max_steps = 1024
Q = {}                # dictionary: key=(state,action) -> float value
alpha = 1.0
gamma = 0.9

with open("q-random.tsv", "w") as f:
    # Write the header **once**
    f.write("curr_state\tcurr_action\tnext_reward\tnext_state\told_value\tnew_value\n")

    for episode in range(num_episodes):
        for (curr_state, curr_action, next_reward, next_state) in simulator.rollout_policy(random_policy, max_steps=max_steps):

            # old Q-value
            old_value = Q.get((curr_state, curr_action), 0.0)

            # best next Q-value
            next_actions = simulator.get_actions(next_state)
            best_next = max(Q.get((next_state, a), 0.0) for a in next_actions)

            # Q-learning update (alpha=1 → full replacement)
            new_value = old_value + alpha + (next_reward + gamma * best_next - old_value)
            Q[(curr_state, curr_action)] = new_value

            # write the data row
            f.write(f"{curr_state}\t{curr_action}\t{next_reward}\t{next_state}\t{old_value}\t{new_value}\n")


Save each step in the simulator in a file "q-random.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

In [126]:
import pandas as pd
df = pd.read_csv("q-random.tsv", sep="\t")

print("Data rows:", len(df))  # should be 32768


Data rows: 32768


In [127]:
google.colab.files.download("q-random.tsv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Submit "q-random.tsv" in Gradescope.

## Part 3: Implement Epsilon-Greedy Policy

Implement an epsilon-greedy policy that picks the optimal policy based on your q-values so far 75% of the time, and picks a random action 25% of the time.
This is a high epsilon value, but the environment is deterministic, so it will benefit from more exploration.

In [128]:
# YOUR CHANGES HERE

# Build start states (exactly 32 and including 64)
all_inits = simulator.initial_states[:]   # length 71
if 64 not in all_inits:
    all_inits.append(64)
other_states = [s for s in all_inits if s != 64]
start_states = other_states[:31] + [64]   # length 32, includes 64

Q = {}               # empty Q-table: key = (state, action) -> value
alpha = 1.0
gamma = 0.9
epsilon = 0.25
def epsilon_greedy_policy(state, actions):
    # With probability epsilon, pick a random action
    if random.random() < epsilon:
        return random.choice(actions)
    # Otherwise pick the best action according to current Q-values
    q_values = [Q.get((state, a), 0.0) for a in actions]
    max_q = max(q_values)
    best_actions = [a for a, qv in zip(actions, q_values) if qv == max_q]
    return random.choice(best_actions)

Combine your epsilon-greedy policy with q-learning below and save the observations and updates in "q-greedy.tsv" with columns curr_state, curr_action, next_reward, next_state, old_value, new_value.

Hint: make sure to reset your q-learning state before running the simulation below so that the learning process is recorded from the beginning.

In [129]:


max_steps = 1024

with open("q-greedy.tsv", "w") as f:
    f.write("curr_state\tcurr_action\tnext_reward\tnext_state\told_value\tnew_value\n")

    for init_state in start_states:
        curr_state = init_state
        for step in range(max_steps):
            curr_action = epsilon_greedy_policy(curr_state, simulator.get_actions(curr_state))
            next_reward, next_state = simulator.get_next_reward_state(curr_state, curr_action)

            old_value = Q.get((curr_state, curr_action), 0.0)
            next_actions = simulator.get_actions(next_state)
            best_next = max(Q.get((next_state, a), 0.0) for a in next_actions)
            new_value = old_value + alpha * (next_reward + gamma * best_next - old_value)
            Q[(curr_state, curr_action)] = new_value

            f.write(f"{curr_state}\t{curr_action}\t{next_reward}\t{next_state}\t{old_value}\t{new_value}\n")

            curr_state = next_state


Submit "q-greedy.tsv" in Gradescope.

In [130]:
import pandas as pd
df = pd.read_csv("q-greedy.tsv", sep="\t")
print("Data rows:", len(df))  # should be 32768


Data rows: 32768


In [131]:
google.colab.files.download("q-greedy.tsv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## Part 4: Extract Policy from Q-Values

Using your final q-values from the previous simulation, extract a policy picking the best actions according to those q-values.
Save the policy in a file "policy-greedy.tsv" with columns state and action.

In [132]:
import pandas as pd
import random

# optional: reproducible tie-breaking
random.seed(0)

# 1) Read q-greedy log (chronological rows)
df = pd.read_csv("q-greedy.tsv", sep="\t")

# 2) Compute final Q-values for each (state,action) by taking the last new_value
grouped = df.groupby(['curr_state', 'curr_action'], sort=False, as_index=False).last()

Q_final = {}
for _, row in grouped.iterrows():
    s = int(row['curr_state'])
    a = int(row['curr_action'])
    v = float(row['new_value'])
    Q_final[(s, a)] = v

# 3) Set of states we want policies for: all unique curr_state values in the q-log
states = sorted(int(x) for x in df['curr_state'].unique())

# 4) Create policy file covering every state in `states`
with open("policy-greedy.tsv", "w") as f:
    f.write("state\taction\n")
    for s in states:
        actions = simulator.get_actions(s)   # [-1,0,1]
        q_values = [Q_final.get((s, a), 0.0) for a in actions]
        max_q = max(q_values)
        # tie-break randomly among best actions
        best_actions = [a for a, qv in zip(actions, q_values) if qv == max_q]
        chosen = random.choice(best_actions)
        f.write(f"{s}\t{int(chosen)}\n")


In [133]:
df_policy = pd.read_csv("policy-greedy.tsv", sep="\t")
print("Number of states in policy:", len(df_policy))


Number of states in policy: 1970


In [134]:
df_q = pd.read_csv("q-greedy.tsv", sep="\t")
df_policy = pd.read_csv("policy-greedy.tsv", sep="\t")

states_in_q = set(df_q['curr_state'].unique())
states_in_policy = set(df_policy['state'].unique())

print("states_in_q count:", len(states_in_q))
print("states_in_policy count:", len(states_in_policy))
print("Missing in policy (should be empty):", sorted(list(states_in_q - states_in_policy))[:10])
print("Extra in policy (should be empty):", sorted(list(states_in_policy - states_in_q))[:10])


states_in_q count: 1970
states_in_policy count: 1970
Missing in policy (should be empty): []
Extra in policy (should be empty): []


In [135]:
google.colab.files.download("policy-greedy.tsv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Submit "policy-greedy.tsv" in Gradescope.

## Part 5: Implement Large Policy

Train a more optimal policy using q-learning.
Save the policy in a file "policy-optimal.tsv" with columns state and action.

Hint: this policy will be graded on its performance compared to optimal for each of the initial states.
**You will get full credit if the average value of your policy for the initial states is within 20% of optimal.**
Make sure that your policy has coverage of all the initial states, and does not take actions leading to states not included in your policy.
You will have to run several rollouts to get coverage of all the initial states, and the provided loops for parts 2 and 3 only consist of one rollout each.

Hint: this environment only gives one non-zero reward per episode, so you may want to cut off rollouts for speed once they get that reward.
But make sure you update the q-values first!

In [136]:
import random
import math
import pandas as pd

random.seed(0)           # optional reproducibility

# Parameters
alpha = 1.0
gamma = 0.9
epsilon = 0.25          # 25% random actions in epsilon-greedy
episodes_per_start = 25  # how many episodes from each initial state (tune this)
max_steps = 1024

# Prepare start states (all simulator.initial_states)
start_states = list(simulator.initial_states)  # 71 initial states
# Optionally ensure center/64 is included (you already did earlier)
if simulator.terminal_state not in start_states:
    pass  # terminal isn't an initial state; we don't start there

# Q table
Q = {}   # key: (state, action) -> float

def epsilon_greedy_action(state, actions):
    if random.random() < epsilon:
        return random.choice(actions)
    q_values = [Q.get((state, a), 0.0) for a in actions]
    max_q = max(q_values)
    best_actions = [a for a, qv in zip(actions, q_values) if qv == max_q]
    return random.choice(best_actions)

# Train: for every start state, run many short episodes; break after reward
# Log Q updates optionally to a file (q-optimal.tsv)
with open("q-optimal.tsv", "w") as fout:
    fout.write("episode\tstep\tcurr_state\tcurr_action\tnext_reward\tnext_state\told_value\tnew_value\n")
    episode_id = 0
    for start_state in start_states:
        for ep in range(episodes_per_start):
            episode_id += 1
            curr_state = start_state
            for step in range(max_steps):
                curr_action = epsilon_greedy_action(curr_state, simulator.get_actions(curr_state))
                next_reward, next_state = simulator.get_next_reward_state(curr_state, curr_action)

                old_value = Q.get((curr_state, curr_action), 0.0)
                next_actions = simulator.get_actions(next_state)
                best_next = max(Q.get((next_state, a), 0.0) for a in next_actions)

                # general Q-update (works for any alpha)
                new_value = old_value + alpha * (next_reward + gamma * best_next - old_value)
                Q[(curr_state, curr_action)] = new_value

                fout.write(f"{episode_id}\t{step}\t{curr_state}\t{curr_action}\t{next_reward}\t{next_state}\t{old_value}\t{new_value}\n")

                curr_state = next_state

                # stop episode after the non-zero reward has been observed and updated
                if next_reward != 0:
                    break

# After training, extract policy from Q
# Build Q_final (final new_value per (state,action)) — Q already contains final values
Q_final = dict(Q)  # shallow copy

# States to produce a policy for: all states seen as curr_state in training
states = sorted({s for (s, a) in Q_final.keys()})

# For safety, also compute set of visited next_states (so we can avoid actions leading to unknown states)
visited_states = set(states)  # states where we have Q entries as curr_state

# Create policy-optimal.tsv ensuring actions do not lead to unknown states
with open("policy-optimal.tsv", "w") as f:
    f.write("state\taction\n")
    for s in states:
        actions = simulator.get_actions(s)
        # prefer actions that are known and lead to visited states
        candidate_actions = []
        for a in actions:
            # compute next_state for this action deterministically
            _, predicted_next = simulator.get_next_reward_state(s, a)
            if predicted_next in visited_states:
                candidate_actions.append(a)
        # If no candidate action leads to a visited state, fallback to all actions
        candidate_list = candidate_actions if candidate_actions else actions

        q_values = [Q_final.get((s, a), 0.0) for a in candidate_list]
        max_q = max(q_values)
        best_actions = [a for a, qv in zip(candidate_list, q_values) if qv == max_q]
        chosen = random.choice(best_actions)
        f.write(f"{s}\t{int(chosen)}\n")

# Validation checks:
df_policy = pd.read_csv("policy-optimal.tsv", sep="\t")
policy_states = set(df_policy['state'].unique())

# 1) Coverage of all initial states
missing_initials = [s for s in simulator.initial_states if s not in policy_states]
print("Missing initial states in policy (should be empty):", missing_initials)

# 2) Policy actions don't lead to unknown states
bad_actions = []
for _, row in df_policy.iterrows():
    s = int(row['state'])
    a = int(row['action'])
    _, next_s = simulator.get_next_reward_state(s, a)
    if next_s not in policy_states:
        bad_actions.append((s, a, next_s))
print("Policy actions leading outside policy states (should be empty):", bad_actions[:10])

# 3) Basic counts
print("policy size (states):", len(policy_states))
print("unique Q states:", len(states))


Missing initial states in policy (should be empty): []
Policy actions leading outside policy states (should be empty): []
policy size (states): 1993
unique Q states: 1993


In [137]:
google.colab.files.download("policy-optimal.tsv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Submit "policy-optimal.tsv" in Gradescope.

## Part 6: Code

Please submit a Jupyter notebook that can reproduce all your calculations and recreate the previously submitted files.
You do not need to provide code for data collection if you did that by manually.

## Part 7: Acknowledgements

If you discussed this assignment with anyone, please acknowledge them here.
If you did this assignment completely on your own, simply write none below.

If you used any libraries not mentioned in this module's content, please list them with a brief explanation what you used them for. If you did not use any other libraries, simply write none below.

If you used any generative AI tools, please add links to your transcripts below, and any other information that you feel is necessary to comply with the generative AI policy. If you did not use any generative AI tools, simply write none below.