<a href="https://colab.research.google.com/github/emil-freme/rflearnign-autobot/blob/main/Reinforcement_Learning_OOP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gymnasium

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

# Definição do ambiente
> https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/#sphx-glr-tutorials-gymnasium-basics-environment-creation-py


In [None]:
class AutoBot(gym.Env):

    def __init__(self,
                 render_mode=None,
                 size=3,
                 obstacles_n=1,
                 targets_n=1,
                 targets=None,
                 start_position=None,
                 obstacles=None,
                 gamma=0.9):
        """
        Ambiente personalizado de Aprendizado por Reforço que simula um agente (AutoBot)
        movendo-se em um grid para alcançar alvos e evitar obstáculos.

        Parâmetros:
        - render_mode (None): Modo de renderização do ambiente.
        - size (int): Tamanho do grid do ambiente.
        - obstacles_n (int): Número de obstáculos no ambiente.
        - targets_n (int): Número de alvos no ambiente.
        """
        self.size = size
        self.window_size = 400
        self.obstacles = []
        self.targets = []
        self.agent_position = None
        self.start_position = None
        self.v_table = np.zeros((size, size))
        self.v_policy = np.zeros((size, size), dtype=str)
        self.gamma = gamma

        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "targets": spaces.Box(0, size - 1, shape=(targets_n, 2), dtype=int),
                "obstacles": spaces.Box(0, size -1, shape=(obstacles_n, 2), dtype=int)
            }
        )

        # Definimos 4 ações possiveis, ver action_to_direction()
        self.action_space = spaces.Discrete(4)

        self.q_table =  np.zeros((size, size, self.action_space.n))

        # Posicionando o Agente
        if( start_position != None ):
            self.start_position = start_position
        else:
            self.start_position = self.observation_space["agent"].sample()
        self.agent_position = self.start_position


        # Posicionando o objetivos (targets)
        if( targets != None ):
            self.targets =  targets
        self.targets.extend(self.observation_space["targets"].sample())
        self.targets = self.targets[0:targets_n]


        # Positionando os obstaculos
        if( obstacles != None ):
            self.obstacles = obstacles
        for i in range(obstacles_n - len(self.obstacles)):
            obstacles_position = self.agent_position
            while(  np.array_equal( obstacles_position, self.agent_position) or
                    np.any(np.all(obstacles_position == self.targets, axis=1))):
                obstacles_position = self.observation_space["targets"].sample()[0];
            self.obstacles.append(obstacles_position)


    def action_to_direction(self, action):
        _action_to_direction = {
            0: np.array([0, -1]),   #Left
            1: np.array([-1, 0]),    #Up
            2: np.array([0, 1]),    #Right
            3: np.array([1, 0]),   #Down
        }
        return _action_to_direction[action]


    def _get_obs(self):
        return self.agent_position


    def _get_info(self):
        return {
            "agent" : self.agent_position,
            "targets" : self.targets,
            "target distances": [ np.linalg.norm(self.agent_position - target, ord=1) for target in self.targets ],
            "obstacles" : self.obstacles,
        }


    def reset(self, seed=None, options=None):
        """
        Reinicia o ambiente para um novo episódio.

        Parâmetros:
        - seed (int): Semente para a geração de números aleatórios.
        - options (dict): Opções adicionais para resetar o ambiente.

        Retorna:
        - tuple: Uma tupla contendo a observação inicial e informações adicionais.
        """
        super().reset(seed=seed)

        self.agent_position = self.start_position

        observation = self._get_obs()
        info = self._get_info()

        return observation, info


    def step(self,action):
        """
        Executa uma ação no ambiente e retorna o resultado.

        Parâmetros:
        - action (int): A ação a ser realizada pelo agente.

        Retorna:
        - tuple: Uma tupla contendo a nova observação, recompensa, se o episódio terminou,
                 se o episódio foi truncado e informações adicionais.
        """

        # Em nosso ambiente determinitico a função de transição é se mover
        # Para o proximo estado
        direction = self.action_to_direction(action)
        self.agent_position = np.clip(
            self.agent_position + direction, 0, self.size-1)

        reward, terminated = self.calculate_reward(self.agent_position)

        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, terminated, False, info


    def calculate_reward(self, position):
        #naive implementation, primeiro para um depois tento fazer para n e para obstaculos
        obs_hit = np.array_equal(position, self.obstacles[0])
        terminated = np.array_equal(position, self.targets[0])

        # Rewards:
        # 1 Para chegar no objetivo
        # -1 Se bater em um obstaculo
        # -0.01 Para living Penalty
        reward = 1 if terminated else -1 if obs_hit else -0.1
        return reward, terminated

    def render(self, mode='human'):
        grid = np.chararray((self.size, self.size))
        grid[:] = "-"
        for obs in self.obstacles:
            grid[obs[0], obs[1]] = "#"  # Marcar obstáculos
        for target in self.targets:
            grid[target[0], target[1]] = "?"  # Marcar pontos de entrega
        grid[self.agent_position[0], self.agent_position[1]] = "X"  # Marcar a posição do agente
        print(grid)

    def check_bounds(self, i, j):
        return (0 <= i < self.size and
                0 <= j < self.size )

    def update_v_table(self):
        new_V = np.copy(self.v_table)

        for i in range(self.size):
            for j in range(self.size):
                state = (i, j)

                if( np.array_equal(state, self.targets[0])):
                    continue

                max_value = float("-inf")

                for action in range(self.action_space.n):
                    mov_i, mov_j = self.action_to_direction(action)
                    next_i = i + mov_i
                    next_j = j + mov_j

                    if ( self.check_bounds(next_i, next_j ) ):
                        new_state = np.array((next_i,next_j))
                        reward = self.calculate_reward(state)[0]
                        v_futuro = self.v_table[new_state[0], new_state[1]]
                        value =  reward + self.gamma * v_futuro
                        max_value = max(value, max_value)

                new_V[i, j] = max_value
        self.v_table = new_V

    def update_v_policy(self):

        for i in range(self.size):
            for j in range(self.size):
                state = np.array((i, j))

                if( np.array_equal(state, self.targets[0])):
                    self.v_policy[i,j] = "T"
                    pass
                else:

                    max_value = float("-inf")
                    best_action = None
                    for action in range(self.action_space.n):

                        mov_i, mov_j = self.action_to_direction(action)
                        next_i = i + mov_i
                        next_j = j + mov_j
                        if ( self.check_bounds(next_i, next_j ) ):
                            new_state = np.array((next_i,next_j))
                            reward = self.calculate_reward(state)[0]
                            v_futuro = self.v_table[new_state[0], new_state[1]]
                            value =  reward + self.gamma * v_futuro
                            if (value > max_value):
                                max_value = value
                                best_action = action
                    self.v_policy[i, j] = best_action

    def show_v_policy(self):
        icons ={
            "0" : '⇐',
            "1" : '⇑',
            "2" : '⇒',
            "3" : '⇓'
        }

        for i, row in enumerate(self.v_policy):
            for j, data in enumerate(row):
                print(icons[data] if data in icons else data, end="\t")
            print()


    def choose_action(self, state, epsilon):
        """
        Escolhe uma ação com base na política atual (exploração ou explotação).

        Parâmetros:
        - state (tuple): O estado atual do agente no ambiente.
        - epsilon (float): Parâmetro que define a probabilidade de escolher uma ação aleatória (exploração).

        Retorna:
        - int: A ação escolhida.
        """
        if np.random.uniform(0, 1) < epsilon:
            action = self.action_space.sample()
        else:
            action = np.argmax(self.q_table[state[0], state[1]])
        return action

    def update_q_table(self, state, action, reward, next_state, alpha, gamma):
        """
        Atualiza a tabela Q com base na recompensa recebida e na estimativa do valor futuro.

        Parâmetros:
        - state (tuple): O estado atual do agente.
        - action (int): A ação tomada pelo agente.
        - reward (float): A recompensa recebida pela ação.
        - next_state (tuple): O próximo estado do agente.
        - alpha (float): Taxa de aprendizado.
        - gamma (float): Fator de desconto.
        """
        # Calcula o valor Q máximo para o próximo estado
        max_next_q = np.max(self.q_table[next_state[0], next_state[1]])

        # Atualiza o valor Q para o par estado-ação atual
        #pdb.set_trace()
        self.q_table[state[0], state[1], action] = ((1 - alpha) * self.q_table[state[0], state[1], action] +
                                                    alpha * (reward + gamma * max_next_q))


    def show_q_policy(self):
        icons ={
            0 : '⇐',
            1 : '⇑',
            2 : '⇒',
            3 : '⇓'
        }

        for i, row in enumerate(np.argmax(self.q_table, axis=2)):
            for j, data in enumerate(row):
                print(icons[data] if data in icons else data, end="\t")
            print()


In [None]:
def run_training(env, seed=42, epsilon=0.5, alpha=0.1, gamma=0.9):
    state = env.reset(42)
    for _ in range(1000):
        action = env.choose_action(state[0], epsilon)
        step = env.step(action)
        print(f"""Ação: {action}\tObs: {step[0]},\tReward: {step[1]},\tTerminated: {step[2]},\tTruncated: {step[3]},\tInfo: {step[4]}
        """)
        env.render()
        env.update_q_table(state[0], action, step[1], step[0], alpha, gamma)
        state = step
        epsilon = max(epsilon * 0.99, 0.01)
        if step[2]:
            break

In [None]:
env = AutoBot()
run_training(env)

Ação: 0	Obs: [0 1],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([0, 1]), 'targets': [array([1, 2])], 'target distances': [2.0], 'obstacles': [array([1, 0])]}
        
[[b'-' b'X' b'-']
 [b'#' b'-' b'?']
 [b'-' b'-' b'-']]
Ação: 1	Obs: [0 1],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([0, 1]), 'targets': [array([1, 2])], 'target distances': [2.0], 'obstacles': [array([1, 0])]}
        
[[b'-' b'X' b'-']
 [b'#' b'-' b'?']
 [b'-' b'-' b'-']]
Ação: 2	Obs: [0 2],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([0, 2]), 'targets': [array([1, 2])], 'target distances': [1.0], 'obstacles': [array([1, 0])]}
        
[[b'-' b'-' b'X']
 [b'#' b'-' b'?']
 [b'-' b'-' b'-']]
Ação: 3	Obs: [1 2],	Reward: 1,	Terminated: True,	Truncated: False,	Info: {'agent': array([1, 2]), 'targets': [array([1, 2])], 'target distances': [0.0], 'obstacles': [array([1, 0])]}
        
[[b'-' b'-' b'-']
 [b'#' b'-' b'X']
 [b'-' b'-' b'-']]


In [None]:
env = AutoBot(size=10)
run_training(env)

Ação: 1	Obs: [6 7],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([6, 7]), 'targets': [array([8, 7])], 'target distances': [2.0], 'obstacles': [array([9, 1])]}
        
[[b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'X' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'?' b'-' b'-']
 [b'-' b'#' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']]
Ação: 2	Obs: [6 8],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([6, 8]), 'targets': [array([8, 7])], 'target distances': [3.0], 'obstacles': [array([9, 1])]}
        
[[b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'

In [None]:
env = AutoBot(size=5, obstacles_n=8)
run_training(env)

Ação: 0	Obs: [4 0],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([4, 0]), 'targets': [array([1, 2])], 'target distances': [5.0], 'obstacles': [array([2, 0]), array([3, 1]), array([2, 0]), array([1, 1]), array([3, 4]), array([3, 2]), array([3, 3]), array([0, 2])]}
        
[[b'-' b'-' b'#' b'-' b'-']
 [b'-' b'#' b'?' b'-' b'-']
 [b'#' b'-' b'-' b'-' b'-']
 [b'-' b'#' b'#' b'#' b'#']
 [b'X' b'-' b'-' b'-' b'-']]
Ação: 0	Obs: [4 0],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([4, 0]), 'targets': [array([1, 2])], 'target distances': [5.0], 'obstacles': [array([2, 0]), array([3, 1]), array([2, 0]), array([1, 1]), array([3, 4]), array([3, 2]), array([3, 3]), array([0, 2])]}
        
[[b'-' b'-' b'#' b'-' b'-']
 [b'-' b'#' b'?' b'-' b'-']
 [b'#' b'-' b'-' b'-' b'-']
 [b'-' b'#' b'#' b'#' b'#']
 [b'X' b'-' b'-' b'-' b'-']]
Ação: 2	Obs: [4 1],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([4, 1]), 'targets': [ar

In [None]:
env = AutoBot(size=10)
run_training(env, epsilon=0.9)

Ação: 2	Obs: [6 7],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([6, 7]), 'targets': [array([3, 0])], 'target distances': [10.0], 'obstacles': [array([7, 2])]}
        
[[b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'?' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'X' b'-' b'-']
 [b'-' b'-' b'#' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']]
Ação: 1	Obs: [5 7],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([5, 7]), 'targets': [array([3, 0])], 'target distances': [9.0], 'obstacles': [array([7, 2])]}
        
[[b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b

In [None]:
env = AutoBot(size=10)
run_training(env, epsilon=0.1)

Ação: 0	Obs: [4 2],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([4, 2]), 'targets': [array([8, 3])], 'target distances': [5.0], 'obstacles': [array([9, 4])]}
        
[[b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'X' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'?' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'#' b'-' b'-' b'-' b'-' b'-']]
Ação: 0	Obs: [4 1],	Reward: -0.1,	Terminated: False,	Truncated: False,	Info: {'agent': array([4, 1]), 'targets': [array([8, 3])], 'target distances': [6.0], 'obstacles': [array([9, 4])]}
        
[[b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-' b'-']
 [b'-' b'-' b'-' b'-' b'