In [40]:
#Imports
import gym
from gym import spaces
import numpy as np
import io
from IPython.core.debugger import set_trace
import sys

In [52]:
class env(gym.Env):

    metadata = {'render_modes': ['human', 'ansi']}
    
    def __init__(self, shape = [4,4]):
        if not isinstance(shape, (list, tuple)) or len(shape) < 2:
            raise ValueError('shape must be a list or a tuple with a minimum length of 2')
        
        self.shape = shape
        self.num_states = np.prod(shape)

        self.num_actions = 4  #gym.spaces.Discrete(4)  # states = 0, 1, 2, 3
        '''
            action map:
                UP    = 0
                RIGHT = 1
                DOWN  = 2
                LEFT  = 3
        '''
       
        MAX_Y = shape[0]
        MAX_X = shape[1]

        P = {} # empty dictionary for probabilities
        grid = np.arange(self.num_states).reshape(shape)

        itr = np.nditer(grid, flags=['multi_index'])

        while not itr.finished:
            state = itr.iterindex
            y, x = itr.multi_index

            '''
                The probability dictionary is supposed to contain the probability of action with respect to the current state, next state, reward, and if the episode is done

                So, as the equation, it looks like P[s][a] = (probability, next_state, rewards, is_done)

            '''
            P[state] = {actions: [] for actions in range (self.num_actions)}
            is_done = lambda s: s == 0 or s == (self.num_states-1)
            reward = 0.0 if is_done(state) else -1.0 

            #checking if the agent is stuck in a terminal state. i.e the endpoints . this is for grid world example

            if (is_done(state)):
                for i in range (self.num_actions) :
                    P[state][i] = [(1.0, state, reward, True)]  #Setting Probability for going in any direction
            else:
                next_state_up = state if (y == 0) else (state - MAX_X)
                next_state_down = state if (y == MAX_Y - 1) else (state + MAX_X)
                next_state_right = state if (x == MAX_X - 1) else (state + 1)
                next_state_left = state if (x == 0) else (state - 1)

                P[state][0] = [(1.0, next_state_up, reward, is_done(next_state_up))]
                P[state][1] = [(1.0, next_state_right, reward, is_done(next_state_right))]
                P[state][2] = [(1.0, next_state_down, reward, is_done(next_state_down))]
                P[state][3] = [(1.0, next_state_left, reward, is_done(next_state_left))]

            itr.iternext()
        
        #initial state distribution is supposed to be uniform
        init_state_ds = np.ones(self.num_states) / self.num_states 
        
        self.P = P 

        super(env, self).__init__()

        def render(self, mode:str = 'human', close:bool = False):
            if(close):
                return
            
            outfile = io.StringIO() if mode == 'ansi' else sys.stdout

            grid = np.arange(self.num_states).reshape(self.shape)

            itr = np.nditer(grid, flags = ['multi_index'])
            while not itr.finished:
                s = itr.iterindex
                y,x = itr.multi_index

                if(self.s == s):
                    output = "x" #current position/state
                elif(s == 0 or s == (self.num_states - 1)):
                    output = "T"
                else:
                    output = "o"

                if(x == 0):
                    output = output.lstrip()
                
                if (x == self.shape[1] - 1):
                    output = output.rstrip()

                outfile.write(output)
                if(x == self.shape[1] -1 ):
                    outfile.write("\n")

                itr.iternext()

In [53]:
Env = env()


In [54]:
def eval_policy(policy, env, discount_factor = 1.0, theta = 0.00001):

    V = np.zeros(env.num_states)

    while True:
        delta = 0
        for s in range(env.num_states):
            v = 0
            for action, action_prob in enumerate(policy[s]):
                for prob, next_state, reward, done in env.P[s][action]:
                    v += action_prob * prob * (reward + discount_factor*V[next_state])

            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
            
            if delta < theta:
                break

        return np.array(V)

In [61]:

random_policy = np.ones([Env.num_states, Env.num_actions])/Env.num_actions
v = eval_policy(random_policy, Env)