In [1]:
# Most of this code is adapted from https://github.com/berkeleydeeprlcourse/homework/

#The following function generates a sample from a probability distribution. You may choose to ignore the logic. Just see how to use it.

import sys
import random
import numpy as np
def weighted_choice(v, p):
   total = sum(p)
   r = random.uniform(0, total)
   upto = 0
   for c, w in zip(v,p):
      if upto + w >= r:
         return c
      upto += w
   assert False, "Shouldn't get here"

In [2]:
#Using the above function to sample from a distribution

#6-faced die with equal probabilities
Sample_Space=[1,2,3,4,5,6]
Prob_Values=[1/6, 1/6, 1/6, 1/6, 1/6, 1/6]

#Generating a sample
weighted_choice(Sample_Space, Prob_Values)

5

In [3]:
#Creating a class for MDP environment definition that takes Transition Probability Matrix p(s'|s,a) and reward E[R_{t+1} | s,a,s'] as inputs

class MDP:
    def __init__(self, transition_probs, rewards, initial_state=None):
        """
        Defines an MDP. Compatible with gym Env.
        :param transition_probs: transition_probs[s][a][s_next] = P(s_next | s, a)
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> prob]
            For each state and action, probabilities of next states should sum to 1
            If a state has no actions available, it is considered terminal
        :param rewards: rewards[s][a][s_next] = r(s,a,s')
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> reward]
            The reward for anything not mentioned here is zero.
        :param get_initial_state: a state where agent starts or a callable() -> state
            By default, picks initial state at random.

        States and actions can be anything you can use as dict keys, but we recommend that you use strings or integers

        Here's an example from MDP depicted on http://bit.ly/2jrNHNr
        transition_probs = {
              's0':{
                'a0': {'s0': 0.5, 's2': 0.5},
                'a1': {'s2': 1}
              },
              's1':{
                'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
                'a1': {'s1': 0.95, 's2': 0.05}
              },
              's2':{
                'a0': {'s0': 0.4, 's1': 0.6},
                'a1': {'s0': 0.3, 's1': 0.3, 's2':0.4}
              }
            }
        rewards = {
            's1': {'a0': {'s0': +5}},
            's2': {'a1': {'s0': -1}}
        }
        """
        self._check_param_consistency(transition_probs, rewards)
        self._transition_probs = transition_probs
        self._rewards = rewards
        self._initial_state = initial_state
        self.n_states = len(transition_probs)
        self.reset()

    def get_all_states(self):
        """ return a tuple of all possible states """
        return tuple(self._transition_probs.keys())

    def get_possible_actions(self, state):
        """ return a tuple of possible actions in a given state """
        return tuple(self._transition_probs.get(state, {}).keys())

    def is_terminal(self, state):
        """ return True if state is terminal or False if it isn't """
        return len(self.get_possible_actions(state)) == 0

    def get_next_states(self, state, action):
        """ return a dictionary of {next_state1 : P(next_state1 | state, action), next_state2: ...} """
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._transition_probs[state][action]

    def get_transition_prob(self, state, action, next_state):
        """ return P(next_state | state, action) """
        return self.get_next_states(state, action).get(next_state, 0.0)

    def get_reward(self, state, action, next_state):
        """ return the reward you get for taking action in state and landing on next_state"""
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._rewards.get(state, {}).get(action, {}).get(next_state, -0.01)

    def reset(self):
        """ reset the game, return the initial state"""
        if self._initial_state is None:
            self._current_state = random.choice(tuple(self._transition_probs.keys()))
        elif self._initial_state in self._transition_probs:
            self._current_state = self._initial_state
        elif callable(self._initial_state):
            self._current_state = self._initial_state()
        else:
            raise ValueError("initial state %s should be either a state or a function() -> state" % self._initial_state)
        return self._current_state

    def step(self, action):
        """ take action, return next_state, reward, is_done, empty_info """
        possible_states, probs = zip(*self.get_next_states(self._current_state, action).items())
        next_state = weighted_choice(possible_states, p=probs)
        reward = self.get_reward(self._current_state, action, next_state)
        is_done = self.is_terminal(next_state)
        self._current_state = next_state
        return next_state, reward, is_done, {}

    def render(self):
        print("Currently at %s" % self._current_state)

    def _check_param_consistency(self, transition_probs, rewards):
        for state in transition_probs:
            assert isinstance(transition_probs[state], dict), "transition_probs for %s should be a dictionary " \
                                                              "but is instead %s" % (
                                                              state, type(transition_probs[state]))
            for action in transition_probs[state]:
                assert isinstance(transition_probs[state][action], dict), "transition_probs for %s, %s should be a " \
                                                                          "a dictionary but is instead %s" % (
                                                                              state, action,
                                                                              type(transition_probs[state, action]))
                next_state_probs = transition_probs[state][action]
                assert len(next_state_probs) != 0, "from state %s action %s leads to no next states" % (state, action)
                sum_probs = sum(next_state_probs.values())
                assert abs(sum_probs - 1) <= 1e-10, "next state probabilities for state %s action %s " \
                                                    "add up to %f (should be 1)" % (state, action, sum_probs)
        for state in rewards:
            assert isinstance(rewards[state], dict), "rewards for %s should be a dictionary " \
                                                     "but is instead %s" % (state, type(transition_probs[state]))
            for action in rewards[state]:
                assert isinstance(rewards[state][action], dict), "rewards for %s, %s should be a " \
                                                                 "a dictionary but is instead %s" % (
                                                                 state, action, type(transition_probs[state, action]))
        msg = "The Enrichment Center once again reminds you that Android Hell is a real place where" \
              " you will be sent at the first sign of defiance. "
        assert None not in transition_probs, "please do not use None as a state identifier. " + msg
        assert None not in rewards, "please do not use None as an action identifier. " + msg

In [4]:
#Creating a MDP shown in this Figure https://upload.wikimedia.org/wikipedia/commons/2/21/Markov_Decision_Process_example.png

transition_probs = {'s0':{'a0': {'s0': 0.5, 's2': 0.5},'a1': {'s2': 1}},
                    's1':{'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},'a1': {'s1': 0.95, 's2': 0.05}},
                    's2':{'a0': {'s0': 0.4, 's1': 0.6},'a1': {'s0': 0.3, 's1': 0.3, 's2':0.4}}
                   }
rewards = {'s1': {'a0': {'s0': +5}},'s2': {'a1': {'s0': -1}}}

In [5]:
#Intialize an environment
env=MDP(transition_probs, rewards)

In [6]:
#To know the current state
env.render()

Currently at s1


In [7]:
#To take an action and observe the next state, reward, 'whether terminal state reached or not', any other description.
env.step('a0')

('s0', 5, False, {})

## Assignment 2

In [8]:
#Creating MDP given in Assignment 2

transition_probability = {
    's0':{'up':{'s0':0.9, 's1':0.1}, 'down':{'s4':0.8, 's0':0.1, 's1':0.1}, 'left':{'s0':0.9, 's4':0.1}, 'right':{'s1':0.8, 's0':0.1, 's4':0.1}},
    's1':{'up':{'s1':0.8, 's0':0.1, 's2':0.1}, 'down':{'s1':0.8, 's0':0.1, 's2':0.1}, 'left':{'s0':0.8, 's1':0.2}, 'right':{'s2':0.8, 's1':0.2}},
    's2':{'up':{'s2':0.8, 's1':0.1, 's3':0.1}, 'down':{'s6':0.8, 's1':0.1, 's3':0.1}, 'left':{'s1':0.8, 's2':0.1, 's6':0.1}, 'right':{'s3':0.8, 's2':0.1, 's6':0.1}},
    's4':{'up':{'s0':0.8, 's4':0.2}, 'down':{'s8':0.8, 's4':0.2}, 'left':{'s4':0.8, 's0':0.1, 's8':0.1}, 'right':{'s4':0.8, 's0':0.1, 's8':0.1}},
    's6':{'up':{'s2':0.8, 's6':0.1, 's7':0.1}, 'down':{'s10':0.8, 's6':0.1, 's7':0.1}, 'left':{'s6':0.8, 's2':0.1, 's10':0.1}, 'right':{'s7':0.8, 's2':0.1, 's10':0.1}},
    's8':{'up':{'s4':0.8, 's8':0.1, 's9':0.1}, 'down':{'s8':0.9, 's9':0.1}, 'left':{'s8':0.9, 's4':0.1}, 'right':{'s9':0.8, 's8':0.1, 's4':0.1}},
    's9':{'up':{'s9':0.8, 's8':0.1, 's10':0.1}, 'down':{'s9':0.8, 's8':0.1, 's10':0.1}, 'left':{'s8':0.8, 's9':0.2}, 'right':{'s10':0.8, 's9':0.2}},
    's10':{'up':{'s6':0.8, 's9':0.1, 's11':0.1}, 'down':{'s10':0.8, 's9':0.1, 's11':0.1}, 'left':{'s9':0.8, 's10':0.1, 's6':0.1}, 'right':{'s11':0.8, 's10':0.1, 's6':0.1}},
    's11':{'up':{'s7':0.8, 's11':0.1, 's10':0.1}, 'down':{'s11':0.9, 's10':0.1}, 'left':{'s10':0.8, 's11':0.1, 's7':0.1}, 'right':{'s11':0.9, 's7':0.1}}
}
reward = {'s2':{'right':{'s3':+1}, 'down':{'s3':+1}, 'up':{'s3':+1}}, 's6':{'right':{'s7':-1}, 'up':{'s7':-1}, 'down':{'s7':-1}}, 's11':{'right':{'s7':-1}, 'up':{'s7':-1}, 'left':{'s7':-1}}}


In [9]:
env = MDP(transition_probability, reward)

In [10]:
env.get_next_states('s2','left')

{'s1': 0.8, 's2': 0.1, 's6': 0.1}

In [11]:
env.is_terminal('s4')

False

In [12]:
env.get_transition_prob('s2','left','s3')

0.0

In [13]:
env.get_reward('s6','right','s2')

-0.01

In [14]:
env.get_next_states('s1','left')

{'s0': 0.8, 's1': 0.2}

In [15]:
env.reset()

's10'

In [16]:
env.render()

Currently at s10


In [17]:
env.step('right')

('s11', -0.01, False, {})


##### The states are considered to be in this form:

[s0            s1.             s2.              s3]

[s4.           s5.             s6.              s7]

[s8.           s9.             s10              s11]

In [18]:
delta = 0.00001 #threshold
gamma = 1 #Discount factor

### Value Iteration

In [19]:
states = ['s0','s1','s2','s4','s6','s8','s9','s10','s11']
action = ['up','down','left','right']
Vs = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0} #state values
Vs_new = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0}
diff_dict = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0} #diff values

policy = {'s0':'NA','s1':'NA','s2':'NA','s3':'NA','s4':'NA','s5':'NA','s6':'NA','s7':'NA','s8':'NA','s9':'NA','s10':'NA'}
action_val = {'up':0,'down':0,'left':0,'right':0}

for t in range(100):
    Vs_new = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0} #state values
    m=0
    for s in states:
        for a in action:
            nxtst = env.get_next_states(s,a) #next states
            for s_ in nxtst:
                action_val[a] += env.get_transition_prob(s,a,s_)*(env.get_reward(s,a,s_)+gamma*Vs[s_])
        Vs_new[s] = action_val[max(zip(action_val.values(), action_val.keys()))[1]]
        policy[s] = max(zip(action_val.values(), action_val.keys()))[1]
        action_val = {'up':0,'down':0,'left':0,'right':0}

    for key in Vs.keys():
        diff_dict[key] = abs(Vs_new[key] - Vs.get(key, 0))
    m = max(diff_dict.values())
    
    if m < delta:
        print("No. of iterations to converge = ",t,"\n")
        break
    Vs = Vs_new
    
print("State Values: \n",Vs_new,"\n")
print("State policy: \n",policy)
        

No. of iterations to converge =  90 

State Values: 
 {'s0': 0.9597240654337827, 's1': 0.9737866045711732, 's2': 0.9862866319024044, 's3': 0, 's4': 0.9472240244231713, 's5': 0, 's6': 0.8965798684530988, 's7': 0, 's8': 0.9331614656973752, 's9': 0.920661404160804, 's10': 0.9068744893947835, 's11': 0.8067925049710254} 

State policy: 
 {'s0': 'right', 's1': 'right', 's2': 'right', 's3': 'NA', 's4': 'up', 's5': 'NA', 's6': 'left', 's7': 'NA', 's8': 'up', 's9': 'left', 's10': 'left', 's11': 'down'}


### Policy Iteration 

In [26]:
states = ['s0','s1','s2','s4','s6','s8','s9','s10','s11']
action = ['up','down','left','right']
Vs = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0} #state values
Vs_new = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0}
action_val = {'up':0,'down':0,'left':0,'right':0}
policy = {'s0':'right','s1':'right','s2':'right','s3':'NA','s4':'right','s5':'NA','s6':'right','s7':'NA','s8':'right','s9':'right','s10':'right','s11':'right'}
policy_new = {'s0':'right','s1':'right','s2':'right','s3':'NA','s4':'right','s5':'NA','s6':'right','s7':'NA','s8':'right','s9':'right','s10':'right','s11':'right'}
diff_dict = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0} #diff values
action_val1 = 0

for t in range(100):
    value_converge = False
    flag = 0
    
    #Policy Evaluation
    for pe in range(100):
        if value_converge == False:
            Vs_new = {'s0':0,'s1':0,'s2':0,'s3':0,'s4':0,'s5':0,'s6':0,'s7':0,'s8':0,'s9':0,'s10':0,'s11':0}
            m = 0
            for s in states:
                a = policy[s]
                nxtst = env.get_next_states(s,policy[s]) #next states
                for s_ in nxtst:
                    action_val1 += env.get_transition_prob(s,a,s_)*(env.get_reward(s,a,s_)+gamma*Vs[s_])
                Vs_new[s] = action_val1
                action_val1 = 0
            for key in Vs.keys():
                diff_dict[key] = abs(Vs_new[key] - Vs[key])
            m = max(diff_dict.values())
            
            if m < delta:
                value_converge = True
            Vs = Vs_new

    #Policy Improvement
    for s in states:
        for a in action:
            nxtst = env.get_next_states(s,a) #next states
            for s_ in nxtst:
                action_val[a] += env.get_transition_prob(s,a,s_)*(env.get_reward(s,a,s_)+gamma*Vs[s_])
        policy_new[s] = max(zip(action_val.values(), action_val.keys()))[1]
        action_val = {'up':0,'down':0,'left':0,'right':0}
        
    policy = policy_new

print("State values: \n",Vs_new,"\n")
print("Policy: \n",policy_new,"\n")

State values: 
 {'s0': 0.9597242647056706, 's1': 0.9737867647057122, 's2': 0.9862867647057413, 's3': 0, 's4': 0.9472242647056272, 's5': 0, 's6': 0.8965808823518637, 's7': 0, 's8': 0.9331617647055648, 's9': 0.9206617647054994, 's10': 0.9068749999994575, 's11': 0.8068749965191249} 

Policy: 
 {'s0': 'right', 's1': 'right', 's2': 'right', 's3': 'NA', 's4': 'up', 's5': 'NA', 's6': 'left', 's7': 'NA', 's8': 'up', 's9': 'left', 's10': 'left', 's11': 'down'} 



#### 
Both Value iteration and Policy iteration are converging to same state values and policies. Policy iteration in general converges faster.

##### Name: Prajeeth Babu Kodru
##### Roll no: 22104071