# Install Libraries

In [None]:
!pip install gym

In [None]:
!git clone https://github.com/zhpinkman/armed-bandit.git

In [None]:
!pip install ./armed-bandit

# Impor Libraries

In [1]:
from amalearn.reward import RewardBase
from amalearn.agent import AgentBase

In [2]:
from amalearn.environment import EnvironmentBase
import gym




In [5]:
import random

In [237]:
list(range(1,10))

[1, 2, 3, 4, 5, 6, 7, 8, 9]

# Environment

In [290]:
space = gym.spaces.Box(low=1, high=15, shape=(1,2), dtype=int)

In [293]:
type(space.sample()[0])

numpy.ndarray

In [266]:
space = gym.spaces.Discrete(3)

In [268]:
space.sample()

1

In [289]:
from gym.spaces import Discrete

Discrete(5)

AssertionError: n (counts) have to be positive

In [297]:
from gym.spaces import Discrete, Box

# Action:
# 0 1 2
# 3 4 5
# 6 7 8

class Environment(EnvironmentBase):
    def __init__(self, obstacle = [] ,id = 0, action_count=9, actionPrice = -1, goalReward = 100
                 , punish=-10, j_limit = 10, i_limit = 10, p = 0.8, container=None):
        """
        initialize your variables
        """
        
        self.obstacle = obstacle
        
        self.x_min = 1
        self.x_max = i_limit
        
        self.y_min = 1
        self.y_max = j_limit
        
        self.reset()
        
        self.action_count = action_count
        self.actionPrice = actionPrice
        self.goalReward = goalReward
        self.punish = punish
        self.p = p
        self.id = id

        action_space = Discrete(action_count)
        state_space = Box(low=1, high=max(i_limit, j_limit), shape=(1,2), dtype=int)
        
        super(Environment, self).__init__(action_space=action_space, state_space=state_space, id=id ,container=container)

        
    def isStatePossible(self, state):
        """if given state is possible (not out of the grid and not obstacle) return ture"""
        if self.x_min <= state[0] <= self.x_max and self.y_min <= state[1] <= self.y_max:
            if state in self.obstacle:
                return False
            else:
                return True
        else:
            return False
    
    
    def isAccessible(self, state, state_p):
        """if given state is Accesible (we can reach state_p by doing an action from state) return true"""
        return abs(state[0]-state_p[0]) <= 1 and abs(state[1] - state_p[1]) <= 1 and self.isStatePossible(state_p)
            
    def getTransitionStatesAndProbs(self, state, action, state_p):
        """return probability of transition or T(sp,a,s)"""
        
        actions = self.available_actions_state(state)
        
        if self.calculate_next_state(state,action)==state_p:
            if self.isAccessible(state, state_p):
                return self.p + (1-self.p) / len(actions)
            else:
                return self.p
        else:
            if self.isAccessible(state, state_p):
                return (1-self.p) / len(actions)
            else:
                return 0

    
    def getReward(self, state, action, state_p):
        """return reward of transition"""
        if self.terminated_state(state_p):
            return self.actionPrice + self.goalReward
        
        if self.isStatePossible(state_p):
            return self.actionPrice
        else:
            return self.actionPrice + self.punish
        
    def sample_all_rewards(self):
        return 
    
    def calculate_reward(self, action):
        return 

    def terminated(self):
        return self.terminated_state(self.current_state)
    
    def terminated_state(self, state):
        return state[0] == 1 and state[1] == 1
        
    def observe(self):
        return self.current_state 

    def available_actions(self):
        return self.available_actions_state(self.current_state)
    
    def available_actions_state(self, state):
        output_actions = []
        for action in range(self.action_count):
            next_state = self.calculate_next_state(state, action)
            
            if self.isAccessible(state, next_state):
                output_actions.append(action)
        
        return output_actions
        
    
    def calculate_next_state(self, state, action):
        return np.array([state[0] + (action%3 -1), state[1] + (int(action/3)-1) ])
        
    def next_state(self, action):
        actions = self.available_actions()
        
        if action not in actions:
            actions.append(action)
                
        probabilities = []
                
        for action2 in actions:
            state2 = self.calculate_next_state(self.current_state, action2)
            probabilities.append(self.getTransitionStatesAndProbs(self.current_state, action, state2))
        
        final_action = random.choices(population=actions, weights=probabilities, k=1)[0]
        
        real_next_state = self.calculate_next_state(self.current_state, final_action)
        
        if not self.isStatePossible(real_next_state):
            real_next_state = self.current_state
        
        self.last_action = action
        
        self.sliped = not (final_action==action)
        
        self.current_state = real_next_state
        
        return

    def reset(self):
        self.current_state = np.array([15, 15])
        
        self.last_action = None
        self.sliped = None

    def render(self, mode='human'):
        print(f"{self.current_state} \t {self.last_action} \t {self.sliped}")
        return 

    def close(self):
        return

In [298]:
grid_states =  [(7, 1), (8, 1), (7, 2), (8, 2), (7, 3), (8, 3), (7, 4), (8, 4)
                ,(13, 8), (14, 8), (15, 8), (13, 9), (14, 9), (15, 9)
                ,(6, 12), (7, 12), (6, 13), (7, 13), (6, 14), (7, 14), (6, 15), (7, 15)]

In [299]:
base_environment = Environment(obstacle = grid_states ,id = 0, action_count=9, actionPrice = -1, goalReward = 100
                                , punish=-10, j_limit = 15, i_limit = 15, p = 0.8, container=None)

In [301]:
base_environment.render()

[15 15] 	 None 	 None


# Agent

In [286]:
import numpy as np

class Agent(AgentBase):
    def __init__(self, id, environment, discount, theta):
        
        # initialize a random policy and V(s) = 0 for each state
        self.environment = environment
        
        # mapp states to its ids
        self.mapp = {}
        
        # init V
        self.V = {}
        
        # init policy
        self.policy = {}
        
        super(Agent, self).__init__(id, environment)
        
        self.discount = discount
        
        self.theta = theta
        
    def policy_evaluation(self):
        pass
    
    def policy_improvement(self):
        pass
    
    def value_iteration(self):
        pass
    
    def take_action(self) -> (object, float, bool, object):
        # in this method, you MUST call the `step` method of 
        # the environment and observe the results and return them like:
        # return observation, reward, done, info
        return self.environment.step(1)

In [287]:
agent = Agent(id=0, environment=base_environment, discount=0.9, theta=None)

In [288]:
agent.take_action()

(None, None, False, {})