In [1]:
# Dynamic programming for stochastic planning

In [2]:
from abc import ABC, abstractmethod
import random
import sys

In [3]:
# Python way to say that BaseEnvironment is an abstract class:
# BaseEnvironment inherits from ABC
class BaseEnvironment(ABC):
    # Python way to describe an abstract method
    # Override thim method to return the set of states
    @abstractmethod
    def get_states(self):
        pass
    
    # Override this method to return the set of actions from a state
    @abstractmethod
    def get_actions(self):
        pass
    
    # Override this method to return all the actions
    @abstractmethod
    def get_all_actions(self):
        pass
    
    # Override this method to implement p(s'|s, a)
    # The environment is meant to be used for stochastic planning
    # so I need to know the state transition probabilities
    @abstractmethod
    def p(self,surrent_state, action, next_state):
        pass
    
    # Override this method to implement r(s, a, s')
    @abstractmethod
    def r(self, current_state, action, next_state):
        pass
    

Let's introduce a concrete Environment implementing a GridWorld:
<ul>
    <li> 4x4 grid </li>
    <li> the agent can move up and down, left and right (no diagonal moves)
</li>
    <li> transitions are deterministic </li>
    <li> cell(3,3) is the goal </li>
    <li> reward is -1 for moving and 10 for reaching the target </li>

In [4]:
class GridWorld(BaseEnvironment):
    
    # Define actions as pairs implementing movements
    UP = (-1,0) # going up means you subtract one from the current row
    DOWN = (1,0)
    RIGHT = (0, 1) # going right means you add one to the current column
    LEFT = (0,-1)
    
    # Define the startign position and the goal
    START=(0,0)
    GOAL=(3,3)
    
    # A dictionary giving labels to actions
    label = {(-1,0) : "UP", (1,0) : "DOWN", (0,1) : "RIGHT", (0,-1) : "LEFT"}
    
    def __init__(self):
        self.states = list()
        # It is a bad coding practice to put "magic constants" in code
        # We create the set of states as a list of pairs:
        # (0,0) (0,1), ..., (1,0), (1,1), ..., (3,2), (3,3)
        for ro in range(4):
            for col in range(4):
                self.states.append((ro,col))
                
        # The set of actions is a dictionary state-actions
        self.actions = dict()
        # I wxplicitly list the mapping
        self.actions[(0,0)] = [GridWorld.RIGHT, GridWorld.DOWN]
        self.actions[(0,1)] = [GridWorld.RIGHT, GridWorld.LEFT, GridWorld.DOWN]
        self.actions[(0,2)] = [GridWorld.RIGHT, GridWorld.LEFT, GridWorld.DOWN]
        self.actions[(0,3)] = [GridWorld.LEFT, GridWorld.DOWN]
        self.actions[(1,0)] = [GridWorld.RIGHT, GridWorld.DOWN, GridWorld.UP]
        self.actions[(1,1)] = [GridWorld.RIGHT, GridWorld.LEFT, GridWorld.DOWN, GridWorld.UP]
        self.actions[(1,2)] = [GridWorld.RIGHT, GridWorld.DOWN, GridWorld.UP]
        self.actions[(1,3)] = [GridWorld.LEFT, GridWorld.DOWN, GridWorld.UP]
        self.actions[(2,0)] = [GridWorld.RIGHT, GridWorld.DOWN, GridWorld.UP]
        self.actions[(2,1)] = [GridWorld.RIGHT, GridWorld.LEFT, GridWorld.DOWN, GridWorld.UP]
        self.actions[(2,2)] = [GridWorld.RIGHT, GridWorld.LEFT,GridWorld.DOWN, GridWorld.UP]
        self.actions[(2,3)] = [GridWorld.LEFT, GridWorld.DOWN, GridWorld.UP]
        self.actions[(3,0)] = [GridWorld.RIGHT, GridWorld.UP]
        self.actions[(3,1)] = [GridWorld.RIGHT, GridWorld.LEFT, GridWorld.UP]
        self.actions[(3,2)] = [GridWorld.RIGHT, GridWorld.LEFT, GridWorld.UP]
        self.actions[(3,3)] = [GridWorld.LEFT, GridWorld.UP, GridWorld.GOAL]
        self.all_actions = [GridWorld.START, GridWorld.LEFT, GridWorld.RIGHT, GridWorld.DOWN, GridWorld.UP]
    
    # Implementation of abstract method get_states
    def get_states(self):
        return self.states
    
        # Implementation of abstract method get_actions
    def get_actions(self, state):
        return self.actions[state]
    
        # Implementation of abstract method get_all_actions
    def get_all_actions(self):
        return self.all_actions
    
    # Introducing an "helper" method to compute a transition:
    # returns the next state given the current one and an action
    # E.g., if current state is (0,0) and action is (0,1) (action RIGHT) 
    # the next state will be (0+0, 0+1) = (0,1)
    def transition(self, current_state, action):
        return (current_state[0]+action[0], current_state[1]+action[1])
    
    # Implementation of p
    def p(self, current_state, action, next_state):
        # If the action is not executable p(s,a,s')=0
        if action not in self.get_actions(current_state):
            return 0
        # Which state do I get if I execute the action?
        new_state = self.transition(current_state, action)
        # The problem is deterministic
        if (new_state==next_state):
            return 1
        else: 
            return 0
        
        # Implementation of r
    def r(self, current_state, action, next_state):
        # Check whether next_state is reachable from the current one
        if (self.p(current_state, action, next_state) > 0):
            if (next_state != GridWorld.GOAL):
                # The action is executable and the next state is not the GOAL
                return -1
            else:
                # the action is executable and the next state is the GOAL
                return 10
        else:
            # The action is not executable: the robot woll not attempt this, 
            # but is added for sake of completeness
            return 0

In [5]:
# Test the code

In [6]:
gw = GridWorld()
gw.get_all_actions()

[(0, 0), (0, -1), (0, 1), (1, 0), (-1, 0)]

In [7]:
gw.get_all_actions()

[(0, 0), (0, -1), (0, 1), (1, 0), (-1, 0)]

In [8]:
gw.get_actions((0,0))

[(0, 1), (1, 0)]

In [9]:
# What is the probability of ending in (0,1) if in (0,0) and going RIGHT?
gw.p((0,0), GridWorld.RIGHT, (0,1))

1

In [10]:
#what is the roward for taht?
gw.r((0,0),GridWorld.RIGHT, (0,1))

-1

In [11]:
#introduce some code to handle policies : a policy can be either applied as it is or updated

In [12]:
class BasePolicy(ABC):
    # returns the action to apply in the state
    
    @abstractmethod
    def apply(self, state):
        pass
    # change the action to apply in the state
    @abstractmethod
    def update(self,state,action):
        pass
    

In [13]:
class CustomPolicy(BasePolicy):
    #environment should be an instance of BaseEnvironment
    def __init__(self, environment):
        #keep trace of the environment internally
        self.environment=environment
        #map states to actions: initially no mapping
        self.state_action_table=dict()
        for states in environment.get_states():
            self.state_action_table[states]= None
    def apply(self,state):
        if self.state_action_table[state]==None:
        #if no action is specified for that state, choose at random
            actions=self.environment.get_actions(state)
            return random.choice(actions)
    #if an action is available, rteturtn that one
        return self.state_action_table[state]
    def update(self, state, action):
        self.state_action_table[state]= action

In [14]:
#let's custom policy
policy=CustomPolicy(gw)

In [15]:
action=policy.apply((0,0))
GridWorld.label[action]

'DOWN'

In [16]:
class PolicyIteration:
    
    #gamma is the discount factor
    # theta is the threshold for the convergence in policy evaluation
    def __init__(self,environment, gamma, theta):
        self.environment = environment
        self.gamma = gamma
        self.theta = theta
        #store the current policy (in the end should be the optimal one)
        self.policy = CustomPolicy(environment)
        #store the value of a state a k.th step
        self.old_value = dict()
        #store the value of a state at k+1.th step
        self.new_value = dict()
        # initialize the value of each state to 0
        for state in environment.get_states():
            self.old_value[state] = 0

    def copy_values(self, from_value, to_value):
        for state in self.environment.get_states():
            to_value[state]=from_value[state]

    def evaluate_policy(self):
        #repeat until delta<theta (slides)
        while(True):
            delta=0
            for state in self.environment.get_states():
                #the policy is deterministic!!
                #otherwise need another for loop for all the possible policy actions
                action=self.policy.apply(state)
                self.new_value[state]=0
                #check all potential next states
                for next_state in self.environment.get_states():
                    self.new_value[state]+=self.environment.p(state, action, next_state)*\
                    (self.environment.r(state,action,next_state)+self.gamma*self.old_value[next_state])
                delta=max(delta,abs(self.old_value[state]-self.new_value[state]))
            #copy the values into old values
            self.copy_values(self.new_value,self.old_value)
            if delta<self.theta:
                return
    
    #return true if policy is improved and false otherwise
    def policy_improvement(self):
        policy_improved=False
        for state in self.environment.get_states():
            old_action=self.policy.apply(state)
            #old action is the action suggested by the current policy
            #we must compute the action yielding the max return from the state
            max_value_of_action=-1*sys.float_info.max #smallest float
            best_action=old_action
            for action in self.environment.get_actions(state):
                value_of_action=0
            for next_state in self.environment.get_states():
                value_of_action+=self.environment.p(state, action, next_state)*\
                    (self.environment.r(state,action,next_state)+self.gamma*self.old_value[next_state])
            if value_of_action>max_value_of_action:
                max_value_of_action=value_of_action
                best_action=action
        #when i'm done checking, i know the best
        if old_action!=best_action:
            self.policy.update(state,best_action)
            policy_improved=True
        return policy_improved
    #compute policy iteration
    def apply(self):
        while True:
            self.evaluate_policy()
            if not self.policy_improvement():
                return 

In [17]:
gw=GridWorld()
#initialize policy iteration function object with gamma=0.8 and delta=10^-5
policy_iteration=PolicyIteration(gw,0.8,1/100000)

In [18]:
policy_iteration.apply()

In [19]:
#print out the value function (should be the optimal one)
for ro in range(4):
    for co in range(4):
        print(policy_iteration.old_value[(ro,co)],end=' ')
    print()
    
#controlla gli errori

-4.999999951148707 -4.999999999999973 -4.999999951148707 -4.999999999999973 
-4.999999999999973 -4.999999951148707 -4.999999999999973 -4.999999951148707 
-4.999999951148707 -4.999999999999973 -4.999999951148707 -4.999999999999973 
-4.999999999999973 -4.999999951148707 -4.999999999999973 0.0 


In [20]:
#print out the value function (should be the optimal one)
for ro in range(4):
    for co in range(4):
        print(GridWorld.label[policy_iteration.policy.apply((ro,co))],end=' ')
    print()
    #????

RIGHT DOWN LEFT DOWN 
RIGHT LEFT UP UP 
RIGHT UP DOWN DOWN 
UP LEFT LEFT LEFT 


In [21]:
class SlipperyGridWorld(GridWorld):
    
    #gamma is the discount factor
    # theta is the threshold for the convergence in policy evaluation
    def __init__(self):
        super(SlipperyGridWorld, self).__init__()
    def p(self, current_state,action,next_state):
        if action not in self.get_actions(current_state):
            return 0
        new_state=self.transition(current_state,action)
        if (new_state==next_state):
            return 0.9
        elif (new_state==current_state):
            return 0.1
        else:
            return 0
        

In [22]:
sgw=SlipperyGridWorld()

In [23]:
pi=PolicyIteration(sgw,0.8,1/100000)

In [24]:
pi.apply()

In [25]:
for ro in range(4):
    for co in range(4):
        print(pi.old_value[(ro,co)],end=' ')
    print()

-3.214285714285695 -3.214283981168631 -3.214285714285695 -3.214283981168631 
-3.214283981168631 -3.214285714285695 -3.214283981168631 -3.214285714285695 
-3.214285714285695 -3.214283981168631 -3.214285714285695 -3.214283981168631 
-3.214283981168631 -3.214285714285695 -3.214283981168631 0.0 


In [26]:
for ro in range(4):
    for co in range(4):
        print(GridWorld.label[pi.policy.apply((ro,co))],end=' ')
    print()

DOWN RIGHT DOWN LEFT 
UP UP UP LEFT 
UP RIGHT UP LEFT 
UP UP UP 

KeyError: (3, 3)

In [None]:
# VALUE ITERATION ALGORITHM

In [None]:
#base attributes and the copy function could be factored out in a base class for both policies
class ValueIteration:
    
    #gamma is the discount factor
    # theta is the threshold for the convergence in policy evaluation
    def __init__(self,environment, gamma, theta):
        self.environment = environment
        self.gamma = gamma
        self.theta = theta
        #store the current policy (in the end should be the optimal one)
        self.policy = CustomPolicy(environment)
        #store the value of a state a k.th step
        self.old_value = dict()
        #store the value of a state at k+1.th step
        self.new_value = dict()
        # initialize the value of each state to 0
        for state in environment.get_states():
            self.old_value[state] = 0

    def copy_values(self, from_value, to_value):
        for state in self.environment.get_states():
            to_value[state]=from_value[state]

    def compute_values(self):
        #repeat until delta<theta (slides)
        while(True):
            delta=0
            for state in self.environment.get_states():
                #compute new value of the state
                max_value=-1*sys.float_info.max #smallest float
                for action in self.environment.get_actions(state):
                    value=0
                for next_state in self.environment.get_states():
                    value+=self.environment.p(state, action, next_state)*\
                        (self.environment.r(state,action,next_state)+self.gamma*self.old_value[next_state])
                if value>max_value:
                    max_value=value
                    
                #the policy is deterministic!!
                #otherwise need another for loop for all the possible policy actions
                
                self.new_value[state]=max_value
                delta=max(delta,abs(self.old_value[state]-self.new_value[state]))
            #copy the values into old values
            self.copy_values(self.new_value,self.old_value)
            if delta<self.theta:
                return
    
    #compute policy
    def compute_policy(self):
        for state in self.environment.get_states():
            # we must compute the action yielding the max return state
            max_value_of_action=-1*sys.float_info.max #smallest float
            best_action=None
            for action in self.environment.get_actions(state):
                value_of_action=0
            for next_state in self.environment.get_states():
                value_of_action+=self.environment.p(state, action, next_state)*\
                    (self.environment.r(state,action,next_state)+self.gamma*self.old_value[next_state])
            if value_of_action>max_value_of_action:
                max_value_of_action=value_of_action
                best_action=action
        #when i'm done checking, i know the best
        self.policy.update(state,best_action)
        
    #compute policy according to value iteration
    def apply(self):
        self.compute_values()
        self.compute_policy()

In [None]:
vi=ValueIteration(gw,0.8,1/100000)

In [None]:
vi.apply()

In [None]:
for ro in range(4):
    for co in range(4):
        print(GridWorld.label[vi.policy.apply((ro,co))],end=' ')
    print()

In [None]:
vis=ValueIteration(sgw,0.8,1/100000)

In [None]:
vis.apply()