In [10]:
import numpy as np

In [26]:
class GridWorldEnv():
    
    def __init__(self, gridsize=4, startState='00', terminalStates=['33'], ditches=['12'],
                ditchPenalty=-10, turnPenalty=-1, winReward=100, mode='prod'):
        
        self.mode=mode
        self.gridSize=min(gridsize, 9)
        self.create_stateSpace()
        self.actionSpace = [0, 1, 2, 3]
        self.actionDict = {0: 'UP', 1:'DOWN', 2:'LEFT', 3:'RIGHT'}
        self.startState = startState
        self.terminalStates = terminalStates
        self.ditches = ditches
        self.winReward = winReward
        self.ditchPenalty = ditchPenalty
        self.turnPenalty = turnPenalty
        self.stateCount = self.get_stateSpace_len()
        self.actionCount = self.get_actionSpace_len()
        self.stateDict = {k: v for k, v in zip(self.stateSpace, range(self.stateCount))}
        self.currentState = self.startState
        
        if self.mode == 'debug':
            print("State Space", self.stateSpace)
            print("State Dict", self.stateDict)
            print("Action Space", self.actionSpace)
            print("Action Dict", self.actionDict)
            print("Start State", self.startState)
            print("Terminal States", self.terminalStates)
            print("Ditches", self.ditches)
            print("WinReward:{}, TurnPenalty:{}, DitchPenalty:{}".format(self.winReward, self.turnPenalty, self.ditchPenalty))
        
    def create_stateSpace(self):
        
        self.stateSpace = []
        for row in range(self.gridSize):
            for col in range(self.gridSize):
                self.stateSpace.append(str(row)+ str(col))
                
    def set_mode(self, mode):
        self.mode = mode
    
    def get_stateSpace(self):
        return self.stateSpace
    
    def get_actionSpace(self):
        return self.actionSpace
    
    def get_actionDict(self):
        return self.actionDict
    
    def get_stateSpace_len(self):
        return len(self.stateSpace)
    
    def get_actionSpace_len(self):
        return len(self.actionSpace)
    
    def next_state(self, current_state, action):
        s_row = int(current_state[0])
        s_col = int(current_state[1])
        next_row = s_row
        next_col = s_col
        
        if action == 0: next_row = max(0, s_row - 1)
        if action == 1: next_row = min(self.gridSize-1, s_row+1)
        if action == 2: next_col = max(0, s_col - 1)
        if action == 3: next_col = min(self.gridSize - 1, s_col + 1)
            
        new_state = str(next_row) + str(next_col)
        if new_state in self.stateSpace:
            if new_state in self.terminalStates: self.isGameEnd = True
            if self.mode == 'debug':
                print("CurrentState:{}, Action:{}, NextState:{}".format(current_state, action, new_state))
            return new_state
        else:
            return current_state
    
    def compute_reward(self, state):
        
        reward = 0
        reward += self.turnPenalty
        if state in self.ditches: reward += self.ditchPenalty
        if state in self.terminalStates: reward += self.winReward
        return reward
    
    def reset(self):
        
        self.isGameEnd = False
        self.totalAccumulatedReward = 0
        self.totalTurns = 0
        self.currentState = self.startState
        return self.currentState
    
    def step(self, action):
        
        if self.isGameEnd:
            raise("Game is Over Exception")
        if action not in self.actionSpace:
            raise("Invalid Action Exception")
        self.currentState = self.next_state(self.currentState, action)
        obs = self.currentState
        reward = self.compute_reward(obs)
        done = self.isGameEnd
        if self.mode=='debug':
            print("Obs:{}, Reward:{}, Done:{}, TotalTurns:{}".format(obs, reward, done, self.totalTurns))
        return obs, reward, done, self.totalTurns
    
    
    
            

In [27]:
class ValueIteration:
    
    def __init__(self, env=GridWorldEnv(), discountingFactor = 0.9, convergenceThreshold=1e-4, iterationThreshold=30,
                 mode='prod'):
        self.env = env
        self.gamma = discountingFactor
        self.th = convergenceThreshold
        self.maxIter = iterationThreshold
        self.stateCount = self.env.get_stateSpace_len()
        self.actionCount = self.env.get_actionSpace_len()
        self.uniformActionProbability = 1.0/self.actionCount
        self.stateDict = self.env.stateDict
        self.actionDict = self.env.actionDict
        self.mode = mode
        self.V = np.zeros(self.stateCount)
        self.Q = [np.zeros(self.actionCount) for s in range(self.stateCount)]
        self.Policy = np.zeros(self.stateCount)
        self.totalReward = 0
        self.totalStep = 0
        
    def reset_episode(self):
        
        self.totalReward = 0
        self.totalSteps = 0
        
    def iterate_value(self):
        
        self.V = np.zeros(self.stateCount)
        for i in range(self.maxIter):
            last_V = np.copy(self.V)
            for state_index in range(self.stateCount):
                current_state = self.env.stateSpace[state_index]
                for action in self.env.actionSpace:
                    next_state = self.env.next_state(current_state, action)
                    reward = self.env.compute_reward(next_state)
                    next_state_index = self.env.stateDict[next_state]
                    self.Q[state_index][action] = reward + self.gamma*last_V[next_state_index]
                    
                if self.mode == 'debug':
                    print("Q(s={}):{}".format(current_state, self.Q[state_index]))
                self.V[state_index] = max(self.Q[state_index])
                if np.sum(np.fabs(last_V - self.V)) <= self.th:
                    print("Convergence Achieved in {}th iteration. Breaking V_Iteration loop!".format(i))
                    break
    
    def extract_optimal_policy(self):
        
        self.Policy = np.argmax(self.Q, axis=1)
        if self.mode=='debug':
            print("Optimal Policy:", self.Policy)
            
    def run_episode(self):
        
        self.reset_episode()
        obs = self.env.reset()
        while True:
            action = self.Policy[self.env.stateDict[obs]]
            new_obs, reward, done, _ = self.env.step(action)
            if self.mode == 'debug':
                print("PrevObs:{}, Obs:{}, Reward:{}, Done:{}".format(obs, new_obs, reward, done))
            self.totalReward += reward
            self.totalSteps += 1
            if done:
                break
            else:
                obs = new_obs
        return self.totalReward
    
    def evaluate_policy(self, n_episodes=100):
        
        episode_scores = []
        if self.mode =='debug':print("Running {} episodes!".format(n_episodes))
        for e, episode in enumerate(range(n_episodes)):
            score = self.run_episode()
            episode_scores.append(score)
            if self.mode=='debug': print("Score in {} episode = {}".format(e, score))
        return np.mean(episode_scores)
    
    def solve_mdp(self, n_episode=100):
        
        if self.mode=='debug':
            print("Iteration Values..")
        self.iterate_value()
        if self.mode=='debug':
            print("Extracting Optimal Policy..")
        self.extract_optimal_policy()
        if self.mode == 'debug':
            print("Scoring Policy..")
        return self.evaluate_policy(n_episode)
    
        

In [28]:
env = GridWorldEnv(mode='prod')

In [29]:
valueIteration = ValueIteration(env, mode='prod')

In [30]:
print("Policy Evaluation Score = ", valueIteration.solve_mdp())

Policy Evaluation Score =  94.0
