In [1]:
from amalearn.reward import GaussianReward, RewardBase
from amalearn.agent import AgentBase
from amalearn.environment import EnvironmentBase
import numpy as np
from math import *
import matplotlib.pyplot as plt
import scipy.stats as stats
import gym
from random import randrange

* Class rerward:
    * get_reward: returns the difference of curent value and value before update
    * update: updates the value based on defined probabilties for possible values and save last value.
* NArmedBanditEnvironment:
    * calculate_reward: updates each company and for each selected company in the selected action, if any, calculates the reward and returns it
    * init_p_table: calls claculate_p_tabel: for each possible s,sp,r and a.
    * claculate_p_tabel: calculates P(sp,r| s,a)
    * claculate_p returns P(sp,r| s,a)
* Agent:
    * initilizes V(s) =0 for each state and Policy to a random policy.
    * policy_evaluation and policy_iteration: implementation of policy iteration steps

In [2]:
class Reward(RewardBase):
    def __init__(self, stay_prob, base, p, min_value, max_value):
        super(Reward, self).__init__()
        self.inc = p[0]
        self.dec = p[1]
        self.value = base
        self.min = min_value
        self.max = max_value
        self.stay_prob = stay_prob
        self.past_value = 0
    
    def get_reward(self):
        return self.value - self.past_value
    
    def update(self, base_change):
        self.past_value = self.value
        rand = random.uniform(0, 1)
        if self.value < self.min + 1:
            if rand > self.stay_prob:
                self.value = self.value + self.base_change
        elif self.value > self.max - 1:
            if rand  > self.stay_prob:
                self.value = self.value - self.base_change
        else:
            if rand < self.inc:
                self.value = self.value + self.base_change
            elif rand < self.inc + self.dec:
                self.value = self.value - self.base_change

class NArmedBanditEnvironment(EnvironmentBase):
    def __init__(self, action_count, state_count, id, initial_money, goal_money, base_change, base_value, change_prob, stay_prob, min_value, max_value, terminal, actions, container=None):
        state_space = gym.spaces.Discrete(state_count)
        action_space = gym.spaces.Discrete(action_count)
        super(NArmedBanditEnvironment, self).__init__(action_space, state_space, id, container)
        self.state_count = state_count
        self.action_count = action_count
        self.money = initial_money
        self.initial_money = initial_money
        self.goal_money = goal_money
        self.base_change = base_change
        self.stay_prob = stay_prob
        self.min_value = min_value
        self.max_value = max_value
        self.base_value = base_value
        self.change_prob = change_prob
        self.rewards = [Reward(b, stay_prob, cp, min_value, max_value) for (b, cp) in zip(base_value, change_prob)]
        self.actions = actions
        self.terminal = terminal
        self.state = 0
        self.p_table = []
    
    def calculate_reward(self, action):
        for r in self.rewards:
            r.update(self.base_change)
        change = 0
        if action != 0:
            for a in self.actions[action]:
                change = change + self.rewards[a].get_reward()
        return change

    def claculate_p_tabel(self, sp, r, s, a):
        if abs(sp - s) > 2:
            return 0
        if abs(r) != self.base_change and r != 0 and abs(r) != 2*self.base_change:
            return 0
        if sp == s:
            if r == 0:
                p = 1
                for ai in a:
                    p= p *(1-self.rewards[ai].inc - self.rewards[ai].dec)
                return p
            else:
                return 0
        elif sp == s - 1:
            if r == -1*self.base_change:
                if len(a) == 0:
                    return 0
                if len(a)>1:
                    return self.rewards[a[0]].dec*(1-self.rewards[a[1]].dec-self.rewards[a[1]].inc)+self.rewards[a[1]].dec*(1-self.rewards[a[0]].dec-self.rewards[a[0]].inc)
                else:
                    return self.rewards[a[0]].dec
            else:
                return 0
        elif sp == s + 1:           
            if r == self.base_change:
                if len(a) == 0:
                    return 0
                if len(a)>1:
                    return self.rewards[a[0]].inc*(1-self.rewards[a[1]].dec-self.rewards[a[1]].inc)+self.rewards[a[1]].inc*(1-self.rewards[a[0]].dec-self.rewards[a[0]].inc)
                else:
                    return self.rewards[a[0]].inc
            else:
                return 0
        elif sp == s - 2:
            if r == -2 * self.base_change:
                if len(a) == 2:
                    return self.rewards[a[0]].dec*self.rewards[a[1]].dec
                return 0
            else:
                return 0            
        else:
            if r == 2 * self.base_change:
                if len(a) == 2:
                    return self.rewards[a[0]].inc*self.rewards[a[1]].inc
                return 0
            else:
                return 0
    def init_p_table(self):
        for a in self.actions:
            temp = []
            for i in range(self.state_count):
                p0 = []
                p1 = []
                p2 = []
                p3 = []
                p4 = []
                for j in range(self.state_count):
                    p0.append(self.claculate_p_tabel(j, -10 , i, a))
                    p1.append(self.claculate_p_tabel(j, -5 , i, a))
                    p2.append(self.claculate_p_tabel(j, 0 , i, a))
                    p3.append(self.claculate_p_tabel(j, 5 , i, a))
                    p4.append(self.claculate_p_tabel(j, 10 , i, a))
                temp.append([p0,p1,p2,p3,p4])
            self.p_table.append(temp)
    
    def claculate_p(self, sp, r, s, a):
        action_index = self.actions.index(a)
        return self.p_table[action_index][s][int((r+10)/5)][sp]
    
    def terminated(self):
        if self.state == self.terminal:
            return False
        return True

    def observe(self):
        return 

    def available_actions(self):
        return self.action_space.n

    def next_state(self, action):
        v = self.money + self.rewards[action].get_reward()
        if v < 0:
            self.state = 0
            return
        if v == self.goal_money:
            self.state = self.terminal
            return
        self.state = int(v/50) 
        return #(0-50)(50-100)...(950-1000)+Terminal

    def reset(self):
        self.rewards = [Reward(b, self.stay_prob, cp, self.min_value, self.max_value) for (b, cp) in zip(self.base_value, self.change_prob)]
        self.money = self.initial_money
        return

    def render(self, mode='human'):
        #print('{}:\taction={}'.format(self.state['length'], self.state['last_action']))
        return 

    def close(self):
        return
    
class Agent(AgentBase):
    def __init__(self, id, environment, discount, theta):
        #initialize a random policy and V(s) = 0 for each state
        self.V = np.zeros(environment.state_count)
        self.policy = [randrange(environment.action_count) for i in range(environment.state_count)]
        super(Agent, self).__init__(id, environment)
        self.discount = discount
        self.theta = theta
        self.environment.init_p_table()
    def policy_evaluation(self):
        delta = 0
        while True:
            for s in range(self.environment.state_count):
                vp = self.V[s]
                x1 = 0
                for sp in range(self.environment.state_count):
                    #comment for faster learning with theta <5
                    y = s * 5
                    x1 = x1 + self.environment.claculate_p(sp, 0, s, self.environment.actions[self.policy[s]])*(0 + y + self.discount*self.V[sp])
                    x1 = x1 + self.environment.claculate_p(sp, self.environment.base_change, s, self.environment.actions[self.policy[s]])*(self.environment.base_change + y + self.discount*self.V[sp])
                    x1 = x1 + self.environment.claculate_p(sp, -1*self.environment.base_change, s, self.environment.actions[self.policy[s]])*(-1*self.environment.base_change + y + self.discount*self.V[sp])
                    x1 = x1 + self.environment.claculate_p(sp, 2*self.environment.base_change, s, self.environment.actions[self.policy[s]])*(2*self.environment.base_change + y + self.discount*self.V[sp])
                    x1 = x1 + self.environment.claculate_p(sp, -2*self.environment.base_change, s, self.environment.actions[self.policy[s]])*(-2*self.environment.base_change + y + self.discount*self.V[sp])
                    
            self.V[s] = x1
            delta = max(delta, abs(vp - self.V[s]))            
            #print(delta)
            if delta < self.theta:
                break
        return
    def policy_iteration(self):
        stable = True
        for s in range(self.environment.state_count):
            old_action = self.policy[s]
            temp = []
            #comment for faster learning with theta <5
            y = s * 5
            self.policy[s] = np.argmax([np.sum([self.environment.claculate_p(sp, 0, s, self.environment.actions[i])*(0 + y+ self.discount*self.V[sp]) + self.environment.claculate_p(sp, self.environment.base_change, s, self.environment.actions[i])*(self.environment.base_change + y + self.discount*self.V[sp])+ self.environment.claculate_p(sp, -1*self.environment.base_change, s, self.environment.actions[i])*(-1*self.environment.base_change + y + self.discount*self.V[sp]) + self.environment.claculate_p(sp, 2*self.environment.base_change, s, self.environment.actions[i])*(2*self.environment.base_change + y + self.discount*self.V[sp])+ self.environment.claculate_p(sp, -2*self.environment.base_change, s, self.environment.actions[i])*(-2*self.environment.base_change + y + self.discount*self.V[sp])for sp in range(self.environment.state_count)]) for i in range(self.environment.action_count)])
            if old_action != self.policy[s]:
                stable = False
        return stable
    def take_action(self) -> (object, float, bool, object):
        index_selected_arm = 0
        obs, Ri, d, i = self.environment.step(index_selected_arm)
        self.environment.money = self.environment.money + Ri
        #self.environment.update_selected_arm(index_selected_arm, Ri)
        self.environment.render()
        return t

In [126]:
goal_money = 100
initial_money = 20
base_change = 5
base_value = [5, 15, 5] # 5+2, 10+5, 5+2
change_prob = [[0.39, 0.31], [0.15, 0.15], [0.23, 0.64]]
action_company = ['nothing','B', 'C', 'D', 'B and C', 'B and D', 'C and D']
actions = [[], [0], [1], [2], [0,1], [0,2], [1,2]]
stay_prob = 0.25
min_value = 5
max_value = 50
delta = 10 # change to bigger number for faster
env = NArmedBanditEnvironment(len(actions), int(goal_money/5), 1, initial_money, goal_money, base_change, base_value, change_prob, stay_prob, min_value, max_value, int(goal_money/5), actions)
agent = Agent('1', env, 0.9, delta)
print(np.shape(env.p_table))
it = 0
while True:
    agent.policy_evaluation()
    if agent.policy_iteration():
        break
    it = it + 1

print("discount factor of 0.9")
print("recommended policy:")
print([action_company[i] for i in agent.policy])

for d in [0.8,0.7,0.4,0.1]:
    agent = Agent('1', env, d, delta)
    it = 0
    while True:
        agent.policy_evaluation()
        if agent.policy_iteration():
            break
        it = it + 1
    print("discount factor of "+str(d))
    print("recommended policy:")
    print([action_company[i] for i in agent.policy])

(7, 20, 5, 20)
discount factor of 0.9
recommended policy:
['B and C', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B and C', 'B', 'nothing']
discount factor of 0.8
recommended policy:
['B and C', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'nothing']
discount factor of 0.7
recommended policy:
['B and C', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'nothing']
discount factor of 0.4
recommended policy:
['B and C', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'nothing']
discount factor of 0.1
recommended policy:
['B and C', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'nothing']
