# Reinforce the Ramparts – A Q-learning demonstration on a text-based role-playing game

## Introduction
This code defines a text-based RPG with elements of procedural generation and role-playing game mechanics. The game is built around a concept of the player character progressing through different stages, each of which consists of various randomly generated events. These events include encounters with monsters of varying sizes, finding treasures, resting at bonfires, and more.

The game is simple and largely inspired by the popular roguelike *Slay the Spire*.

## Features
**Character progression**: The player character has a set of stats (Health, Attack, and Defense) which can be improved over the course of the game. These improvements can be gained by finding treasure or defeating monsters.

**Procedural generation**: Each stage of the game is procedurally generated. A stage consists of three events, and each event can be one of six types: Small Monster, Big Monster, Elite Monster, Treasure, Bonfire, or Random.

**Combat**: Encounters with monsters lead to combat situations where the player's and monster's stats determine the outcome. The combat system is turn-based, and the outcome depends on the HP, Attack and Defense stats of the player and the monster.

## Events

**Small Monster, Big Monster and Elite Monster**: These events initiate a combat scenario. Defeating a monster also results in a stat increase for the player proportional to the difficulty of the monster.

**Treasure**: The player finds a treasure chest which can be empty or contain equipment that increases the player's stats.

**Bonfire**: Allows the player to rest and restore health.

**Random**: This event will randomly trigger one of the other events (Small Monster, Big Monster, Treasure, Bonfire), or can result in nothing happening.

**Boss**: The final stage consists of a difficult boss fight.

## Game flow
The game flow consists of the player choosing an event from the current stage to engage with. The event resolution will result in changes to the player's stats and can even lead to the end of the game if the player's health drops to zero.

The game continues until the player either defeats the boss at the final stage or the player character dies during a combat event.

Finally, the game environment provides functions for getting the current state of the game (observations) and the possible actions at each stage. This setup makes the game suitable for training and testing reinforcement learning algorithms, as the agent can receive feedback (reward) based on the actions it takes in the game.

In [1]:
import random

class RampartsEnvironment:
    
    def __init__(self, verbose=False):
        '''
        Initializes the game environment.
        
        events: List of events in the game
        SMALL_EXPERIENCE = 1: Experience for small events (small treasure)
        MEDIUM_EXPERIENCE = 3: Reward for medium events (small monster)
        LARGE_EXPERIENCE = 5: Reward for large events (big monster)
        GREAT_EXPERIENCE = 10: Reward for great events (elite monster)
        N_EVENTS_PER_STAGE: Number of events per stage
        N_STAGES: Number of stages in the game before encountering the boss
        history: A dictionary containing the observation and action taken at each stage
        verbose: If a human is playing this will toggle the text output for the game
        '''
        self.default_player_stats = [100, 20, 10]
        self.events = ["Small Monster", "Big Monster", "Elite Monster", "Treasure", "Bonfire", "Random", "Boss"]
        self.SMALL_EXPERIENCE = 1
        self.MEDIUM_EXPERIENCE = 3
        self.LARGE_EXPERIENCE = 5
        self.GREAT_EXPERIENCE = 10
        self.N_EVENTS_PER_STAGE = 3
        self.N_STAGES = 15 - 1 # Final stage reserved for boss
        self.verbose = verbose
    
    # Game methods
    def create_stage(self):
        '''
        Generates a new stage in the game.
        
        Returns: a list of random events for the current stage (boss not included)
        '''
        return [random.choice(self.events[:-1]) for _ in range(self.N_EVENTS_PER_STAGE)]
    
    def level_up(self, experience):
        '''
        Increases the stats of the player up to a limit.
        
        Params:
        - experience: The amount of experience to increase levels by
        '''
        STAT_MAX = 50
        
        self.round_exp = experience
        
        for _ in range(experience):
            
            attack = self.player['Att']
            defence = self.player['Def']
            
            # If maximum values for stats exceeded
            if (attack >= STAT_MAX) and (defence >= STAT_MAX):
                break
            
            # If max attack exceeded give to defence
            if attack >= STAT_MAX:
                self.player['Def'] += 1
                
            # If max defence exceeded give to attack
            elif defence >= STAT_MAX:
                self.player['Att'] += 1
            
            stat = random.randint(1, 2)

            if stat == 0:
                key = 'HP'
            elif stat == 1:
                key = 'Att'
            elif stat == 2:
                key = 'Def'
                
            self.player[key] += 1
        
    def combat(self, enemy):
        '''
        Simulates a combat between player and enemy reducing health from player as combat unfolds.
        
        Params:
        - enemy: The enemy to fight against
        '''
        def hit(attacker, defender):
            damage = max(1, attacker['Att'] - defender['Def'])
            return damage

        player_turn = True
        while True:
            if self.player['HP'] <= 0:
                self.is_dead = True
                return None
            elif enemy['HP'] <= 0:
                return None
            
            if player_turn:
                damage = hit(self.player, enemy)
                enemy['HP'] -= damage
            else:
                damage = hit(enemy, self.player)
                self.player['HP'] -= damage
            player_turn = not player_turn
            
    def event_small_monster(self):
        def create_small_monster():
            '''
            Generates a small monster with a random variant.

            Returns: a dictionary representing the monster
            '''
            monster = {'HP': 35, 'Att': 16, 'Def': 10}
            variant = random.randint(0, 3)
            if variant == 0:
                if self.verbose: print("Goblin!")
            elif variant == 1:
                if self.verbose: print("Wraith!")
                monster['Att'] += 4
                monster['Def'] -= 1
            elif variant == 2:
                if self.verbose: print("Crab!")
                monster['Att'] -= 1
                monster['Def'] += 5
            elif variant == 3:
                if self.verbose: print("Slime!")
                monster['HP'] += 20
            return monster
        
        self.combat(create_small_monster())
        if self.is_dead is not True:
            self.level_up(self.MEDIUM_EXPERIENCE)

    def event_big_monster(self):
        def create_big_monster():
            '''
            Generates a big monster with a random variant.

            Returns: a dictionary representing the monster
            '''
            monster = {'HP': 80, 'Att': 22, 'Def': 13}
            variant = random.randint(0,3)
            if variant == 0:
                if self.verbose: print("Maruauder!")
            elif variant == 1:
                if self.verbose: print("Assassin!")
                monster['HP'] -= 10
                monster['Att'] += 8
                monster['Def'] -= 3
            elif variant == 2:
                if self.verbose: print("Golem!")
                monster['Att'] -= 3
                monster['Def'] += 4
            elif variant == 3:
                if self.verbose: print("Ogre!")
                monster['Att'] -= 2
                monster['Def'] -= 2
                monster['HP'] += 25
            return monster
        self.combat(create_big_monster())
        if self.is_dead is not True:
            self.level_up(self.LARGE_EXPERIENCE)
            
    def event_elite_monster(self):
        def create_elite_monster():
            '''
            Generates an elite monster with a random variant.

            Returns: a dictionary representing the monster
            '''
            monster = {'HP': 120, 'Att': 25, 'Def': 18}
            variant = random.randint(0,3)
            if variant == 0:
                if self.verbose: print("Minotaur!")
            elif variant == 1:
                if self.verbose: print("Lich!")
                monster['HP'] -= 30
                monster['Att'] += 8
                monster['Def'] -= 5
            elif variant == 2:
                if self.verbose: print("Stoneguard!")
                monster['Att'] -= 5
                monster['Def'] += 5
            elif variant == 3:
                if self.verbose: print("Juggernaut!")
                monster['Att'] -= 3
                monster['Def'] -= 3
                monster['HP'] += 80
            return monster
        self.combat(create_elite_monster())
        if self.is_dead is not True:
            self.level_up(self.GREAT_EXPERIENCE)
            
    def event_boss(self):
        def create_boss():
            '''
            Generates the boss with predefined stats.

            Returns: a dictionary representing the boss
            '''
            if self.verbose: print("Your path ends here, before you stands a towering dragon!")
            boss = {'HP': 500, 'Att': 30, 'Def': 25}
            return boss
        self.combat(create_boss())
        if self.is_dead is not True:
            self.boss_killed = True

    def event_treasure(self):
        if self.verbose: print("You stumble upon a treasure chest!")
        quality = random.randint(1, 10)
        if quality <= 2:
            if self.verbose: print("It's empty!")
        elif quality <= 7:
            if self.verbose: print("You find some usable equipment!")
            self.level_up(self.SMALL_EXPERIENCE)
        elif quality <= 9:
            if self.verbose: print("You find some great equipment!")
            self.level_up(self.MEDIUM_EXPERIENCE)
        elif quality == 10:
            if self.verbose: print("You find some excellent equipment!")
            self.level_up(self.LARGE_EXPERIENCE)

    def event_bonfire(self):
        if self.verbose: print("You rest at the bonfire restoring health.")
        if self.player['HP'] < 60:
            self.big_heal = True
        self.player['HP'] = min(self.player['HP'] + 50, 100)

    def event_random(self):
        if random.randint(0, 2) == 0:
            if self.verbose: print("Nothing happens!")
        else:
            event = random.choice(self.events[:-1])
            if self.verbose: print(event)
            self.event_manager(event)
            
    def event_manager(self, event):
        event_func = {
            'Small Monster': self.event_small_monster,
            'Big Monster': self.event_big_monster,
            'Elite Monster': self.event_elite_monster,
            'Treasure': self.event_treasure,
            'Bonfire': self.event_bonfire,
            'Random': self.event_random,
            'Boss': self.event_boss,
        }
        event_func[event]()
    
    
    # Auxiliary methods
    def print_game_state(self):
        '''
        Prints the current game state for a human playing the game.
        '''
        print("Stage", self.current_stage_index + 1)
        print("Current:", self.current_stage)
        print(self.player)    
        
    def int_action_to_str(self, action):
        '''
        Converts numeric action to string equivalent, essentially functions as a wrapper.

        Params:
        - action: The numeric action to be converted. It could be any of 0, 1, 2, 3, 4, 5 representing "Small Monster", "Big Monster", "Treasure", "Bonfire", "Random", "Boss" respectively.

        Raises an exception if the action is invalid.
        '''
        try:
            return self.events[action]
        except IndexError:
            raise Exception("Invalid choice.")
            
    def str_action_to_int(self, action):
        '''
        Converts string action to integer equivalent, essentially functioning as a wrapper.

        Params:
        - action: The string action to be converted. It could be any of "Small Monster", "Big Monster", "Treasure", "Bonfire", "Random", "Boss" representing 0, 1, 2, 3, 4, 5 respectively.

        Raises an exception if the action is invalid.
        '''
        try:
            return self.events.index(action)
        except ValueError:
            raise Exception("Invalid choice.")

    def stage_names_to_indices(self, stage):
        '''
        Converts stage event names to numeric indices, functions as a wrapper for the environment.

        Params:
        - stage: A list containing the names of the events in the current stage

        Returns: A list of indices corresponding to the event names
        '''
        try:
            return [self.events.index(event) for event in stage]
        except ValueError:
            raise Exception("Invalid event entered. Not present in current stage.")
    
    def play_human(self):
        '''
        Run a game loop where a human player can play the game, using standard input/output.
        '''
        self.reset()

        while not self.is_done():
            self.print_game_state()
            choice = input("Enter your choice:")
            try:
                choice = self.str_action_to_int(choice)
                self.action(choice)
            except Exception as e:
                print(e)
                

    # RL methods
    def calculate_reward(self):
        '''
        Calculates the reward based on the player's stats and progress in the game.
        
        Returns: The calculated reward
        '''
        
        # Reward bonuses/penalties
        victory_bonus = 500
        death_penalty = -3
        
        # Negative reward if our agent dies
        if self.is_dead is not True:
            death_penalty = 0
            
        if self.big_heal is True:
            heal_bonus = ((self.player['Def']-10)+(self.player['Att']-20))/3
        else:
            heal_bonus = 0
        
        # Reward bonuses for stat increases and penalty for losing HP
        health_bonus = (self.player['HP'] - self.default_player_stats[0])/50
        stat_bonus = (self.round_exp)**1.5
        
        # Large bonus for defeating the boss 
        if self.boss_killed == True:
            boss_killed_bonus = victory_bonus
        else:
            boss_killed_bonus = 0

        reward = health_bonus + stat_bonus + death_penalty + heal_bonus + boss_killed_bonus
        return reward    
    
    def reset(self):
        '''
        Resets the game state for a new game.
        
        Returns: Initial game observations
        '''
        # Reset history
        self.history = {}
        
        # Reward helpers
        self.round_exp = 0
        self.big_heal = False
        
        # Reset flags
        self.boss_killed = False
        self.is_dead = False
        
        # Reset player
        self.player = {'HP': self.default_player_stats[0],
                       'Att': self.default_player_stats[1],
                       'Def': self.default_player_stats[2]}
        
        # Generate new stages
        self.stages = [self.create_stage() for _ in range(self.N_STAGES)]
        self.stages.append(["Boss", "Boss", "Boss"])
        
        # Get current stage
        self.current_stage_index = 0
        self.current_stage = self.stages[self.current_stage_index]
        
        return self.get_observations()
    
    def get_observations(self) -> list:
        '''
        Returns the current game state as a list of numeric values.
        
        Format:
        [choice 1, choice 2, choice 3, (next choice 1, next choice 2, next choice 3), 
         remaining stages, current hp, current att, current def]
         Next choices 1, 2 and 3 will not be returned unless using the look ahead config.
        '''
        current_stage_events = self.stage_names_to_indices(self.current_stage)
        current_stage_number = [self.current_stage_index + 1]
        current_stats = [i for i in self.player.values()]
        
        return current_stage_events + current_stage_number + current_stats
        
    def get_actions(self) -> list:
        '''
        Returns the possible actions in the current stage.
        
        Returns: A list of numeric values representing actions
        '''
        
        return self.stage_names_to_indices(self.current_stage)
    
    def is_done(self) -> bool:
        '''
        Checks if the game has ended.
        
        Returns: True if the game has ended, else False
        '''
        return self.is_dead or self.boss_killed
    
    def action(self, action: int) -> float:
        '''
        Executes an action and returns the reward.

        Params:
        - action: The action to execute

        Returns: The reward after executing the action

        Raises an exception if the game is over or the action is invalid.
        '''
        if self.is_done():
            raise Exception("Game is over.")

        # Get observations before action for history
        obs = self.get_observations()
            
        # Converts numeric action to text choice
        choice = self.int_action_to_str(action)
        
        if choice not in self.current_stage:
            raise Exception(f"{choice} is an Invalid selection in the stage: {self.current_stage}")
            
        # Perform our action in the environment
        self.event_manager(choice)
        
        # Consider the reward as the difference between the current and previous game state
        # otherwise reward will implicitly accumulate, obfuscating good decision making
        reward = self.calculate_reward()
        
        # Print game information
        if (self.current_stage_index == self.N_STAGES) and (self.is_dead is not True):
            if self.verbose: print("You win!")
        elif self.is_dead == True:
            if self.verbose: print("You died!")
        
        # Update game information
        self.current_stage_index += 1
        if self.current_stage_index <= self.N_STAGES: 
            self.current_stage = self.stages[self.current_stage_index]
        self.round_exp = 0
        self.big_heal = False
        
        # Update history regardless of the game's end status
        self.history[self.current_stage_index] = (obs, action, reward)
        return reward

    def action_sample(self) -> int:
        '''
        Returns: a random action from the action space.
        '''
        return random.choice(env.get_actions())

# You can play the game:
Run this code to play the game yourself.

To play, type the name of the path you want to take:

In [2]:
# env = RampartsEnvironment(verbose=True)
# env.play_human()

# The Random Agent
Let's make an AI agent that plays the game by randomly choosing one of the three available actions.

In [3]:
class RandomAgent:
    def __init__(self):
        '''
        Initializes a random agent. This agent takes actions randomly and doesn't learn from the environment.
        '''
        self.total_reward = 0.0
        self.cleared_stages = 0
        
    def step(self, env: RampartsEnvironment):
        '''
        Take a step in the environment.
        
        This function gets the current state and available actions from the environment, 
        selects a random action, and calculates the reward for this action.
        
        Parameters:
        - env: The environment in which the agent is taking a step.
        '''
        # Get observations from environment to make decision with (not used with random agent)
        current_obs = env.get_observations()
        
        # Get available actions and choose a random one 
        actions = env.get_actions()
        action = random.choice(actions)
        
        # Calculate reward for this action and add to agent's total
        reward = env.action(action)
        self.total_reward += reward
        
        # Add 1 to number of cleared stages if agent doesn't die
        if (env.is_dead == False):
            self.cleared_stages += 1
            
    def reset(self):
        self.total_reward = 0.0
        self.cleared_stages = 0


env = RampartsEnvironment()
env.reset()

agent = RandomAgent()

while not env.is_done():
    agent.step(env)


### History
The `history` dictionary keeps track of the episode just played, using the stage number as the key. Each key's value is a list that consists of the following elements:

- Key (int): Stage number

- Value (list): A list containing three elements:

    - Element 1 (list): Observations. This is a list containing the state observations at the respective stage.
    - Element 2 (int): Action taken. This is an integer representing the action that was chosen during the respective stage.
    - Element 3 (float): Reward. This is a float value that represents the reward gained from the action taken at the respective stage.


In [4]:
print("Total reward obtained by random agent:", agent.total_reward)
env.history

Total reward obtained by random agent: -5.1


{1: ([1, 4, 4, 1, 100, 20, 10], 4, 0.0),
 2: ([3, 5, 2, 2, 100, 20, 10], 2, -5.1)}

## 10,000 random agents
Let's run 10,000 episodes with the random agent to observe its performance. We will do so by looking at the number of stages cleared by each agent on a histogram.

In [5]:
def agent_batch(env, agent_, n_episodes):
    '''
    Runs multiple episodes with the given agent and environment and returns the number of stages cleared in each episode.
    
    Parameters:
    - env: The environment in which the agent is acting.
    - agent: The agent that's interacting with the environment.
    - n_episodes: The number of episodes to run.
    
    Returns:
    - stages_cleared_list: A list containing the number of stages cleared in each episode.
    '''
    stages_cleared_list = []
    
    for _ in range(n_episodes):
        env.reset()
        agent.reset()
        while not env.is_done():
            agent.step(env)
        stages_cleared_list.append(agent.cleared_stages)

    return stages_cleared_list


import plotly.graph_objects as go
import numpy as np
from plotly.offline import init_notebook_mode, iplot
init_notebook_mode(connected=True)

def plot_performance(stages_cleared, agent_name="Agent"):
    '''
    Plots the performance of an agent by showing a histogram of the number of stages cleared.
    
    Parameters:
    - stages_cleared: A list containing the number of stages cleared in each episode.
    - agent_name: The name of the agent (used in the plot title).
    '''
    
    # Colors used in plotting
    gray = "#d4dce3"
    green = "#69d6b3"

    # Create histogram frequencies and bin edges
    frequencies, bin_edges = np.histogram(stages_cleared, bins=len(set(stages_cleared)))

    # Create histogram trace for all but the last bin
    trace1 = go.Histogram(
        x=stages_cleared,
        nbinsx=len(set(stages_cleared)),
        marker=dict(color=gray, line=dict(width=1, color='black'), opacity=0.7),
        name='Failure'
    )

    # Create histogram trace for the last bin
    trace2 = go.Histogram(
        x=[bin_edges[-1]]*frequencies[-1],
        nbinsx=1,
        marker=dict(color=green, line=dict(width=1, color='black'),  opacity=0.7),
        name='Success'
    )

    # Create figure
    fig = go.Figure(data=[trace1, trace2])

    fig.update_layout(
        template="plotly",
        barmode="overlay",
        title=agent_name + " Performance",
        xaxis_title="Stages Cleared",
        yaxis_title="Count",
        showlegend=False
    )
    fig.show()
    
env = RampartsEnvironment()
n_episodes = 10000
random_agent = RandomAgent()

stages_cleared_list_random = agent_batch(env, random_agent, n_episodes)
plot_performance(stages_cleared_list_random, "Random Agent")

#### Observations:
- Unsurprisingly this agent performs poorly.
- It frequently gets itself killed too early and likely only wins the game due to dumb luck.
- It beats the game ~2% of the time.

# The Cautious Agent
This agent is not going to get itself killed by trying to fight big monsters before stage 5, and will not fight elite monsters before stage 10.

In [6]:
class CautiousAgent:
    def __init__(self):
        self.total_reward = 0.0
        self.cleared_stages = 0
        
    def select_action(self, env):
        '''
        Selects an action for the agent to take in the environment, given the current observations.
        The agent follows a simple rule-based policy:
            - If on stage 10 or less and there is an option to fight an elite monster, discard this option unless it's the only action.
            - If on stage 5 or less and there is an option to fight a big monster, discard this option unless it's the only action.
        If neither condition applies, choose a random action.

        Parameters:
        - env: The environment in which the agent is acting.

        Returns:
        - The selected action (as an integer).
        '''
        # Get available actions
        actions = env.get_actions()
        
        # Get observations from environment to make decision with
        observations = env.get_observations()
        
        # Use current stage number as it's more readable than number of remaining stages
        current_stage_number = observations[3]
        
        # If on stage 10 or less and we have the option to fight an elite monster,
        # discard the action. If it's the only action however, fight it.
        elite_monster_action_index = env.str_action_to_int("Elite Monster")
        if (current_stage_number < 11) and (elite_monster_action_index in actions):
            for action in actions:
                if action == elite_monster_action_index:
                    actions.remove(elite_monster_action_index)
            if actions == []:
                return elite_monster_action_index
        
        # If on stage 5 or less and we have the option to fight a big monster,
        # discard the action. If it's the only action however, fight it.
        big_monster_action_index = env.str_action_to_int("Big Monster")
        if (current_stage_number < 6) and (big_monster_action_index in actions):
            for action in actions:
                if action == big_monster_action_index:
                    actions.remove(big_monster_action_index)
            if actions == []:
                return big_monster_action_index
            
        # Else return a random choice
        return random.choice(actions)
        
        
    def step(self, env: RampartsEnvironment):
        '''
        Take a step in the environment.
        '''
        # Get available actions and choose a random one 
        action = self.select_action(env)
        
        # Calculate reward for this action and add to agent's total
        reward = env.action(action)
        self.total_reward += reward
        
        # Add 1 to number of cleared stages if agent doesn't die
        if (env.is_dead == False):
            self.cleared_stages += 1
            
    def reset(self):
        self.total_reward = 0.0
        self.cleared_stages = 0
    

env = RampartsEnvironment()
env.reset()

agent = CautiousAgent()

while not env.is_done():
    agent.step(env)
    
print("Total reward obtained by cautious agent:", agent.total_reward)
env.history

Total reward obtained by cautious agent: 27.032984620411163


{1: ([4, 2, 1, 1, 100, 20, 10], 4, 0.0),
 2: ([0, 1, 5, 2, 100, 20, 10], 0, 4.836152422706632),
 3: ([5, 3, 1, 3, 82, 22, 11], 3, 0.64),
 4: ([3, 4, 4, 4, 82, 23, 11], 3, 0.64),
 5: ([1, 4, 0, 5, 82, 23, 12], 0, 4.596152422706632),
 6: ([4, 2, 5, 6, 70, 24, 14], 4, 0.0),
 7: ([2, 3, 2, 7, 100, 24, 14], 3, 11.180339887498949),
 8: ([1, 1, 3, 8, 100, 26, 17], 1, 10.14033988749895),
 9: ([0, 2, 2, 9, 48, 29, 19], 2, -5.0)}

## 10,000 cautious agents
Let's also observe the performance of the cautious agents to see how it compares to random behaviour.

In [7]:
env = RampartsEnvironment()
n_episodes = 10000
cautious_agent = CautiousAgent()

stages_cleared_list_cautious = agent_batch(env, cautious_agent, n_episodes)
    
plot_performance(stages_cleared_list_cautious, "Cautious Agent")

#### Observations:
- The cautious agents performed significantly better than the random ones.
- Many more cautious agents reached the final two stages, but a large number couldn't defeat the final boss.
- This may be because their risk averse behaviour sometimes starves them of much needed exp.

# The Bold Agent
Since the cautious agent had a far better performance than the random agent, let's retain its cautious behaviour, and also make it always fight big and elite monsters when they're available after stages 6 and 11 respectively. This may give our agent the exp boost needed to push on through to the final stage.

In [8]:
class BoldAgent:
    def __init__(self):
        self.total_reward = 0.0
        self.cleared_stages = 0
        
    def select_action(self, env):
        '''
        Selects an action for the agent to take in the environment.
        '''
        # Get available actions
        actions = env.get_actions()
        
        # Get observations from environment to make decision with
        observations = env.get_observations()
        
        # Use current stage number as it's more readable than number of remaining stages
        current_stage_number = observations[3]
        
        # If on stage 11 or less and we have the option to fight an elite monster,
        # discard the action. If it's the only action however, fight it.
        elite_monster_action_index = env.str_action_to_int("Elite Monster")
        if (current_stage_number < 11) and (elite_monster_action_index in actions):
            for action in actions:
                if action == elite_monster_action_index:
                    actions.remove(elite_monster_action_index)
            if actions == []:
                return elite_monster_action_index
        
        # If on stage 5 or less and we have the option to fight a big monster,
        # discard the action. If it's the only action however, fight it.
        big_monster_action_index = env.str_action_to_int("Big Monster")
        if (current_stage_number < 6) and (big_monster_action_index in actions):
            for action in actions:
                if action == big_monster_action_index:
                    actions.remove(big_monster_action_index)
            if actions == []:
                return big_monster_action_index
            
        # If on stages 11, 12 or 13 and there's an elite monster available to fight, fight it.
        if (current_stage_number >= 11) and (current_stage_number < 14) and (elite_monster_action_index in actions):
            return elite_monster_action_index
        
        # If on stage 6 or later and there's a big monster available to fight, fight it.
        if (current_stage_number >= 6) and (big_monster_action_index in actions):
            return big_monster_action_index
        
        # Else return a random choice
        return random.choice(actions)
        
        
    def step(self, env: RampartsEnvironment):
        '''
        Take a step in the environment.
        '''
        # Get available actions and choose a random one 
        action = self.select_action(env)
        
        # Calculate reward for this action and add to agent's total
        reward = env.action(action)
        self.total_reward += reward
        
        # Add 1 to number of cleared stages if agent doesn't die
        if (env.is_dead == False):
            self.cleared_stages += 1
    
    def reset(self):
        self.total_reward = 0.0
        self.cleared_stages = 0

env = RampartsEnvironment()
env.reset()

agent = BoldAgent()

while not env.is_done():
    agent.step(env)
    
print("Total reward obtained by bold agent:", agent.total_reward)
env.history

Total reward obtained by bold agent: -0.4838475772933677


{1: ([5, 0, 2, 1, 100, 20, 10], 0, 4.596152422706632),
 2: ([0, 1, 5, 2, 70, 21, 12], 5, 0.0),
 3: ([5, 3, 2, 3, 100, 21, 12], 5, 0.0),
 4: ([4, 0, 5, 4, 100, 21, 12], 5, -5.08)}

## 10,000 bold agents

In [9]:
env = RampartsEnvironment()
n_episodes = 10000
bold_agent = BoldAgent()

stages_cleared_list_bold = agent_batch(env, bold_agent, n_episodes)
plot_performance(stages_cleared_list_bold, "Bold Agent")

#### Observations:
- Fewer agents reach the final two stages, but more agents are clearing the final boss now, indicating they have enough experience to clear the final boss now.
- We can see spikes of deaths at the stage 5 mark where agents are attempting big monster fights and also a spike at the stage 10 mark where the agents are attempting the elite monsters.

# The Sensible Agent:
Let's introduce an agent that will act similarly to the bold agent, only he will always rest at a bonfire if he's got under 60 health and will avoid choosing the 'random' event unless he has no choice. This agent uses a fairly sensible strategy and will make decisions similarly to most humans playing the game.

In [10]:
class SensibleAgent:
    def __init__(self):
        self.total_reward = 0.0
        self.cleared_stages = 0
        
    def select_action(self, env):
        '''
        Selects an action for the agent to take in the environment.
        '''
        # Observations format for reference
        obs_key = {'choice 1': 0, 'choice 2': 1, 'choice 3': 2, 
                            'current stage': 3, 
                            'current hp': 4, 'current att': 5, 'current def': 6}
        
        
        # Get available actions
        actions = env.get_actions()
        
        # Get observations from environment to make decision with
        observations = env.get_observations()
        
        # Use current stage number as it's more readable than number of remaining stages
        current_stage_number = observations[3]
        
        # Getting hp from observations for bonfire
        hp = observations[4]
        
        # If health is below 60, rest at bonfire.
        bonfire_action_index = env.str_action_to_int("Bonfire")
        if (hp < 60) and (bonfire_action_index in actions):
            return bonfire_action_index
        
        # If on stage 11 or less and we have the option to fight an elite monster,
        # discard the action. If it's the only action however, fight it.
        elite_monster_action_index = env.str_action_to_int("Elite Monster")
        if (current_stage_number < 11) and (elite_monster_action_index in actions):
            for action in actions:
                if action == elite_monster_action_index:
                    actions.remove(elite_monster_action_index)
            if actions == []:
                return elite_monster_action_index
        
        # If on stage 5 or less and we have the option to fight a big monster,
        # discard the action. If it's the only action however, fight it.
        big_monster_action_index = env.str_action_to_int("Big Monster")
        if (current_stage_number < 6) and (big_monster_action_index in actions):
            for action in actions:
                if action == big_monster_action_index:
                    actions.remove(big_monster_action_index)
            if actions == []:
                return big_monster_action_index
            
        # If on stages 11, 12 or 13 and there's an elite monster available to fight, fight it.
        if (current_stage_number >= 11) and (current_stage_number < 14) and (elite_monster_action_index in actions):
            return elite_monster_action_index
        
        # If on stage 6 or later and there's a big monster available to fight, fight it.
        if (current_stage_number >= 6) and (big_monster_action_index in actions):
            return big_monster_action_index
        
        # If on stage 10 or less, don't choose the 'Random' action unless it's our only choice.
        random_action_index = env.str_action_to_int("Random")
        if (current_stage_number < 11) and (random_action_index in actions):
            for action in actions:
                if action == random_action_index:
                    actions.remove(random_action_index)
            if actions == []:
                return random_action_index
        
        # Else return a random choice
        return random.choice(actions)
        
        
    def step(self, env: RampartsEnvironment):
        '''
        Takes a step in the environment.
        '''
        # Get available actions and choose a random one 
        action = self.select_action(env)
        
        # Calculate reward for this action and add to agent's total
        reward = env.action(action)
        self.total_reward += reward
        
        # Add 1 to number of cleared stages if agent doesn't die
        if (env.is_dead == False):
            self.cleared_stages += 1
            
    def reset(self):
        self.total_reward = 0.0
        self.cleared_stages = 0

env = RampartsEnvironment()
env.reset()

agent = SensibleAgent()

while not env.is_done():
    agent.step(env)
    
print("Total reward obtained by sensible agent:", agent.total_reward)
env.history

Total reward obtained by sensible agent: 71.16242788876163


{1: ([4, 0, 0, 1, 100, 20, 10], 0, 4.596152422706632),
 2: ([3, 4, 2, 2, 70, 21, 12], 4, 0.0),
 3: ([1, 5, 5, 3, 100, 21, 12], 5, 0.0),
 4: ([4, 4, 2, 4, 100, 21, 12], 4, 0.0),
 5: ([1, 0, 0, 5, 100, 21, 12], 0, 4.956152422706632),
 6: ([1, 3, 0, 6, 88, 23, 13], 1, 9.82033988749895),
 7: ([4, 2, 5, 7, 32, 28, 13], 4, 3.3066666666666666),
 8: ([2, 4, 4, 8, 82, 28, 13], 4, 0.0),
 9: ([2, 5, 5, 9, 100, 28, 13], 5, 0.0),
 10: ([0, 5, 5, 10, 100, 28, 13], 5, 10.16033988749895),
 11: ([2, 1, 4, 11, 49, 31, 15], 4, 5.3133333333333335),
 12: ([1, 5, 2, 12, 99, 31, 15], 2, 30.202776601683794),
 13: ([2, 4, 1, 13, 29, 36, 20], 4, 8.246666666666666),
 14: ([5, 5, 0, 14, 79, 36, 20], 5, -0.42),
 15: ([6, 6, 6, 15, 79, 36, 20], 6, -5.02)}

## 10,000 sensible agents

In [11]:
env = RampartsEnvironment()
n_episodes = 10000
sensible_agent = SensibleAgent()

stages_cleared_list_sensible = agent_batch(env, sensible_agent, n_episodes)
    
plot_performance(stages_cleared_list_sensible, "Sensible Agent")

#### Observations:
- A massive improvement, over 20% of agents are now able to clear the game.
- Utilizing bonfires and avoiding randomness have given the agents a massive boost in performance.
- We can still see small death spikes at the 5th and 10th rounds as before.

# The Q-Agent 
The Q-agent will utilize Q-learning to find an optimal policy by which he can make the best action. This approach should be able to outperform any human in the long run.

## Q-table size
First let's consider the size of our observation space:

- Observations (list): A list containing 7 elements:

    - Choice 1 (int): One of 7 possible events, though the boss only appears at the end.
    - Choice 2 (int): One of 7 possible events, though the boss only appears at the end.
    - Choice 3 (int): One of 7 possible events, though the boss only appears at the end.
    - Current stage (int): Stage number, ranging between 1 and 15 
    - Current hp (int): One of 100 possible values 
    - Current att (int): One of 41 possible values
    - Current def (int): One of 51 possible values
    
This is a total of: $7 * 7 * 7 * 15 * 100 * 41 * 51 = 1,075,819,500$ possible states.

This is a little large. Since currently assume we have $7^3$ possible action choices at the start, which is a gross overestimate, let's try and reduce this number a little:

In [12]:
from itertools import combinations_with_replacement

actions = ["Small Monster", "Big Monster", "Elite Monster", "Treasure", "Bonfire", "Random"]

# Generate all combinations with replacement, then turns each into a set
possible_stages = list(combinations_with_replacement(actions, 3))
reduced_stages = [sorted(list(set(stage))) for stage in possible_stages]

def str_stage_to_int(stage):
    stage = sorted(list(stage))
    return reduced_stages.index(stage)

def int_stage_to_str(i):
    return reduced_stages[i]

# Test the functions
print("Converting actions to integer:", str_stage_to_int(['Big Monster', 'Bonfire', 'Small Monster'])) # => some integer
print("Converting integer to actions:", int_stage_to_str(6)) # => ('Small Monster', 'Small Monster', 'Small Monster')
print(len(reduced_stages) + 1, "possible stages")

Converting actions to integer: 9
Converting integer to actions: ['Big Monster', 'Small Monster']
57 possible stages


#### Thoughts:
This method will allow us to reduce the first 3 observations (space size of $7^3 = 343$) down to $57$. 

This allows for an 84% reduction in the size of our Q-table.

## State aggregation
The purpose of state aggregation is to group similar states together. Decisions made by our agent with 90 hp are unlikely to differ much from decisions made with 91 hp, yet we are considering thousands of possible variations for each one.

### Current stage index
We can almost infer the current stage index just by looking at our player's stats, so this number is not particularly useful until we reach the later stages and want to know when the boss is coming right up.

The encoding will be as follows:
1 to 12 will be encoded as 0 (early)
13 to 15 will be encoded as 1 (boss imminent)

### Stats
We will perform slightly different aggregating for each stat.
- **HP** we will divide by 5 and round to the nearest number (minimum 1, cardinality of 20)
- **Attack** we will subtract 20, divide by 2 and round to the nearest number (minimum 1, cardinality of 15)
- **Defence** we will subtract 10, divide by 2 and round to the nearest number (minimum 1, cardinality of 20)

### Reduced observation list:
- Observations (list): A list containing 5 elements:

    - Actions index (int) a number between 0 and 55 representing the current possible actions.
    - Current stage (int): 1 for boss imminent, 0 otherwise
    - Current hp (int): One of 20 possible values 
    - Current att (int): One of 15 possible values
    - Current def (int): One of 20 possible values
    
This is a total of: $56 * 2 * 20 * 15 * 20 = 672,000$ possible states.

We've reduced the size of our observation space from 1,075,819,500 down to 672,000; over a 99.9% size reduction.

In [13]:
class QAgent:
    def __init__(self):
        '''
        Initialize the QAgent class.
        The agent has a list of possible actions and generates all possible combinations of 
        these actions, which represent the possible stages the agent can be in.
        '''
        self.actions = ["Small Monster", "Big Monster", "Elite Monster", "Treasure", "Bonfire", "Random"]

        # Generate all combinations with replacement, then turn each into a set.
        # These represent the possible stages the agent can be in.
        possible_stages = list(combinations_with_replacement(actions, 3))
        self.reduced_stages = [sorted(list(set(stage))) for stage in possible_stages]
    
    def int_stage_to_str(self, stages):
        '''
        Converts integer representations of stages back to their string counterparts.

        Parameters:
        - stages: List of integer representations of stages.

        Returns:
        - List of string representations of stages.
        '''
        if stages[0] == len(self.actions):
            return ["Boss"]*3
        
        strings = []
        for stage in stages:
            strings.append(self.actions[stage])
            
        return strings
    
    def str_stage_to_int(self, stage):
        '''
        Converts string representations of stages to their integer counterparts.

        Parameters:
        - stage: List of string representations of stages.

        Returns:
        - Integer representation of the stage.
        '''
        stage = sorted(list(set(stage)))
        if stage == ["Boss"]:
            return len(self.reduced_stages) # The 'boss' index is the final element
        return self.reduced_stages.index(stage)
    
    def observation_wrapper(self, obs):
        '''
        Wraps the observations given by the environment for use in the agent's decision-making.

        Parameters:
        - obs: The raw observations from the environment.

        Returns:
        - Wrapped observations that can be used in the agent's decision-making.
        '''
        # Convert choices from integer to string representation
        choices = self.int_stage_to_str([obs[0], obs[1], obs[2]])
        
        # Convert the choices back to integer form after grouping similar ones
        choices = self.str_stage_to_int(choices)              # |choices| = 57
        
        # Define if the stage is late (stage number is 13 or more)
        late_stage = 0 if obs[3]<13 else 1                    # |stage| = 2
        
        # Aggregate stat values
        hp = max(1, round(obs[4]/5))                          # |hp| = 20
        attack = round((obs[5] - 20)/2)                       # |attack| = 15
        defence = round((obs[6] - 10)/2)                      # |defence| = 20

        # Bundle observations together
        observations = (choices, late_stage, hp, attack, defence)
        return observations
    
    
env = RampartsEnvironment()
env.reset()

agent = QAgent()

obs = env.get_observations()
print("Raw observations:", obs)
print("Wrapped observations:", agent.observation_wrapper(obs))

Raw observations: [4, 1, 0, 1, 100, 20, 10]
Wrapped observations: (9, 0, 20, 0, 0)


Our wrapper is formatting the observations as desired, let's built this into a complete Q-Agent class.

We're going to use nested dictionaries to store our Q-values, where each key-value pair represents a state-action pair and its corresponding Q-value.

An example; to retrieve the Q-value for performing action `1` in the state `[28, 0, 20, 0, 0]` would be:

`q_table[[28, 0, 20, 0, 0]][1] = Q value`

We also use a `collections` `defaultdict` here so we can initialize every Q-value as 0.

In [14]:
from collections import defaultdict
from itertools import combinations_with_replacement
import numpy as np

class QAgent:
    def __init__(self, actions, alpha=0.5, gamma=0.99, epsilon=0.1):
        '''
        Initializes the Q-Learning Agent
        
        Parameters:
        - actions: A list of all possible actions the agent can perform
        - alpha: The learning rate for Q-learning algorithm
        - gamma: The discount factor for future rewards
        - epsilon: The exploration rate (for epsilon-greedy action selection)
        '''
        
        # Evaluation metrics
        self.total_reward = 0.0
        self.cleared_stages = 0
        
        # Used to build Q-table
        self.actions = actions
        self.n_actions = len(actions)
        self.actions = self.actions[:-1]

        # Generate all combinations with replacement, then turns each into a set
        possible_stages = list(combinations_with_replacement(self.actions, 3))
        self.reduced_stages = [sorted(list(set(stage))) for stage in possible_stages]
        
        # Q-learning parameters
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.q_table = defaultdict(lambda: np.zeros(self.n_actions))
    
    def int_stage_to_str(self, stages):
        if stages[0] == len(self.actions):
            return ["Boss"]*3
        
        strings = []
        for stage in stages:
            strings.append(self.actions[stage])
            
        return strings
    
    def str_stage_to_int(self, stage):
        stage = sorted(list(set(stage)))
        if stage == ["Boss"]:
            return len(self.reduced_stages) # The 'boss' index is the final element
        return self.reduced_stages.index(stage)
    
    def observation_wrapper(self, obs):
        choices = self.int_stage_to_str([obs[0], obs[1], obs[2]])
        choices = self.str_stage_to_int(choices)     # |choices| = 57
        late_stage = 0 if obs[3]<13 else 1           # |stage| = 2
        hp = max(1, round(obs[4]/5))                 # |hp| = 20
        attack = round((obs[5] - 20)/2)              # |attack| = 15
        defence = round((obs[6] - 10)/2)             # |defence| = 20

        observations = (choices, late_stage, hp, attack, defence)
        return observations

    def choose_action(self, state, available_actions):
        '''
        Choose an action to perform based on the current state.

        This method implements an epsilon-greedy policy: with probability epsilon,
        it randomly selects an action, and otherwise it selects the action with
        the highest Q-value.

        Parameters:
        - state: The current state.
        - available_actions: A list of actions available in the current state.

        Returns:
        - The chosen action.
        '''
        # Epsilon-greedy policy
        if np.random.uniform() < self.epsilon:
            # Randomly choose an action
            return np.random.choice(available_actions)
        else:
            # Choose the action with highest Q-value among available actions
            q_values = [self.q_table[state][action] for action in available_actions]
            return available_actions[np.argmax(q_values)]

    def learn(self, state, action, reward, next_state):
        '''
        Learn from a experience tuple (state, action, reward, next_state).

        This method performs the Q-learning update rule using the experience tuple.

        Parameters:
        - state: The initial state.
        - action: The action performed in the state.
        - reward: The reward received after performing the action.
        - next_state: The state transitioned to after performing the action.
        '''
        
        # If agent is in terminal state, don't attempt to calculate reward for a
        # non-existent future state.
        if env.is_done():
            td_target = reward
        else:
            # Bellman update
            td_target = reward + self.gamma * np.max(self.q_table[next_state])
        
        # Calculate temporal difference error
        td_error = td_target - self.q_table[state][action]
        # Update Q-value
        self.q_table[state][action] += self.alpha * td_error
    
    def step(self, env):
        """
        Take a step using the agent. 
        
        This includes choosing an action, performing it in the environment,
        and learning from the resulting reward and next state.
        
        Parameters:
        - env: The environment in which the agent is acting.
        """
        # Get current environment state and actions
        state = self.observation_wrapper(env.get_observations())
        available_actions = env.get_actions()
        
        # Select action
        action = self.choose_action(state, available_actions)
        
        # Perform action and obtain reward, adding it to total
        reward = env.action(action)
        self.total_reward += reward
        if (env.is_dead == False):
            self.cleared_stages += 1
        
        # Perform Q-update
        next_state = self.observation_wrapper(env.get_observations())
        self.learn(state, action, reward, next_state)
        
    def reset(self):
        """
        Reset the agent's total reward and cleared stages count to zero.
        This method is used only for performance evaluation.
        """
        self.total_reward = 0.0
        self.cleared_stages = 0

#### Observations:
- Our Q-learning agent is superb at surviving until the final round, but is outperformed by the sensible agent when it comes to actually beating the game.

Let's try training our Q-agent on 1 million episodes to see if it's able to learn further, or if we'll need to use alternative methods to further refine its performance.

In [15]:
def plot_reward_history(rewards):
    """
    Plots the history of rewards during training. Averages rewards in bins of a specified size.
    
    Parameters:
    - rewards: A list or numpy array of reward values.
    """
    
    bin_size = round(len(rewards)/100)
    
    # Calculate mean reward for each bin
    mean_rewards = [np.mean(rewards[i:i + bin_size]) for i in range(0, len(rewards), bin_size)]
    
    # Generate x values corresponding to each bin
    x_values = list(range(len(mean_rewards)))
    
    # Create the plot
    fig = go.Figure(data=go.Scatter(x=x_values, y=mean_rewards, mode='lines', line=dict(color='#636efa')))
    
    # Add titles and labels
    fig.update_layout(template='plotly',
                      title='Training Rewards',
                      xaxis_title=f'Training Batches (each batch represents {bin_size} steps)',
                      yaxis_title='Mean Reward')
    
    fig.show()
    

def train_q_agent(env, agent, n_episodes, alpha=0.5, gamma=0.99, epsilon=0.1, late_stage=None, message="", verbose=False):
    agent.alpha = alpha
    agent.gamma = gamma
    agent.epsilon = epsilon
    
    stages_cleared_list = []
    reward_list = []
    
    for i in range(n_episodes):
        env.reset()
        agent.reset()
        while not env.is_done():
            if late_stage is not None:
                if (agent.cleared_stages > late_stage):
                    agent.epsilon = 0.6
                else:
                    agent.epsilon = epsilon
            agent.step(env)
        stages_cleared_list.append(agent.cleared_stages)
        reward_list.append(agent.total_reward)
        
        if (verbose == True) and ((i+1) % 100000 == 0):
            print(f"Completed {i+1} episodes out of {n_episodes} ({message}) - {(100*(i+1)/n_episodes):.0f}%")
    
    return stages_cleared_list, reward_list


# Initialize environment
env = RampartsEnvironment()
env.reset()
events = env.events

# Initialize agent
agent = QAgent(events)

n_training_episodes = 200000
n_testing_episodes = 10000

# Perform training
_, reward_list_training = train_q_agent(env, agent, n_training_episodes)
plot_reward_history(reward_list_training)

# Measure performance with no epsilon-greedy
stages_cleared_testing, _ = train_q_agent(env, agent, n_testing_episodes, message="default", epsilon=0)
plot_performance(stages_cleared_testing, "QAgent Test Performance")

#### Observations:
- Our Q-agent is performing well, but could do with more training episodes.
- The agent is very good at surviving until the final round but doesn't win as often as it should.
- It's possible that the Q-agent is more focused on not dying that beating the game.

# Exploration and Learning Rate Decay
To optimize our Q-agent's performance, we will adopt a step decay strategy for both epsilon (exploration rate) and alpha (learning rate).

The training process will initially commence with larger values for both epsilon and alpha. This approach will result in robust exploration of the state-action space and accelerate learning. However, over the course of millions of training episodes, we will systematically reduce these values in discrete steps down to zero. This gradual reduction allows the agent to shift from exploration to exploitation, making optimal use of its accumulated knowledge about the environment.

A million episodes takes approximately 15 minutes to run and we're running 20 million; so training is rather time consuming.

In [16]:
# Initialize environment
env = RampartsEnvironment()
env.reset()
events = env.events

# Initialize agent
agent = QAgent(events)

n_training_episodes = 5000000
_, reward_list_training1 = train_q_agent(env, agent, n_training_episodes, alpha=0.5, epsilon=0.15, message="medium epsilon")

_, reward_list_training2 = train_q_agent(env, agent, n_training_episodes, alpha=0.3, epsilon=0.1, message="low epsilon")

_, reward_list_training3 = train_q_agent(env, agent, n_training_episodes, alpha=0.2, epsilon=0.05, message="very low epsilon")

_, reward_list_training4 = train_q_agent(env, agent, n_training_episodes, alpha=0.1, epsilon=0, message="zero epsilon")

reward_list_training_seq = reward_list_training1 + reward_list_training2 + reward_list_training3 + reward_list_training4

plot_reward_history(reward_list_training_seq)

n_testing_episodes = 10000
# Measure performance with no epsilon-greedy
stages_cleared_testing_seq, _ = train_q_agent(env, agent, n_testing_episodes, epsilon=0)
plot_performance(stages_cleared_testing_seq, "QAgent Test Performance")

#### Observations:
- Our Q-agent has surpassed all agents before it, and its performance is now approximating that of a skilled human player.