<a href="https://colab.research.google.com/github/ITU-Business-Analytics-Team/Business_Analytics_for_Professionals/blob/main/Part%20I%20%3A%20Methods%20%26%20Technologies%20for%20Business%20Analytics/Chapter%203%3A%20Prediction%20Modelling/3_9_Reinforcement_Learning_Policy_Iteration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Prediction Modelling: Machine Learning**
## **Reinforcement Learning**
Policy Iteration

### **Problem Definition**
The well-known maze problem is used to demonstrate the use of Policy Iteration  algorithm. There is a ***n x n*** grid, which allows players to move in four directions: north, south, east, and west. Certain cells contain obtacles. The agent begins in one cell and works its way to the target cell. The objective is to determine the optimal sequence of motions that will result in the agent reaching the goal cell with the greatest reward.
The Q-learning technique is employed in this case to solve this problem. The following steps in building the algorithm are as follows:
1.   Define the environment as a class of objects in which agents operate.
2. Define the agents as a class of objects.
3. Finally, we trained our agent how to behave in the described environment  to achieve its purpose.

In [1]:
import numpy as np
import operator
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#First define gridmaze environment as a class
class Environment():
  def __init__(self, height, width, blocked_cell, start_cell, end_cell, current_location):
    #Define the dimensions of grid and cell features
    self.height = height
    self.width = width
    self.blocked_cell = blocked_cell
    self.end_cell = end_cell
    self.start_cell = start_cell
    self.current_location = current_location
    self.state_space=[]
    self.create_state_space()

    #Define the immediate reward of each cell if the agent reach moves that cell
    self.cell_reward = np.zeros((self.height, self.width))-1 #When agent moves from one cell to another, it gets -1 reward
    self.cell_reward[self.end_cell[0], self.end_cell[1]] = 10 #if the agent reaches the end cell, gets 10 reward
    self.cell_reward[self.blocked_cell[0], self.blocked_cell[1]] = -10 #if the agent tries to move the blocked cell, it gets -10 reward

    self.actions = ['Left', 'Right', 'Up','Down'] # There are 4 possible actions of the agent.

  def get_actions(self):
      return self.actions
  def create_state_space(self):    
      for i in range(self.height):
        for j in range(self.width):
          if (i,j)!=self.blocked_cell:
            self.state_space.append((i,j))    

  def return_reward(self, next_location):
     return self.cell_reward[next_location[0], next_location[1]]

  def check_end_cell(self):
      if self.current_location == self.end_cell:
        return True
  def check_blocked_cell(self):
      if self.current_location == self.blocked_cell:
        return True    
  def reset(self):
    self.current_location = self.start_cell
    
  def step(self, action):
        """Directs the agent forward. If agent is at the border of or adjacent to a blocked cell, the agent's position is unaffected; however, 
        the agent takes a negative reward. Reward is returned by function."""
              
        last_location = self.current_location
        
        # UP
        if action == 'Up':
            # If agent is at the top, stay still, collect reward
            if last_location[0] == 0:
                reward = self.return_reward(last_location)
            elif (last_location[0]-1, last_location[1]) == self.blocked_cell:
               reward = self.return_reward(last_location)
            else:
                self.current_location = ( self.current_location[0] - 1, self.current_location[1])
                reward = self.return_reward(self.current_location)
        
        # DOWN
        elif action == 'Down':
            # If agent is at bottom, stay still, collect reward
            if last_location[0] == self.height - 1:
                reward = self.return_reward(last_location)
            elif (last_location[0]+1, last_location[1]) == self.blocked_cell:
                reward = self.return_reward(last_location)
            else:
                self.current_location = ( self.current_location[0] + 1, self.current_location[1])
                reward = self.return_reward(self.current_location)
            
        # LEFT
        elif action == 'Left':
            # If agent is at the left, stay still, collect reward
            if last_location[1] == 0:
                reward = self.return_reward(last_location)
            elif (last_location[0], last_location[1]-1) == self.blocked_cell:
                reward = self.return_reward(last_location)
            else:
                self.current_location = ( self.current_location[0], self.current_location[1] - 1)
                reward = self.return_reward(self.current_location)

        # RIGHT
        elif action == 'Right':
            # If agent is at the right, stay still, collect reward
            if last_location[1] == self.width - 1:
                reward = self.return_reward(last_location)
            elif (last_location[0], last_location[1]+1) == self.blocked_cell:
                reward = self.return_reward(last_location)
            else:
                self.current_location = ( self.current_location[0], self.current_location[1] + 1)
                reward = self.return_reward(self.current_location)
                
        return reward

In [3]:
environment = Environment(height=4, width=4, blocked_cell= (2,2), start_cell= (0,0), end_cell = (3,3), current_location = (0,0))

In [4]:
### define transition probabilities and rewards
transition_probs = {}
rewards = {}
for s in environment.state_space:
  for a in environment.actions:
    environment.current_location=s
    if not environment.check_end_cell():
      if not environment.check_blocked_cell():
        Reward = environment.step(a)
        s2=environment.current_location
        transition_probs[(s, a, s2)] = 1 #Note that the process is deterministic hence the probabilities are 1
        rewards[(s, a)]=Reward

In [5]:
#Generate a random policy
policy={}
for s in environment.state_space:
  environment.current_location=s
  if not environment.check_end_cell():
    if not environment.check_blocked_cell():
      action = environment.actions[np.random.randint(0,len(environment.actions)-1)]
      policy[s]=action

In [6]:
policy

{(0, 0): 'Up',
 (0, 1): 'Left',
 (0, 2): 'Right',
 (0, 3): 'Up',
 (1, 0): 'Left',
 (1, 1): 'Up',
 (1, 2): 'Left',
 (1, 3): 'Right',
 (2, 0): 'Right',
 (2, 1): 'Left',
 (2, 3): 'Right',
 (3, 0): 'Up',
 (3, 1): 'Right',
 (3, 2): 'Right'}

In [7]:
#Evaluates and computes the value of a given policy
def policy_evaluation(policy):
    # initialize V(s) = 0
    V = {}
    for s in environment.state_space:
        V[s] = 0
    SMALL_ENOUGH = 1e-3 # threshold for convergence
    gamma = 0.9 # discount factor
    # repeat until convergence
    it = 0
    while True:
        biggest_change = 0
        for s in environment.state_space:
            environment.current_location=s
            if environment.check_end_cell() or environment.check_blocked_cell():
                V[s]=0
            else:
                old_v=V[s]
                new_v = 0 # we will accumulate the answer
                for a in environment.actions:
                    for s2 in environment.state_space:
                    # action probability is deterministic
                        if policy.get(s)==a:
                            action_prob=1
                        else:
                            action_prob=0
                    # reward is a function of (s, a)
                        r = rewards.get((s, a),0)
                        new_v += action_prob * transition_probs.get((s, a, s2),0) * (r + gamma * V[s2])
                # after done getting the new value, update the value table
                V[s] = new_v
                biggest_change = max(biggest_change, np.abs(old_v - V[s]))
        print("iter:", it, "biggest_change:", biggest_change)
        it += 1
        if biggest_change < SMALL_ENOUGH:
            break
    print("\n\n")
    return(V)

In [8]:
#Policy Iteration Algorithm
while True:
    gamma = 0.9
    # policy evaluation step - we already know how to do this!
    V = policy_evaluation(policy)
    # policy improvement step
    is_policy_converged = True
    for s in environment.state_space:
        environment.current_location=s
        if not environment.check_end_cell():
            old_a = policy[s]
            new_a = None
            best_value = float('-inf')
            # loop through all possible actions to find the best current action
            for a in environment.actions:
                v = 0
                for s2 in environment.state_space:
                # reward is a function of (s, a), 0 if not specified
                    r = rewards.get((s, a), 0)
                    v += transition_probs.get((s, a, s2), 0) * (r + gamma * V[s2])
                if v > best_value:
                    best_value = v
                    new_a = a
                  # new_a now represents the best action in this state
                    policy[s] = new_a
            if new_a != old_a:
                is_policy_converged = False
    if is_policy_converged:
        break

iter: 0 biggest_change: 10.0
iter: 1 biggest_change: 9.0
iter: 2 biggest_change: 1.3851000000000004
iter: 3 biggest_change: 1.121931
iter: 4 biggest_change: 0.9087641099999999
iter: 5 biggest_change: 0.7360989291000006
iter: 6 biggest_change: 0.5962401325709994
iter: 7 biggest_change: 0.48295450738250967
iter: 8 biggest_change: 0.43046720999999977
iter: 9 biggest_change: 0.38742048900000015
iter: 10 biggest_change: 0.3486784401000005
iter: 11 biggest_change: 0.3138105960899997
iter: 12 biggest_change: 0.28242953648099967
iter: 13 biggest_change: 0.25418658283289997
iter: 14 biggest_change: 0.2287679245496097
iter: 15 biggest_change: 0.20589113209465015
iter: 16 biggest_change: 0.18530201888518505
iter: 17 biggest_change: 0.16677181699666477
iter: 18 biggest_change: 0.1500946352969983
iter: 19 biggest_change: 0.13508517176729917
iter: 20 biggest_change: 0.12157665459056943
iter: 21 biggest_change: 0.10941898913151249
iter: 22 biggest_change: 0.0984770902183616
iter: 23 biggest_change: 0

In [9]:
#Function to print values on a grid
def print_values(V, rows,columns):
    for i in range(rows):
        print("---------------------------")
        for j in range(columns):
            v = V.get((i,j), 0)
            if v >= 0:
                print(" %.2f|" % v, end="")
            else:
                print("%.2f|" % v, end="")
        print("")

In [10]:
print_values(V, 4,4)

---------------------------
 1.81| 3.12| 4.58| 6.20|
---------------------------
 3.12| 4.58| 6.20| 8.00|
---------------------------
 4.58| 6.20| 0.00| 10.00|
---------------------------
 6.20| 8.00| 10.00| 0.00|


In [11]:
#Function to print policy on a grid
def print_policy(policy,rows,columns):
    for i in range(rows):
        print("---------------------------------------")
        for j in range(columns):
              a = policy.get((i,j), ' ')
              print("  %s  |" % a, end="")
        print("")

**Optimal Policy**

In [12]:
print_policy(policy,4,4)

---------------------------------------
  Right  |  Right  |  Right  |  Down  |
---------------------------------------
  Right  |  Right  |  Right  |  Down  |
---------------------------------------
  Right  |  Down  |     |  Down  |
---------------------------------------
  Right  |  Right  |  Right  |     |
