# Instructions

In HW2 part 2, we will be implementing neural Q-learning, a deep learning approach to the Q-learning algorithm.

**Deep Q-Network (DQN)** is a reinforcement learning algorithm that combines Q-learning with deep neural networks, enabling it to learn optimal policies in complex high-dimensional environments. DQNs employ two neural networks:

- The Q-network (main network): Trained frequently during the learning process to approximate the action-value function.
- The target network: Used to compute target Q-values. Provides stable target Q-values for the Q-network updates. Periodically copied from the Q-network.

During training, the Q-network is updated by minimizing the difference between its predicted Q-values and the target Q-values (which are computed by the target network). This separation reduces instability and divergence, which are common challenges in reinforcement learning with function approximation.

The **DQN agent** interacts with the environment, collecting experiences in the form of transitions (s,a,r,s′) and storing them in the *replay buffer*. During training, mini-batches of experiences are sampled and used to train the Q-network.

In Part 2, you will be implementing **DQN** and **DQN agent** which effectively operate on the TextWorld environment. This assignment is divided into the following steps:

- Step 1: Implement `DQN`, a neural network used for approximating the Q-function. In this assignment, you will be working with RNN-based text encoder to obtain state representations.
- Step 2: Implement `DQNAgent`, an agent containing `DQN`, interacts with environment, save the experience to replay buffer, and train the neural network.
- Step 3: Implement the `run_policy` function. Similar to Part 1. Update the code to work with `DQNAgent`

After finishing step 1 through step 3, you will test your DQN agent on the same environment and testing suite as Part 1.

**Notes:**
- We encourage you to finish Part 1 first before starting Part 2.
- All the test configurations of the environment are the same as in Part 1.
- ***DO NOT REMOVE ANY COMMENTS THAT HAVE `# EXPORT` IN THEM. THE GRADING SCRIPT USES THESE COMMENTS TO EVALUATE YOUR FUNCTIONS. WE WILL NOT AUDIT SUBMISSIONS TO ADD THESE. IF THE AUTOGRADER FAILS TO RUN DUE TO YOUR MODIFICATION OF THESE COMMENTS, YOU WILL NOT RECEIVE CREDIT.***

# Install

Install the `TextWorld-Express` engine, `graphviz` and `pydot` for visualization, and `torch` for neural network implementations.

In [28]:
!pip install gymnasium
!pip install textworld-express
!pip install graphviz
!pip install pydot
!pip install torch



# Imports

In [29]:
# export
# imports for environment
from textworld_express import TextWorldExpressEnv
import gymnasium
import graphviz
import pydot
import matplotlib.pyplot as plt
from collections import namedtuple
import re
import os
import copy
import math
import random

# imports for DQN
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import numpy as np
from transformers import AutoTokenizer

# Load a game and utils

Set the random seed for repeatablity

In [30]:
SEED = 3

Initialize the game environment. `ENV` is a global that encapulates the environment.

In [31]:
ENV = TextWorldExpressEnv(envStepLimit=100)

Set the game generator to generate a particular game (coin game or map reader)

In [32]:
GAME_TYPE = "coin"
GAME_PARAMS = "numLocations=5,includeDoors=1,numDistractorItems=0"
ENV.load(gameName=GAME_TYPE, gameParams=GAME_PARAMS)
obs, infos = ENV.reset(seed=SEED, gameFold="train", generateGoldPath=True)

Define Environment Interaction Functions (see the description in Part 1)

In [33]:
# export
def reset_mdp(env):
  obs, infos = env.reset(seed=SEED, gameFold="train", generateGoldPath=True)
  valids = infos['validActions']
  valids.remove('inventory')
  valids.remove('look around')
  inv = infos['inventory']
  modified_obs = obs_with_inventory(infos['look'], inv)
  # return make_state_mdp(infos['look'], parse_inventory(infos['inventory'])), valids
  return {'observation': infos['look'],
          'inventory': infos['inventory'],
          'valid actions': valids}


def do_action_mdp(action, env):
  obs, reward, done, infos = env.step(action)
  #obs_look, reward_look, done_look, infos_look = env.step('look around')
  valid_actions = infos['validActions']
  valid_actions.remove('inventory')
  valid_actions.remove('look around')
  # return make_state_mdp(infos['look'], parse_inventory(infos['inventory'])), reward, done, valid_actions
  return infos['look'], reward, done, {'observation': infos['look'],
                                       'inventory': infos['inventory'],
                                       'valid actions': valid_actions}

Define the function `pad_sequences`, which is used in Step 2 to pad text sequences into the same size for batching.

In [34]:
def pad_sequences(sequences, maxlen=None, dtype='int32', value=0.):
    '''
    Partially borrowed from Keras
    # Arguments
        sequences: list of lists where each element is a sequence
        maxlen: int, maximum length
        dtype: type to cast the resulting sequence.
        value: float, value to pad the sequences to the desired value.
    # Returns
        x: numpy array with dimensions (number_of_sequences, maxlen)
    '''
    lengths = [len(s) for s in sequences]
    nb_samples = len(sequences)
    if maxlen is None:
        maxlen = np.max(lengths)
    # take the sample shape from the first non empty sequence
    # checking for consistency in the main loop below.
    sample_shape = tuple()
    for s in sequences:
        if len(s) > 0:
            sample_shape = np.asarray(s).shape[1:]
            break
    x = (np.ones((nb_samples, maxlen) + sample_shape) * value).astype(dtype)
    for idx, s in enumerate(sequences):
        if len(s) == 0:
            continue  # empty list was found
        # pre truncating
        trunc = s[-maxlen:]
        # check `trunc` has expected shape
        trunc = np.asarray(trunc, dtype=dtype)
        if trunc.shape[1:] != sample_shape:
            raise ValueError('Shape of sample %s of sequence at position %s is different from expected shape %s' %
                             (trunc.shape[1:], idx, sample_shape))
        # post padding
        x[idx, :len(trunc)] = trunc
    return x

The action set used in Part 2 is as follows:

In [35]:
action_set = {
  'look around': 0,
  'close door to west': 1,
  'close door to east': 2,
  'close door to south': 3,
  'close door to north': 4,
  'move west': 5,
  'move east': 6,
  'move south': 7,
  'move north': 8,
  'open door to west': 9,
  'open door to east': 10,
  'open door to south': 11,
  'open door to north': 12,
  'inventory': 13,
  'take coin': 14,
  'read map': 15,
  'put map in box': 16,
  'task': 17,
  'take map': 18,
  'put coin in box': 19
}

In Part 2, we need to encode textual states, so we can use some tricks to shorten the state's length. We recommend using this `obs_with_inventory` function for this assignment.

In [36]:
def obs_with_inventory(obs, inv):
  # some tricks to reduce the length of state
  if 'Your inventory is currently empty' in inv:
    inv = 'Inventory: empty'

  if '(maximum capacity is 2 items)' in inv:
    inv = inv.replace("(maximum capacity is 2 items)", "")

  return obs + '\n' + inv


# Important Notes for this Assignment


*   A successful episode from the MDP will give a reward of 1.0
*   A partially successful episode from an MDP environment will give a reward of 0.5
*   If you increase NUM_EPISODES too high, it will take too long in the autograder.
*   We will be checking for hard coded values / outputs, so please don't take any shortcuts.



# Step 1. Implement `DQN`

In step 1, we decide how the network will encode observations and calculate Q-values. We define a class `DQN`, a neural network used for approximating the Q-function. This network estimates the expected future rewards for each possible action in a given state. Since neural networks in the DQN takes inputs in the form of tensor, we need to encode a state to obtain state representations. We adopt a simple RNN-based state network following the paper [Interactive Fiction Games: A Colossal Adventure](https://arxiv.org/pdf/1909.05398) to encode textual states from the Textworld-Express.

The RNN and state network are already defined below. You need to complete `DQN` class, and optinally change the `StateNetwork` class if needed. Three classes are defined below are:
- `PackedEncoderRNN`: This class is a recurrent neural network (RNN) to process sequential data, like text. You don't need to change this class in this assignment.
- `StateNetwork` encodes the observations and inventory information from the TextWorld game, creating a compact representation of the game state. (Optional) Current implementation uses one RNN to encode the state. You can explore using more RNNs following [this](https://arxiv.org/pdf/1909.05398) paper to encode observation and inventory seperate and concatenate them as a final state representations.
- `DQN`: The core of the deep Q-Network containing `stateNetwork`.

In [37]:
class PackedEncoderRNN(nn.Module):

  def __init__(self, input_size, hidden_size):
    super(PackedEncoderRNN, self).__init__()
    self.hidden_size = hidden_size
    self.embedding = nn.Embedding(input_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)

  def forward(self, input, hidden=None):
    embedded = self.embedding(input).permute(1, 0, 2) # T x Batch x EmbDim
    if hidden is None:
        hidden = self.initHidden(input.size(0))

    # Pack the padded batch of sequences
    lengths = torch.tensor([torch.nonzero(n)[-1] + 1 for n in input], dtype=torch.long).cpu()
    packed = nn.utils.rnn.pack_padded_sequence(embedded, lengths, enforce_sorted=False)
    output, hidden = self.gru(packed, hidden)

    # Unpack the padded sequence
    output, _ = nn.utils.rnn.pad_packed_sequence(output)

    # Return only the last timestep of output for each sequence
    lengths = lengths.cuda()
    idx = (lengths-1).view(-1, 1).expand(len(lengths), output.size(2)).unsqueeze(0)
    output = output.gather(0, idx).squeeze(0)
    return output, hidden

  def initHidden(self, batch_size):
    return torch.zeros(1, batch_size, self.hidden_size).cuda()


class StateNetwork(nn.Module):
  """
    No need to change, but feel free to improve if needed.
  """
  def __init__(self, config):
    super(StateNetwork, self).__init__()
    self.config = config
    self.enc_state = PackedEncoderRNN(config.vocab_size, config.hidden_size)
    self.fcx = nn.Linear(config.hidden_size, config.hidden_size)
    self.fch = nn.Linear(config.hidden_size, config.hidden_size)

  def forward(self, inputs):
    batch_size = inputs.shape[0]
    x_o, h_o = self.enc_state(inputs, self.enc_state.initHidden(batch_size))

    x = F.relu(self.fcx(x_o))
    h = F.relu(self.fch(h_o))

    return x, h


class DQN(nn.Module):
  def __init__(self, config):
    super(DQN, self).__init__()
    self.state_network = StateNetwork(config)
    self.act_scorer = nn.Linear(config.hidden_size, config.act_size)

  def forward(self, state):
    """
      the output should be (BATCH_SIZE, ACTION_SIZE): the estimated Q-values for each action
    """
    # raise NotImplementedError
    x, _ = self.state_network(state)
    return self.act_scorer(x)



Test your DQN with simple actions. Note that this is mainly for sanity check for RNN implementation, and does not guarantee that your implementation is correct. You will test your implementation after Step 2 and Step 3 on the actual environment.

In [38]:
# Test DQN

class DQNConfig:
  vocab_size = 50257
  act_size = len(action_set)
  embedding_size = 64
  hidden_size = 256

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
config = DQNConfig()
dqn = DQN(config).to(device)
x = torch.tensor([0, 1, 2, 3, 4, 5]).to(device).unsqueeze(0)
print("****** Q-values ****** (not trained)")
q_values = dqn(x)
for act, actid in action_set.items():
  print(f"{act:20}: {q_values[0][actid]:0.6f}")

****** Q-values ****** (not trained)
look around         : -0.041797
close door to west  : 0.027684
close door to east  : -0.017741
close door to south : -0.024807
close door to north : -0.047225
move west           : -0.074349
move east           : 0.101021
move south          : -0.009119
move north          : 0.101794
open door to west   : 0.046262
open door to east   : -0.048911
open door to south  : -0.050215
open door to north  : 0.084541
inventory           : -0.124426
take coin           : -0.112793
read map            : 0.014005
put map in box      : -0.007356
task                : 0.010428
take map            : -0.097566
put coin in box     : 0.011432


# Step 2: Implement `DQNAgent`

`DQNAgent` encapsulates the `DQN` model and the main agent's logic such as interacting with environemnts, storing experience into replay buffer, estimating Q-values using `DQN`. `DQNAgent` initializes the agent's hyperparameters, including the state and action sizes, learning rate, discount factor (gamma), exploration rate (epsilon), and others which are necessary for q-learning algorithm as well.

Make `train` function the starting point of all the training procedure. In the test time, we use following code to run your agent
```
agent = DQNAgent(action_set,
                 learning_rate=LEARNING_RATE,
                 gamma=GAMMA,
                 epsilon=EPSILON,
                 DQNConfig())

agent.train(ENV, NUM_EPISODES, THRESHOLD)
```

The `train` function takes the following parameters:

- env: a pointer to the environment (`ENV`).
- num_episodes: the number of episodes to run before termination of the entire algorithm.
- threshold: the number of steps in an episode before terminating a single episode.

Except `train` function, all implementaions are on your own design. Here are some tips for Step 2 (also feel free to explore your own way):
- use epsilon decay technique
- use 1000 for the period to update the target network.
- Do not update the Q-network every step. Updating the entwork every 4 step would be enough.

Note that your function will interact with the environment through `reset_mdp()` and `do_action_mdp()`. Be sure to reset the environment before running, and terminate the episode if `do_action_mdp()` indicates the episode has terminated.

In [39]:
class DQNAgent:
  def __init__(self,
               action_set,
               dqn_config,
               learning_rate=0.0005,
               gamma=0.9,
               epsilon=1.0,
               epsilon_decay=0.995,
               epsilon_min=0.01,
               batch_size=64,
               memory_size=100000,
               update_freq=4,
               update_freq_target=1000):
    self.act2id = {a: i for i, a in enumerate(action_set)}
    self.id2act = {i: a for i, a in enumerate(action_set)}

    self.update_freq = update_freq
    self.update_freq_target = update_freq_target
    self.max_seq_len = 256 # DO NOT CHANGE `max_seq_len`
    self.tokenizer =  AutoTokenizer.from_pretrained('gpt2')

    self.action_size = len(action_set)
    self.learning_rate = learning_rate
    self.gamma = gamma
    self.epsilon = epsilon
    self.epsilon_decay = epsilon_decay
    self.epsilon_min = epsilon_min
    self.batch_size = batch_size
    self.replay_buffer = deque(maxlen=memory_size)
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.model = DQN(dqn_config).to(self.device)
    self.target_model = DQN(dqn_config).to(self.device)
    self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

  def tokenize_and_pad_states(self, states):
    """
    Tokenizes and pads a list of textual states.
    Uses the pre-trained tokenizer to convert textual descriptions of
    the environment into numerical representations (tokens) and then pads
    the sequences to a uniform length.

    Args:
        states: A list of string representations of the environment state.

    Returns:
        A padded NumPy array of tokenized states.
    """
    input_ids = self.tokenizer(states)['input_ids']
    return pad_sequences(input_ids, maxlen=self.max_seq_len)

  def remember(self, state, action, reward, next_state, done):
    """Stores experiences in the agent's replay buffer."""
    self.replay_buffer.append((state, action, reward, next_state, done))

  def act(self, state, valid_actions=[]) -> int:
    """Selects an action using an epsilon-greedy policy.
    The agent balances exploration (choosing random actions) and exploitation
    (choosing the action with the highest estimated Q-value) using epsilon.

    Args:
        state (str): The current state of the environment.
        valid_actions (list): A list of valid actions in the current state.

    Returns:
        int: The ID of the selected action.
    """
    if random.uniform(0, 1) < self.epsilon:
      return random.choice([self.act2id[a] for a in valid_actions])
    else:
      if not isinstance(state, list):
        state = [state]

      state_tokens = agent.tokenize_and_pad_states(state)
      state = torch.tensor(state_tokens, dtype=torch.int64, device=self.device)
      with torch.no_grad():
        q_values = self.model(state)
      return q_values.argmax().item()

  def get_action_str(self, act_id):
    return self.id2act[act_id]

  def replay(self):
    """Performs experience replay and updates the model's parameters."""

    # Check if enough experiences are available
    if len(self.replay_buffer) < self.batch_size:
      return

    minibatch = random.sample(self.replay_buffer, self.batch_size)
    states, actions, rewards, next_states, dones = zip(*minibatch)

    states = self.tokenize_and_pad_states(states)
    next_states = self.tokenize_and_pad_states(next_states)

    actions_tensor = torch.tensor(list(actions), dtype=torch.int64, device=self.device).unsqueeze(1)
    rewards_tensor = torch.tensor(list(rewards), dtype=torch.float32, device=self.device).unsqueeze(1)
    dones_tensor = torch.tensor(list(dones), dtype=torch.bool, device=self.device).unsqueeze(1)

    # Compute Q-values for current states
    states = torch.tensor(np.array(np.stack(states)), dtype=torch.int32, device=self.device)
    states = states.squeeze(1)
    q_values = self.model(states).gather(1, actions_tensor)

    # Compute target Q-values using target network
    with torch.no_grad():
      next_states = torch.tensor(np.array(np.stack(next_states)),  dtype=torch.int32, device=self.device)
      next_states = next_states.squeeze(1)
      next_q_values = self.target_model(next_states).max(1, keepdim=True)[0]  # Get max Q-values for next states
      target_q_values = rewards_tensor + (self.gamma * next_q_values * ~dones_tensor)  # Bellman equation

    loss = F.mse_loss(q_values, target_q_values)
    self.optimizer.zero_grad()
    loss.backward()
    nn.utils.clip_grad_norm_(self.model.parameters(), 40)
    self.optimizer.step()

  def train(self, env, num_episodes, threshold):
    """Trains the DQN agent in the given environment.

    Args:
      env: The environment to train the agent in.
      agent: The DQN agent to train.
      num_episodes: The number of episodes to train for.
      threshold: The maximum number of steps to take in each episode.

    Returns:
      A tuple containing:
        - A list of rewards obtained in each episode.
        - The trained DQN agent.
    """
    all_rewards = []  # Store rewards for each episode
    total_step = 0
    for episode in range(num_episodes):
      state_info = reset_mdp(env)
      state = obs_with_inventory(state_info['observation'], state_info['inventory'])

      total_reward = 0
      for step in range(threshold):
        total_step += 1
        valid_actions = state_info['valid actions']

        action_id = self.act(state, valid_actions)  # Choose action using epsilon-greedy policy
        action_str = self.get_action_str(action_id)

        next_obs, reward, done, next_state_info = do_action_mdp(action_str, env)
        next_state = obs_with_inventory(next_state_info['observation'], next_state_info['inventory'])

        self.remember(state, action_id, reward, next_state, done) # Store experience in replay buffer

        if total_step % self.update_freq == 0:
          self.replay()

        # Update the target model periodically
        if total_step % self.update_freq_target == 0:
          self.target_model.load_state_dict(self.model.state_dict())

        total_reward += reward

        if done:
          break

        state = next_state
        state_info = next_state_info

      all_rewards.append(total_reward)

      # Print episode info
      print(f"Episode: {episode + 1}/{num_episodes}, Total Reward: {total_reward}, Epsilon: {self.epsilon:.4f}, Total Step: {step}")

      # Decay epsilon
      self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)

    return all_rewards


Set the parameters for your DQNAgent. DQNAgent requires additional parameters for training neural networks such as learning rate, batch_size, optimizer, etc.

**As same as Part 1, the hyperparameters: NUM_EPISODES, THRESHOLD, GAMMA, EPSILON These might need to be changed from the default values. These variables are just for the simple test below. The autograder will use the variables you set in `set_parameters` below.**

In [27]:
# set parameters
class DQNConfig:
  vocab_size = 50257 # vocab_size = tokenizer.vocab_size
  act_size = len(action_set)
  embedding_size = 64
  hidden_size = 256

NUM_EPISODES = 100
THRESHOLD = 50
LEARNING_RATE = 0.0005
GAMMA = 0.9
EPSILON = 1.0

# Create the DQN agent
agent = DQNAgent(
    action_set,
    DQNConfig(),
    learning_rate=LEARNING_RATE,
    gamma=GAMMA,
    epsilon=EPSILON,)

all_rewards = agent.train(ENV, NUM_EPISODES, THRESHOLD)
print(all_rewards)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Episode: 1/100, Total Reward: 1.0, Epsilon: 1.0000, Total Step: 41
Episode: 2/100, Total Reward: 0.0, Epsilon: 0.9950, Total Step: 49
Episode: 3/100, Total Reward: 0.0, Epsilon: 0.9900, Total Step: 49
Episode: 4/100, Total Reward: 1.0, Epsilon: 0.9851, Total Step: 37
Episode: 5/100, Total Reward: 0.0, Epsilon: 0.9801, Total Step: 49
Episode: 6/100, Total Reward: 1.0, Epsilon: 0.9752, Total Step: 5
Episode: 7/100, Total Reward: 0.0, Epsilon: 0.9704, Total Step: 49
Episode: 8/100, Total Reward: 0.0, Epsilon: 0.9655, Total Step: 49
Episode: 9/100, Total Reward: 0.0, Epsilon: 0.9607, Total Step: 49
Episode: 10/100, Total Reward: 0.0, Epsilon: 0.9559, Total Step: 49
Episode: 11/100, Total Reward: 0.0, Epsilon: 0.9511, Total Step: 49
Episode: 12/100, Total Reward: 0.0, Epsilon: 0.9464, Total Step: 49
Episode: 13/100, Total Reward: 0.0, Epsilon: 0.9416, Total Step: 49
Episode: 14/100, Total Reward: 0.0, Epsilon: 0.9369, Total Step: 49
Episode: 15/100, Total Reward: 1.0, Epsilon: 0.9322, Total

# Step 3. Implement Code to Run a Policy

Step 3-1: Implement code to run the policy. This function takes the following parameters:
- agent: your DQNAgent, as specified in step 1.
- env: pointer to the environment (e.g., `ENV`).
- threshold: the maximum number of steps to take before terminating.

Your function should run a single episode from the initial state and return:
- A list of actions taken during the episode (e.g., `[act_1, act_2, ... act_n]`).
- The total sum reward of all actions taken as a float.

If you finihsed Part 1 correctly, this will be a simple adjustment to work with `DQNAgent`. Make sure your agent use **greedy action selection**, choosing the action with the maximum Q values from the `DQN`. You may implement some function to run policy in the `DQNAgent` class.

In [40]:
# export
def run_policy(agent, env, threshold=50):
  """Runs the policy learned by the agent in the given environment.

  Args:
    agent: The agent whose policy to run.
    env: The environment to run the policy in.
    threshold: The maximum number of steps to take before terminating the episode.

  Returns:
    A tuple containing:
      - A list of actions taken during the episode.
      - The total reward obtained during the episode.
  """
  # Use greedy action selection
  agent.epsilon = 0.0

  actions = [] # Store the entire sequence of actions here
  total_reward = 0.0 # Store the total sum reward of all actions executed here
  ### YOUR CODE BELOW HERE
  #raise NotImplementedError
  ### YOUR CODE ABOVE HERE
  state_info = reset_mdp(env)
  state = obs_with_inventory(state_info['observation'], state_info['inventory'])

  for _ in range(threshold):
    action_id = agent.act(state)  # Choose action using epsilon-greedy policy
    action_str = agent.get_action_str(action_id)

    actions.append(action_str)

    # Take action and observe next state and reward
    next_obs, reward, done, next_state_info = do_action_mdp(action_str, env)
    next_state = obs_with_inventory(next_state_info['observation'], next_state_info['inventory'])

    total_reward += reward

    if done:
      break

    state = next_state
    state_info = next_state_info
  return actions, total_reward

Step 3-2: Test your `run_policy` function.

Set the threshold value for episode length during policy execution (test time threshold).

In [41]:
# export
TEST_THRESHOLD = 25

Run the policy.

In [42]:
plan, total_reward = run_policy(agent, ENV, threshold = TEST_THRESHOLD)
print("plan:", plan)
print("Total reward:", total_reward)

plan: ['open door to south', 'move south', 'open door to east', 'move east', 'take coin']
Total reward: 1.0


# New Environments


The following cells are the same as in Part 1: creating new environemnts: `StochasticTextWorldExpressEnv` and `PunishmentTextWorldExpressEnv`.

In [43]:
NEVER_PICK_ACTIONS = set(['look around', 'inventory'])
ENV_VERBOSE = False

In [44]:
class StochasticTextWorldExpressEnv(TextWorldExpressEnv):

  def __init__(self, serverPath=None, envStepLimit=100, stochasticity = 0.0):
    # Call the super constructor
    super().__init__(serverPath, envStepLimit)
    # Store the valid actions and stochasticity
    self.valid_actions = []
    self.stochasticity = stochasticity

  def reset(self, seed=None, gameFold=None, gameName=None, gameParams=None, generateGoldPath=False):
    # Call the super method
    observation, infos = super().reset(seed, gameFold, gameName, gameParams, generateGoldPath)
    # Update the valid actions
    self.valid_actions = infos['validActions']
    return observation, infos

  def step(self, action:str):
    # If a random value is less than the stochasticity target, choose a random action
    if random.random() < self.stochasticity:
      temp_valids = copy.deepcopy(self.valid_actions)
      # Remove inventory and look around from valid actions to choose from
      temp_valids = list(set(self.valid_actions).difference(NEVER_PICK_ACTIONS))
      # Pick a random action from whatever remains
      action = random.choice(temp_valids)
    # If debugging flag is on, print the action that will be executed
    if ENV_VERBOSE:
      print("[[action]]:", action)
    # Call the super class with either the action passed in or the randomly chosen one
    observation, reward, isCompleted, infos = super().step(action)
    # Update the valid actions
    self.valid_actions = infos['validActions']
    return observation, reward, isCompleted, infos

class PunishmentTextWorldExpressEnv(TextWorldExpressEnv):

  def __init__(self, serverPath=None, envStepLimit=100, punishment = 0.0):
    # Call the super constructor
    super().__init__(serverPath, envStepLimit)
    # Store the punishment
    self.punishment = punishment
    # Store the previous observation
    self.previous_observation = None

  def step(self, action:str):
    # Call the super method
    observation, reward, isCompleted, infos = super().step(action)
    # If the current look is the same as the previous look, then we have performed an illegal action
    if infos['look'] == self.previous_observation:
      reward = self.punishment
    # Store the previous observation
    self.previous_observation = infos['look']
    return observation, reward, isCompleted, infos

New environments must be registered through the Gymnasium API.

In [45]:
# register new environment
gymnasium.register(id='TextWorldExpress-StochasticTextWorldExpressEnv-v0',
                   entry_point='__main__:StochasticTextWorldExpressEnv')
gymnasium.register(id='TextWorldExpress-PunishmentTextWorldExpressEnv-v0',
                   entry_point='__main__:PunishmentTextWorldExpressEnv')

# Testing Suite

This function will run all environments, all game types, all game parameters, and all seeds.

In [47]:
def run_all(environments, games, seeds):
  global ENV, GAME_TYPE, GAME_PARAMS, SEED
  # Results will contain a key (env type, game type, game params, seed) and values will be plans and total_rewards
  results = {}
  test_id = 0
  total_reward = 0
  # Iterate through all environments given
  for env in environments:
    # set global environment
    ENV = env
    # Iterate through all game types, the keys of the games dict
    for game_type in games:
      # Set the global game type
      GAME_TYPE = game_type
      # Iterate through all game parameters for the given game type in game dict
      for params in games[game_type]:
        # set the global game params
        GAME_PARAMS = params
        # load the environment
        ENV.load(gameName=GAME_TYPE, gameParams=GAME_PARAMS)
        # Iterate through all seeds
        for seed in seeds:
          print(f"TESTING {type(ENV)}, {GAME_TYPE}, {GAME_PARAMS}, {seed}")
          # set the global seed
          SEED = seed

          # Run the DQNAgent and get the policy
          agent = DQNAgent(action_set,
                           DQNConfig(),
                           learning_rate=LEARNING_RATE,
                           gamma=GAMMA,
                           epsilon=EPSILON)

          _ = agent.train(ENV, NUM_EPISODES, THRESHOLD)

          # run the policy to get the plan
          plan, reward = run_policy(agent, ENV, threshold = TEST_THRESHOLD)

          test_id += 1
          total_reward += reward

          print(f"TESTING {test_id}: total_reward {total_reward}/{test_id} \t (reward: {reward})")
          # Store the plan in the results
          results[(type(ENV), GAME_TYPE, GAME_PARAMS, SEED)] = (plan, total_reward)
  return results

In [48]:
seeds = list(range(5))
environments = [TextWorldExpressEnv(envStepLimit=100),
                StochasticTextWorldExpressEnv(envStepLimit=100, stochasticity=0.25),
                PunishmentTextWorldExpressEnv(envStepLimit=100, punishment=-1.0)]
games = {'coin':      ['numLocations=5,includeDoors=1,numDistractorItems=0',
                       'numLocations=6,includeDoors=1,numDistractorItems=0',
                       'numLocations=7,includeDoors=1,numDistractorItems=0',
                       'numLocations=10,includeDoors=1,numDistractorItems=0'],
         'mapreader': ['numLocations=5,maxDistanceApart=3,includeDoors=0,maxDistractorItemsPerLocation=0',
                       'numLocations=8,maxDistanceApart=4,includeDoors=0,maxDistractorItemsPerLocation=0',
                       'numLocations=11,maxDistanceApart=5,includeDoors=0,maxDistractorItemsPerLocation=0',
                       'numLocations=15,maxDistanceApart=8,includeDoors=0,maxDistractorItemsPerLocation=0']}

Set parameters. Do not alter this cell outside of the changing the numeric values.

**You might need to change these parameters to get a good result on the harder environments**

Please note that increasing `NUM_EPISODES` will result in an increase in time to run the cell below.


In [49]:
# export
def set_parameters():
    global NUM_EPISODES, THRESHOLD, LEARNING_RATE, GAMMA, EPSILON, TEST_THRESHOLD
    NUM_EPISODES = 300
    THRESHOLD = 50
    LEARNING_RATE = 0.0005
    GAMMA = 0.9
    EPSILON = 1.0
    TEST_THRESHOLD = 50
    NUM_EPISODES = 100


In [50]:
# export
set_parameters()

Run all tests

In [51]:
results = run_all(environments, games, seeds)
points = 0
for k, v in results.items():
  points += v[-1]
  if v[-1] < 1:
    print(v[-1], k)
print(f"{points}/{len(results)}")

TESTING <class 'textworld_express.textworld_express.TextWorldExpressEnv'>, coin, numLocations=5,includeDoors=1,numDistractorItems=0, 0
Episode: 1/100, Total Reward: 1.0, Epsilon: 1.0000, Total Step: 11
Episode: 2/100, Total Reward: 1.0, Epsilon: 0.9950, Total Step: 38
Episode: 3/100, Total Reward: 1.0, Epsilon: 0.9900, Total Step: 0
Episode: 4/100, Total Reward: 1.0, Epsilon: 0.9851, Total Step: 3
Episode: 5/100, Total Reward: 1.0, Epsilon: 0.9801, Total Step: 6
Episode: 6/100, Total Reward: 1.0, Epsilon: 0.9752, Total Step: 2
Episode: 7/100, Total Reward: 1.0, Epsilon: 0.9704, Total Step: 13
Episode: 8/100, Total Reward: 1.0, Epsilon: 0.9655, Total Step: 28
Episode: 9/100, Total Reward: 1.0, Epsilon: 0.9607, Total Step: 24
Episode: 10/100, Total Reward: 1.0, Epsilon: 0.9559, Total Step: 0
Episode: 11/100, Total Reward: 1.0, Epsilon: 0.9511, Total Step: 1
Episode: 12/100, Total Reward: 1.0, Epsilon: 0.9464, Total Step: 6
Episode: 13/100, Total Reward: 1.0, Epsilon: 0.9416, Total Step: 

KeyboardInterrupt: 

# Grading

Grading will consist of testing all three environments (regular, stochastic, punishment), two games per environment (coin, mapreader), four sets of parameters per game, and five seeds. There will be a total of 120 tests.
There will also be five additional hidden seeds.

**Grading:**
- 1 point for each correct plan in public games configurations, per algorithm (120 points)
- 1 point for each correct plan in hidden game configuration, per algorithm (120 points)

Please note that since these environments can be stochastic, and we can only run for so many policy iterations, we will give some leeway with the results for plans and reward values. We will have a benchmark average score across many seeds for an environment, and compare your outputs to our benchmark.

Maximum total points: 240

# Grading will be conducted by visual inspection (and/or the use of a private autograder) of cell outputs under the "Testing Suite" heading. We will compare your plans and reward to our rubric/reference implementations. We will add cells to your notebook at grading time to load and test our hidden world configuration files.

We will visually inspect the entire notebook to check if your algorithm implementations include details that are inconsistent with the assignment (e.g., hard-coding values or actions to pass tests) and to make sure no cells were altered to provide unearned grading results.

# Submission

Upload this notebook with the name `submission.ipynb` file to Gradescope. The autograder will only run successfully if your file is named this way. You must ensure that you have removed all print statements from **your** code, or the autograder may fail to run.

We've added appropriate comments to the top of certain cells for the autograder to export (`# export`). You do NOT have to do anything (e.g. remove print statements) to cells we have provided - anything related to those have been handled for you. You are responsible for ensuring your own code has no syntax errors or unnecessary print statements. You ***CANNOT*** modify the export comments at the top of the cells, or the autograder will fail to run on your submission.

You should ***not*** add any cells to the notebook when submitting. You're welcome to add any code as you need to extra cells when testing, but you must remove them when submitting.

If you identify an issue with the autograder, please feel free to reach out to us on Ed Discussion, or email rsudhakar9@gatech.edu, with a subject line including "CS 3600".