# Example Agent and Environment

This example of a rational agent enables experimentation with decision strategies based on expected utility. 

The environment interface is inspired by the Open AI gym framework. 

The example from chapter 16 of AIMA (4th ed.)

# 1. Definition of the Environment

The code below defines all characteristics of a Markov Decision Process that models the game of TicTacToe with an (unknown) opponent. The agent plays with X and the opponent plays with O.

- States: The Environment is in one of the following states $S = (S_0, S_1, ..., S_8)$ with $S_i \in \{E, O, X\}$ .
- Actions: The set of available actions is $a \in \{n \in \mathbb{N}: S_n = E\}$ (the agent can place the X on any empty place)
- Transitions: The transition depends on the strategy of the opponent. Let's assume the opponent plays in a random fashion. Then the probability that the next O will be on any empty square is equal to 1 / (# free squares left for the opponent)

An TicTacToeMDPEnvironment object has the following methods:
- reset() which brings the environment the start state, which is also returned
- step(action) processes the action of the agent and returns the new state, done, reward (and optional debug info)
- render() simple visualisation of the current state of the world

To allow an agent to calculate optimal decisions using model information, these methods are also available:

- get_possible_states() for iterating over all possible states
- is_done(state) for excluding the stop states from the policy
- get_reward(state) simplified version $R(s)$ of the general reward function: $R(s, a, s')$
- get_transition_prob(action, new_state, old_state): $P(s' \mid s, a)$

We will illustrate each of the elements above by simple code examples below.  
All of the theory can be found in AIMA 16.1 and 16.2.1 (4th ed.)

In [32]:
from enum import Enum
from random import randint, choice
from copy import copy

E, X, O = ' ', 'X', 'O'

class TicTacToeMDPEnvironment():
    def __init__(self, initial_state=None):
        if initial_state == None:
            self.__initial_state = ([], [n for n in range(9)]) # start with empty board and 9 possible actions
        else:
            self.__initial_state = copy(initial_state)  # copy to prevent aliassing
        self.__state = self.__initial_state
        self.__possible_states = []
        self.__calculate_possible_states(self.__initial_state)
        
    def __calculate_possible_states(self, state):
        possible_actions = self.__state[1]
        for action in possible_actions:
            new_state = copy(state)
            if state.count(X) == state.count(O):
                new_state[action] = X 
            else: 
                new_state[action] = O
            self.__possible_states.append(new_state)
            if not self.is_done(new_state):
                self.__calculate_possible_states(new_state)

    def reset(self):
        self.__state = self.__initial_state
        return self.__state
    
    def __calculate_transition(self, action):
        # change the state to reflect the move by the agent
        self.__state[0].append(action)
        self.__state[1].remove(action)
        if self.is_done(): # check if agent has won
            return self.__state
        # let the opponent make a random move
        opponent_action = choice(self.state[1])
        self.__state.append(opponent_action)
        self.__state.remove(opponent_action)
        return self.__state
      
    def step(self, action):
        old_state = self.__state
        self.__state = self.__calculate_transition(action)  # state after action
        observation = self.__state  # environment is fully observable
        done = self.is_done()
        reward = self.get_reward(self.__state)
        info = {}  # optional debug info
        return observation, done, reward, info

    def render(self):
        BACKGROUND = [
            '   │   │   ',
            '───┼───┼───',
            '   │   │   ',
            '───┼───┼───',
            '   │   │   '
        ]
        rendering = copy(BACKGROUND)
        for move_nr, position in enumerate(self.__state[0]):
            row = 2 * (position // 3)
            col = 4 * (position % 3) + 1
            line = rendering[row]
            if move_nr % 2 == 0:
                rendering[row] = line[:col] + X + line[col + 1:]
            else:
                rendering[row] = line[:col] + O + line[col + 1:]

        for line in rendering:
            print(line)
        
    #=========================================================
    # public functions for agent to calculate optimal policy
    #=========================================================
    
    def get_possible_states(self):
        return self.__possible_states
    
    def get_possible_actions(self, state=None):
        if state is None:
            state = self.__state
        return [n for n in range(9) if state[n] == E]

    def is_done(self, state=None):
        if state is None:
            state = self.__state
        if state[1] == []:
            return True
        if state[0] == state[1] == state[2] != E:
            return True
        if state[3] == state[4] == state[5] != E:
            return True
        if state[6] == state[7] == state[8] != E:
            return True
        if state[0] == state[3] == state[6] != E:
            return True
        if state[1] == state[4] == state[7] != E:
            return True
        if state[2] == state[5] == state[8] != E:
            return True
        if state[0] == state[4] == state[8] != E:
            return True
        if state[2] == state[4] == state[6] != E:
            return True
        return False
    
    def get_reward(self, state):
        # Reward R(s) for every possible state
        if state[0] == state[1] == state[2] != E:
            return 1 if state[0] == X else -1
        if state[3] == state[4] == state[5] != E:
            return 1 if state[0] == X else -1
        if state[6] == state[7] == state[8] != E:
            return 1 if state[0] == X else -1
        if state[0] == state[3] == state[6] != E:
            return 1 if state[0] == X else -1
        if state[1] == state[4] == state[7] != E:
            return 1 if state[0] == X else -1
        if state[2] == state[5] == state[8] != E:
            return 1 if state[0] == X else -1
        if state[0] == state[4] == state[8] != E:
            return 1 if state[0] == X else -1
        if state[2] == state[4] == state[6] != E:
            return 1 if state[0] == X else -1
        return 0
        
    def get_transition_prob(self, action, new_state, old_state=None):
        if old_state is None:
            old_state = self.__state
        # returns the Transition Probability P(s'| s, a)
        # with s = old_state, a = action and s' = new_state

        # if the game is over, no transition can take place
        if self.is_done(old_state):
            return 0.0
        
        # the position of the action must be empty
        if old_state[action] != E:
            return 0.0
        
        # state after placing X
        state_after_X = copy(old_state)  # avoid unwanted changed by reference
        state_after_X[action] = X

        # check if game is done
        if self.is_done(state_after_X) and state_after_X == new_state:
            return 1.0

        # game is not done: calculate all possible states of the opponent
        possible_new_states = []
        possible_opponent_actions = self.get_possible_actions(state_after_X)
        for action in possible_opponent_actions:
            possible_new_state = copy(state_after_X)
            possible_new_state[action] = O
            possible_new_states.append(possible_new_state)
        if new_state not in possible_new_states:
            return 0.0
        
        # transition is possible, apply strategy:
        # random opponent, probability is 1 / (# of E before placing the new O)
        prob = 1 / (len(possible_new_states))
        return prob

## Creation of an Environment

The Environment Class allows creation of an Environment with an initial state as parameter s = (1, 1).
Also, method reset() will set the state back to (1, 1)

In [33]:
# example of creation of an environment in the default state
mdp = TicTacToeMDPEnvironment()
mdp.reset()
mdp.render()
state, done, reward, info = mdp.step(4)
print('state =', state, ', reward =', reward, ', done =', done)
mdp.render()
print('possible (internal) game states:')
mdp.get_possible_states()

IndexError: list assignment index out of range

## Action Space and Transitions

We will only deal with environments with a finite number of discrete actions.

The Action Space (set of all possible actions) can be gotten from the environment.

Transitions can be done by calling method step(action).

Here is an experiment with a random move by the agent to show the effect.

In [19]:
mdp = TicTacToeMDPEnvironment(['X', ' ', ' ', 
                               'O', 'X', ' ', 
                               ' ', ' ', 'O'])
mdp.render()
possible_actions = mdp.get_possible_actions()
print('possible actions: ', possible_actions)
random_agent_action = choice(possible_actions)
new_state, done, reward, info = mdp.step(random_agent_action)
mdp.render()
possible_actions = mdp.get_possible_actions()
print('possible actions: ', possible_actions)

TypeError: unsupported operand type(s) for //: 'str' and 'int'

The transition probability $P(s' \mid s, a)$ can also be returned directly via method get_transition_prob(action, new_state, old_state).  
This means that the agent has information about the environment model. N.B. this is not always the case in reinforcement learning.

In [8]:
S_0 = ['X', ' ', ' ', 
       'O', 'X', ' ', 
       ' ', ' ', 'O']

S_1 = ['X', 'X', 'O', 
       'O', 'X', ' ', 
       ' ', ' ', 'O']

S_2 = ['X', 'X', ' ', 
       'O', 'X', 'O', 
       ' ', ' ', 'O']

S_3 = ['X', 'X', ' ', 
       'O', 'X', ' ', 
       'O', ' ', 'O']

S_4 = ['X', 'X', ' ', 
       'O', 'X', ' ', 
       ' ', 'O', 'O']

S_5 = ['X', 'X', ' ', 
       'O', 'X', ' ', 
       'X', ' ', 'O']

mdp = TicTacToeMDPEnvironment(S_0)
mdp.render()

print('possible actions:', mdp.get_possible_actions())

for n, S_p in enumerate([S_1, S_2, S_3, S_4, S_5], 1):
    print('S_0 -> action 1 -> S_' + str(n), 'has probability:', mdp.get_transition_prob(1, new_state=S_p))

TypeError: list indices must be integers or slices, not str

# 2. Random Agent

The policy function $\pi(s) \to a$ is the concrete implementation of the decision process of the agent (selection of an action $a$). In the cell below, you can see the effect of an agent with a random policy choosing an arbitrary action regardless of the new state.

In [None]:
def policy_random(mdp, state):
    # action is random choice from all actions in Action Space
    action = choice([a for a in mdp.get_possible_actions(state)])
    return action

# create a random environment
mdp = TicTacToeMDPEnvironment()
state = mdp.reset()
print('initial state: {}'.format(state))

total_reward = 0.0
done = False
nr_steps = 0
while not done:
    next_action = policy_random(mdp, state)
    state, done, reward, info = mdp.step(next_action)
    total_reward += reward
    nr_steps += 1
    print('action: {}\tstate: {}, reward: {:5.2f}'
          .format(next_action, state, reward))
print('Episode done after {} steps. total reward: {:6.2f}'.format(nr_steps, total_reward))
mdp.render()

initial state: [' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ']
action: 6	state: [' ', ' ', ' ', ' ', ' ', 'O', 'X', ' ', ' '], reward:  0.00
action: 8	state: [' ', ' ', ' ', 'O', ' ', 'O', 'X', ' ', 'X'], reward:  0.00
action: 7	state: [' ', ' ', ' ', 'O', ' ', 'O', 'X', 'X', 'X'], reward: -1.00
Episode done after 3 steps. total reward:  -1.00
   │   │   
───┼───┼───
 O │   │ O 
───┼───┼───
 X │ X │ X 


Each run from start state until stop state is called an episode.  
Let's assemble some statistics on the episodes of the random agent:

In [None]:
from statistics import mean, stdev

def run_one_episode(policy):
    mdp = TicTacToeMDPEnvironment()
    state = mdp.reset()

    total_reward = 0.0
    done = False
    while not done:
        next_action = policy(mdp, state)
        state, done, reward, info = mdp.step(next_action)
        print(state, done, reward, info)
        total_reward += reward
    return total_reward

def measure_performance(policy, nr_episodes=10):
    N = nr_episodes
    print('statistics over', N, 'episodes')
    all_rewards = []
    for n in range(1, N+1):
        episode_reward = run_one_episode(policy)
        print('episode:', n, 'reward:', episode_reward)
        all_rewards.append(episode_reward)

    print('mean: {:6.2f}, sigma: {:6.2f}'.format(mean(all_rewards), stdev(all_rewards)))
    print()
    for n, episode_reward in enumerate(all_rewards[:5], 1):
        print('ep: {:2d}, total reward: {:5.2f}'.format(n, episode_reward))
    print('......')
    for n, episode_reward in enumerate(all_rewards[-5:], len(all_rewards)-5):
        print('ep: {:2d}, total reward: {:5.2f}'.format(n, episode_reward))

measure_performance(policy_random)  # in Python a function pointer is simply the name of the function

statistics over 10 episodes
[' ', ' ', ' ', 'O', ' ', ' ', 'X', ' ', ' '] False 0 {}
[' ', ' ', ' ', 'O', 'X', ' ', 'X', 'O', ' '] False 0 {}
['X', 'O', ' ', 'O', 'X', ' ', 'X', 'O', ' '] False 0 {}
['X', 'O', 'O', 'O', 'X', 'X', 'X', 'O', ' '] False 0 {}
['X', 'O', 'O', 'O', 'X', 'X', 'X', 'O', 'X'] True 1 {}
episode: 1 reward: 1.0
[' ', ' ', 'X', ' ', ' ', ' ', 'O', ' ', ' '] False 0 {}
[' ', 'O', 'X', ' ', 'X', ' ', 'O', ' ', ' '] False 0 {}
[' ', 'O', 'X', ' ', 'X', 'O', 'O', ' ', 'X'] False 0 {}
['X', 'O', 'X', ' ', 'X', 'O', 'O', ' ', 'X'] True 1 {}
episode: 2 reward: 1.0
[' ', ' ', ' ', ' ', 'O', ' ', ' ', 'X', ' '] False 0 {}
[' ', ' ', ' ', 'O', 'O', 'X', ' ', 'X', ' '] False 0 {}
['X', ' ', ' ', 'O', 'O', 'X', ' ', 'X', 'O'] False 0 {}
['X', 'O', ' ', 'O', 'O', 'X', 'X', 'X', 'O'] False 0 {}
['X', 'O', 'X', 'O', 'O', 'X', 'X', 'X', 'O'] True 0 {}
episode: 3 reward: 0.0
[' ', 'O', 'X', ' ', ' ', ' ', ' ', ' ', ' '] False 0 {}
[' ', 'O', 'X', ' ', 'O', 'X', ' ', ' ', ' '] False

Running the code above multiple times gives an idea of the typical performance (total reward) of the random policy on this environment. We get a consistent result if we average over enough episodes. 

# 3. Optimal decisions based on sums of rewards

Bellman showed in 1957 that the optimal policy $\pi^{*}(s)$ for an MDP is:

(1) $\pi^{*}(s) = \underset{a}{argmax} \space \sum_{s'} P(s' \mid s, a) [R(s, a, s') + \gamma \space U(s')]$,

provided that utility function U(s) satisfies Bellman's equation:

(2) $U(s) = \underset{a}{max} \space \sum_{s'} P(s' \mid s, a) [R(s, a, s') + \gamma \space U(s')]$.

One can show that Bellman's equation can always be solved and with a single solution.

It is useful to define the so-called Q-function:

(3) $Q(s, a) = \sum_{s'} P(s' \mid s, a) [R(s, a, s') + \gamma \space U(s')]$

Which simplifies equations (1) and (2) to:

(4) $\pi^{*}(s) = \underset{a}{argmax} \space Q(s, a)$  
and

(5) $U(s) = \underset{a}{max} \space Q(s, a)$

Thus, finding the optimal policy is reduced to solving Bellman's equation. There are several strategies for this.

# 4. Solving the Bellman Equation: Value Iteration

Value Iteration is based on the Bellman update:

(6) $U_{i+1}(s) = \underset{a}{max} \sum_{s'} P(s' \mid s, a) \space [ R(s, a, s') + \gamma \space U_i(s') ]$

Using equation (3) this simplifies to:

(7) $U_{i+1}(s) = \underset{a}{max} \space Q_i(s, a)$

One can prove that after enough iterations $U_{i+1}(s) \approx U(s)$, after which Bellman's equation is satisfied.  
Since there is only one solution to Bellman's equation, it does not matter with which $U_0(s)$ you start!

The algorithm below is Value Iteration with one simplification: $\gamma$ the so-called discount factor, is set to 1.

In [None]:
def get_initial_U(mdp):
    U = {}
    for s in mdp.get_possible_states():
        U[tuple(s)] = mdp.get_reward(s)
    return U
    
def Q_Value(mdp, s, a, U):
    Q = 0.0
    for s_p in mdp.get_possible_states():
        P = mdp.get_transition_prob(a, s_p, s)
        R = mdp.get_reward(s_p)
        Q += P * (R + U[tuple(s_p)])
    return Q

def ValueIteration(mdp, error=0.00001):
    # from AIMA 4th edition without discount gamma 
    U_p = get_initial_U(mdp) # U_p = U'
    delta = float('inf')
    while delta > error:
        U = {}
        for s in mdp.get_possible_states():
            U[tuple(s)] = U_p[tuple(s)]
        #print_U(U)  # to illustrate the iteration process
        delta = 0
        for s in mdp.get_possible_states():
            max_a = float('-inf')
            for a in mdp.get_possible_actions(s):
                q = Q_Value(mdp, s, a, U) 
                if q > max_a:
                    max_a = q
            U_p[tuple(s)] = max_a
            if abs(U_p[tuple(s)] - U[tuple(s)]) > delta:
                delta = abs(U_p[tuple(s)] - U[tuple(s)])
    return U

def print_U(U):
    print('Utilities:')
    for y in range (3, 0, -1):
        for x in range(1, 5):
            s = (x, y)
            if s in U:
                print('   {}: {:8.4f}'.format(s, U[s]), end = '')
            else: # preserve alignment
                print('                   ', end = '')
        print()

def print_policy(pi):
    print('Policy:')
    for y in range (3, 0, -1):
        for x in range(1, 5):
            s = (x, y)
            if s in pi:
                print('   {}: {:12}'.format(s, pi[s]), end = '')
            else: # preserve alignment
                print(' '*23, end = '')
        print()

mdp = TicTacToeMDPEnvironment()
U = ValueIteration(mdp)

pi_star = {}
for s in mdp.get_possible_states():
    if mdp.is_done(s):
        continue # policy is not needed in stop states
    max_a = float('-inf')
    argmax_a = None
    for action in Action:
        q = Q_Value(mdp, s, action, U) 
        if q > max_a:
            max_a = q
            argmax_a = action
    pi_star[s] = argmax_a


NameError: name 'TicTacToeMDPEnvironment' is not defined

In [None]:
def optimal_policy(mdp, state):
    return pi_star[state]

measure_performance(optimal_policy, nr_episodes = 10)


# 4. Solving the Bellman Equation: Policy Iteration

In [None]:
def policy_evaluation(pi, U_i, mdp):
    U_i = {}
    return U

def policy_iteration(mdp):
    # initialise U(s) (arbitrary value 0) and policy pi (arbitrary action Up)
    U = {}
    for s in mdp.get_possible_states():
        U[s] = 0
    pi = {}
    for s in mdp.get_possible_states():
        if not mdp.is_done(s):
            pi[s] = choice([a for a in Action])

    changed = True
    while changed:
        print_policy(pi) # to vilualise the iterations
        changed = False
        U = policy_evaluation(pi, U, mdp)
        for s in mdp.get_possible_states():
            if not mdp.is_done(s):
                # determine action a that yields the highest Q-value
                max_a, max_q = None, float('-inf')
                for a in Action:
                    q = Q_Value(mdp, s, a, U) 
                    if q > max_q:
                        max_a, max_q = a, q
                if max_q > Q_Value(mdp, s, pi[s], U):
                    pi[s], changed = max_a, True
    return pi

mdp = SimpleMDPEnvironment(initial_state=(1, 1), reward_per_step=-0.04)
print('optimal policy:')
pi_star = policy_iteration(mdp)