In [493]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()

import gym
from amalearn.environment import EnvironmentBase
from amalearn.reward import RewardBase
from amalearn.agent import AgentBase

sns.set_style("dark")

## Environment

In [494]:
class Environment(EnvironmentBase):
    def __init__(self, obstacles = None ,id = 0, action_count=9, actionPrice=-1, goalReward=100, punish=-10, j_limit=15, i_limit=15, p=0.8, container=None):

        # Define state_space and action_space required by the amalearn environment class
        state_space = gym.spaces.MultiDiscrete([i_limit, j_limit])
        action_space = gym.spaces.MultiDiscrete([3, 3])
        super(Environment, self).__init__(action_space, state_space, id, container)

        # Get the environment variables (:D)
        self.action_price = actionPrice
        self.goal_reward = goalReward
        self.punish = punish
        self.p = p
        self.i_limit = i_limit
        self.j_limit = j_limit

        # Pass data between the environment and the agent
        self.is_terminated = False
        self.optimal_policy = None
        self.optimal_V = None

        # Agent's Position at the start
        self.state = np.array([15,15])

        # Agent's next position
        self.state_p = None
        
        # Get Obstacle positions
        self.obstacles = np.array([
            # Bottom block
            [12,6],
            [13,6],
            [14,6],
            [15,6],
            [12,7],
            [13,7],
            [14,7],
            [15,7],

            # Top block
            [1,7],
            [2,7],
            [3,7],
            [4,7],
            [1,8],
            [2,8],
            [3,8],
            [4,8],

            # Right Block
            [8,13],
            [8,14],
            [8,15],
            [9,13],
            [9,14],
            [9,15]
        ])

        # Override if obstacles is provided
        if obstacles is not None:
            self.obstacles = obstacles

    # Done ✅✅
    def isStatePossible(self, state):
        '''If the given state is possible (not out of the grid and not obstacle) return true'''

        # Vertical Bound Check
        if state[0] < 1 or state[0] > self.i_limit:
            return False
        
        # Horizontal Bound Check
        if state[1] < 1 or state[1] > self.j_limit:
            return False
        
        # Obstacle Check(bitwise 'and' of first and second components' comparison)
        if np.any((self.obstacles[:, 0] == state[0]) & (self.obstacles[:, 1] == state[1])):
            return False
        
        return True
    
    # Done ✅✅
    def isAccessible(self, state, state_p):
        '''If the given state is accessible (we can reach state_p by doing an action from state) return true'''
        # TODO: Optimization idea = norm_1(state_p - state)<1

        for action in self.available_actions():
            # print(state + action)
            if np.all(np.array(state) + action == np.array(state_p)):
                return True
        return False

    # TODO 💛 ✅✅
    def getTransitionStatesAndProbs(self, state, action):
        '''return the available states from 'state' by taking 'action' and their probabilities'''
        accessible_states = []
        probs = []
        number_of_accessible_states = 0 #used to divide the probability of each slippage
        action = np.array(action)
        
        # 1. The actual action:
        state_p = np.array(state) + action
        # print("next_state: ", state_p)
        accessible_states.append(state_p)
        probs.append(self.p) # 80% chance of going in the desired direction

        # 2. Slipping actions:
        # Loop over every action
        for action_new in self.available_actions():
            # Get the next state after taking the action 'action_new'
            state_p = self.get_next_state(state, action_new)

            # print(action_new, action, not np.all(action_new == action))
            if not np.all(action_new == action):
                number_of_accessible_states += 1
                # TODO: Do we count obstacles? or just ignore them? if no then add a check here( check if accessible then increment the thing)
                # 20% chance of going in the other directions
                accessible_states.append(state_p)
                probs.append(1.0 - self.p)
        
        # 3. Normalize the probabilities
        # Apply divison on every item except the first one which is the correct action
        probs = [probs[0]] + list(map(lambda x: x / number_of_accessible_states, probs[1:]))

        return accessible_states, probs
    
    # Done ✅✅✅
    def getReward(self, state, state_p):
        '''return reward of transition'''

        # Reward for reaching the goal
        if np.all(np.array(state_p) == np.array([1,1])):
            return self.goal_reward

        # Reward for state_p which is accessible from state
        elif self.isAccessible(state, state_p) and self.isStatePossible(state_p):
            return self.action_price

        # Punishment for state_p which is not accessible from state
        else:
            return self.punish
    
    # TODO 💔
    def sample_all_rewards(self):
        return 
    
    # TODO 💔
    def calculate_reward(self, action):
        return 

    # Done ✅✅
    def terminated(self):
        '''Returns true if the episode is terminated'''
        if self.state == [1,1]:
            return True
        return False

    # Done ✅✅
    def observe(self):
        '''Returns the current state'''
        return self.state

    # Done ✅✅
    def available_actions(self):
        '''returns a list of all actions.
        0 : stay
        1 : N
        2 : NE
        3 : E
        4 : SE
        5 : S
        6 : SW
        7 : W
        8 : NW
        '''
        actions = np.array([
            [0,0],   # stay
            [0,1],   # N
            [1,1],   # NE
            [1,0],   # E
            [1,-1],  # SE
            [0,-1],  # S
            [-1,-1], # SW
            [-1,0],  # W
            [-1,1]   # NW
        ])

        return actions

    # Done ✅✅
    def get_next_state(self, state, action):
        '''Return the state after taking the action.
        Return the current state if the resulting state is not valid.
        '''

        state_p = np.array(state) + np.array(action)
        # print(f"state_p: {state_p}")
        if self.isAccessible(state, state_p) and self.isStatePossible(state_p):
            return np.array(state_p)
        else:
            return np.array(state)
    
    def next_state(self, action):
        '''Return the state after taking the action.
        Return the current state if the resulting state is not valid.
        '''

        state_p = np.array(self.state) + np.array(action)
        if self.isAccessible(self.state, state_p) and self.isStatePossible(state_p):
            return np.array(state_p)
        else:
            return np.array(self.state)

    # Done ✅
    def reset(self):
        '''Reset the environment to the initial state'''
        self.state = np.array([15,15])

    # TODO 💔
    def render(self, mode='human'):
        #print('{}:\taction={}'.format(self.state['length'], self.state['last_action']))
        return 

    # TODO 💔
    def close(self):
        return

## Agent

In [495]:
import numpy as np

class Agent(AgentBase):
    def __init__(self, id=0, environment=None, discount=0.9, theta=0.1, algorithm='policy_iteration'):
        
        super(Agent, self).__init__(id, environment)

        self.environment = environment
        self.discount = discount
        self.theta = theta
        self.algorithm = algorithm

        # mapp states to its ids TODO: Unneccessary?
        self.mapp = {}

        m,n = self.environment.i_limit, self.environment.j_limit

        # Initialize V with zeros
        self.V = np.zeros((m,n))

        # Initialize policy with random actions
        random_indices = np.random.randint(self.environment.available_actions().shape[0], size=m*n)
        self.policy = self.environment.available_actions()[random_indices].reshape((m, n, 2))
        # print(self.policy.shape)

    # Done ✅
    def sum_states(self, state, action, debug=False):
        '''The sigma line in the policy iteration algorithm.
        sums the reward of every accessible state' from current state
        via action 'action'.
        '''
        
        # Which states can we go from here via action 'action' and what are their probabilities? (Slipping)
        accessible_states, probs = self.environment.getTransitionStatesAndProbs(state, action)

        if debug:
            print("# probs")
            print(np.array(accessible_states))
            print(probs)
        
        # The Sigma
        summ = 0
        for i in range(len(accessible_states)):
            # Reward
            r = self.environment.getReward(state, accessible_states[i])
            # -1 because V is a 0-based numpy array but our world is 1-based
            summ += (r + self.discount * self.V[state[0]-1, state[1]-1]) * probs[i]
        
        return summ

        
    # Done ✅
    def policy_evaluation(self):
        '''Policy Evaluation step.
        Iterates until ΔV is less than 'theta'.
        '''
        import time

        counter = 1
        # Iterate until Convergence
        while True:
            delta = 0
            start_time = time.perf_counter_ns()
            # For every state s, calculate V[s]
            for i in range(self.environment.i_limit):
                for j in range(self.environment.j_limit):
                    old_v = self.V[i,j]
                    # +1 because our world is 1-based but i,j are 0-based
                    state = [i+1,j+1]
                    # Get the current action
                    action = self.policy[i,j]
                    # Calculate the new value
                    self.V[i,j] = self.sum_states(state, action)
                    # Calculate the difference
                    delta = max(delta, abs(self.V[i,j] - old_v))
            

            # print(f"Iteration: {counter} done in {(time.perf_counter_ns() - start_time)/1000000:.3f} ms delta:{delta}")
            counter += 1

            if delta < self.theta:
                break
                
    # Done ✅
    def policy_improvement(self):
        '''Policy Improvment step.
        Updates the policy based on the V that we get from 'policy_evaluation' step.

        Returns
        -------
        is_policy_stable: bool
            True if the policy is stable, False otherwise.
        '''
        is_policy_stable = True

        # For every state s, calculate π[s] based on V[s]
        for i in range(self.environment.i_limit):
            for j in range(self.environment.j_limit):
                old_action = self.policy[i,j]
                state = [i+1,j+1]

                # Loop over all actions and find the best one
                best_action = old_action.copy()
                for action in self.environment.available_actions():
                    # print(self.sum_states(state, action))
                    if self.sum_states(state, action) > self.sum_states(state, best_action):
                        best_action = action.copy()
                
                # print(old_action, best_action)
                # print()
                # Check for stability of policy
                if np.all(best_action != old_action):
                    is_policy_stable = False
                
        return is_policy_stable
    
    # TODO 💔
    def policy_iteration(self):
        # in a loop
        is_policy_stable = False
        while not is_policy_stable:
            self.policy_evaluation()
            is_policy_stable = self.policy_improvement()
    
    # TODO 💔
    def value_iteration(self):
        pass
    
    # TODO 💔
    def take_action(self) -> tuple[object, float, bool, object]:
        # One step of policy iteration
        self.policy_evaluation()
        is_policy_stable = self.policy_improvement()
        
        # Pass data from agent to environment
        self.environment.optimal_policy = self.policy
        self.environment.optimal_value = self.V
        self.environment.is_terminated = is_policy_stable

        # Step method of the environment
        observation, reward, done, info = self.environment.step(action)
        
        # Render the environment after every action
        # self.environment.render()

        return observation, reward, done, info

    def test(self):
        is_policy_stable = False
        while not is_policy_stable:
            self.policy_evaluation()
            is_policy_stable = self.policy_improvement()
            print(is_policy_stable)
        
        return self.V, self.policy


## Main (Test)

In [496]:
environment = Environment()
agent = Agent(environment=environment, theta=80)
# agent.take_action()

In [497]:
environment.getTransitionStatesAndProbs([2,1], [0,-1])

([array([2, 0]),
  array([2, 1]),
  array([2, 2]),
  array([3, 2]),
  array([3, 1]),
  array([2, 1]),
  array([2, 1]),
  array([1, 1]),
  array([1, 2])],
 [0.8,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994])

In [513]:
agent.sum_states([15,15], [1,1], debug=True)

# probs
[[16 16]
 [15 15]
 [15 15]
 [15 15]
 [15 15]
 [15 14]
 [14 14]
 [14 15]
 [15 15]]
[0.8, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994]


-8.200000000000003

In [499]:
for i in range(0,15):
    print(agent.sum_states(np.array([i,9]), np.array([-1,0])))

# probs
[[-1  9]
 [ 0  9]
 [ 0  9]
 [ 1 10]
 [ 1  9]
 [ 0  9]
 [ 0  9]
 [ 0  9]
 [ 0  9]]
[0.8, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994]
-9.55
# probs
[[ 0  9]
 [ 1  9]
 [ 1 10]
 [ 2 10]
 [ 2  9]
 [ 1  9]
 [ 1  9]
 [ 1  9]
 [ 1  9]]
[0.8, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994]
-8.200000000000003
# probs
[[ 1  9]
 [ 2  9]
 [ 2 10]
 [ 3 10]
 [ 3  9]
 [ 2  9]
 [ 2  9]
 [ 2  9]
 [ 1 10]]
[0.8, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994, 0.024999999999999994]
-1.0000000000000002
# probs
[[ 2  9]
 [ 3  9]
 [ 3 10]
 [ 4 10]
 [ 4  9]
 [ 3  9]
 [ 3  9]
 [ 3  9]
 [ 2 10]]
[0.8, 0.024999999999999994, 0.024999999999999994,

In [500]:
# agent.sum_states([15,15], [1,0])

In [501]:
environment.getReward([15,15], [16,16])
environment.getReward([15,15], [1,1])

100

In [502]:
environment.get_next_state([15,15], [-1,-1])

array([14, 14])

In [503]:
environment.isStatePossible([14,15])

True

In [504]:
environment.isAccessible([15,15], [14,15])

True

In [505]:
environment.getTransitionStatesAndProbs([15,15], [-1,0])

([array([14, 15]),
  array([15, 15]),
  array([15, 15]),
  array([15, 15]),
  array([15, 15]),
  array([15, 15]),
  array([15, 14]),
  array([14, 14]),
  array([15, 15])],
 [0.8,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994,
  0.024999999999999994])

In [506]:
environment.isAccessible([15,15], [14,1])


False

In [507]:
%%time
# a,b = agent.test();

CPU times: total: 0 ns
Wall time: 0 ns
