# Example Agent and Environment

This example of a rational agent enables experimentation with decision strategies based on expected utility. The environment interface is inspired by the Open AI gym framework. The example from chapter 16 of AIMA (4th ed.)

# 1. Definition of the Environment

The code below defines all characteristics of the Markov Decision Process of the AIMA book in chapter 16 with the following characteristics:

- States: The agent is in one of the following positions {(1, 1), (2, 1), (3, 1), (4, 1), (1, 2), (3, 2), (4, 2), (1, 3), (2, 3), (3, 3), (4, 3)}.
- Actions: The agent can choose between {Up, Down, Left, Right}
- Transitions: There is a probability of 0.8/0.1/0.1 that action Up results in an upward/left/right movement, provided that the state exists. Movements 'outside the world' or to (2, 2) result in the state remaining the same. 

An SimpleMDPEnvironment object has the following methods:
- reset() which brings the environment the start state, which is also returned
- step(action) processes the action of the agent and returns the new state, done, reward (and optional debug info)
- render() simple visualisation of the current state of the world

To allow an agent to calculate optimal decisions using model information, these methods are also available:

- get_possible_states() for iterating over all possible states
- is_done(state) for excluding the stop states from the policy
- get_reward(state) simplified version $R(s)$ of the general reward function: $R(s, a, s')$
- get_transition_prob(action, new_state, old_state): $P(s' \mid s, a)$

We will illustrate each of the elements above by simple code examples below.  
All of the theory can be found in AIMA 16.1 and 16.2.1 (4th ed.)

In [None]:
from enum import Enum
from random import randint, choice
from copy import copy

class Action(Enum):
    def __str__(self):
        return self.name
    Left = 1
    Right = 2
    Up = 3
    Down = 4

    
class Drift(Enum):
    def __str__(self):
        return self.name
    Left = 1
    Not = 2
    Right = 3    


class SimpleMDPEnvironment():
    def __init__(self, initial_state, reward_per_step):
        self.__x_min, self.__y_min = (1, 1)
        self.__x_max, self.__y_max = (4, 3)
        self.__possible_states = \
        [(1, 1), (2, 1), (3, 1), (4, 1), 
         (1, 2), (3, 2), (4, 2), (1, 3), 
         (2, 3), (3, 3), (4, 3)]
        if initial_state in self.__possible_states:
            self.__initial_state = initial_state
        else:
            self.__initial_state = (1, 1)
        self.__state = self.__initial_state
        self.__reward_per_step = reward_per_step
    
    def reset(self):
        self.__state = self.__initial_state
        return self.__state

    def __calculate_transition(self, action):
        # helper method for public method step(self, action)
        # determine drift (20% to the left, 80% no drift, 20% to the right)
        n = randint(1, 10)
        if n == 1:
            drift = Drift.Left
        elif n == 10:
            drift = Drift.Right
        else:  # 1 < n < 10
            drift = Drift.Not
        # determine position
        x_old, y_old = self.__state
        if action == Action.Up:
            if drift == Drift.Left:
                x_new, y_new = x_old - 1, y_old
            elif drift == Drift.Not:
                x_new, y_new = x_old, y_old + 1
            elif drift == Drift.Right:
                x_new, y_new = x_old + 1, y_old
        if action == Action.Down:
            if drift == Drift.Left:
                x_new, y_new = x_old + 1, y_old
            elif drift == Drift.Not:
                x_new, y_new = x_old, y_old - 1
            elif drift == Drift.Right:
                x_new, y_new = x_old - 1, y_old
        if action == Action.Left:
            if drift == Drift.Left:
                x_new, y_new = x_old, y_old - 1
            elif drift == Drift.Not:
                x_new, y_new = x_old - 1, y_old
            elif drift == Drift.Right:
                x_new, y_new = x_old, y_old + 1
        if action == Action.Right:
            if drift == Drift.Left:
                x_new, y_new = x_old, y_old + 1
            elif drift == Drift.Not:
                x_new, y_new = x_old + 1, y_old
            elif drift == Drift.Right:
                x_new, y_new = x_old, y_old - 1
        
        if (x_new, y_new) in self.__possible_states:
            new_state = (x_new, y_new)
        else:
            new_state = self.__state  # state does not change
            
        return new_state
      
    def step(self, action):
        old_state = self.__state
        self.__state = self.__calculate_transition(action)  # state after action
        observation = self.__state  # environment is fully observable
        done = self.is_done()
        reward = self.get_reward(self.__state)
        info = {}  # optional debug info
        return observation, done, reward, info

    def render(self):
        BACKGROUND = [
            '  ┌───────┬───────┬───────┬───────┐',
            '  │       │       │       │       │',
            '3 │       │       │       │  + 1  │',
            '  │       │       │       │       │',
            '  ├───────┼───────┼───────┼───────┤',
            '  │       │░░░░░░░│       │       │',
            '2 │       │░░░░░░░│       │  - 1  │',
            '  │       │░░░░░░░│       │       │',
            '  ├───────┼───────┼───────┼───────┤',
            '  │       │       │       │       │',
            '1 │       │       │       │       │',
            '  │       │       │       │       │',
            '  └───────┴───────┴───────┴───────┘',
            '      1       2       3       4'
        ]
        cell_width, cell_height, left_x, bottom_y = 8, 4, -2, 15
        rendering = copy(BACKGROUND)
        # insert '*' at the current state
        x, y = self.__state
        line_nr = bottom_y - cell_height * y
        char_nr = left_x + cell_width * x
        old_line = rendering[line_nr]
        rendering[line_nr] = old_line[:char_nr] + '*' + old_line[char_nr+1:]
        
        for line in rendering:
            print(line)
        print('reward per step:', self.__reward_per_step)    

    #=========================================================
    # public functions for agent to calculate optimal policy
    #=========================================================
    
    def get_possible_states(self):
        return self.__possible_states
    
    def is_done(self, state=None):
        if state is None:
            state = self.__state
        return (state == (4, 2) or state == (4, 3))
    
    def get_reward(self, state):
        # Reward R(s) for every possible state
        if state == (4, 3):
            return 1.0
        if state == (4, 2):
            return -1.0
        return self.__reward_per_step
        
    def get_transition_prob(self, action, new_state, old_state=None):
        if old_state is None:
            old_state = self.__state
        # returns the Transition Probability P(s'| s, a)
        # with s = old_state, a = action and s' = new_state

        # distinction in 4 cases
        
        # case 1: if state is a stop state -> Prob = 0.0
        if self.is_done(old_state):
            return 0.0
        
        # case 2: if new_state not possible -> Prob = 0.0
        # examples of impossible new_state: (2, 2), (1, 0)
        if new_state not in self.__possible_states:
            return 0.0

        # case 3: if new state != old_state and movement not blocked by a 'wall'
        x_old, y_old = old_state
        x_new, y_new = new_state
        if action == Action.Up:
            (dx_s, dy_s), (dx_l, dy_l), (dx_r, dy_r) = ( 0,  1), (-1,  0), ( 1,  0)
        elif action == Action.Down:
            (dx_s, dy_s), (dx_l, dy_l), (dx_r, dy_r) = ( 0, -1), ( 1,  0), (-1,  0)
        elif action == Action.Left:
            (dx_s, dy_s), (dx_l, dy_l), (dx_r, dy_r) = (-1,  0), ( 0, -1), ( 0,  1)
        else:  # action == Action.Right:
            (dx_s, dy_s), (dx_l, dy_l), (dx_r, dy_r) = ( 1,  0), ( 0,  1), ( 0, -1)

        (x_no_drift,    y_no_drift)    = (x_old + dx_s, y_old + dy_s)
        (x_drift_left,  y_drift_left)  = (x_old + dx_l, y_old + dy_l)
        (x_drift_right, y_drift_right) = (x_old + dx_r, y_old + dy_r)

        if (x_new, y_new) == (x_no_drift, y_no_drift):
            return 0.8
        if (x_new, y_new) == (x_drift_left, y_drift_left):
            return 0.1
        if (x_new, y_new) == (x_drift_right, y_drift_right):
            return 0.1
        if (x_new, y_new) != (x_old, y_old):
            return 0.0  # all other cases: Prob = 0.0 (e.g. states are no neighbors)
        
        # case 4: new_state = old_state and movement blocked by one or more 'walls'
        prob = 0.0
        if (x_no_drift, y_no_drift) not in self.__possible_states:
            prob += 0.8
        if (x_drift_left, y_drift_left) not in self.__possible_states:
            prob += 0.1
        if (x_drift_right, y_drift_right) not in self.__possible_states:
            prob += 0.1
        return prob


## Creation of an Environment

The Environment Class allows creation of an Environment with an initial state as parameter s = (1, 1).
Also, method reset() will set the state back to (1, 1)

In [None]:
# example of creation of an environment in the default state
mdp = SimpleMDPEnvironment(initial_state=(1, 1), reward_per_step=-0.04)
mdp.reset()
mdp.render()

## Action Space

We will only deal with environments with a finite number of discrete actions.

In that case the so-called Action Space (set of all possible actions) can be easily represented by an enum:

In [None]:
for nr, action in enumerate(Action, 1):
    print('action', nr, 'is', action)

## Transitions

Transitions can be done by calling method step(action).

Since the environment is stochastic, the new state may vary according to these rules:
 - there is a 80% probability that the action is carried out without 'drift'
 - there is a 20% probability that the movement has a 'drift to the left'  
 (drift to the left means: Left instead of Up, Up instead of Right, Right instead of Down or Down instead of Left)
 - there is a 20% probability that the movement has a 'drift to the right'  
 (drift to the right means: Right instead of Up, Down instead of Right, Left instead of Down or Up instead of Left)
If the new position would be outside the environment ('through a wall') the new state is identical to the old state

Here is an 'experiment' in which the action Up is repeated on an environment in state (3,1). All possible stats $s'$ with a transition probability T((3, 1), Up, $s'$) are reported with the observed relative frequencies: 

In [None]:
new_states = {}
mdp = SimpleMDPEnvironment(initial_state = (3, 1), reward_per_step=-0.04)
N = 100000
for _ in range(N):
    mdp.reset()
    new_state, done, reward, info = mdp.step(Action.Up)
    if new_state not in new_states:
        new_states[new_state] = 1
    else:
        new_states[new_state] = new_states[new_state] + 1
for new_state in new_states:
    print('(3, 1) -> Up ->', new_state, 'has probability:', new_states[new_state] / N)

The transition probability $P(s' \mid s, a)$ can also be returned directly via method get_transition_prob(action, new_state, old_state). This means that the agent has information about the environment model. N.B. this is not always the case in reinforcement learning.

In [None]:
# check with no walls blocking the movement
mdp = SimpleMDPEnvironment(initial_state = (3, 1), reward_per_step=-0.04)
for new_state in [(3,2), (2,1), (4,1), (1, 1), (3, 1)]:
    print('(3, 1) -> Up   ->', new_state, 'has probability:', mdp.get_transition_prob(Action.Up, new_state))

# check in the top left corner
mdp = SimpleMDPEnvironment(initial_state = (1, 3), reward_per_step=-0.04)
for new_state in [(1,2), (1,3), (2,3)]:
    print('(1, 3) -> Left ->', new_state, 'has probability:', mdp.get_transition_prob(Action.Left, new_state))

# 2. Random Agent

The policy function $\pi(s) \to a$ is the concrete implementation of the decision process of the agent (selection of an action $a$). In the cell below, you can see the effect of an agent with a random policy choosing an arbitrary action regardless of the new state.

In [None]:
def policy_random(state):
    # action is random choice from all actions in Action Space
    action = choice([a for a in Action])
    return action

# create a random environment
mdp = SimpleMDPEnvironment(initial_state=(1, 1), reward_per_step=-0.04)
state = mdp.reset()
print('initial state: {}'.format(state))

total_reward = 0.0
done = False
nr_steps = 0
while not done:
    next_action = policy_random(state)
    state, done, reward, info = mdp.step(next_action)
    total_reward += reward
    nr_steps += 1
    print('action: {}\tstate: {}, reward: {:5.2f}'
          .format(next_action, state, reward))
print('Episode done after {} steps. total reward: {:6.2f}'.format(nr_steps, total_reward))

If you run the cell above a number of times, you can observe two things:
- The environment always ends up in state (4, 2) or state (4, 3), this is by definition of the stop criterium (done). 
- The total reward will vary from run to run.

Each run from start state until stop state is called an episode.  
Let's assemble some statistics on the episodes of the random agent:

In [None]:
from statistics import mean, stdev

def run_one_episode(policy):
    mdp = SimpleMDPEnvironment(initial_state=(1, 1), reward_per_step=-0.04)
    state = mdp.reset()

    total_reward = 0.0
    done = False
    while not done:
        next_action = policy(state)
        state, done, reward, info = mdp.step(next_action)
        total_reward += reward
    return total_reward

def measure_performance(policy, nr_episodes=100):
    N = nr_episodes
    print('statistics over', N, 'episodes')
    all_rewards = []
    for _ in range(N):
        episode_reward = run_one_episode(policy)
        all_rewards.append(episode_reward)

    print('mean: {:6.2f}, sigma: {:6.2f}'.format(mean(all_rewards), stdev(all_rewards)))
    print()
    for n, episode_reward in enumerate(all_rewards[:5], 1):
        print('ep: {:2d}, total reward: {:5.2f}'.format(n, episode_reward))
    print('......')
    for n, episode_reward in enumerate(all_rewards[-5:], len(all_rewards)-5):
        print('ep: {:2d}, total reward: {:5.2f}'.format(n, episode_reward))

measure_performance(policy_random)  # in Python a function pointer is simply the name of the function

Running the code above multiple times gives an idea of the typical performance (total reward) of the random policy on this environment (start=(1,1), reward_per_step=-0.04). We get a consistent result if we average over enough episodes. 

Let's see what is the effect of a simple fixed policy: Always choose action Up.

Running the same statistics on function policy_up we see that the the total reward also varies between episodes, this is due to the stochastic nature of the environment. The same action does not always yield the same transition:

In [None]:
def policy_only_up(state):
    return Action.Up

measure_performance(policy_only_up, nr_episodes=5000)

# 3. Optimal decisions based on sums of rewards

Bellman showed in 1957 that the optimal policy $\pi^{*}(s)$ for an MDP is:

(1) $\pi^{*}(s) = \underset{a}{argmax} \space \sum_{s'} P(s' \mid s, a) [R(s, a, s') + \gamma \space U(s')]$,

provided that utility function U(s) satisfies Bellman's equation:

(2) $U(s) = \underset{a}{max} \space \sum_{s'} P(s' \mid s, a) [R(s, a, s') + \gamma \space U(s')]$.

One can show that Bellman's equation can always be solved and with a single solution.

It is useful to define the so-called Q-function:

(3) $Q(s, a) = \sum_{s'} P(s' \mid s, a) [R(s, a, s') + \gamma \space U(s')]$

Which simplifies equations (1) and (2) to:

(4) $\pi^{*}(s) = \underset{a}{argmax} \space Q(s, a)$  
and

(5) $U(s) = \underset{a}{max} \space Q(s, a)$

Thus, finding the optimal policy is reduced to solving Bellman's equation. There are several strategies for this.  

We will demonstrate Value Iteration which is based on the Bellman update:

(6) $U_{i+1}(s) = \underset{a}{max} \sum_{s'} P(s' \mid s, a) \space [ R(s, a, s') + \gamma \space U_i(s') ]$

Using equation (3) this simplifies to:

(7) $U_{i+1}(s) = \underset{a}{max} \space Q_i(s, a)$

One can prove that after enough iterations $U_{i+1}(s) \approx U(s)$, after which Bellman's equation is satisfied.  
Since there is only one solution to Bellman's equation, it does not matter with which $U_0(s)$ you start!

The algorithm below is Value Iteration with one simplification: $\gamma$ the so-called discount factor, is set to 1.

In [None]:
def Q_Value(mdp, s, a, U):
    Q = 0.0
    for s_p in mdp.get_possible_states():
        P = mdp.get_transition_prob(a, s_p, s)
        R = mdp.get_reward(s_p)
        Q += P * (R + U[s_p])
    return Q

def ValueIteration(mdp, error=0.00001):
    # from AIMA 4th edition without discount gamma 
    U = {}
    U_p = get_initial_U(mdp) # U_p = U'
    delta = float('inf')
    while delta > error:
        for s in mdp.get_possible_states():
            U[s] = U_p[s]
        print_U(U)  # to illustrate the iteration process
        delta = 0
        for s in mdp.get_possible_states():
            max_a = float('-inf')
            for a in Action:
                q = Q_Value(mdp, s, a, U) 
                if q > max_a:
                    max_a = q
            U_p[s] = max_a
            if abs(U_p[s] - U[s]) > delta:
                delta = abs(U_p[s] - U[s])
    return U

def get_initial_U(mdp):
    U = {}
    for s in mdp.get_possible_states():
        U[s] = mdp.get_reward(s)
    return U
        
def print_U(U):
    print('Utilities:')
    for y in range (3, 0, -1):
        for x in range(1, 5):
            s = (x, y)
            if s in U:
                print('   {}: {:8.4f}'.format(s, U[s]), end = '')
            else: # preserve alignment
                print('                   ', end = '')
        print()

def print_policy(pi):
    print('Policy:')
    for y in range (3, 0, -1):
        for x in range(1, 5):
            s = (x, y)
            if s in pi:
                print('   {}: {:12}'.format(s, pi[s]), end = '')
            else: # preserve alignment
                print(' '*23, end = '')
        print()

mdp = SimpleMDPEnvironment(initial_state=(1, 1), reward_per_step=-0.04)
U = ValueIteration(mdp)
print_U(U)

pi_star = {}
for s in mdp.get_possible_states():
    if mdp.is_done(s):
        continue # policy is not needed in stop states
    max_a = float('-inf')
    argmax_a = None
    for action in Action:
        q = Q_Value(mdp, s, action, U) 
        if q > max_a:
            max_a = q
            argmax_a = action
    pi_star[s] = argmax_a

print_policy(pi_star)


In [None]:
def optimal_policy(state):
    return pi_star[state]

measure_performance(optimal_policy, nr_episodes = 5000)
