In [1]:
import gym
from gym import spaces
import numpy as np

In [2]:
class TwoArmBandit(gym.Env):
    
    # metadata = {'render.modes':['human']}
    

    def __init__(self, alpha, beta):
        super(TwoArmBandit, self).__init__()
        N_DISCRETE_ACTIONS = 2
        LEFT = 0
        RIGHT = 1
        N_DISCRETE_STATES = 3
        self.alpha = alpha
        self.beta = beta
        self.action_space = spaces.Discrete(N_DISCRETE_ACTIONS)
        self.observation_space = spaces.Discrete(N_DISCRETE_STATES)
        
        # Data structure to store MDP of 2-arm Bernoulli Bandit
        self.P = {}
        self.P[0] = {
                        LEFT: [[self.alpha, 1, 0, True], [1-self.alpha, 2, 1, True]],
                        RIGHT: [[self.beta, 2, 1, True], [1-self.beta, 1, 0, True]]
                    }
        self.P[1] = {
                        LEFT: [[1,1,0,True]],
                        RIGHT: [[1,1,0,True]]
                    }
        self.P[2] = {
                        LEFT: [[1,2,0,True]],
                        RIGHT: [[1,2,0,True]]
                    }
        self.agent_position = self.reset()

    def step(self, action):
        # get experience tuple from MDP dynamics

        probabilities = [] # to collect probabilities of various states agent can land in
        for dynamic in self.P[self.agent_position][action]: # collecting probabilities
            probabilities.append(dynamic[0])

        idx = [i for i in range(len(self.P[self.agent_position][action]))] # indices to choose from ie number of tuples present in MDP corresponding to current state and action taken
        j = int(np.random.choice(idx,1,probabilities)) # select where to go according to the probablities
        
        _, observation, reward, done = self.P[self.agent_position][action][j] # collect next experience tuple
        
        # update agent's position
        self.agent_position = observation 
        info = {}
        return observation, reward, done, info

    def reset(self):
        self.agent_position = 0
        return self.agent_position  # reward, done, info can't be included
    
    def render(self, mode='human'):
        raise NotImplementedError

    def close (self):
        raise NotImplementedError

In [43]:
class TenArmGaussianBandit(gym.Env):
    
    # metadata = {'render.modes':['human']}
    

    def __init__(self, mu = 0, sigma_square=1, seed=0):
        super(TenArmGaussianBandit, self).__init__()
        N_DISCRETE_ACTIONS = 10
        N_DISCRETE_STATES = 11
        
        self.seed(seed)

        self.action_space = spaces.Discrete(N_DISCRETE_ACTIONS)
        self.observation_space = spaces.Discrete(N_DISCRETE_STATES)
        
        # sample from a gaussian 10 times to create the 10 arm gaussian bandit
        self.mu = mu
        self.sigma_square = sigma_square
        self.q_value = np.random.normal(self.mu, np.sqrt(self.sigma_square), self.action_space.n)
        self.rewards = np.random.normal(self.q_value, np.sqrt(self.sigma_square), self.action_space.n)

        self.P = self.set_MDP()
        self.agent_position = self.reset()

    def set_MDP(self):
        print(self.rewards)
        P = {}
        for i in range(0,self.observation_space.n):
            P[i] = {}
        for i in range(0,self.action_space.n):
            P[0][i] = [(1,i+1,self.rewards[i],True)]
        
        for i in range(1,self.observation_space.n):
            for j in range(0,self.action_space.n):
                P[i][j] = [(1,i,0,True)]
        return P
    
    def step(self, action):
        if self.agent_position != 0:
            return self.agent_position, 0, True, {}
        else:
            self.rewards = np.random.normal(self.q_value, np.sqrt(self.sigma_square), self.action_space.n)
            self.set_MDP()
            self.agent_position = action+1
            reward = self.rewards[action]
            done = True
            info = {}
            return self.agent_position, reward, done, info

    def reset(self):
        self.agent_position = 0
        # self.rewards = np.random.normal(self.q_value, np.sqrt(self.sigma_square), self.action_space.n)
        # self.set_MDP()
        return self.agent_position  # reward, done, info can't be included
    
    def seed(self, seed=0):
        np.random.seed(seed)

    def render(self, mode='human'):
        raise NotImplementedError

    def close (self):
        raise NotImplementedError

In [4]:
class RandomWalk(gym.Env):
    def __init__(self, alpha=0.5, beta=0.5, seed=0):
        super(RandomWalk, self).__init__()
        self.seed(seed)
        N_DISCRETE_ACTIONS = 2
        LEFT = 0
        RIGHT = 1
        N_DISCRETE_STATES = 7
        self.alpha = alpha
        self.beta = beta
        self.action_space = spaces.Discrete(N_DISCRETE_ACTIONS)
        self.observation_space = spaces.Discrete(N_DISCRETE_STATES)
        self.agent_position = self.reset()

    def step(self, action):
        if self.agent_position == 0 or self.agent_position == 6:
            return self.agent_position, 0, True, {}
        else:
            if action == 0:
                if np.random.uniform() < self.alpha:
                    self.agent_position -= 1 
                    reward = 0
                    done = False
                    if self.agent_position == 0:
                        done = True
                else:
                    self.agent_position += 1
                    reward = 0
                    done = False
                    if self.agent_position == 6:
                        reward = 1
                        done = True
            if action == 1:
                if np.random.uniform() < self.beta:
                    self.agent_position += 1 
                    reward = 0
                    done = False
                    if self.agent_position == 0:
                        done = True
                else:
                    self.agent_position -= 1
                    reward = 0
                    done = False
                    if self.agent_position == 6:
                        reward = 1
                        done = True
        info = {}
        return self.agent_position, reward, done, info

    def reset(self):
        self.agent_position = int(np.random.randint(1,6,1)[0])
        return self.agent_position  
    
    def seed(self, seed=0):
        np.random.seed(seed)

    def render(self, mode='human'):
        raise NotImplementedError

    def close (self):
        raise NotImplementedError

In [6]:
env = TenArmGaussianBandit(seed=0)
for i in range(10):
    env.reset()
    action = 0
    obs, reward, done, _ = env.step(action)
    print(obs, env.q_value[action], reward, done)
from stable_baselines.common.env_checker import check_env
check_env(env)

1 1.764052345967664 0.7154993809005714 True
1 1.764052345967664 1.091591898191713 True
1 1.764052345967664 0.5989025051843075 True
1 1.764052345967664 3.6472030430239184 True
1 1.764052345967664 2.1404778771232933 True
1 1.764052345967664 0.2727947532620585 True
1 1.764052345967664 1.2660198952753592 True
1 1.764052345967664 1.41005843471418 True
1 1.764052345967664 1.3948705080252204 True
1 1.764052345967664 0.808107345474887 True


In [44]:

class policyEvaluation:

    def __init__(self, policy, gamma=0.99, theta=1e-10, max_iterations=500):
        # policy distn
        self.pi = policy
        
        # taking gamma as 0.99
        self.gamma = 0.99

        # threshold
        self.theta = 1e-3

        # max_iterations
        self.max_iterations = max_iterations

    def evaluate(self, env):
        # randomly initialize old Value estimates, here initializing to 0
        Vold = np.zeros(env.observation_space.n)
        
        for i in range(self.max_iterations):
            Vnew = np.zeros(env.observation_space.n)
            for s in range(env.observation_space.n): # for all states
                for a in range(env.action_space.n): # for all actions in each state
                    temp = 0
                    
                    for p,s_,r,d in env.P[s][a]: # for all dynamics 
                        # inner summation over next state and reward
                        if not d:
                            temp += p*(r+self.gamma*Vold[s_])
                        else:
                            temp += p*r
                            # print('state:', s, 'action:', a, p, s_, 'reward:', r)

                    Vnew[s] += self.pi[a]*temp # outermost summation over policy
                    
            if np.max(np.abs(Vnew-Vold)) < self.theta:
                break 
            Vold = Vnew

        for i in range(len(Vnew)):
            if i==0:
                print(f'    Value of initial state {i} is {np.round(Vnew[i],2)}')
            else:
                print(f'    Value of terminal state {i} is {np.round(Vnew[i],2)}')
        return Vnew
    
    def __repr__(self):
        return 'policyEvaluation(policy={}, gamma={}, theta={}, max_iterations={})'.format(self.policy, self.gamma, self.theta, self.max_iterations)


In [46]:
if __name__ == '__main__':

    env = TenArmGaussianBandit(sigma_square=1,seed=1)
    env.reset()
    # print(env.q_value)
    a = np.argmax(env.q_value)
    print(a)
    some_policy = np.zeros(env.action_space.n)
    some_policy[a] = 1
    print(some_policy)
    policyEvluator = policyEvaluation(some_policy)
    policyEvluator.evaluate(env)
    # print(env.rewards)

[ 3.0864533  -2.67189712 -0.85058896 -1.45702298  1.99917707 -3.40142996
  1.57238356 -1.63906532  0.36125284  0.33344484]
6
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
    Value of initial state 0 is 1.57
    Value of terminal state 1 is 0.0
    Value of terminal state 2 is 0.0
    Value of terminal state 3 is 0.0
    Value of terminal state 4 is 0.0
    Value of terminal state 5 is 0.0
    Value of terminal state 6 is 0.0
    Value of terminal state 7 is 0.0
    Value of terminal state 8 is 0.0
    Value of terminal state 9 is 0.0
    Value of terminal state 10 is 0.0
