In [1]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from collections import defaultdict
from copy import deepcopy

In [2]:
def log_arguments(fn): # note that function as input
    def new_function(*args,**kwargs): # we've seen these arguments before
        print ('positional arguments:')
        print (args)
        print ('keyword arguments:')
        print (kwargs)
        return fn(*args,**kwargs) # return a function
    return new_function

In [3]:
class FrozenLake:
    '''
    A class written to solve Question 1 in HW2, ECE276C at UCSD
    For more on frozenlake, check out - https://github.com/openai/gym/blob/master/gym/envs/toy_text/frozen_lake.py
    
    The action have the following meanings:
        LEFT = 0
        DOWN = 1
        RIGHT = 2
        UP = 3
    '''
    def __init__(self):
        '''
        Initialize the frozen-lake environment
        '''
        self.env = gym.make ("FrozenLake-v0")
        self.numStates = self.env.observation_space.n
        self.numActions = self.env.action_space.n
        self.dt = 1
        
        self.initial_state = self.env.reset() #reset env and return initial state
        print('Environment map:')
        self.printMap()

    def printMap(self):
        print(self.env.desc)
    
    def generateRollout(self, policy=None, maxT = 100, initial_state = 0):
        '''
        A rollout is a series of (state, action) pairs which goes on until time maxT or we reach a terminal state.
        A terminal state is reached when done = True in the following statement:
        
        >>> obs,r,done = env.step(action)
        '''
        assert isinstance(maxT, (int, float)), 'maxT needs to be int or float'
        assert isinstance(initial_state, int)
        assert initial_state in range(self.numStates)
        
        states = [initial_state]
        actions = []
        
        t = 0
        while(t < maxT):
            if policy == None:
                a = self.env.action_space.sample()
            else:
                a = policy(self, states[-1])
            # Take a step using action a
            next_state, r, done, info = env.step(a)
            # Save next state and the action that led to it
            states.append(next_state)
            actions.append(a)
            if done:
                break
            t = t + self.dt

        assert len(states) - len(actions) == 1, 'Number of actions should be 1 less than number of states'
        rollout = {
            'state': states,
            'action': actions,
        }
        
        return rollout
    
#     @log_arguments
    def TestPolicy(self, policy, num_trials=100, timeLimit = False, maxT=5000):
        '''
        Returns the average rate of successful episodes over 100 trials for a deterministic policy
        '''
        assert isinstance(policy, np.ndarray) and len(policy) == self.numStates
        assert isinstance(num_trials, int) and num_trials>0
        assert isinstance(timeLimit, bool)
        assert isinstance(maxT, int) and maxT>0

        
        success_count = 0
        for i in range(num_trials):
            t = 0
            state = int(self.env.reset()) #resetting state to initial position
            while(True):
                a = policy[state] #getting action from policy
                next_state, r, done, info = self.env.step(a) #taking a step using action
#                 print('next_state, r = ', next_state, r)
                # Check if we reached goal, i.e. check for success
                if (done and r == 1.0):
                    success_count += 1
                    break
                
                # Checking if we fell in a hole
                if done:
                    break
                
                if timeLimit and t>maxT:
                    print('Max time exceeded. Breaking out of loop')
                    break
                
                state = next_state
                t += self.dt
        return success_count/num_trials

    def LearnModel(self, num_samples = 100000):
        '''
        Returns transition probabilities and reward function
        
        p(s'|a, s) is accessed by typing p[s][a][s']
        r(s,a,s') is accessed by typing r[s][a][s']
        '''
        assert isinstance(num_samples, int) and num_samples > 0
        
        self.env.reset()
        p = np.zeros((self.numStates, self.numActions, self.numStates))
        r = np.zeros((self.numStates, self.numActions, self.numStates))
        counter = np.zeros((self.numStates, self.numActions))
        self.count_s = np.zeros(self.numStates)
        self.count_a = np.zeros(self.numActions)
        
        for i in range(num_samples):
            s = np.random.randint(low = 0, high = self.numStates, dtype = int)
            a = np.random.randint(low = 0, high = self.numActions, dtype = int)
            self.count_s[s] += 1; self.count_a[a] += 1
            
            self.env.unwrapped.s = s #setting current state to randomly chosen state
            s_prime, reward, _, _ = self.env.step(a)
            
            p[s][a][s_prime] += 1
            r[s][a][s_prime] += reward
            counter[s][a] += 1
        
        #use itertools instead
        for s in range(self.numStates):
            for a in range(self.numActions):
                assert counter[s][a] != 0, 'Zero occurences of state-action pair. Cannot divide by 0'
                p[s][a][:] = p[s][a][:]/counter[s][a]
                r[s][a][:] = r[s][a][:]/counter[s][a]
        
        # Checking that probabilities sum to 1        
        for s in range(self.numStates):
            for a in range(self.numActions):
                assert abs(sum(p[s,a,:])-1.0) < 1e-4, 'Probabilities dont sum to 1 --> %f' % sum(p[s,a,:])
        
        return p, r, counter
    
    def initializeValueAndPolicyFunction(self, initial_value = 0):
        '''
        Initializes Value and Policy function and returns them
        All initial values for Value function are set to param initial_value
        Initial values for Policy function are chosen randomly between [0, number of actions)
        '''
        assert isinstance(initial_value, (int, float))
        
        V = np.zeros((self.numStates))
        policy = np.zeros((self.numStates))
        
        for s in range(self.numStates):
            a = np.random.randint(low = 0, high = self.numActions, dtype = int)
            V[s] = initial_value
            policy[s] = a
        
        return V, policy
    
    def evaluatePolicy(self, V, policy, gamma = 0.9):
        '''
        Evaluates policy for 1 iteration
        '''
        assert isinstance(V, np.ndarray) and len(V) == self.numStates
        assert isinstance(gamma, float) and 0.0<gamma<1.0
        
        V_new = np.zeros_like(V)
        for s in range(self.numStates):
            a = policy[s]
            pf_s = self.env.P[s][a] #prob distribution over states for taking action a at state s
            # Calculating expected value of Value function coz of policy
            exp_V = 0.0
            for possible_sa_pairs in pf_s:
                prob, next_state, r, _ = possible_sa_pairs
                exp_V += prob*( r + gamma*V[next_state] ) 
            V_new[s] = exp_V
        return V_new
                
    def checkPolicyConvergence(self, V_old, V_new, threshold = 1e-3):
        '''
        Checks for convergence by checking if the maximum difference between value of all states 
        is less than some threshold
        '''
        assert isinstance(threshold, float) and 0 < threshold < 1
        assert isinstance(V_old, np.ndarray) and len(V_old) == self.numStates
        assert isinstance(V_new, np.ndarray) and len(V_new) == self.numStates
        
        max_diff = np.max(np.abs(V_old - V_new))
        success = max_diff<threshold
        return success, max_diff
    
    def doValueIteration(self, threshold = 1e-3, gamma = 0.9, verbose = False):
        ''' Implement value iteration '''
        # 1. Initialize V
        V, _ = self.initializeValueAndPolicyFunction()
        # 2. Initialize delta
        delta = np.inf
        # 3. Improve V over 1 iteration and then repeat until delta is less than some threshold
        i = 0
        while(delta > threshold):
            delta = 0
            for s in range(self.numStates):
                v = deepcopy(V[s])
                allActionsResult = self.getExpectationOverAction(V, s, gamma)
                assert allActionsResult.shape == (self.numActions, ), 'Needs to be a 1D ndarray'
                V[s] = np.max(allActionsResult)
                delta = max(delta, np.abs(v - V[s]))
            
            i += 1
            # Getting policy from value function
            policy = self.getPolicyFromValueFunction(V, gamma)
            avg_success_rate = self.TestPolicy(policy, num_trials = 100, maxT = 5000)
            print('avg_success_rate = ', avg_success_rate)
        
        if verbose:
            print('Value iteration converged in {0} iterations with final delta = {1}'.format(i, delta))
        
        return V
        
    def getExpectationOverAction(self, V, s, gamma):
        '''
        Calculate the expected value of all actions at state s
        '''
        assert isinstance(s, int) and s in range(self.numStates)
        assert isinstance(gamma, (int, float)) and 0<gamma<1
        
        allActionsResult = np.zeros((self.numActions))
        for a in range(self.numActions):
            result_for_sa = self.env.P[s][a]
            for sa in result_for_sa:
                prob, next_state, r, _ = sa
                allActionsResult[a] += prob*( r + gamma*V[next_state] )
        return allActionsResult
    
    # POSSIBLE BUG -----> different gamma values can be used while calculating value function and deriving policy
    
    def getPolicyFromValueFunction(self, V, gamma):
        '''
        Deriving optimal policy using value function
        '''
        assert isinstance(gamma, (int, float)) and 0<gamma<1
        
        # 1. Initialize policy
        _, policy = self.initializeValueAndPolicyFunction()
        # 2. Get optimal action for each state
        for s in range(self.numActions):
            allActionsResult = self.getExpectationOverAction(V, s, gamma)
            assert allActionsResult.shape == (self.numActions, ), 'Needs to be a 1D ndarray'
            policy[s] = np.argmax(allActionsResult)
        return policy
    
#     def plotPolicy(self, policy):
        

In [4]:
fl = FrozenLake()
# Forming test policy
policyTest = np.zeros((fl.numStates))
for s in range(fl.numStates):
    policyTest[s] = (s+1)%4

success_rate = fl.TestPolicy(policyTest, num_trials = 100, maxT = 5000)
print('\nSuccess Rate for policy is: %.3f' % (success_rate))
# p, r, counter = fl.LearnModel()

Environment map:
[[b'S' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'H']
 [b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'G']]

Success Rate for policy is: 0.020


In [5]:
gamma = 0.9
V = fl.doValueIteration(gamma = gamma, verbose=True)

print(V.reshape(4,4))

avg_success_rate =  0.0
avg_success_rate =  0.19
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.04
avg_success_rate =  0.01
avg_success_rate =  0.01
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.04
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.0
avg_success_rate =  0.03
avg_success_rate =  0.0
avg_success_rate =  0.0
Value iteration converged in 23 iterations with final delta = 0.0009297665029441571
[[0.06428821 0.05807365 0.07231299 0.05356057]
 [0.08830336 0.         0.11127288 0.        ]
 [0.14298808 0.24613328 0.29877497 0.        ]
 [0.         0.37905097 0.63860174 0.        ]]


In [6]:
policy = fl.getPolicyFromValueFunction(V, gamma = gamma)
print(policy.reshape(4,4))

# LEFT = 0
# DOWN = 1
# RIGHT = 2
# UP = 3

[[0. 3. 0. 3.]
 [0. 0. 3. 1.]
 [1. 0. 3. 3.]
 [2. 1. 1. 2.]]


In [7]:
fl.TestPolicy(policy, num_trials = 100, maxT = 5000)

0.12

In [8]:
## Plotting transition probabilites for a certain action a
# fig, axs = plt.subplots(figsize = (6,6))
# axs = plt.imshow(p[:,0,:], cmap='gray')
# plt.colorbar(axs,fraction=0.046, pad=0.04)
## Checking if states and actions were sampled uniformly
# fig, axs = plt.subplots(1,2, figsize = (15,5))
# axs[0].bar(range(fl.numStates), fl.count_s)
# axs[1].bar(range(fl.numActions), fl.count_a)

In [9]:
# dir(gym.envs.toy_text.discrete.DiscreteEnv)
# import inspect
# inspect.getmembers(gym.envs.toy_text.discrete.DiscreteEnv, lambda a: not(inspect.isroutine(a)))