## Create coin-collector game##
Create a coin-collector game with 5 rooms.

In [1]:
# !tw-make tw-coin_collector --level 5 -f -v --seed 1996 --save-overview --output "tw_games/single_games/l5_easy_2/coin_collector_l5_easy_2.ulx"

Global seed: 1996
Game generated: /home/cs19s028/IIT Madras/Research/coin_collector_game_custom/coin_collector_game_tests/1.vanilla/tw_games/single_games/l5_easy_2/coin_collector_l5_easy_2.ulx

Objective:
I hope you're ready to go into rooms and interact with objects, because you've just entered TextWorld! Here is how to play! First off, move east. With that accomplished, attempt to venture north. Okay, and then, make an effort to move east. With that accomplished, make an attempt to head north. After that, pick-up the coin from the floor of the chamber. Once that's all handled, you can stop!

Walkthrough:
go east > go north > go east > go north > take coin

-= Stats =-
Nb. locations: 5
Nb. objects: 1
Overview image: /home/cs19s028/IIT Madras/Research/coin_collector_game_custom/coin_collector_game_tests/1.vanilla/tw_games/single_games/l5_easy_2/coin_collector_l5_easy_2.png


In [2]:
import re
from typing import List, Mapping, Any, Optional
from collections import defaultdict, namedtuple, deque

import time, datetime

import yaml
import os
from glob import glob
from tqdm import tqdm

import random
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

import gym 
import textworld.gym
from textworld import EnvInfos

In [3]:
class TextWorldGame:
    
    def __init__(self, game_config):
        self.current_date_time = datetime.datetime.now().strftime("%m-%d-%Y_%H-%M-%S")
        self.name  = f"{game_config['name']}_" + f"created_on-{self.current_date_time}"
        self.path = game_config["path"]
        self.game_files = [self.path]
        if os.path.isdir(self.path):
            self.game_files = glob(os.path.join(path, "*.ulx"))
            
        self.max_steps = game_config["max_steps"]
        
        self.env_id = textworld.gym.register_games(
            self.game_files,
            request_infos=self.request_infos,
            max_episode_steps=self.max_steps
        )
        
        self.env = gym.make(self.env_id)
        
        self.verbs = ["go", "take"]
        self.nouns = ["north", "east", "south", "west", "coin"]
        
        self.game_info = {
            "env" : self.env,
            "name" : self.name,
            "max_steps" : self.max_steps,
            "created_on" : self.current_date_time
        }
        
        self.game_controls = {
            "verbs" : self.verbs,
            "nouns" : self.nouns,
        }
        
        
    def __str__(self):
        msg = f"GAME: {self.game_info['name']}\n" + \
            "====================================\n" + \
            f"MAX. STEP: {self.game_info['max_step']}\n\n" + \
            f"VERBS: {self.game_controls['verbs']}\n\n" + \
            f"NOUNS: {self.game_controls['nouns']}\n" 
        
        return msg
    
        
    def get_game_info(self):
        "Return dictionary containing gym env. info."
        return self.game_info
    
    
    def get_game_controls(self):
        "Return dictionary containing gym env. info."
        return self.game_controls
    
    
    @property
    def request_infos(self) -> EnvInfos:
        request_infos = EnvInfos()
        request_infos.description = True
        request_infos.won = True
        request_infos.last_command = True
        
        return request_infos

## Building the random agent baseline ##
We build an agent that plays the game by selecting random commands. This agent will be treated as the baseine agent against which the performnace of the RL agent will be measured.

In [4]:
class RandomAgent:
    
    def __init__(self, config):  
        
        init_config     = config["init"]
        testing_config  = config["testing"] 
        
        self.agent_name              = init_config["general"]["agent_name"]
        
        self.seed                    = init_config["general"]["seed"]        
        self.epochs                  = init_config["general"]["epochs"]
        
        self.log_dir                 = init_config["general"]["log_dir"]
        self.logging_frequency       = init_config["general"]["logging_frequency"]
        
        self.verbose                 = init_config["general"]["verbose"]
        
        self.verbs                   = ["go", "take"]        
        self.nouns                   = ["north", "east", "south", "west", "coin"]
        self.noun_map                = dict()
        self.verb_map                = dict()
        
        self._build_general_modules()
        self._build_testing_modules(testing_config)
        
        
    def _build_general_modules(self):
        self.rng = np.random.RandomState(self.seed)
        
        # Build verb map.
        for i, v in enumerate(self.verbs):
            self.verb_map[i] = v
        # Build noun map.
        for i, n in enumerate(self.nouns):
            self.noun_map[i] = n
            
        return
    
    
    def _build_testing_modules(self, testing_config):
        self.max_episodes_testing = testing_config["scheduling"]["max_episodes"]
        self.max_steps_testing    = testing_config["scheduling"]["max_steps"]
            
        return
            
    
    def _choose_command(self):
        """Choose any command at random."""        
        v_idx = self.rng.randint(0, len(self.verb_map))
        n_idx = self.rng.randint(0, len(self.noun_map))
                
        command_verb = self.verb_map[v_idx]
        command_noun = self.noun_map[n_idx]
        command      = f"{command_verb} {command_noun}"

        return v_idx, n_idx, command
    
    
    def play(self, testing_game_info, training_game_info=None, training_enabled=False):
        """Play the game in the selected mode."""
        
        testing_game_name  = testing_game_info["name"]
        testing_game_env   = testing_game_info["env"]
        
        # We will only return the stats corresponding to the testing phase of the agent.
        stats = {
            "agent_name"         : self.agent_name,
            "testing_game_name"  : testing_game_name,
            "epochs"             : self.epochs,
            "epochs_avg_reward"  : [],
            "epochs_avg_steps"   : [],
            "epochs_avg_wins"    : []
        }
        
        testing_game_name = testing_game_info["name"]
        testing_game_env  = testing_game_info["env"]
        
        # Create log dir., if not present.
        if not os.path.isdir(f"{self.log_dir}_{testing_game_name}"):
            os.makedirs(f"{self.log_dir}_{testing_game_name}")
            
        # Create summary writer for tensorboard.
        self.summary_writer = \
            tf.summary.create_file_writer(logdir=f"{self.log_dir}_{testing_game_name}")
            
        for epoch in tqdm(range(self.epochs), ascii=True, unit=" epoch"):
            epoch_testing_stats = dict()
            
            epoch_testing_stats = self._test(testing_game_env)
            
            if self.verbose:
                if (not ((epoch+1) % self.logging_frequency)) or (epoch == 0):
                    print("")
                    print(f"Epoch: {epoch}")
                    print("=============================================")
                    
                    print("Test stats")
                    print("--------------------------------------------")
                    print(f"Avg. reward: {epoch_testing_stats['avg_reward']:>5.3f}")
                    print(f"Avg. steps: {epoch_testing_stats['avg_reward']:>5.3f}")
                    print(f"Avg. wins: {epoch_testing_stats['avg_steps']:>5.3f}")                    
            
            with self.summary_writer.as_default():
                tf.summary.scalar(
                  f"Avg. reward", 
                  epoch_testing_stats['avg_reward'],
                  step=epoch
                )
                tf.summary.scalar(
                  f"Avg. steps", 
                  epoch_testing_stats['avg_steps'],
                  step=epoch
                )
                tf.summary.scalar(
                  f"Avg. wins", 
                  epoch_testing_stats['avg_wins'],
                  step=epoch
                )
                
            stats["epochs_avg_reward"].append(epoch_testing_stats['avg_reward'])
            stats["epochs_avg_steps"].append(epoch_testing_stats['avg_steps'])
            stats["epochs_avg_wins"].append(epoch_testing_stats['avg_wins'])
            
        return stats
    
    
    def _test(self, testing_game_env):
        self.mode = "testing"
        
        testing_stats = {
            "avg_reward" : 0,
            "avg_steps" : 0,
            "avg_wins" : 0
        }        
        
        # Initialize empty list for storing episode info at the start of evry epoch.
        episodes_rewards = []
        episodes_steps   = []
        episodes_won     = []
        
        for episode in range(self.max_episodes_testing):
            # Reset environment at the start of evry episode.
            obs, infos        =  testing_game_env.reset()
            
            cumulative_reward = 0
            nb_moves          = 0  
            done              = False
            won               = False
            
            while not done:

                v_idx, n_idx, command = self._choose_command()

                obs, score, done, infos = testing_game_env.step(command)
                
                # Calculate reward based on score.
                reward = score # Reward is equal to the score obtained after each move.

                cumulative_reward += reward # Update cumulative reward.
                nb_moves          += 1 # Update no. of moves (steps) played.
                won               = infos["won"]
                
                # Break out from while loop (i.e., end episode) if reached max. steps allowed.
                if nb_moves >= self.max_steps_testing:
                    break
                    
            episodes_rewards.append(cumulative_reward) # Rewards obtained every episode.
            episodes_steps.append(nb_moves) # Steps played every episode.
            episodes_won.append(won) # Info. regarding whether won episode or not.
                
        testing_stats["avg_reward"] = np.mean(episodes_rewards)
        testing_stats["avg_steps"]  = np.mean(episodes_steps)
        testing_stats["avg_wins"]   = np.count_nonzero(episodes_won)/len(episodes_won)
        
        return testing_stats

## Building the reinforcement learning agent##
Now, we build an RL agent which uses a LSTM-DQN model to issue the commands. This agent will have to be trained first before testing it so as to measure its performance.

### Build the LSTM-DQN model ###
First, create the LSTM-DQN model used by the RL agent.

In [5]:
class RepresentationGenerator(tf.keras.layers.Layer):
    def __init__(self, config):
        super(RepresentationGenerator, self).__init__()
        self.seed = config["seed"]
        tf.random.set_seed(self.seed)
        self.embedding_layer_input_dim = config["embedding_layer_input_dim"]
        self.embedding_layer_hidden_dim = config["embedding_layer_hidden_dim"]
        self.lstm_layer_hidden_dim = config["lstm_layer_hidden_dim"]
        self._build_layers()
        
    def _build_layers(self):
        
        self.embedding_layer = tf.keras.layers.Embedding(
            self.embedding_layer_input_dim,
            self.embedding_layer_hidden_dim,
            name="embedding_layer"
        )        
        self.lstm_layer = tf.keras.layers.LSTM(
            self.lstm_layer_hidden_dim, 
            return_sequences=True,
            name="lstm_layer"
        )        
        self.mean_pooling_layer = tf.keras.layers.AveragePooling1D(
            name="mean_pooling_layer"
        )
        
    def call(self, x):
        x = self.embedding_layer(x)        
        x = self.lstm_layer(x)        
        x = self.mean_pooling_layer(x)
        
        return x 

In [6]:
class ActionScorer(tf.keras.layers.Layer):
    def __init__(self, config):
        super(ActionScorer, self).__init__()
        self.seed = config["seed"]
        tf.random.set_seed(self.seed)
        self.shared_mlp_layer_hidden_dim = config["shared_mlp_layer_hidden_dim"]
        self.verb_scorer_perceptron_hidden_dim = config["verb_scorer_perceptron_hidden_dim"]
        self.noun_scorer_perceptron_hidden_dim = config["noun_scorer_perceptron_hidden_dim"]
        self.verb_scorer_output_dim = config["verb_scorer_output_dim"]
        self.noun_scorer_output_dim = config["noun_scorer_output_dim"]
        self._build_layers()
        
    def _build_layers(self):
        self.shared_mlp_layer = tf.keras.layers.Dense(
            self.shared_mlp_layer_hidden_dim, 
            activation="relu",
            name="shared_mlp_layer"
        )        
        self.verb_scorer_perceptron_layer = tf.keras.layers.Dense(
            self.verb_scorer_perceptron_hidden_dim,
            activation="relu",
            name="verb_scorer_perceptron_layer"
        )        
        self.noun_scorer_perceptron_layer = tf.keras.layers.Dense(
            self.noun_scorer_perceptron_hidden_dim,
            activation="relu",
            name="noun_scorer_perceptron_layer"
        )
        self.verb_scorer_layer = tf.keras.layers.Dense(
            self.verb_scorer_output_dim, 
            name="verb_scores"
        )        
        self.noun_scorer_layer = tf.keras.layers.Dense(
            self.noun_scorer_output_dim, 
            name="noun_scores"
        )
        
    def call(self, x):
        x = self.shared_mlp_layer(x)
        
        x1 = self.verb_scorer_perceptron_layer(x)
        x1 = self.verb_scorer_layer(x1)
        
        x2 = self.noun_scorer_perceptron_layer(x)
        x2 = self.noun_scorer_layer(x2)
        
        return [x1, x2]

In [7]:
class LSTM_DQN(tf.keras.Model):
    def __init__(self, config):
        super(LSTM_DQN, self).__init__()
        
        self.representation_generator_config = {
            "seed"                       : config["general"]["seed"],
            "embedding_layer_input_dim" : config["layers"]["embedding_input_dim"],
            "embedding_layer_hidden_dim" : config["layers"]["embedding_hidden_dim"],
            "lstm_layer_hidden_dim"      : config["layers"]["lstm_hidden_dim"]
        }
        
        self.action_scorer_config = {
            "seed"                              : config["general"]["seed"],
            "shared_mlp_layer_hidden_dim"       : config["layers"]["shared_mlp_hidden_dim"],
            "verb_scorer_perceptron_hidden_dim" : config["layers"]["verb_scorer_perceptron_hidden_dim"],
            "noun_scorer_perceptron_hidden_dim" : config["layers"]["noun_scorer_perceptron_hidden_dim"],
            "verb_scorer_output_dim"            : config["layers"]["verb_scorer_output_dim"],
            "noun_scorer_output_dim"            : config["layers"]["noun_scorer_output_dim"]        
        }
        
        self._build_layers()
        
        
    def _build_layers(self):
        self.representation_generator = RepresentationGenerator(self.representation_generator_config)
        self.action_scorer = ActionScorer(self.action_scorer_config)
        
        
    def call(self, x):
        x = self.representation_generator(x)
        x1, x2 = self.action_scorer(x)
        return x1, x2

### Transition Tuple ###
Create a _nametuple_ called *Transitions* to store the experiences in the replay memory.

In [9]:
Transition = namedtuple(
    "Transition",
    ("state", "v_idx", "n_idx", "reward",
     "discovery_bonus", "next_state", "done")
)

### Replay Memory###
This is a vanilla replay memory used to store the experiences such that the stored experiences can be used later on for training the agent.

In [10]:
class ReplayMemory(object):

    def __init__(self, seed, capacity=100_000):
        # vanilla replay memory
        self.seed = seed
        random.seed(self.seed)
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, minibatch_size):
        return random.sample(self.memory, minibatch_size)

    def __len__(self):
        return len(self.memory)

### Prioritized Replay Memory Class ###
We now create a prioritized replay memory to store the experiences so that they can be used later on for training the agent. Using the vanilla replay memory proved to be little disadvanatgeous due to the problem of **catatrophic forgetting**.

In [11]:
class PrioritizedReplayMemory(object):

    def __init__(self, seed, capacity=100_000, priority_fraction=0.0):
        # prioritized replay memory
        self.seed = seed
        random.seed(self.seed)
        self.priority_fraction = priority_fraction
        self.alpha_capacity = int(capacity * priority_fraction)
        self.beta_capacity = capacity - self.alpha_capacity
        self.alpha_memory = []
        self.beta_memory = []
        self.alpha_position = 0 
        self.beta_position = 0

    def push(self, is_prior=False, *args):
        """Saves a transition."""
        if is_prior:
            if len(self.alpha_memory) < self.alpha_capacity:
                self.alpha_memory.append(None)
            self.alpha_memory[self.alpha_position] = Transition(*args)
            self.alpha_position = (self.alpha_position + 1) % self.alpha_capacity
        else:
            if len(self.beta_memory) < self.beta_capacity:
                self.beta_memory.append(None)
            self.beta_memory[self.beta_position] = Transition(*args)
            self.beta_position = (self.beta_position + 1) % self.beta_capacity

    def sample(self, minibatch_size):
        from_alpha = min(int(self.priority_fraction * minibatch_size), len(self.alpha_memory))
        from_beta = min(minibatch_size - int(self.priority_fraction * minibatch_size), len(self.beta_memory))
        res = random.sample(self.alpha_memory, from_alpha) + random.sample(self.beta_memory, from_beta)
        random.shuffle(res)
        return res

    def __len__(self):
        return len(self.alpha_memory) + len(self.beta_memory)

### Reinforcement learning agent ###
Now, we finally create the reinforcement learning agent.

In [12]:
class RLAgent: 
    
    def __init__(self, config):
        
        init_config     = config["init"]
        training_config = config["training"]
        testing_config  = config["testing"] 
        
        self.agent_name              = init_config["general"]["agent_name"]
        
        self.seed                    = init_config["general"]["seed"]
        self.max_vocab_size          = init_config["general"]["max_vocab_size"]
        self.epochs                  = init_config["general"]["epochs"]
        self.log_dir                 = init_config["general"]["log_dir"]
        self.logging_frequency       = init_config["general"]["logging_frequency"]
        
        self.model_dir               = init_config["general"]["model_dir"]
        self.model_saving_frequency  = init_config["general"]["model_saving_frequency"]
        
        self.verbose                 = init_config["general"]["verbose"]
        self.model_config            = init_config["model"]["lstm_dqn"]
        self.learning_rate           = self.model_config["optimizer"]["learning_rate"]
        
        self.id2word                 = ["<PAD>", "<UNK>"]
        self.word2id                 = {w: i for i, w in enumerate(self.id2word)}
        
        self.verbs                   = ["go", "take"]        
        self.nouns                   = ["north", "east", "south", "west", "coin"]
        self.noun_map                = dict()
        self.verb_map                = dict()
        
        self._build_general_modules()
        self._build_training_modules(training_config)
        self._build_testing_modules(testing_config)
        
    
    def _build_general_modules(self): 
        
        self.rng = np.random.RandomState(self.seed)
        
        # Build verb map.
        for i, v in enumerate(self.verbs):
            self.verb_map[i] = v
            
        # Build noun map.
        for i, n in enumerate(self.nouns):
            self.noun_map[i] = n
        
        # Add the information abot number of verbs and nouns to
        # model config dicitionary.
        self.model_config["layers"]["verb_scorer_output_dim"] = len(self.verb_map)
        self.model_config["layers"]["noun_scorer_output_dim"] = len(self.noun_map)        
        
        # Create new main lstm-dqn model (if loading saved model is set to false).
        if self.model_config["general"]["load_saved_model"] == True:
            self.main_model = tf.keras.models.load_model(self.model_config["general"]["saved_model_path"])
        else:
            self.main_model = self._create_model(self.model_config)
        
        return
        
        
    def _create_model(self, model_name):
        """Method to create the LSTM-DQN model."""
        # Create the model.
        lstm_dqn = LSTM_DQN(self.model_config)
        
        # Compile the model.
        lstm_dqn.compile(
            optimizer=tf.keras.optimizers.Adam(self.learning_rate),
            loss=tf.keras.losses.MSE,
            metrics="accuracy"
        )
    
        return lstm_dqn
    
    
    def _build_training_modules(self, training_config):        
        self.max_episodes_training    = training_config["scheduling"]["max_episodes"]
        self.max_steps_training       = training_config["scheduling"]["max_steps"]
        self.bonus_coefficient        = 0 # No bonus coefficient for vanilla agents.
        self.discount                 = training_config["scheduling"]["discount"]
        
        self.replay_frequency         = training_config["scheduling"]["replay_frequency"]
        self.replay_counter           = 0
        self.minibatch_size           = training_config["scheduling"]["minibatch_size"]
        self.target_update_frequency  = training_config["scheduling"]["target_update_frequency"]
        self.target_update_counter    = 0
        self.min_replay_memory_size   = training_config["scheduling"]["min_replay_memory_size"]
        
        self.replay_memory_capacity   = training_config["replay_memory"]["capacity"]
        self.replay_priority_fraction = training_config["replay_memory"]["priority_fraction"]
        
        self.epsilon_init_training    = training_config["exploration"]["epsilon_init"]
        self.epsilon_min_training     = training_config["exploration"]["epsilon_min"]
        self.epsilon_decay_training   = training_config["exploration"]["epsilon_decay_rate"]
        self.epsilon_training         = self.epsilon_init_training
        
        # Target neural n/w model.
        self.target_model = self._create_model(self.model_config)
        self.target_model.set_weights(self.main_model.get_weights())
        
        # Replay memory.
        self.replay_memory = PrioritizedReplayMemory(
            seed=self.seed, 
            capacity=self.replay_memory_capacity, 
            priority_fraction=self.replay_priority_fraction
        )
        
        return
    
    
    def _build_testing_modules(self, testing_config):
        self.max_episodes_testing = testing_config["scheduling"]["max_episodes"]
        self.max_steps_testing    = testing_config["scheduling"]["max_steps"]
        self.epsilon_testing      = testing_config["exploration"]["epsilon_init"]
            
        return
        
        
    def _choose_command(self, state, eps):
        """Choose command using e-greedy strategy."""
        if np.random.random() >= eps:
            # Exploit.
            v_idx, n_idx = self._get_max_qs_idx(self.main_model, [state])
            v_idx = v_idx[0]
            n_idx = n_idx[0]
        else:
            # Explore.
            v_idx = self.rng.randint(0, len(self.verb_map))
            n_idx = self.rng.randint(0, len(self.noun_map))
                
        command_verb = self.verb_map[v_idx]
        command_noun = self.noun_map[n_idx]
        command      = f"{command_verb} {command_noun}"

        return v_idx, n_idx, command
    
    
    def play(self, testing_game_info, training_game_info=None, training_enabled=False):
        """Play the game in the selected mode."""
        
        if training_enabled == True:
            training_game_name = training_game_info["name"]
            training_game_env  = training_game_info["env"]
        
        testing_game_name  = testing_game_info["name"]
        testing_game_env   = testing_game_info["env"]
        
         # Create log dir., if not present.
        if not os.path.isdir(f"{self.log_dir}_{testing_game_name}"):
            os.makedirs(f"{self.log_dir}_{testing_game_name}")
            
        # Create summary writer for tensorboard.
        self.summary_writer = \
            tf.summary.create_file_writer(logdir=f"{self.log_dir}_{testing_game_name}")
        
        # Create model dir., if not present.
        if not os.path.isdir(f"{self.model_dir}_{testing_game_name}"):
            os.makedirs(f"{self.model_dir}_{testing_game_name}")
        
        # We will only return the stats c,orresponding to the testing phase of the agent.
        stats = {
            "agent_name"         : self.agent_name,
            "testing_game_name"  : testing_game_name,
            "epochs"             : self.epochs,
            "epochs_avg_reward"  : [],
            "epochs_avg_steps"   : [],
            "epochs_avg_wins"    : []
        }
        
        # Create log dir if not present.
        if not os.path.isdir(f"{self.log_dir}_{testing_game_name}"):
            os.makedirs(f"{self.log_dir}_{testing_game_name}")
        
        for epoch in tqdm(range(self.epochs), ascii=True, unit=" epoch"):
            epoch_training_stats = dict()
            epoch_testing_stats = dict()
            
            if training_enabled:
                epoch_training_stats = self._train(training_game_env)

            epoch_testing_stats = self._test(testing_game_env)
            
            if self.verbose:
                if (not ((epoch+1) % self.logging_frequency)) or (epoch == 0):
                    print("")
                    print(f"Epoch: {epoch}")
                    print("=============================================")
                    
                    if training_enabled:
                        print("Train stats")
                        print("--------------------------------------------")
                        print(f"Avg. reward: {epoch_training_stats['avg_reward']:>5.3f}")
                        print(f"Avg. steps: {epoch_training_stats['avg_steps']:>5.3f}")
                        print(f"Avg. wins: {epoch_training_stats['avg_wins']:>5.3f}")
                        print(f"Epsilon: {self.epsilon_training:>5.3f}")
                    
                    print("")
                    print("Test stats")
                    print("--------------------------------------------")
                    print(f"Avg. reward: {epoch_testing_stats['avg_reward']:>5.3f}")
                    print(f"Avg. steps: {epoch_testing_stats['avg_steps']:>5.3f}")
                    print(f"Avg. wins: {epoch_testing_stats['avg_wins']:>5.3f}")
                    print(f"Epsilon: {self.epsilon_testing:>5.3f}")
                    
            
            with self.summary_writer.as_default():
                tf.summary.scalar(
                  f"Avg. reward", 
                  epoch_testing_stats['avg_reward'],
                  step=epoch
                )
                tf.summary.scalar(
                  f"Avg. steps", 
                  epoch_testing_stats['avg_steps'],
                  step=epoch
                )
                tf.summary.scalar(
                  f"Avg. wins", 
                  epoch_testing_stats['avg_wins'],
                  step=epoch
                )
                
            if (not ((epoch+1) % self.model_saving_frequency)):
                model_name = \
                    f"epoch-{epoch}_" + f"avg_reward-{epoch_testing_stats['avg_reward']}_" + \
                    f"avg_steps-{epoch_testing_stats['avg_steps']}_" + \
                    f"avg_wins-{epoch_testing_stats['avg_wins']}_" + \
                    f"training_epsilon-{self.epsilon_training}"
                model_out = os.path.join(f"{self.model_dir}_{testing_game_name}_mode-{self.mode}", model_name)
                self.main_model.save(model_out)
                
            stats["epochs_avg_reward"].append(epoch_testing_stats['avg_reward'])
            stats["epochs_avg_steps"].append(epoch_testing_stats['avg_steps'])
            stats["epochs_avg_wins"].append(epoch_testing_stats['avg_wins'])
            
        return stats
    
    
    def _train(self, training_game_env):
        self.mode = "training"
        
        training_stats = {
            "avg_reward" : 0,
            "avg_steps" : 0,
            "avg_wins" : 0
        }        
        
        # Initialize empty list for storing episode info at the start of evry epoch.
        episodes_rewards = []
        episodes_steps   = []
        episodes_won     = [] 
        
        for episode in range(self.max_episodes_training):
        
            # Reset environment at the start of evry episode.
            obs, infos        = training_game_env.reset()

            state             = f"{obs}\n{infos['description']}\n{infos['last_command']}"            
            state_processed   = self._preprocess([state])
            cumulative_reward = 0
            nb_moves          = 0  
            done              = False
            won               = False
            
            while not done:

                v_idx, n_idx, command = self._choose_command(state_processed, self.epsilon_training)

                obs, score, done, infos = training_game_env.step(command)

                # Calculate reward based on score.
                reward = score # Reward is equal to the score obtained after each move. 
                
                # No discovery bonus for vanilla agent.    
                discovery_bonus = 0

                # Preprocess the next state.
                next_state = f"{obs}\n{infos['description']}\n{infos['last_command']}"
                next_state_processed = self._preprocess([next_state])                

                # In training mode, we need to remember experiences and replay every few steps.
                if reward >= 0.0:
                    is_prior = True
                else:
                    is_prior = False
                    
                self._remember(
                    is_prior, state_processed, v_idx, n_idx, reward, 
                    discovery_bonus, next_state_processed, done
                )

                #  After evry move, increment replay_counter.
                self.replay_counter += 1
                if self.replay_counter > self.replay_frequency: 
                    self._replay()
                    self.replay_counter = 0
                    
                

                state             = next_state
                state_processed   = next_state_processed # Update current state.
                cumulative_reward += reward # Update cumulative reward.
                nb_moves          += 1 # Update no. of moves (steps) played.
                won               = infos["won"]
                
                self.epsilon_training = max(self.epsilon_min_training, self.epsilon_decay_training * self.epsilon_training)
                
                # Break out from while loop (i.e., end episode) if reached max. steps allowed.
                if nb_moves >= self.max_steps_training:
                    break
                    
            episodes_rewards.append(cumulative_reward) # Rewards obtained every episode.
            episodes_steps.append(nb_moves) # Steps played every episode.
            episodes_won.append(won) # Info. regarding whether won episode or not.

            if self.mode == "training":
                # If in training mode, increment target_update_counter at end of every episode.
                self.target_update_counter += 1
                if self.target_update_counter > self.target_update_frequency:
                    self.target_model.set_weights(self.main_model.get_weights())
                    self.target_update_counter = 0
                
        training_stats["avg_reward"] = np.mean(episodes_rewards)
        training_stats["avg_steps"]  = np.mean(episodes_steps)
        training_stats["avg_wins"]   = np.count_nonzero(episodes_won)/len(episodes_won)
        
        return training_stats
    
    
    def _test(self, testing_game_env):
        self.mode = "testing"
        
        testing_stats = {
            "avg_reward" : 0,
            "avg_steps" : 0,
            "avg_wins" : 0
        }        
        
        # Initialize empty list for storing episode info at the start of evry epoch.
        episodes_rewards = []
        episodes_steps   = []
        episodes_won     = []
        
        for episode in range(self.max_episodes_testing):
            # Reset environment at the start of evry episode.
            obs, infos        =  testing_game_env.reset()

            state             = f"{obs}\n{infos['description']}\n{infos['last_command']}"
            state_processed   = self._preprocess([state])
            cumulative_reward = 0
            nb_moves          = 0  
            done              = False
            won               = False
            
            while not done:

                v_idx, n_idx, command = self._choose_command(state_processed, self.epsilon_testing)

                obs, score, done, infos = testing_game_env.step(command)

                # Calculate reward based on score.
                reward = score # Reward is equal to the score obtained after each move.               

                # Preprocess the next state.
                next_state = f"{obs}\n{infos['description']}\n{infos['last_command']}"
                next_state_processed = self._preprocess([next_state])

                state             = next_state
                state_processed   = next_state_processed # Update current state.
                cumulative_reward += reward # Update cumulative reward.
                nb_moves          += 1 # Update no. of moves (steps) played.
                won               = infos["won"]
                
                # Break out from while loop (i.e., end episode) if reached max. steps allowed.
                if nb_moves >= self.max_steps_testing:
                    break
                    
            episodes_rewards.append(cumulative_reward) # Rewards obtained every episode.
            episodes_steps.append(nb_moves) # Steps played every episode.
            episodes_won.append(won) # Info. regarding whether won episode or not.
                
        testing_stats["avg_reward"] = np.mean(episodes_rewards)
        testing_stats["avg_steps"]  = np.mean(episodes_steps)
        testing_stats["avg_wins"]   = np.count_nonzero(episodes_won)/len(episodes_won)
        
        return testing_stats
    
    
    def _remember(self, is_prior, state, v_idx, n_idx, reward, discovery_bonus, next_state, done):
        """Update replay memory."""
        self.replay_memory.push(is_prior, state, v_idx, n_idx, reward, discovery_bonus, next_state, done)
        return
        
        
    def _get_max_qs_idx(self, nn_model, state_batch):
        """Get indexes corresponding to the max. verb and noun scores."""
        
        # Add padding to the batch.
        state_batch = self._add_padding(state_batch)
        
        verb_scores_batch, noun_scores_batch = \
            nn_model.predict(state_batch)
        
        max_verb_score_idx_batch = np.argmax(verb_scores_batch, axis=2)
        max_verb_score_idx_batch = max_verb_score_idx_batch[:, 0]
        
        max_noun_score_idx_batch = np.argmax(noun_scores_batch, axis=2)
        max_noun_score_idx_batch = max_noun_score_idx_batch[:, 0]
        
        return max_verb_score_idx_batch, max_noun_score_idx_batch
    
    
    def _get_max_qs(self, nn_model, state_batch):
        """Get max. verb and noun scores."""
        
        # Add padding to the batch.
        state_batch = self._add_padding(state_batch)
        
        verb_scores_batch, noun_scores_batch = \
            nn_model.predict(state_batch)
        
        max_verb_score_batch = np.max(verb_scores_batch, axis=2)

        max_verb_score_batch = max_verb_score_batch[:, 0]
        
        max_noun_score_batch = np.max(noun_scores_batch, axis=2)
        max_noun_score_batch = max_noun_score_batch[:, 0]
        
        return max_verb_score_batch, max_noun_score_batch
    
    
    def _get_qs(self, nn_model, state_batch):
        """Get list of verb scores and list of noun scores."""
        
        # Add padding to the batch.
        state_batch = self._add_padding(state_batch)
        
        verb_scores_batch, noun_scores_batch = \
            nn_model.predict(state_batch)
        
        return verb_scores_batch, noun_scores_batch
    
    
    def _get_target_scores(self, idx_batch, scores_batch, target_batch):
        """
        Create the batch of target scores (verb/noun) 
        used for fitting the model.
        """
        scores_target_batch = []
        
        for idx, scores, target in zip(idx_batch, scores_batch, target_batch):
            scores_target = np.copy(scores)
            scores_target[0, idx] = target
            scores_target_batch.append(scores_target)
        
        scores_target_batch = tf.constant(scores_target_batch)
        
        return scores_target_batch
    
    
    def _replay(self):
        """Creat source-target pairs and perform gradient update. """
        
        # Sample a minibatch of transitions from the 
        # experience replay memory and create req. lists
        # to perform nececessary operations on.
        
        if len(self.replay_memory) < self.min_replay_memory_size:
            return
        
        minibatch = self.replay_memory.sample(self.minibatch_size)
        
        state_batch           = [data.state for data in minibatch]        
        v_idx_batch           = [data.v_idx for data in minibatch]        
        n_idx_batch           = [data.n_idx for data in minibatch]        
        reward_batch          = [data.reward for data in minibatch]
        discovery_bonus_batch = [data.discovery_bonus for data in minibatch]
        next_state_batch      = [data.next_state for data in minibatch]        
        done_batch            = [data.done for data in minibatch]
        
        # Get the batch of max. verb and noun scores corresponding
        # to the next states.
        max_verb_score_batch, max_noun_score_batch = \
            self._get_max_qs(self.target_model, next_state_batch)
        
        # Get the Q-learning targets for verb scores.
        verb_target_batch = []
        for done, reward, discovery_bonus, max_verb_score in \
        zip(done_batch, reward_batch, discovery_bonus_batch, max_verb_score_batch):
            if done:
                verb_target = (reward + discovery_bonus)
            else:
                verb_target = (reward + discovery_bonus) + (self.discount * max_verb_score)
            
            verb_target_batch.append(verb_target)
        
        # Get the Q-learning targets for noun scores.
        noun_target_batch = []
        for done, reward, discovery_bonus, max_noun_score in \
        zip(done_batch, reward_batch, discovery_bonus_batch, max_noun_score_batch):
            if done:
                noun_target = (reward + discovery_bonus)
            else:
                noun_target = (reward + discovery_bonus) + (self.discount * max_noun_score)
            
            noun_target_batch.append(noun_target)
        
        # Get the verb and noun scores corresponding to batch of 
        # current state.
        verb_scores_batch, noun_scores_batch = \
            self._get_qs(self.main_model, state_batch)   
        
        # IMP: Create the batch of target verb scores.
        verb_scores_target_batch = self._get_target_scores(
            v_idx_batch, verb_scores_batch,
            verb_target_batch
        )
        
        # IMP: Create the batch of target verb scores.
        noun_scores_target_batch = self._get_target_scores(
            n_idx_batch, noun_scores_batch,
            noun_target_batch
        )        
            
        # IMP: Create the batch of source input sequence.
        input_seq_data_batch = self._add_padding(state_batch)        

        # Fit the main model.
        self.main_model.fit(
            input_seq_data_batch,
            [verb_scores_target_batch,noun_scores_target_batch],
            batch_size=self.minibatch_size,
            verbose=False,
            shuffle=False
        )
            
        return
    
    
    def _get_word_id(self, word):
        if word not in self.word2id:
            if len(self.word2id) >= self.max_vocab_size:
                return self.word2id["<UNK>"]
            
            self.id2word.append(word)
            self.word2id[word] = len(self.word2id)
            
        return self.word2id[word]
    
    
    def _tokenize(self, text):
        # Simple tokenizer: strip out all non-alphabetic characters.
        text = re.sub("[^a-zA-Z0-9\- ]", " ", text)
        word_ids = list(map(self._get_word_id, text.split()))
        return word_ids

    
    def _add_padding(self, texts):
        """Add padding to the batch of texts."""
        max_len = max(len(text[0]) for text in texts)
        padded = np.ones((len(texts), max_len)) * self.word2id["<PAD>"]

        for i, text in enumerate(texts):
            padded[i, :len(text[0])] = text
            
        padded_texts = np.copy(padded)
        return padded_texts
    
    
    def _preprocess(self, texts):
        """Tokenize and padding (if req.) the text."""
        texts = list(map(self._tokenize, texts))
        texts = np.array(texts)
        
        return texts

## Method to evaluate performance of agent
Next , we define a method to evaluate performance of a (**random/RL**) agent as it plays any (**text based**) game in (**train/test**) mode.

In [13]:
def evaluate_performance(agent, training_game_config, testing_game_config, training_enabled=False):
    
    # Create textworld object anf get req. info to play game.
    text_world_game_testing = TextWorldGame(testing_game_config)
    testing_game_info = text_world_game_testing.get_game_info()
    
    if training_enabled:
        text_world_game_training = TextWorldGame(training_game_config)
        training_game_info = text_world_game_training.get_game_info()
    else:
        training_game_info = None
    
    stats = agent.play(testing_game_info, training_game_info, training_enabled)
        
    # Stats containing info. related to the diff. epochs.    
    agent_name        = stats["agent_name"]
    testing_game_name = stats["testing_game_name"]
    epochs            = stats["epochs"]
    epochs_avg_reward = stats["epochs_avg_reward"]
    epochs_avg_steps  = stats["epochs_avg_steps"]
    epochs_avg_wins   = stats["epochs_avg_wins"]
    
    plt_path = os.path.join("plots", f"{agent_name}_{testing_game_name}")
    if not os.path.isdir(plt_path):
        os.makedirs(plt_path)
    
    """Plot the learning curves."""
    
    # Fig 1: Avg. reward over all epochs.
    fig = plt.figure(figsize=(10, 6))    
    ax = fig.add_subplot(1, 1, 1)
    ax.set_title(
        f"GAME- {testing_game_name}\n" + \
        f"========================================================\n" + \
        f"Avg. rewards over {epochs} epochs"
    )
    ax.set_xlabel("Epochs")
    ax.set_ylabel("Avg. reward")
    ax.plot(range(epochs), epochs_avg_reward)
    plt.savefig(f"{plt_path}/avg_rewards.pdf")
    plt.show()
    
    # Fig 2: Avg. steps over all epochs.
    fig = plt.figure(figsize=(10, 6))
    ax = fig.add_subplot(1, 1, 1)
    ax.set_title(
        f"GAME- {testing_game_name}\n" + \
        f"========================================================\n" + \
        f"Avg. steps over {epochs} epochs"
    )
    ax.set_xlabel("Epochs")
    ax.set_ylabel("Avg. steps")
    ax.plot(range(epochs), epochs_avg_steps)
    plt.savefig(f"{plt_path}/avg_steps.pdf")
    plt.show()
    
    # Fig 3: Avg. steps over all epochs.
    fig = plt.figure(figsize=(10, 6))
    ax = fig.add_subplot(1, 1, 1)
    ax.set_title(
        f"GAME- {testing_game_name}\n" + \
        f"========================================================\n" + \
        f"Avg. wins over {epochs} epochs"
    )
    ax.set_xlabel("Epochs")
    ax.set_ylabel("Avg. wins")
    ax.plot(range(epochs), epochs_avg_wins)
    plt.savefig(f"{plt_path}/avg_wins.pdf")
    plt.show()
    
    plt.close()
    
    return stats

## Get games for evaluating agent performances##
Here, we obtain the trainig games.

In [14]:
game_config = dict()
game_config["name"] = "coin_collector_l5_easy_2"
game_config["max_steps"] = 50
game_config["path"] = "tw_games/single_games/l5_easy_2/coin_collector_l5_easy_2.ulx"

## Get that agents to play the game ##
Now, we get the **Random Agent** and **RL Agent** to play the coin collectr game having 5 rooms.

### Get random agent ###

In [15]:
# Read random agent config details from config file.
config_file = open("configs/random_agent_config.yaml")
random_agent_config = yaml.safe_load(config_file)
config_file.close()

random_agent = RandomAgent(random_agent_config)

### Get RL agent ###

In [16]:
# Read config details from config file.
config_file = open("configs/rl_agent_config.yaml")
rl_agent_config = yaml.safe_load(config_file)
config_file.close()

rl_agent = RLAgent(rl_agent_config)

## Test random agent on single games ##
Test and evaluate the performance of the random agent on single games.

In [17]:
# random_agent_testing_config = random_agent_config["testing"]
# random_agent_testing_stats = evaluate_performance(
#     agent=random_agent, 
#     testing_game_config=game_config,
#     training_game_config=None, 
#     training_enabled=False
# )

## Test RL agent on single games ##
Test and evaluate the performance of the RL agent on single games.

In [18]:
rl_agent_testing_config = rl_agent_config["testing"]
rl_agent_testing_before_training_stats = evaluate_performance(
    agent=rl_agent, 
    testing_game_config=game_config,
    training_game_config=game_config,
    training_enabled=True
)

  result = entry_point.load(False)
  0%|          | 0/100 [00:00<?, ? epoch/s]



  1%|1         | 1/100 [03:32<5:51:05, 212.78s/ epoch]


Epoch: 0
Train stats
--------------------------------------------
Avg. reward: 0.267
Avg. steps: 47.033
Avg. wins: 0.267
Epsilon: 0.978

Test stats
--------------------------------------------
Avg. reward: 0.000
Avg. steps: 50.000
Avg. wins: 0.000
Epsilon: 0.000


  1%|1         | 1/100 [05:49<9:36:07, 349.17s/ epoch]


KeyboardInterrupt: 