# Simple GridWorld Implementation

## Setting up the environment

In [1]:
# importing packages
import numpy as np

# setting the environment using the GridWorld Class
class GridWorld:
    
    """
    Grid environment with following stochastic property:
    | Agent Action | Possible Actions  |  Probability  |
    | :----------: | :---------------: | :-----------: |
    |      UP      |  UP, RIGHT, LEFT  | 0.8, 0.1, 0.1 |
    |     DOWN     | DOWN, RIGHT, LEFT | 0.8, 0.1, 0.1 |
    |     LEFT     |  LEFT, UP, DOWN   | 0.8, 0.1, 0.1 |
    |    RIGHT     |  RIGHT, UP, DOWN  | 0.8, 0.1, 0.1 |
    
    """

    # listing out all the possible actions...
    POSSIBLE_ACTIONS = ['U', 'D', 'L', 'R']

    
    def __init__(self, size, rewards, actions):
        
        """
        Initialize the GridWorld object
        
        Parameters
        ----------
        size : tuple (row,col)
        
        rewards: dict {(row,col):int}
            A dictionary with reward values for each state in the grid
            
        actions: dict ({row,col}:list)
            A dictionary that associates all possible actions for each state
            
        """

        self.height, self.width = size

        self.rewards = rewards
        self.actions = actions

        self.num_states = np.prod(size)
        self.num_actions = len(GridWorld.POSSIBLE_ACTIONS)

        
    
    def _limit_coordinates(self, state):
        
        """
        Limits the coordinates if/after collision with grid wall
        
        Parameters
        ----------
        state : tuple (row, col)
        
        Returns
        -------
        state: tuple(row, col)
        
        """

        # pretty straightforward...
        i, j = state
        
        if i < 0:
            i = 0
            
        elif i > self.height - 1:
            i = self.height - 1
            
        if j < 0:
            j = 0
            
        elif j > self.width - 1:
            j = self.width - 1
            
        return (i, j)

    
    def _new_state_reward(self, action, state):
        
        """
        Returns the coordinates of a resultant state and its rewards
        
        Parameters
        ----------
        action: char
            The character representing the action taken
            
        state: tuple (row, col)
        
        Returns
        -------
        state: tuple (row, col)
            The new state reached by taking the action
            
        reward: int
            The reward for the state
            
        """

        # again, we are updating the agent's location...
        i, j = state
        
        if action == 'U':
            i, j = i - 1, j
            
        elif action == 'D':
            i, j = i + 1, j
            
        elif action == 'R':
            i, j = i, j + 1
            
        elif action == 'L':
            i, j = i, j - 1

        # make sure the new state is not out of grid
        new_state = self._limit_coordinates((i, j))

        
        return new_state, self.rewards.get(new_state)
    

    def transition(self, action, state, choose=False):
        
        """
        The stochastic transition model of the grid
        
        Parameters
        ----------
        action : char
            The character representing the action taken
            
        state : tuple (row, col)
            The current state from where transition is occuring
            
        choose : boolean
            If True: environment takes stochastic action and returns resultant
                     state and reward
                     
            If False: environment returns a list of all possible actions with
                      corresponding reward and probabilties
                      
        Returns
        -------
        If choose == True
        
            state : tuple (row, col)
            
            reward: int
            
        If choose == Flase
            A list with following tuple:
            
                prob: float
                    Probabilty with which environment selects the transition
                    
                state: tuple (row, col)
                
                reward: int
                
        """

        # as the blueprint above suggests, we create the function body...
        def stochastic_transition(possible_actions, prob):
            
            if not choose:
                # create and return a list of all possible actions
                result = []
                
                for i, a in enumerate(possible_actions):
                    coord, reward = self._new_state_reward(a, state)
                    result.append((prob[i], coord, reward))
                    
                return result
            
            else:
                # choose a random action with given probabilities
                a = np.random.choice(possible_actions, 1, p = prob)
                coord, reward = self._new_state_reward(a, state)
                
                return coord, reward

        
        # we already have the state transition probabilities, they are initialised in the next function
        # going up the grif
        if action == 'U':
            return stochastic_transition(['U', 'R', 'L'], [0.8, 0.1, 0.1])
        
        # going down the grid
        elif action == 'D':
            return stochastic_transition(['D', 'R', 'L'], [0.8, 0.1, 0.1])
        
        # going right
        elif action == 'R':
            return stochastic_transition(['R', 'U', 'D'], [0.8, 0.1, 0.1])
        
        # going left
        elif action == 'L':
            return stochastic_transition(['L', 'U', 'D'], [0.8, 0.1, 0.1])


        
def grid():
    """Utility function, returns 4x4 GridWorld object with rewards and actions
    """

    # dict with rewards for states of the grid
    rewards = {
        (0, 0): -1, (0, 1): -1, (0, 2): -1, (0, 3): -1,
        (1, 0): -1, (1, 1): -1, (1, 2): -1, (1, 3): -1,  # start state is 1x0
        (2, 0): -1, (2, 1): -70, (2, 2): -1, (2, 3): -1,  # bad state is 2x1
        (3, 0): -1, (3, 1): -1, (3, 2): -1, (3, 3): 100  # goal state is 3x3
    }

    # dict with actions allowed for the grid states
    actions = {
        (0, 0): ['R', 'D'], (0, 1): ['R', 'L', 'D'],
        (0, 2): ['R', 'L', 'D'], (0, 3): ['L', 'D'],
        (1, 0): ['R', 'U', 'D'], (1, 1): ['R', 'L', 'U', 'D'],
        (1, 2): ['R', 'L', 'U', 'D'], (1, 3): ['L', 'U', 'D'],
        (2, 0): ['R', 'U', 'D'], (2, 1): ['R', 'L', 'U', 'D'],
        (2, 2): ['R', 'L', 'U', 'D'], (2, 3): ['L', 'U', 'D'],
        (3, 0): ['R', 'U'], (3, 1): ['R', 'L', 'U'],
        (3, 2): ['R', 'L', 'U'], (3, 3): []
    }

    return GridWorld(size=(4, 4), rewards=rewards, actions=actions)


def print_grid(env, content_dict):
    
    """ 
    Utility function that prints the grid environment with given content
    
    Parameters
    ---------
    env: GridWorld
    
    content_dict: dict {(row,col):object}
    
    """
    # getting even coordinates in the grid..
    grid = np.arange(env.num_states, dtype=object).reshape(env.height, env.width)
    
    # content_dict maps the coordinate to the content(policy, value function, etc)...
    for coord, content in content_dict.items():
        grid[coord[0], coord[1]] = content
    
    # printing out the np matrix
    print(grid)


def play_game(env, start, end, policy):
    
    """
    Utility function that follows given policy from start to end and
    returns a list of action, state and reward for each step
    
    Parameters
    ----------
    env : GridWorld
    
    start : (row, col)
    
    end : (row, col)
    
    policy : list of optimal actions for each state
        Policy list in the stats array returned by the algorithm
        
    Returns
    -------
    steps: list of [action,state,reward]
    
    """
    steps = []
    state = start
    
    while state != end:
        # get 1D index from state tuple
        state_idx = state[0] * env.height + state[1]
        action = policy[state_idx]
        
        # make a transition with choose=True
        new_state, reward = env.transition(action, state, choose=True)
        steps.append([action, list(state), reward])
        state = new_state
        
    # append the goal state for visualization
    steps.append(['G', list(end), 0])
    
    return steps

## Value Iteration

In [2]:
def value_iteration(env, start, stop, discount_factor=0.99, threshold=0.001):
    
    """
    Performs Value Iteartion algorithm on given environment
    
    Parameters
    ----------
    env : GridWorld
        The GridWorld environment object
        
    start : (row, col)
        The start state of the grid
        
    stop : (row, col)
        The end state of the grid
        
    discount_factor : float
        Factor that represents care for future rewards
        
    threshold: float
        Value that represents cutoff for iterations
        
    Returns
    -------
    policy: dict {(row, col): char}
        The optimal policy dict with optimal action for each state
        
    V: dict {(row, col): float}
        The optimal value dict with optimal values for each state
        
    stats: list of {policy:list,score:list}
        The policy and V values for each iteration, used for visualization
        
    """

    # improving the value function
    def calculate_v(V, state, actions):
        
        """ 
        V[s] = max[a]{ sum[s',r] { p(s',r|s,a)[r + gamma*V[s']] } }
        
        """
        new_V = {'U': 0, 'D': 0, 'L': 0, 'R': 0}

        for action in actions:
            
            # get all possible transitions list
            possible_transitions = env.transition(action, state)
            
            for prob, next_state, reward in possible_transitions:
                
                new_V[action] += prob * \
                    (reward + discount_factor * V[next_state])

        # key with max value
        best_a = max(new_V, key=new_V.get)
        
        # max value in the dict
        best_V = new_V[best_a]

        return best_V, best_a

    # getting all the states possible
    all_states = set(env.rewards.keys())
    
    # Initialize V to 0
    V = {}
    for state in all_states:
        V[state] = 0

        
    episode = 0
    stats = []

    # run until convergence
    while True:

        stats.append({})
        
        # delta is the improvement on the value function...
        delta = 0

        # iterating over all the states...
        for state in all_states:
            
            # getting the best output from value function
            best_V, _ = calculate_v(V, state, env.actions[state])
            delta = max(delta, np.abs(best_V - V[state]))
            V[state] = best_V

        # collect stats for visuaization
        policy_list, v_list = [], []
        
        # iterate on sorted states as policy and v are a list with index representing the states
        for state in sorted(all_states):
            
            best_v, best_a = calculate_v(V, state, env.actions[state])
            
            policy_list.append(best_a)
            
            v_list.append(best_v)

        # stats for this iteration
        stats[episode] = {'policy': policy_list, 'score': v_list}

        # we wanna calculate the number of episodes needed for convergence(optimal value function)
        episode += 1

        # check if converged
        if delta < threshold:
            break

    # get optimal policy
    
    policy = {}
    
    # we wanna calculate the best policy by putting in the best action for each state...(optimal policy)
    for state in all_states:
        _, best_a = calculate_v(V, state, env.actions[state])
        policy[state] = best_a
    
    return policy, V, stats

## Policy Evaluation

In [3]:
# im using the monte-carlo definition of Value Function... 

def policy_evaluation(env, policy, stop = (3,3), discount_factor = 0.99):
    
    """
    Performs policy evaluation on a given policy
    
    Parameters
    ----------
    env : GridWorld
        The GridWorld environment object
        
    start : (row, col)
        The start state of the grid
        
    stop : (row, col)
        The end state of the grid
        
    discount_factor : float
        Factor that represents care for future rewards
        
    policy: dict
        The action state relation that we wanna evaluate
        
    Returns
    -------
    V: dict(float)
        The value for each state using the policy.
    """
    # getting all the states(state space)
    all_states = set(env.rewards.keys())
    
    # Initialize V to 0
    V = {}
    for state in all_states:
        V[state] = 0
        
        
    for state in sorted(all_states):
        
        # we create a dummy in order to preserve state after while
        state_dummy = state
        
        # the exponent on the discount factor
        i = 0
        
        while state_dummy != stop:
            
            # getting the reward for the next state
            state_new, reward = env._new_state_reward(policy[state_dummy], state_dummy)
            
            # calculating the value function
            V[state] += reward * (discount_factor ** i) 
            
            # incrementing the state
            state_dummy = state_new
            
            i += 1
    
    # return the value function dict
    return V

## Q learning

In [12]:
# listing out all the actions
all_actions = ['U', 'D', 'L', 'R']

# the greedy nature of policy iteration
def _epsilon_greedy(action, epsilon):
    
    """
    Returns action according to epsilon greedy exploration scheme
    """
    # getting a random array of 0.0 to 1.0
    p = np.random.random()
    
    # condition for generating a random action
    if p < (1 - epsilon):
        return action
    
    else:
        return np.random.choice(all_actions, 1)[0]



def q_learning(env, num_episodes, epsilon, alpha, start = (0,0), stop = (3,3), discount_factor=0.99):
    
    """Performs q learning algorithm on given environment
    Parameters
    ----------
    env : GridWorld
        The GridWorld environment object
        
    num_episodes: int
        The number of episodes to run
        
    epsilon: float
        Epsilon value for exploration
        
    alpha: float
        Learning rate
        
    start : (row, col)
        The start state of the grid
        
    stop : (row, col)
        The end state of the grid
        
    discount_factor : float
        Factor that represents care for future rewards
        
    Returns
    -------
    policy: dict {(row, col): char}
        The optimal policy dict with optimal action for each state
        
    V: dict {(row, col): float}
        The optimal value dict with optimal values for each state
        
    stats: list of {policy:list,score:list}
        The policy and V values for each iteration, used for visualization
    """

    # getting all possible states...
    all_states = set(env.rewards.keys())

    # q is the action value function...
    # initialize Q[s][a]
    Q = {}
    
    for state in all_states:
        Q[state] = {}
        
        for action in all_actions:
            Q[state][action] = 0
            

            
    stats = []
    
    # iterating over the episodes
    for episode in range(num_episodes):
        
        # starting from (0,0)
        state = start
        stats.append({})
        
        # until the stop state is reached
        while state != stop:
            
            # taking best action for current state and use epsilon greedy...
            action = max(Q[state], key=Q[state].get)
            action = _epsilon_greedy(action, epsilon)

            # transitioning to the next state and maximising the action..
            new_state, reward = env.transition(action, state, choose=True)
            best_next_action = max(Q[new_state], key=Q[new_state].get)

            # calculating the temporal difference target
            td_target = reward + discount_factor * \
                Q[new_state][best_next_action]
            
            # updating the Q-value
            Q[state][action] += alpha * (td_target - Q[state][action])

            # going to the next state
            state = new_state

        # collecting stats
        policy_list, q_list = [], []
        
        # iterate on sorted states as policy and v are a list with index representing the states
        for state in sorted(all_states):
            
            # again, getting the best action
            best_a = max(Q[state], key=Q[state].get)
            
            # getting the best q value for that state
            best_q = Q[state][best_a]
            
            # putting the best action into the policy list(optimal policy)
            if state != stop:
                policy_list.append(best_a)
                
            # goal state reached!
            else:
                policy_list.append('G')
                
             # the best q value is appended   
            q_list.append(best_q)

        # add steps according to current policy to visualize agent's actions
        stats[episode] = {
            'policy': policy_list,
            'score': q_list,
            'steps': play_game(
                env,
                start,
                stop,
                policy_list)}

    # optimal policy and Value function
    policy, V, = {}, {}
    
    # iterating over all states
    for state in all_states:
        
        # getting the best action
        best_a = max(Q[state], key=Q[state].get)
        
        # again storing the best q value
        best_q = Q[state][best_a]
        
        # putting the best action into the optimal policy
        policy[state] = best_a if state != stop else 'G'
        
        # putting in the best action-value function
        V[state] = best_q

    return policy, V, stats

## Evaluating our Agent/Algorithms

In [13]:
# creating the grid object...
env = grid()

In [14]:
# applying value iteration and getting the return objects...
policy_val_iter, V_val_iter, stats_val_iter = value_iteration(env,start=(1,0),stop=(3,3))

In [15]:
# printing out the optimal policy obtained from value iteration
print_grid(env, policy_val_iter)

[['R' 'R' 'R' 'D']
 ['U' 'U' 'R' 'D']
 ['D' 'R' 'R' 'D']
 ['R' 'R' 'R' 'U']]


In [16]:
# printing out the optimal value function obtained from value iteration
print_grid(env, V_val_iter)

[[86.36413640238297 88.96553043345659 91.58232574832843 93.69969985383423]
 [84.37867721159373 87.1196669488651 93.99527972321908 96.4100771203664]
 [76.335435930807 92.97504454302948 96.68547553197288 99.19185503857933]
 [85.1940856411934 88.63980510086662 99.19185503857932 0]]


In [17]:
# applying policy evaluation on the policy obtained from value iteration
value_function = policy_evaluation(env, policy_val_iter)

In [18]:
# printing out the value function obtained
print_grid(env, value_function)

[[90.19800998 92.119202 94.0598 96.02]
 [88.2960298802 90.19800998 96.02 98.0]
 [94.0598 96.02 98.0 100.0]
 [96.02 98.0 100.0 0]]


In [20]:
# applying q-learning and obtaining the optimal policy and value function...
num_episodes = 500
epsilon = 0.05
alpha = 0.5

policy_q_learn, V_q_learn, stats_q_learn = q_learning(env, num_episodes, epsilon, alpha)

In [21]:
# printing out the optimal policy obtained from policy iteration
print_grid(env, policy_q_learn)

[['R' 'R' 'R' 'D']
 ['R' 'U' 'D' 'D']
 ['L' 'L' 'R' 'D']
 ['R' 'D' 'R' 'G']]


In [22]:
# printing out the optimal value function obtained from policy iteration
print_grid(env, V_q_learn)

[[83.16472463096417 87.53554182030925 90.54961017031259 92.44684233253933]
 [80.02526838810203 85.124420271248 90.42659757010276 90.64016647273385]
 [65.90783251886765 13.131151083349256 96.31536439872912
  95.28971566668275]
 [81.0674489698741 92.04150327557775 99.45606707874634 0]]


In [None]:
# applying policy evaluation on the policy obtained from q learning
value_function = policy_evaluation(env, policy_q_learn)