# Instructions

In Part 2 of HW2, we will be implementing neural Q-learning, a deep learning approach to the Q-learning algorithm.

**Deep Q-Network (DQN)** is a reinforcement learning algorithm that combines Q-learning with deep neural networks, enabling it to learn optimal policies in complex high-dimensional environments. DQNs employ two neural networks:

- The Q-network (main network): Trained frequently during the learning process to approximate the action-value function.
- The target network: Used to compute target Q-values. Provides stable target Q-values for the Q-network updates. Periodically copied from the Q-network.

During training, the Q-network is updated by minimizing the difference between its predicted Q-values and the target Q-values (which are computed by the target network). This separation reduces instability and divergence, which are common challenges in reinforcement learning with function approximation.

The **DQN agent** interacts with the environment, collecting experiences in the form of transitions (s,a,r,s′) and storing them in the *replay buffer*. During training, mini-batches of experiences are sampled and used to train the Q-network.

In Part 2, you will implement **DQN** and **DQN agent** which effectively operate on the TextWorld environment. This assignment is divided into the following steps:

- Step 1: Implement `DQN`, a neural network used for approximating the Q-function. In this assignment, you will work with a RNN-based text encoder to obtain state representations.
- Step 2: Implement `DQNAgent`, an agent containing `DQN`, interacts with environment, saves experiences to the replay buffer, and trains the Q-network.
- Step 3: Implement the `run_policy` function. Similar to Part 1. Update the code to work with `DQNAgent`

After completing steps 1 through 3, you will test your DQN agent on the same environment and testing suite as in Part 1.

**Notes:**
- We encourage you to finish Part 1 first before starting Part 2.
- All the test configurations of the environment are the same as in Part 1.
- Training and testing with a neural network require computations, which may require purchasing API credits. Here are some tips to minimize costs:  
    - Implementation and initial testing can be done on CPUs. You can use your local machine or free Colab credits.
    - For parameter tuning, you may need more computational power, especially GPUs. Instead of an exhaustive search, try making educated guesses to optimize your parameters efficiently.
    - [lightning.ai](https://lightning.ai) also offers free GPU credits.  
- ***DO NOT REMOVE ANY COMMENTS THAT HAVE `# EXPORT` IN THEM. THE GRADING SCRIPT USES THESE COMMENTS TO EVALUATE YOUR FUNCTIONS. WE WILL NOT AUDIT SUBMISSIONS TO ADD THESE. IF THE AUTOGRADER FAILS TO RUN DUE TO YOUR MODIFICATION OF THESE COMMENTS, YOU WILL NOT RECEIVE CREDIT.***

# Install

Install the `TextWorld-Express` engine, `graphviz` and `pydot` for visualization, and `torch` for neural network implementations.

In [71]:
!pip install gymnasium
!pip install textworld-express
!pip install graphviz
!pip install pydot
!pip install torch
!pip install transformers



# Imports

In [72]:
# export
# imports for environment
from textworld_express import TextWorldExpressEnv
import gymnasium
import copy
import random

# imports for DQN
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import numpy as np
from transformers import AutoTokenizer

# Load a Game

Set the random seed for repeatablity

In [73]:
# export
SEED = 3

Initialize the game environment. `ENV` is a global that encapulates the environment.

In [74]:
ENV = TextWorldExpressEnv(envStepLimit=100)

Set the game generator to generate a particular game (coin game or map reader)

In [75]:
GAME_TYPE = "coin"
GAME_PARAMS = "numLocations=5,includeDoors=1,numDistractorItems=0"
ENV.load(gameName=GAME_TYPE, gameParams=GAME_PARAMS)
obs, infos = ENV.reset(seed=SEED, gameFold="train", generateGoldPath=True)

# Utility Functions

This section defines utility functions for Part 2. Most of these functions do not require modification.

Environment Interaction Functions (see the description in Part 1)

In [76]:
# export
def reset_mdp(env):
  obs, infos = env.reset(seed=SEED, gameFold="train", generateGoldPath=True)
  valids = infos['validActions']
  valids.remove('inventory')
  valids.remove('look around')
  inv = infos['inventory']
  modified_obs = obs_with_inventory(infos['look'], inv)
  # return make_state_mdp(infos['look'], parse_inventory(infos['inventory'])), valids
  return {'observation': infos['look'],
          'inventory': infos['inventory'],
          'valid actions': valids,
          'modified_obs': modified_obs}


def do_action_mdp(action, env):
  obs, reward, done, infos = env.step(action)
  #obs_look, reward_look, done_look, infos_look = env.step('look around')
  valid_actions = infos['validActions']
  valid_actions.remove('inventory')
  valid_actions.remove('look around')
  # return make_state_mdp(infos['look'], parse_inventory(infos['inventory'])), reward, done, valid_actions
  return infos['look'], reward, done, {'observation': infos['look'],
                                       'inventory': infos['inventory'],
                                       'valid actions': valid_actions}

Define `pad_sequences`, which is used in Step 2 to pad text sequences into the same size for batching.

In [77]:
# export
def pad_sequences(sequences, maxlen=None, dtype='int32', value=0.):
    '''
    Partially borrowed from Keras
    # Arguments
        sequences: list of lists where each element is a sequence
        maxlen: int, maximum length
        dtype: type to cast the resulting sequence.
        value: float, value to pad the sequences to the desired value.
    # Returns
        x: numpy array with dimensions (number_of_sequences, maxlen)
    '''
    lengths = [len(s) for s in sequences]
    nb_samples = len(sequences)
    if maxlen is None:
        maxlen = np.max(lengths)
    # take the sample shape from the first non empty sequence
    # checking for consistency in the main loop below.
    sample_shape = tuple()
    for s in sequences:
        if len(s) > 0:
            sample_shape = np.asarray(s).shape[1:]
            break
    x = (np.ones((nb_samples, maxlen) + sample_shape) * value).astype(dtype)
    for idx, s in enumerate(sequences):
        if len(s) == 0:
            continue  # empty list was found
        # pre truncating
        trunc = s[-maxlen:]
        # check `trunc` has expected shape
        trunc = np.asarray(trunc, dtype=dtype)
        if trunc.shape[1:] != sample_shape:
            raise ValueError('Shape of sample %s of sequence at position %s is different from expected shape %s' %
                             (trunc.shape[1:], idx, sample_shape))
        # post padding
        x[idx, :len(trunc)] = trunc
    return x

The action set used in Part 2. This mapping table is used when we initialize DQN to set the number of final outputs (action space) and to map action strings to numerical IDs and vice versa.

In [78]:
# export
action_set = {
  'look around': 0,
  'close door to west': 1,
  'close door to east': 2,
  'close door to south': 3,
  'close door to north': 4,
  'move west': 5,
  'move east': 6,
  'move south': 7,
  'move north': 8,
  'open door to west': 9,
  'open door to east': 10,
  'open door to south': 11,
  'open door to north': 12,
  'inventory': 13,
  'take coin': 14,
  'read map': 15,
  'put map in box': 16,
  'task': 17,
  'take map': 18,
  'put coin in box': 19
}

In Part 2, we need to encode textual states, which is more computationally expensive than using a simple Q-table. To improve efficiency, we use some tricks to reduce the length of state representations. We recommend using this `obs_with_inventory` function for this assignment.

In [79]:
# export
def obs_with_inventory(obs, inv):
  # some tricks to reduce the length of state
  if 'Your inventory is currently empty' in inv:
    inv = 'Inventory: empty'

  if '(maximum capacity is 2 items)' in inv:
    inv = inv.replace("(maximum capacity is 2 items)", "")

  return obs + '\n' + inv

# Important Notes for this Assignment


*   A successful episode from the MDP will give a reward of 1.0
*   A partially successful episode from an MDP environment will give a reward of 0.5
*   If you increase NUM_EPISODES too high, it will take too long in the autograder.
*   We will be checking for hard coded values / outputs, so please don't take any shortcuts.



# Step 1. Implement `DQN`

In this step, we will define how the neural network will encode observations and calculate Q-values to approximate the Q-function.
We implement the `DQN` class that estimates the expected Q-values for each possible action in a given state.
Since neural networks in the DQN takes inputs in the form of tensor, we need to encode a state to obtain state representations. We adopt a simple RNN-based state network following the paper [Interactive Fiction Games: A Colossal Adventure](https://arxiv.org/pdf/1909.05398) to encode textual states from the Textworld-Express.

To help you with this task, we have provided the following three classes:
- `PackedEncoderRNN`: This class is a recurrent neural network (RNN) for processing sequential data like text. You don't need to modify this class in this assignment.
- `StateNetwork`: This class encodes the observations and inventory information from the TextWorld game, creating a compact representation of the game state. While the current implementation uses one RNN to encode the state, you can optionally explore using more RNNs (as suggested in the [paper](https://arxiv.org/pdf/1909.05398)) to encode observations and inventory separately, concatenating them for the final state representations.
- `DQN`: This is the core of the deep Q-Network, containing the `StateNetwork`. **Your main task in this step is to complete the `DQN` class.**

In [80]:
# export

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

class PackedEncoderRNN(nn.Module):
  """
    No need to change, but feel free to improve if needed.
  """
  def __init__(self, input_size, hidden_size):
    super(PackedEncoderRNN, self).__init__()
    self.hidden_size = hidden_size
    self.embedding = nn.Embedding(input_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)

  def forward(self, input, hidden=None):
    embedded = self.embedding(input).permute(1, 0, 2) # T x Batch x EmbDim
    if hidden is None:
        hidden = self.initHidden(input.size(0))

    # Pack the padded batch of sequences
    lengths = torch.tensor([torch.nonzero(n)[-1] + 1 for n in input], dtype=torch.long).cpu()
    packed = nn.utils.rnn.pack_padded_sequence(embedded, lengths, enforce_sorted=False)
    output, hidden = self.gru(packed, hidden)

    # Unpack the padded sequence
    output, _ = nn.utils.rnn.pad_packed_sequence(output)

    # Return only the last timestep of output for each sequence
    lengths = lengths.to(device)
    idx = (lengths-1).view(-1, 1).expand(len(lengths), output.size(2)).unsqueeze(0)
    output = output.gather(0, idx).squeeze(0)
    return output, hidden

  def initHidden(self, batch_size):
    return torch.zeros(1, batch_size, self.hidden_size).to(device)


class StateNetwork(nn.Module):
  """
    No need to change, but feel free to improve if needed.
  """
  def __init__(self, config):
    super(StateNetwork, self).__init__()
    self.config = config
    self.enc_state = PackedEncoderRNN(config.vocab_size, config.hidden_size)
    self.fcx = nn.Linear(config.hidden_size, config.hidden_size)
    self.fch = nn.Linear(config.hidden_size, config.hidden_size)

  def forward(self, inputs):
    batch_size = inputs.shape[0]
    x_o, h_o = self.enc_state(inputs, self.enc_state.initHidden(batch_size))

    x = F.relu(self.fcx(x_o))
    h = F.relu(self.fch(h_o))

    return x, h


class DQN(nn.Module):
  def __init__(self, config):
    super(DQN, self).__init__()
    self.state_network = StateNetwork(config)
    self.act_scorer = nn.Linear(config.hidden_size, config.act_size)

  def forward(self, state):
    """
      the output should be (BATCH_SIZE, ACTION_SIZE): the estimated Q-values for each action given a state
    """
    encoding_x, encoding_h = self.state_network(state)
    return self.act_scorer(encoding_x)
    

Test your DQN implementation with a simple input example. This is a sanity check and does not guarantee the correctness of your code. You will test your implementation after Step 2 and Step 3 on the actual environment.

In [81]:
# Test your DQN implementation

class DQNConfig:
  vocab_size = 50257 # vocab size of the GPT2 tokenizer. Change only if you want to try a different tokenizer.
  act_size = len(action_set)
  embedding_size = 64
  hidden_size = 256

config = DQNConfig()
dqn = DQN(config).to(device)
x = torch.tensor([0, 1, 2, 3, 4, 5]).to(device).unsqueeze(0) # random input token ids
print("******** Q-values ******** (not trained)")
q_values = dqn(x)
for act, actid in action_set.items():
  print(f"{act:20}: {q_values[0][actid]:0.6f}")

******** Q-values ******** (not trained)
look around         : -0.089469
close door to west  : 0.109600
close door to east  : 0.034183
close door to south : 0.075443
close door to north : -0.021584
move west           : 0.020146
move east           : -0.232662
move south          : -0.076713
move north          : 0.101618
open door to west   : 0.034378
open door to east   : -0.118141
open door to south  : 0.016514
open door to north  : 0.151836
inventory           : 0.066870
take coin           : -0.101632
read map            : 0.076312
put map in box      : -0.055058
task                : 0.067009
take map            : 0.085617
put coin in box     : -0.001476


# Step 2: Implement `DQNAgent`

In this step, you will implement the `DQNAgent` class, which encapsulates the `DQN` model and the core logic for interacting with the environment, storing experiences, and training the neural network. The `DQNAgent` is responsible for:

* Interacting with the environment: Using `reset_mdp()` to initialize an episode and `do_action_mdp()` to take actions and observe the consequences.
* Storing experiences: Saving transitions (state, action, reward, next state, done) in a replay buffer for experience replay.
* Estimating Q-values: Using the `DQN` model to predict the expected Q-values for each action in a given state.
* Training the neural network: Updating the `DQN` model's parameters based on the experiences stored in the replay buffer.

Note that the `train` function will serve as the entry point for the training process. For example, we will use this code to create and train your agent:

```
agent = DQNAgent(action_set, DQNConfig(), gamma=GAMMA, epsilon=EPSILON)
agent.train(ENV, NUM_EPISODES, THRESHOLD)
```

The `train` function takes the following arguments:

* `env`: The TextWorldExpress environment (`ENV`).
* `num_episodes`: The total number of episodes to train for.
* `threshold`: The maximum number of steps allowed in a single episode.

**Design Considerations:**

Apart from the `train` function, you have flexibility in designing the internal structure and methods of the `DQNAgent` class. Here are some recommendations to guide your implementation:

* Epsilon Decay: Gradually decrease the exploration rate (epsilon) over time to shift from exploration to exploitation. (use `epsilon_decay` and `epsilon_min`)
* Target Network Update: Periodically update the target network with the weights of the main Q-network (e.g., every 1000 steps) to stabilize training. (use `update_freq_target`)
* Q-Network Update Frequency: Update the Q-network every few steps (e.g., every 4 steps) rather than after every single step to improve efficiency and stability. (use `update_freq` )

Note that your function will interact with the environment through `reset_mdp()` and `do_action_mdp()`. Be sure to reset the environment before running, and terminate the episode if `do_action_mdp()` indicates the episode has terminated.

In [None]:
# export
import sys


class DQNAgent:
  def __init__(self,
               action_set,
               dqn_config,
               gamma,
               epsilon,
               learning_rate=0.0005,
               epsilon_decay=0.96,
               epsilon_min=0.01,
               batch_size=64,
               memory_size=100000,
               update_freq=4,
               update_freq_target=1000):
    self.act2id = {a: i for i, a in enumerate(action_set)}
    self.id2act = {i: a for i, a in enumerate(action_set)}

    self.update_freq = update_freq
    self.update_freq_target = update_freq_target
    self.max_seq_len = 256  # DO NOT CHANGE `max_seq_len`
    self.tokenizer =  AutoTokenizer.from_pretrained('gpt2')

    self.gamma = gamma
    self.epsilon = epsilon
    self.epsilon_decay = epsilon_decay
    self.epsilon_min = epsilon_min
    self.batch_size = batch_size
    self.replay_buffer = deque(maxlen=memory_size)
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.model = DQN(dqn_config).to(self.device)
    self.target_model = DQN(dqn_config).to(self.device)
    self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

  def tokenize_and_pad_states(self, states):
    """
    Tokenizes and pads a list of textual states.
    Uses the pre-trained tokenizer to convert textual descriptions of
    the environment into numerical representations (tokens) and then pads
    the sequences to a uniform length.

    Args:
        states: A list of string representations of the environment state.

    Returns:
        A padded NumPy array of tokenized states.
    """
    input_ids = self.tokenizer(states)['input_ids']
    return pad_sequences(input_ids, maxlen=self.max_seq_len)

  def get_next_action(self, obs):
    state_input = torch.tensor([self.tokenizer(obs)['input_ids']])
    state_input = state_input.to(self.device)
    Q_output = list(self.model(state_input)[0])
    next_action_index = Q_output.index(max(Q_output))
    next_action = self.id2act[next_action_index]
    return next_action
    

  def train(self, env, num_episodes, threshold):
    """Trains the DQN agent in the given environment.

    Args:
      env: The environment to train the agent in.
      num_episodes: The number of episodes to train for.
      threshold: The maximum number of steps to take in each episode.

    Returns:
      - A list of rewards obtained in each episode.
    """
    loss_function = torch.nn.MSELoss()
    all_rewards = []  # Store rewards for each episode
    total_step = 0

    for episode in range(num_episodes):
      eps_reward = 0
      reset_result = reset_mdp(env)
      obs = reset_result["modified_obs"]
      actions = reset_result["valid actions"]
      actions_list = []
      
      for step in range(threshold):
        if np.random.rand() < self.epsilon:
          next_action = random.choice(actions)
        else:
           state_input = torch.tensor([self.tokenizer(obs)['input_ids']])
           state_input = state_input.to(self.device)
           Q_star_output = list(self.model(state_input)[0])
           next_action_index = Q_star_output.index(max(Q_star_output))
           next_action = self.id2act[next_action_index]
        actions_list.append(next_action)
        new_obs, reward, done, info = do_action_mdp(next_action, env)
        eps_reward+=reward
        new_obs = obs_with_inventory(info['observation'], info['inventory'])
        actions = info['valid actions']
        self.replay_buffer.append((obs, next_action, reward, new_obs, done))
        obs = new_obs
        
        if total_step % self.update_freq == 0:
          sample = random.sample(self.replay_buffer, min(self.batch_size, len(self.replay_buffer)))
            
          x = torch.tensor(self.tokenize_and_pad_states([entry[0] for entry in sample]))
          y = []
          action_set = []
          for entry in sample:
            (entry_obs, entry_next_action, entry_reward, entry_new_obs, entry_done) = entry
            if entry_done:
              y.append(torch.tensor(entry_reward))
            else:
              state_input = torch.tensor([self.tokenizer(entry_new_obs)['input_ids']])
              state_input = state_input.to(self.device)
              Q_output = list(self.target_model(state_input)[0])
              y.append(entry_reward + self.gamma*max(Q_output))
            action_set.append([self.act2id[entry_next_action]])
            
          x = x.to(self.device)
          y = torch.tensor(y).to(self.device)
          action_set = torch.tensor(action_set).to(self.device)
          y_hat = self.model(x).gather(dim=1, index=action_set)
          y_hat = torch.squeeze(y_hat, dim=1)
          loss = loss_function(y_hat, y)
          self.optimizer.zero_grad()
          loss.backward()
          self.optimizer.step()
        
        if total_step % self.update_freq_target == 0:
          self.target_model.load_state_dict(self.model.state_dict())
      
        total_step+=1

        if done:
          break
      
      print(f"Episode {episode}, Reward: {eps_reward}, Actions: {actions_list}")
      all_rewards.append(eps_reward)
      if self.epsilon > self.epsilon_min:
        self.epsilon = self.epsilon * self.epsilon_decay
      
    return all_rewards

Similar to Part 1, you might need to adjust the hyperparameters NUM_EPISODES, THRESHOLD, GAMMA, and EPSILON from their default values. These variables are just for the simple test below. The autograder will use the variables you set in `set_parameters` below.

In [83]:
# set parameters
NUM_EPISODES = 150
THRESHOLD = 100
GAMMA = 0.75
EPSILON = 1.0

Create a DQNAgent and train

In [84]:
agent = DQNAgent(action_set, DQNConfig(), gamma=GAMMA, epsilon=EPSILON)
print(device)
all_rewards = agent.train(ENV, NUM_EPISODES, THRESHOLD)
print(all_rewards)

cuda
Episode 0, Reward: 1.0, Actions: ['open door to south', 'open door to south', 'move south', 'close door to east', 'close door to east', 'close door to north', 'move north', 'move north', 'close door to east', 'close door to north', 'close door to east', 'close door to east', 'open door to north', 'close door to east', 'close door to north', 'open door to north', 'open door to east', 'move north', 'close door to west', 'open door to west', 'close door to south', 'close door to south', 'close door to west', 'open door to west', 'move west', 'close door to east', 'close door to east', 'move east', 'open door to east', 'close door to east', 'move east', 'open door to east', 'open door to east', 'open door to east', 'close door to east', 'open door to east', 'close door to east', 'close door to east', 'close door to east', 'open door to east', 'open door to east', 'move east', 'close door to south', 'open door to south', 'move south', 'move north', 'open door to south', 'move south', '

KeyboardInterrupt: 

# Step 3. Implement Code to Run a Policy

In this step, you will implement the `run_policy` function to execute the policy learned by the `DQNAgent`. If you have successfully completed Part 1, adapting your existing implementation to work with the `DQNAgent` should be straightforward. 

**Important:** Ensure that your agent uses **greedy action selection** during policy execution, meaning it always chooses the action with the highest estimated Q-value from the `DQN`. You may implement helper functions within the `DQNAgent` class to facilitate policy execution.

The `run_policy` function takes the following arguments:

* `agent`: Your trained `DQNAgent` instance.
* `env`: The TextWorldExpress environment (e.g., `ENV`).
* `threshold`: The maximum number of steps allowed in an episode before termination.

Your function should run a single episode from the initial state and return:
- A list of actions taken during the episode (e.g., `[act_1, act_2, ... act_n]`).
- The total sum reward of all actions taken as a float.

In [87]:
# export
def run_policy(agent, env, threshold=50):
  actions = [] # Store the entire sequence of actions here
  total_reward = 0.0 # Store the total sum reward of all actions executed here
  reset_result = reset_mdp(env)
  obs = reset_result["modified_obs"]
  
  for step in range(threshold):
      
    next_action = agent.get_next_action(obs)
    
    actions.append(next_action)
    obs, reward, done, info = do_action_mdp(next_action, env)
    total_reward+=reward
    obs = obs_with_inventory(info['observation'], info['inventory'])
            
    if done:
      break
  print(f"Policy actions: {actions}")
  return actions, total_reward


Test your `run_policy` function. Set the threshold value for episode length during policy execution (test time threshold).

In [None]:
# export
TEST_THRESHOLD = 50

Run the policy.

In [None]:
plan, total_reward = run_policy(agent, ENV, threshold = TEST_THRESHOLD)
print("plan:", plan)
print("Total reward:", total_reward)

Policy actions: ['move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west']
plan: ['move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', 'move west', '

# New Environments


The following cells are the same as in Part 1: creating new environemnts: `StochasticTextWorldExpressEnv` and `PunishmentTextWorldExpressEnv`.

In [86]:
NEVER_PICK_ACTIONS = set(['look around', 'inventory'])
ENV_VERBOSE = False

In [88]:
class StochasticTextWorldExpressEnv(TextWorldExpressEnv):

  def __init__(self, serverPath=None, envStepLimit=100, stochasticity = 0.0):
    # Call the super constructor
    super().__init__(serverPath, envStepLimit)
    # Store the valid actions and stochasticity
    self.valid_actions = []
    self.stochasticity = stochasticity

  def reset(self, seed=None, gameFold=None, gameName=None, gameParams=None, generateGoldPath=False):
    # Call the super method
    observation, infos = super().reset(seed, gameFold, gameName, gameParams, generateGoldPath)
    # Update the valid actions
    self.valid_actions = infos['validActions']
    return observation, infos

  def step(self, action:str):
    # If a random value is less than the stochasticity target, choose a random action
    if random.random() < self.stochasticity:
      temp_valids = copy.deepcopy(self.valid_actions)
      # Remove inventory and look around from valid actions to choose from
      temp_valids = list(set(self.valid_actions).difference(NEVER_PICK_ACTIONS))
      # Pick a random action from whatever remains
      action = random.choice(temp_valids)
    # If debugging flag is on, print the action that will be executed
    if ENV_VERBOSE:
      print("[[action]]:", action)
    # Call the super class with either the action passed in or the randomly chosen one
    observation, reward, isCompleted, infos = super().step(action)
    # Update the valid actions
    self.valid_actions = infos['validActions']
    return observation, reward, isCompleted, infos

class PunishmentTextWorldExpressEnv(TextWorldExpressEnv):

  def __init__(self, serverPath=None, envStepLimit=100, punishment = 0.0):
    # Call the super constructor
    super().__init__(serverPath, envStepLimit)
    # Store the punishment
    self.punishment = punishment
    # Store the previous observation
    self.previous_observation = None

  def step(self, action:str):
    # Call the super method
    observation, reward, isCompleted, infos = super().step(action)
    # If the current look is the same as the previous look, then we have performed an illegal action
    if infos['look'] == self.previous_observation:
      reward = self.punishment
    # Store the previous observation
    self.previous_observation = infos['look']
    return observation, reward, isCompleted, infos

New environments must be registered through the Gymnasium API.

In [89]:
# register new environment
gymnasium.register(id='TextWorldExpress-StochasticTextWorldExpressEnv-v0',
                   entry_point='__main__:StochasticTextWorldExpressEnv')
gymnasium.register(id='TextWorldExpress-PunishmentTextWorldExpressEnv-v0',
                   entry_point='__main__:PunishmentTextWorldExpressEnv')

# Testing Suite

This function will run all environments, all game types, all game parameters, and all seeds.

In [90]:
def run_all(environments, games, seeds):
  global ENV, GAME_TYPE, GAME_PARAMS, SEED
  # Results will contain a key (env type, game type, game params, seed) and values will be plans and total_rewards
  results = {}
  test_id = 0
  total_reward = 0
  # Iterate through all environments given
  for env in environments:
    # set global environment
    ENV = env
    # Iterate through all game types, the keys of the games dict
    for game_type in games:
      # Set the global game type
      GAME_TYPE = game_type
      # Iterate through all game parameters for the given game type in game dict
      for params in games[game_type]:
        # set the global game params
        GAME_PARAMS = params
        # load the environment
        ENV.load(gameName=GAME_TYPE, gameParams=GAME_PARAMS)
        # Iterate through all seeds
        for seed in seeds:
          print(f"TESTING {type(ENV)}, {GAME_TYPE}, {GAME_PARAMS}, {seed}")
          # set the global seed
          SEED = seed

          # Run the DQNAgent and get the policy
          agent = DQNAgent(action_set,
                           DQNConfig(),
                           gamma=GAMMA,
                           epsilon=EPSILON)

          agent.train(ENV, NUM_EPISODES, THRESHOLD)

          # run the policy to get the plan
          plan, reward = run_policy(agent, ENV, threshold = TEST_THRESHOLD)

          test_id += 1
          total_reward += reward

          print(f"TESTING {test_id}: total_reward {total_reward}/{test_id} \t (reward: {reward})")
          # Store the plan in the results
          results[(type(ENV), GAME_TYPE, GAME_PARAMS, SEED)] = (plan, total_reward)
  return results

In [91]:
seeds = list(range(3))
environments = [TextWorldExpressEnv(envStepLimit=100),
                StochasticTextWorldExpressEnv(envStepLimit=100, stochasticity=0.25),
                PunishmentTextWorldExpressEnv(envStepLimit=100, punishment=-1.0)]
games = {'coin':      ['numLocations=5,includeDoors=1,numDistractorItems=0',
                       'numLocations=6,includeDoors=1,numDistractorItems=0',
                       'numLocations=7,includeDoors=1,numDistractorItems=0',
                       'numLocations=10,includeDoors=1,numDistractorItems=0'],
         'mapreader': ['numLocations=5,maxDistanceApart=3,includeDoors=0,maxDistractorItemsPerLocation=0',
                       'numLocations=8,maxDistanceApart=4,includeDoors=0,maxDistractorItemsPerLocation=0',
                       'numLocations=11,maxDistanceApart=5,includeDoors=0,maxDistractorItemsPerLocation=0']}

Set parameters. Do not alter this cell outside of the changing the numeric values.

**You might need to change these parameters to get a good result on the harder environments**

Please note that increasing `NUM_EPISODES` will result in an increase in time to run the cell below.


In [102]:
# export
def set_parameters():
    global NUM_EPISODES, THRESHOLD, GAMMA, EPSILON, TEST_THRESHOLD
    NUM_EPISODES = 100
    THRESHOLD = 100
    GAMMA = 0.75
    EPSILON = 1.0
    TEST_THRESHOLD = 50

    return {
      'NUM_EPISODES': NUM_EPISODES,
      'THRESHOLD': THRESHOLD,
      'GAMMA': GAMMA,
      'EPSILON': EPSILON,
      'TEST_THRESHOLD': TEST_THRESHOLD
    }

In [103]:
set_parameters()

{'NUM_EPISODES': 100,
 'THRESHOLD': 100,
 'GAMMA': 0.75,
 'EPSILON': 1.0,
 'TEST_THRESHOLD': 50}

Run all tests

In [104]:
results = run_all(environments, games, seeds)

TESTING <class 'textworld_express.textworld_express.TextWorldExpressEnv'>, coin, numLocations=5,includeDoors=1,numDistractorItems=0, 0
Episode 0, Reward: 1.0, Actions: ['take coin']
Episode 1, Reward: 1.0, Actions: ['move south', 'move west', 'move east', 'move north', 'open door to south', 'move west', 'close door to west', 'move east', 'move south', 'close door to north', 'move north', 'close door to north', 'move north', 'move north', 'open door to north', 'move north', 'close door to north', 'move north', 'close door to north', 'close door to south', 'take coin']
Episode 2, Reward: 1.0, Actions: ['close door to south', 'take coin']
Episode 3, Reward: 1.0, Actions: ['close door to south', 'move north', 'open door to south', 'move south', 'move north', 'close door to north', 'move south', 'take coin', 'move north', 'move south', 'move north', 'close door to north', 'close door to north', 'move south', 'open door to north', 'close door to north', 'move north', 'close door to north', '

KeyboardInterrupt: 

# Grading

Grading will be done in the same way as Part 1. There will be a total of 63 tests: 1 point for each correct plan per algorithm.

**Grading:**

Maximum total points: 50

| # correct plan | Score |
|----------|-------|
| >= 50   |  50   |
| 49      |  49   |
| 48      |  48   |
| ...      |  ...  |
| 2      |  2   |
| 1       |   1  |
| 0   |   0   |


# Submission

Upload this notebook with the name `hw2_part2.ipynb` file to Gradescope. Part 2 will be graded on our local GPU machines. Final grades will be uploaded on Gradescope after the submission deadline.

We've added appropriate comments to the top of certain cells for the autograder to export (`# export`). You do NOT have to do anything (e.g. remove print statements) to cells we have provided - anything related to those have been handled for you. You are responsible for ensuring your own code has no syntax errors or unnecessary print statements. You ***CANNOT*** modify the export comments at the top of the cells, or the autograder will fail to run on your submission.

You should ***not*** add any cells to the notebook when submitting. You're welcome to add any code as you need to extra cells when testing, but you must remove them when submitting.

If you identify an issue with the autograder, please feel free to reach out to us on Piazza, or email bok004@ucsd.edu.