<h3> Training an AI agent to traverse an environment and collect materials in order using value iteration <h3>
<h5> By Ivan Ovcharov & Veronika Valeva <h5>

### Table of Contents

* Introduction
* Why value iteration?
* Environment description
* Python environment


### Introduction

Over the period of 9 weeks, we have been tasked to train an agent to learn over given constraints in a python environment. The project we chose to tackle is something that closely resembles the infamous <strong> frozen lake </strong> environment. With every environment, there are different ways of approaching how an agent's rules may be defined or what strategy may be used for it to <strong> "learn" </strong>.


After delving a bit deeper into what <strong> reinforcement learning </strong> really is, we made the decision that <strong> <i> Value Iteration </i> </strong> would be best suited for our environment and the given conditions/rules we have defined. But why?

### Why value iteration?

For any given state, we first calculate the state-action values for all the possible <strong>actions</strong> from that given state. We then update the value function of that state with the greatest state-action value. The reason we decided not to utilize <i>policy iteration</i> instead, as we thought unnecessary to have calculations of the expected/mean state-action value. For an environment like ours, where no "predictions" must be made, value iteration was the best option at hand.

With value iteration, we'd be able to terminate when the difference between all the new state values and the old state values is a relatively small value. Furthermore, for a grid-like environment, where all of the possible actions and <strong>"reward"</strong> positions are pre-defined, we'd be better off with iterating over all possible states.

 ![Value iteration algorithm](images/value_iteration.png)

### Environment description

We start with defining what an <strong>environment</strong> really stands for. An environment in AI is what is surrounding the agent. The agent can take input from the environment and deliver output.

As mentioned previously, the environment we have chosen to define is something that very closely resembles the <strong> frozen lake </strong> one by OpenGymAI. The rules are as follows:
* The environment is a grid (initially a 5x5 but scaled down to a 2x2/2x3) where there are "ingredients" that the agent must collect.
* The agent is only able to move up, down, left or right, depending on his position on the grid.
* The agent may not go (for example) left, if left is outside of the grid bounds
* The agent starts in a <i> starting state </i> and finishes in an <i> ending state</i>

Once the agent collects all of the ingredients (must be done in the correct order, otherwise => agent restarts), the agent must "leave" by going to the end state. 

The reward system that is utilized is based on getting items in correct order and finishing the game. There are also slight penalties: when agent steps on an empty cell, he loses a total of 0.2 points and when he steps on a cell, containing an ingredient => +1 points. The reason behind this is that the agent can not only learn how to collect the ingredients in the right order, but the -0.2 points serves as a bound that "pushes" the agent to finish the game in less moves.

 ![Value iteration algorithm](images/env.png)

### Python environment


We first start by defnining what kind of states there will be in our environment. In here, we define the empty state, E. Furthermore, we have one representing our imaginary lettuce and cheese, L and C respectively. Lastly, a state indicating the start and end.

In [434]:
from enum import Enum
import random
from random import randint, choice
from copy import copy

E, L, C, START, END = ' ', 'L', 'C','START','END'

## Action space
We start by defining the action space for our world. The action space will look as follows:

Actions:
*  Up
*  Down
*  Left
*  Right

Actions `Up`, `Down`, `Left`, and `Right` all move the actor to a new position. 

In [435]:
class Inventory(Enum):
    Empty = ""
    C = "C"
    L = "L"
    CL = "CL"
    LC = "LC"

    def __str__(self):
        return self.name

In [436]:
class Action(Enum):
    Up = 1
    Down = 2
    Left = 3
    Right = 4

    def __str__(self):
        return self.name

We then define the rule set of our environment. This is done within a python class that holds all of the methods needed.

In [437]:
class State():
    def __init__(self, state,inventory, action, is_done):
        self.state = state
        self.inventory = inventory
        self.action = action
        self.is_done = is_done

In [438]:
class RestaurantEnvironment():
    def __init__(self, initial_state=None):
        if initial_state is None:
            self.__initial_state = [E for n in range(6)]
            self.__initial_state[0] = START
            self.__initial_state[2] = C 
            self.__initial_state[4] = L
            self.__initial_state[5] = END
            self.playerState = [0, ""]
            self.reward = 0
        else:
            self.__initial_state = copy(initial_state)
            self.__state = self.__initial_state


    def reset(self):
        self.__state = self.__initial_state
        return self.__state

    # Based on playerposition (0 to 6), add a letter to the currentWord
    def calculate_curr_word(self, playerPosition):
        if playerPosition == 2 and self.playerState[1] == "":
            return "C"
        elif playerPosition == 4 and self.playerState[1] == "C":
            return "L"
        else:
            return ""

    def step(self, playerPosition):
        if (self.calculate_curr_word(self.calculate_transition(playerPosition))) != "":
            self.playerState[1] += self.calculate_curr_word(self.calculate_transition(playerPosition))
            self.reward += 1
        else:
            self.reward -= 0.1
        newPlayerPosition = self.calculate_transition(playerPosition)
        self.playerState[0] = newPlayerPosition
        observation = self.__state  # environment is fully observable
        done = self.is_done()
        if done:
            self.reset()
        return observation, done, self.reward, self.playerState[0], self.playerState[1]

    def get_reward(self):
        return self.reward;
    
    def get_reward_according_to_location(self, location):
        if location == 2 or location == 4:
            return 1
        else:
            return -0.1

    def render(self):
        BACKGROUND = [
            ' S │   │ C │',
            '───┼───┼───┼',
            '   │ L │ E │',
            '───┼───┼───┼',
        ]
        rendering = copy(BACKGROUND)
        for n, S_n in enumerate(self.__state):
            if S_n != E:
                row = 2 * (n // 5)
                col = 4 * (n % 5) + 1
                line = rendering[row]
                rendering[row] = line[:col] + S_n + line[col + 1:]

        for line in rendering:
            print(line)

    # =========================================================
    # public functions for agent to calculate optimal policy
    # =========================================================

    def calculate_transition(self, action: Action):
        current_location = self.playerState[0]
        next_location = None
        if self.playerState[0] == 0:
            if action == Action.Right:
                next_location = current_location + 1
            elif action == Action.Down:
                next_location = current_location + 3
        elif self.playerState[0] == 1:
            if action == Action.Right:
                next_location = current_location + 1
            elif action == Action.Left:
                next_location = current_location - 1
            elif action == Action.Down:
                next_location = current_location + 3
        elif self.playerState[0] == 2:
            if action == Action.Left:
                next_location = current_location - 1
            elif action == Action.Down:
                next_location = current_location + 3
        elif self.playerState[0] == 3:
            if action == Action.Right:
                next_location = current_location + 1
            elif action == Action.Up:
                next_location = current_location - 3
        elif self.playerState[0] == 4:
            if action == Action.Right:
                next_location = current_location + 1
            elif action == Action.Up:
                next_location = current_location - 3
            elif action == Action.Left:
                next_location = current_location - 1
        elif self.playerState[0] == 5:
            if action == Action.Up:
                next_location = current_location - 3
            elif action == Action.Left:
                next_location = current_location - 1

        if next_location == None:
            return current_location
        else:
            return next_location

    def get_transition_probability(self, action: Action, new_inventory):
        next_location = self.calculate_transition(action)
        current_inventory = self.playerState[1]
        if next_location == 2: # State 2 contains C
            if new_inventory == current_inventory + "C":
                    return 1
            else:
                    return 0
        elif next_location == 4: # State 4 contains C
            if new_inventory == current_inventory + "L":
                    return 1
            else:
                    return 0
        elif current_inventory == new_inventory:
            return 1        
        else:
            return 0

    def get_transition_prob(self, action: Action, currentLocation):
        next_location = self.calculate_transition(action)
        current_location = currentLocation
        if self.is_done(): # If the current state is an end state
            return 0.0
        if next_location == current_location:
            return 0.0
        else: 
            return 1

    def is_done(self):
        if self.playerState[1] == "CL" and self.playerState[0] == 5:
            return True
        else:
            return False

    def is_done_givenState(self, state):
        if self.playerState[1] == "CL" and state == 5:
            return True
        else:
            return False


    def get_possible_states(self):
        return [
         State(0, "", Action.Down, False), State(0, "", Action.Right, False),
         State(0, "C", Action.Down, False), State(0, "C", Action.Right, False),
         State(0, "CL", Action.Down, False), State(0, "CL", Action.Right, False),
         
         State(1,"", Action.Left, False), State(1, "", Action.Right, False), State(1, "", Action.Down, False),
         State(1,"C", Action.Left, False), State(1,"C", Action.Right, False), State(1,"C", Action.Down, False),
         State(1,"CL", Action.Left, False), State(1,"CL", Action.Right, False), State(1,"CL", Action.Down, False),

         State(2, "", Action.Left, False), State(2,"", Action.Down, False),
         State(2, "C", Action.Left, False), State(2,"C", Action.Down, False),
         State(2, "CL", Action.Left, False), State(2,"CL", Action.Down, True), 

         State(3,"", Action.Up, False), State(3, "", Action. Right, False),
         State(3,"C", Action.Up, False), State(3, "C", Action.Right, False),
         State(3,"CL", Action.Up, False), State(3, "CL", Action.Right, False),

         State (4,"", Action.Up, False), State(4, "", Action.Right, False),
         State (4,"C", Action.Up, False), State(4, "C", Action.Right, False),
         State (4,"CL", Action.Up, False), State(4, "CL", Action.Right, True),
         State(4, "C",Action.Left, False),  State(4, "CL",Action.Left, False),
         State (5,"C", Action.Left, False),State(5,"C", Action.Up, False)]


## Defining the world
After we have defined the environment, we are going to construct the world the actor will be acting in.

In [439]:
def generate_environment() -> RestaurantEnvironment:
    environment = RestaurantEnvironment()
    return environment

restEnvironment = generate_environment()

In [440]:
class Game():
    # example of creation of an environment in the default state
    mdp = RestaurantEnvironment()
    mdp.reset()
    mdp.render()
    state = ""
    reward = 0.0
    done = False
    playerPosition = 0
    inventory = ""
    
    # state, done, reward = mdp.step(Action.Right)
    # print(state, done, reward)
    # state, done, reward = mdp.step(Action.Right)
    # print(state, done, reward)
    # state, done, reward = mdp.step(Action.Left)
    # print(state, done, reward)
    # state, done, reward = mdp.step(Action.Down)
    # print(state, done, reward)
    # state, done, reward = mdp.step(Action.Right)
    # print(state, done, reward)
    # print(mdp.playerState[0], mdp.playerState[1])
    # print('possible (internal) game states:')


game = Game()


 START │ C │ C │L
───┼───┼───┼
 END │ L │ E │
───┼───┼───┼


## Measuring performance
In order to get an accurate idea of the performance of a function we define a set of helper functions which will run number of episodes with the given policy, and print some statistics such as the `mean` of the running time as well as the `standard deviation`.

In [441]:
from statistics import mean, stdev

def run_one_episode(policy, environment, max_iteration_timeout=1000):
  environment.reset()
  state = environment
  total_reward = 0.0
  done = False
  nextState = 0
  inventory = ""
  
  iteration = 0
  while not done or iteration >= max_iteration_timeout:
    next_action = policy(state)
    state, done, reward, nextState, inventory = environment.step(next_action)
    total_reward += reward
    iteration += 1
  return total_reward

def measure_performance(policy, environment, nrof_episodes=100):
  N = nrof_episodes
  print("statistics over {} episodes".format(N))
  all_rewards = []
  for _ in range(N):
    episode_reward = run_one_episode(policy, environment)
    all_rewards.append(episode_reward)
  print("mean: {:6.2f}, sigma: {:6.2f}".format(mean(all_rewards), stdev(all_rewards)))
  print()
  for n, episode_reward in enumerate(all_rewards[:5], 1):
    print("ep: {:2d}, total reward: {:5.2f}".format(n, episode_reward))
  print(".....")
  for n, episode_reward in enumerate(all_rewards[-5:], len(all_rewards) - 5):
    print("ep: {:2d}, total reward: {:5.2f}".format(n, episode_reward))

## Random agent
Here we test the performance of an agent who's policy is to pick a random action at each state. 

In [442]:
def policy_random(state: RestaurantEnvironment) -> Action:
  action = random.choice([a for a in Action])
  return action

measure_performance(policy_random, restEnvironment)

statistics over 100 episodes
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at 0x0000027FBE8B23B0>
<__main__.RestaurantEnvironment object at

### VALUE ITERATION

In [443]:
# Optimal decisions based on sums of rewards
# state, done, reward, nextState, inventory = environment.step(next_action)
# Function that computes the Q-value
def Q_Value(restEnvironment, a, U, inventory):
    Q = 0.0
    for s in restEnvironment.get_possible_states():
        P = restEnvironment.get_transition_probability(a, inventory)
        R = restEnvironment.get_reward_according_to_location(getattr(s, 'state'))
        Q += P * (R + U[str(s.__dict__)])
    return Q

# Creating a grid with initial values for our utility functions equal the reward of each state
def get_initial_U(environment):
  U = {}
  for s in environment.get_possible_states():
    print(s.__dict__)
    U[str(s.__dict__)] = 0.0
  
  return U

# Decreasing the error between utility functions results in a more precise policy 
def value_iteration(restEnvironment, error=0.0000000001):
    U = {}
    U_p = get_initial_U(restEnvironment)
    delta = float('inf')
    possible_states = restEnvironment.get_possible_states()
    while delta > error:
        for s in possible_states:
            U[str(s.__dict__)] = U_p[str(s.__dict__)]
        delta = 0
        for s in possible_states:
            max_a = float('-inf')
            for a in Action:
                for i in Inventory:
                    q = Q_Value(restEnvironment, a, U, i) 
                    if q > max_a:
                        max_a = q
            U_p[str(s.__dict__)] = max_a
            if abs(U_p[str(s.__dict__)] - U[str(s.__dict__)]) > delta:
                delta = abs(U_p[str(s.__dict__)] - U[str(s.__dict__)])
    return U

def print_U(U):
    print('Utilities:')
    for y in restEnvironment.get_possible_states():
        s = str(y.__dict__)
        if s in U:
            print(f'[{U[s]:7.4f}]', end = ' ')
        else:
            print('              ', end = '') # Used to preserve alignment when state is not present
        print()

def print_policy(pi):
    arrows_unicode = {Action.Up: u"\U0001F879", Action.Down: u"\U0001F87B", Action.Left: u"\U0001F878", Action.Right: u"\U0001F87A"}
    print('Policy:')
    for y in restEnvironment.get_possible_states():
        s = str(y.__dict__)
        if s in pi:
            print(f'{s}: {arrows_unicode[pi[s]]}', end = ' ')
        else:
            print(' '*11, end = '') # Used to preserve alignment when state is not present
        print()

restEnvironment = generate_environment()
restEnvironment.reset()
U = value_iteration(restEnvironment)
print_U(U)

# Optimal policy (p*)
pi_star = {}

for s in restEnvironment.get_possible_states():
    if s.is_done:
      continue
    max_a = float('-inf')
    argmax_a = None
    for i in Inventory:
        for action in Action:
            q = Q_Value(restEnvironment, action, U, i)  
            if q > max_a:
                max_a = q
                argmax_a = action
    pi_star[str(s.__dict__)] = argmax_a
    print(pi_star[str(s.__dict__)])

print_policy(pi_star)

{'state': 0, 'inventory': '', 'action': <Action.Down: 2>, 'is_done': False}
{'state': 0, 'inventory': '', 'action': <Action.Right: 4>, 'is_done': False}
{'state': 0, 'inventory': 'C', 'action': <Action.Down: 2>, 'is_done': False}
{'state': 0, 'inventory': 'C', 'action': <Action.Right: 4>, 'is_done': False}
{'state': 0, 'inventory': 'CL', 'action': <Action.Down: 2>, 'is_done': False}
{'state': 0, 'inventory': 'CL', 'action': <Action.Right: 4>, 'is_done': False}
{'state': 1, 'inventory': '', 'action': <Action.Left: 3>, 'is_done': False}
{'state': 1, 'inventory': '', 'action': <Action.Right: 4>, 'is_done': False}
{'state': 1, 'inventory': '', 'action': <Action.Down: 2>, 'is_done': False}
{'state': 1, 'inventory': 'C', 'action': <Action.Left: 3>, 'is_done': False}
{'state': 1, 'inventory': 'C', 'action': <Action.Right: 4>, 'is_done': False}
{'state': 1, 'inventory': 'C', 'action': <Action.Down: 2>, 'is_done': False}
{'state': 1, 'inventory': 'CL', 'action': <Action.Left: 3>, 'is_done': Fal

In [444]:
from statistics import mean, stdev

def run_one_episode_optimal_policy(policy, environment, max_iteration_timeout=1000):
  environment.reset()
  state = environment
  total_reward = 0.0
  done = False
  nextState = 0
  inventory = ""
  
  iteration = 0
  while not done or iteration >= max_iteration_timeout:
    next_action = policy(state)
    state, done, reward, nextState, inventory = environment.step(next_action)
    total_reward += reward
    iteration += 1
  return total_reward

def measure_performance_optimal_policy(policy, environment, nrof_episodes=100):
  N = nrof_episodes
  print("statistics over {} episodes".format(N))
  all_rewards = []
  for _ in range(N):
    episode_reward = run_one_episode_optimal_policy(policy, environment)
    all_rewards.append(episode_reward)
  print("mean: {:6.2f}, sigma: {:6.2f}".format(mean(all_rewards), stdev(all_rewards)))
  print()
  for n, episode_reward in enumerate(all_rewards[:5], 1):
    print("ep: {:2d}, total reward: {:5.2f}".format(n, episode_reward))
  print(".....")
  for n, episode_reward in enumerate(all_rewards[-5:], len(all_rewards) - 5):
    print("ep: {:2d}, total reward: {:5.2f}".format(n, episode_reward))

In [445]:
def optimal_policy(state) -> Action:
    print(pi_star[state])
    return pi_star[state]

measure_performance_optimal_policy(optimal_policy, restEnvironment)

statistics over 100 episodes


TypeError: optimal_policy() missing 1 required positional argument: 'state'