# AGENT7 (En construction)

Ce notebook utilise un simulateur egocentré.

# L'environnement Small Loop

Implémentons une version de l'environnement qui nous permet d'afficher le simulateur interne de l'agent.

In [1]:
save_dir = "sav"

FORWARD = 0
FEEL_FRONT = 1
FEEL_LEFT = 2
FEEL_RIGHT = 3
TURN_LEFT = 4
TURN_RIGHT = 5

ENV_HIGHT = 6
ENV_WIDTH = 6
SIM_HIGHT = 3
SIM_WIDTH = 3

In [171]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, BoundaryNorm
from ipywidgets import Button, HBox,VBox, Output
from IPython.display import display

LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3
EMPTY = 0
WALL = 1
FEELING_EMPTY = 2
FEELING_WALL = 3
BUMPING = 4
UNKNOWN = 5

colors = ["#b0b0b0", '#b0b0b0', '#ffffff', '#535865', "#F93943"]  # Hidden environment
colors = ["#D6D6D6", '#5C946E', '#FAE2DB', '#535865', "#F93943", 
          "#EEEEEE", "#85A48F", '#FAE2DB', '#535865', "#F93943", '#BAC9E1']  # Simulator
agent_color = "#1976D2"
prediction_error_color = "#f62dae"
agent_size = 200

class SmallLoop():
    def __init__(self, position, direction, grid):
        self.environment_grid = np.array(grid)
        self.display_grid = np.full((ENV_HIGHT, ENV_WIDTH + SIM_WIDTH), WALL, dtype=int)
        self.display_grid[0:self.environment_grid.shape[0], 0:self.environment_grid.shape[1]] = self.environment_grid
        self.position = np.array(position) 
        self.direction = direction
        self.cmap = ListedColormap(colors)
        self.norm = BoundaryNorm(np.arange(-0.5, len(colors) + 0.5, 1.0), self.cmap.N)
        self.marker_size = agent_size
        self.marker_map = {LEFT: '<', DOWN: 'v', RIGHT: '>', UP: '^'}
        self.marker_color = agent_color
        self.directions = np.array([
            [0, -1],  # Left
            [1, 0],   # Down
            [0, 1],   # Right
            [-1, 0]   # Up
            ])

    def outcome(self, action):
        """Update the grid. Return the outcome of the action."""
        result = 0
        self.display_grid[0:self.environment_grid.shape[0], 0:self.environment_grid.shape[1]] = self.environment_grid

        if action == FORWARD:  
            target_position = self.position + self.directions[self.direction]
            if self.environment_grid[tuple(target_position)] == EMPTY:
                self.position[:] = target_position
            else:
                result = 1
                self.display_grid[tuple(target_position)] = BUMPING
        
        elif action == TURN_RIGHT:
            self.direction = {LEFT: UP, DOWN: LEFT, RIGHT: DOWN, UP: RIGHT}[self.direction]
        
        elif action == TURN_LEFT:
            self.direction = {LEFT: DOWN, DOWN: RIGHT, RIGHT: UP, UP: LEFT}[self.direction]
        
        elif action == FEEL_FRONT:
            feeling_position = self.position + self.directions[self.direction]
            if self.environment_grid[tuple(feeling_position)] == EMPTY:
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            else:
                result = 1
                self.display_grid[tuple(feeling_position)] = FEELING_WALL
        
        elif action == FEEL_LEFT:
            feeling_position = self.position + self.directions[(self.direction + 1) % 4]
            if self.environment_grid[tuple(feeling_position)] == EMPTY:
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            else:
                result = 1
                self.display_grid[tuple(feeling_position)] = FEELING_WALL
        
        elif action == FEEL_RIGHT:
            feeling_position = self.position + self.directions[self.direction - 1]
            if self.environment_grid[tuple(feeling_position)] == EMPTY:
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            else:
                result = 1
                self.display_grid[tuple(feeling_position)] = FEELING_WALL

        # print(f"Line: {self.position[0]}, Column: {self.position[1]}, direction: {self.direction}")
        return result  
    
    def display(self, simulator=None):
        """Display the grid in the notebook"""
        out.clear_output(wait=True)
        with out:
            fig, ax = plt.subplots()
            if simulator is not None:
                plt.scatter(simulator.position[1] + 6, simulator.position[0], s=self.marker_size, marker=self.marker_map[UP], c="#aaaaaa")
                self.display_grid[0:SIM_HIGHT, ENV_WIDTH:(ENV_WIDTH + SIM_WIDTH + 1)] = simulator.display_grid[0:SIM_HIGHT, 0:SIM_WIDTH] + 5
            ax.imshow(self.display_grid, cmap=self.cmap, norm=self.norm)
            plt.scatter(self.position[1], self.position[0], s=self.marker_size, marker=self.marker_map[self.direction], c=self.marker_color)
            ax.text(4.5, 0, f"{step:>3}", fontsize=12, color='White')
            plt.show()
    
    def save(self, step, img_nb, simulator):
        """Save the display as a PNG file"""
        fig, ax = plt.subplots()
        ax.set_xticks([])
        ax.set_yticks([])
        ax.axis('off')
        ax.imshow(self.display_grid, cmap=self.cmap, norm=self.norm)
        plt.scatter(self.position[1], self.position[0], s=self.marker_size, marker=self.marker_map[self.direction], c=self.marker_color)
        plt.scatter(simulator.position[1] + 6, simulator.position[0], s=simulator.marker_size, marker=self.marker_map[UP], 
                    c=simulator.marker_color)
        ax.text(4.5, 0, f"{step:>4}", fontsize=10, color='White')
        plt.savefig(f"{save_dir}/{img_nb:04}.png", bbox_inches='tight', pad_inches=0, transparent=True)
        plt.close(fig)
    
    def clear(self, clear):
        """Clear the grid display"""
        if clear:
            self.display_grid[0:6, 0:6] = self.environment_grid
       

# Le simulateur

Le simulateur implémente plusieurs opérateurs
* `spawn(position)` create a wall at this position
* `kill(position)` empty the cell at this position
* `translate()` translate the simulator memory
* `rotate_clockwise()` et `rotate_counterclockwise()` rotate the simulator memory


In [345]:
class Simulator():
    def __init__(self, grid):
        self.init_grid = np.array(grid)
        self.simulation_grid = np.array(grid)
        self.display_grid = np.array(grid)
        self.position = np.array([1, 1]) 
        self.marker_size = agent_size
        self.marker_color = agent_color

    def translate(self):
        """Translate the grid when the agent moves forward"""
        self.simulation_grid[:SIM_HIGHT, :SIM_WIDTH] = np.vstack((np.full((1, self.simulation_grid.shape[1]), UNKNOWN), self.simulation_grid[:-1])) 
        self.display_grid[:SIM_HIGHT, :SIM_WIDTH] = self.simulation_grid 
        
    def rotate_clockwise(self):
        """Rotate the grid clockwise when the agent rotates left"""
        self.simulation_grid[:SIM_HIGHT, :SIM_WIDTH] = np.rot90(self.simulation_grid, axes=(1,0))
        self.display_grid[:SIM_HIGHT, :SIM_WIDTH] = self.simulation_grid
    
    def rotate_counterclockwise(self):
        """Rotate the grid counter-clockwise when the agent rotates right"""
        self.simulation_grid[:SIM_HIGHT, :SIM_WIDTH] = np.rot90(self.simulation_grid)
        self.display_grid[:SIM_HIGHT, :SIM_WIDTH] = self.simulation_grid

    def bump(self):
        """Simulate bump when move forward wall"""
        self.spawn()
        self.display_grid[tuple(target_position)] = BUMPING
        
    def kill_front(self):
        """Delete the wall in front and display feeling empty"""
        feeling_position = self.position + [-1, 0]
        self.kill(feeling_position)
        self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
        
    def kill_left(self):
        """Delete the wall on the left and display feeling empty"""
        feeling_position = self.position + [0, -1]
        self.kill(feeling_position)
        self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
        
    def kill_right(self):
        """Delete the wall on the right and display feeling empty"""
        feeling_position = self.position + [0, 1]
        self.kill(feeling_position)
        self.display_grid[tuple(feeling_position)] = FEELING_EMPTY

    def spawn_front(self):
        """Delete the wall in front and display feeling empty"""
        feeling_position = self.position + [-1, 0]
        self.spawn(feeling_position)
        self.display_grid[tuple(feeling_position)] = FEELING_WALL
        
    def spawn_left(self):
        """Delete the wall on the left and display feeling empty"""
        feeling_position = self.position + [0, -1]
        self.spawn(feeling_position)
        self.display_grid[tuple(feeling_position)] = FEELING_WALL
        
    def spawn_right(self):
        """Delete the wall on the right and display feeling empty"""
        feeling_position = self.position + [0, 1]
        self.spawn(feeling_position)
        self.display_grid[tuple(feeling_position)] = FEELING_WALL
     
    def spawn(self, position):
        """Spawn a wall at the position """
        self.simulation_grid[tuple(position)] = WALL
        
    def kill(self, position):
        """Remove a wall at the position """
        self.simulation_grid[tuple(position)] = EMPTY
        
    def outcome(self, action):
        """Update the grid. Return the outcome of the action."""
        result = EMPTY
        self.display_grid[:, :] = self.simulation_grid

        if action == FORWARD:  
            target_position = self.position + [-1, 0]
            if self.simulation_grid[tuple(target_position)] == EMPTY:
                self.translate()
            elif self.simulation_grid[tuple(target_position)] == WALL:
                result = WALL
                self.display_grid[tuple(target_position)] = BUMPING
            else:
                # Reset the simulation
                result = UNKNOWN
                self.simulation_grid[:SIM_HIGHT, :SIM_WIDTH] = self.init_grid 
                self.display_grid[:, :] = self.init_grid 
        
        elif action == TURN_RIGHT:
            self.rotate_counterclockwise()
        
        elif action == TURN_LEFT:
            self.rotate_clockwise()
        
        elif action == FEEL_FRONT:
            feeling_position = self.position + [-1, 0]
            if self.simulation_grid[tuple(feeling_position)] == EMPTY:
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            elif self.simulation_grid[tuple(feeling_position)] == UNKNOWN:
                result = UNKNOWN
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            else:
                result = WALL
                self.display_grid[tuple(feeling_position)] = FEELING_WALL
        
        elif action == FEEL_LEFT:
            feeling_position = self.position + [0, -1]
            if self.simulation_grid[tuple(feeling_position)] == EMPTY:
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            elif self.simulation_grid[tuple(feeling_position)] == UNKNOWN:
                result = UNKNOWN
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            else:
                result = 1
                self.display_grid[tuple(feeling_position)] = FEELING_WALL
        
        elif action == FEEL_RIGHT:
            feeling_position = self.position + [0, 1]
            if self.simulation_grid[tuple(feeling_position)] == EMPTY:
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            elif self.simulation_grid[tuple(feeling_position)] == UNKNOWN:
                result = UNKNOWN
                self.display_grid[tuple(feeling_position)] = FEELING_EMPTY
            else:
                result = WALL
                self.display_grid[tuple(feeling_position)] = FEELING_WALL

        # print(f"Line: {self.position[0]}, Column: {self.position[1]}, direction: {self.direction}")
        return result  
        
    def clone(self):
        """Return a copy of the simulator in its current state"""
        return Simulator(self.simulation_grid)

# L'agent

In [346]:
class Interaction:
    """An interaction is a tuple (action, outcome) with a valence"""
    def __init__(self, action, outcome, valence):
        self.action = action
        self.outcome = outcome
        self.valence = valence

    def key(self):
        """ The key to find this interaction in the dictinary is the string '<action><outcome>'. """
        return f"{self.action}{self.outcome}"

    def __str__(self):
        """ Print interaction in the form '<action><outcome:<valence>' for debug."""
        return f"{self.action}{self.outcome}:{self.valence}"

    def __eq__(self, other):
        """ Interactions are equal if they have the same key """
        return self.key() == other.key()

In [347]:
import pandas as pd
import itertools

class Agent:
    def __init__(self, interactions, simulator):
        """ Initialize the dictionary of interactions"""
        self._interactions = {interaction.key(): interaction for interaction in interactions}
        self._intended_interaction = self._interactions["00"]
        self._actions = [i.action for i in interactions if i.outcome == 0]
        self._simulator = simulator
        # Initialize the dataframe with all the combinations of two actions
        pairs = list(itertools.product(self._actions, repeat=2))  
        self._default_df = pd.DataFrame(pairs, columns=['action1', 'action2'])
        self._default_df[['outcome1', 'step', 'outcome2', 'valence']] = 0
        self._simulation_step = 0
        self._s1 = None
        self.action_df = self._default_df.copy()
        self.selection_df = None
        self.simulation = simulator  # Used to display the current state of the simulation
        # The dataframe of all possible simulations
        simulations = list(itertools.product(range(1,5), range(1,5), range(4), self._actions))
        self._simulation_df = pd.DataFrame(simulations, columns=['pre_x', 'pre_y', 'pre_d', 'action'])
        self._simulation_df[['outcome', 'post_x', 'post_y', 'post_d']] = -1
        self.history_df = pd.DataFrame({'x':[simulator.position[0]],'y':[simulator.position[1]], 
                                        'd':[3], 'action':[0], 'predicted':[0],'outcome':[0]})
        self.operators = {f"{FORWARD}{EMPTY}":self._simulator.translate,
                      f"{FORWARD}{WALL}":self._simulator.bump,
                      f"{TURN_LEFT}{EMPTY}":self._simulator.rotate_clockwise,
                      f"{TURN_RIGHT}{EMPTY}":self._simulator.rotate_counterclockwise,
                      f"{FEEL_FRONT}{EMPTY}":self._simulator.kill_front,
                      f"{FEEL_FRONT}{WALL}":self._simulator.spawn_front,
                      f"{FEEL_LEFT}{EMPTY}":self._simulator.kill_left,
                      f"{FEEL_LEFT}{WALL}":self._simulator.spawn_left,
                      f"{FEEL_RIGHT}{EMPTY}":self._simulator.kill_right,
                      f"{FEEL_RIGHT}{WALL}":self._simulator.spawn_right}
    

    def simulate(self): 
        """Compute the next row in action_df. Return True during the simulation and False when the simulation is over"""
        a1, step, a2, o1, valence = self.action_df.loc[self._simulation_step, ['action1', 'step', 'action2', 'outcome1', 'valence']]
        if step == 0:
            # Simulate the first action
            self._s1 = self._simulator.clone()
            o1 = self._s1.outcome(a1)
            self.simulation = self._s1
            valence = self._interactions[f"{a1}{o1}"].valence
            # Mark all the rows of this action as 'simulated'
            self.action_df.loc[self.action_df['action1'] == a1, 'step'] = 1
            self.action_df.loc[self.action_df['action1'] == a1, 'outcome1'] = o1
            self.action_df.loc[self.action_df['action1'] == a1, 'valence'] = valence
            # Record this simulation
            self._simulation_df.loc[(self._simulation_df['pre_x'] == self._simulator.position[0]) &
                                     (self._simulation_df['pre_y'] == self._simulator.position[1]) &
                                     (self._simulation_df['pre_d'] == UP) &
                                     (self._simulation_df['action'] == a1), 
                                     ['outcome', 'post_x', 'post_y', 'post_d']] = [o1, self._s1.position[0], self._s1.position[1], UP]
        elif o1 != UNKNOWN:
            # Simulate the second action
            s2 = self._s1.clone()
            o2 = s2.outcome(a2)
            valence += self._interactions[f"{a2}{o2}"].valence
            self.action_df.loc[(self.action_df['action1'] == a1) & (self.action_df['action2'] == a2), 'outcome2'] = o2
            self.action_df.loc[(self.action_df['action1'] == a1) & (self.action_df['action2'] == a2), 'valence'] = valence
            self.simulation = s2
            self._simulation_step += 1
            # Record this simulation
            self._simulation_df.loc[(self._simulation_df['pre_x'] == self._s1.position[0]) &
                                     (self._simulation_df['pre_y'] == self._s1.position[1]) &
                                     (self._simulation_df['pre_d'] == UP) &
                                     (self._simulation_df['action'] == a2), 
                                     ['outcome', 'post_x', 'post_y', 'post_d']] = [o2, s2.position[0], s2.position[1], UP]
        else:
            # Skip the second simulation
            self._simulation_step += 1
            
        # When all the pairs of action have been simulated or a positive valence has been found, return False to stop the simulation
        return self._simulation_step < len(self.action_df) and valence <= 0

    def assimilate(self, outcome):
        """ Process the received outcome and prepare for the next simulation """
        # Trace the previous cycle
        previous_interaction = self._interactions[f"{self._intended_interaction.action}{outcome}"]
        prediction_is_correct = self._intended_interaction.outcome == outcome
        print(f"Action: {self._intended_interaction.action}, Predicted: {self._intended_interaction.outcome}, Outcome: {outcome}, " 
              f"Prediction: {prediction_is_correct}, Valence: {previous_interaction.valence})")
        # Trace the history
        new_history = pd.DataFrame({'x':[simulator.position[0]],'y':[simulator.position[1]], 'd':[UP], 
                                    'action':[self._intended_interaction.action], 'predicted':[self._intended_interaction.outcome] , 
                                    'outcome':[outcome]})
        self.history_df = pd.concat([self.history_df, new_history], ignore_index=True)

        if self._intended_interaction.outcome == UNKNOWN:
            # Update the simulator by applying the operator corresponding to the enacted interaction
            self.operators[previous_interaction.key()]()
        elif not prediction_is_correct:
            # Other unexpected outcome: adjust the simulator
            self.adjust(outcome)
        # Update the simulator
        self._simulator.outcome(self._intended_interaction.action)
        self.simulation = self._simulator
        # Prepare the next simulation
        self._simulation_step = 0
        self.action_df = self._default_df.copy()

    def adjust(self, outcome):
        """Adjust the simulator"""
        # look for a compatible state in the simulations 
        filtered_df = self._simulation_df.loc[(self._simulation_df['action'] == self._intended_interaction.action) & 
                                          (self._simulation_df['outcome'] == outcome)].reset_index(drop=True)
        if filtered_df.empty:
            print("Adjustement failed to find a compatible position")
        else:
            # Place the simulator in a compatible state
            x, y, direction = filtered_df.loc[0,['pre_x', 'pre_y', 'pre_d']]
            self._simulator.position[:] = [x, y]
            self._simulator.direction = direction
            print(filtered_df)
            print(f"Simulator adjusted to [{x},{y}], {direction}")  
    
    def action(self):
        """ Select the next interaction to try to enact """
        # Sort by descending valence
        self.selection_df = self.action_df.sort_values(by=['valence'], ascending=[False]).reset_index(drop=True)
        # Return the action that yeilds the highest valence
        intended_action, intended_outcome = self.selection_df.loc[0, ['action1', 'outcome1']]
        self._intended_interaction = self._interactions[f"{intended_action}{intended_outcome}"]
        return self._intended_interaction.action


# Run the experiment

In [348]:
# Instanciate the small loop environment
grid = [[WALL, WALL , WALL , WALL , WALL , WALL],
        [WALL, EMPTY, EMPTY, EMPTY, WALL , WALL],
        [WALL, EMPTY, WALL , EMPTY, EMPTY, WALL],
        [WALL, EMPTY, WALL , WALL , EMPTY, WALL],
        [WALL, EMPTY, EMPTY, EMPTY, EMPTY, WALL],
        [WALL, WALL , WALL , WALL , WALL , WALL]]
e = SmallLoop([1, 1], 0, grid)

# Instanciate the agent 
interactions = [
    Interaction(FORWARD, EMPTY, 5),
    Interaction(FORWARD, WALL, -10),
    Interaction(FORWARD, UNKNOWN, -5),
    Interaction(TURN_LEFT, EMPTY,-3),
    Interaction(TURN_RIGHT, EMPTY, -3),
    Interaction(FEEL_FRONT, EMPTY, -1),
    Interaction(FEEL_FRONT, WALL, -1),
    Interaction(FEEL_FRONT, UNKNOWN, 1),
    Interaction(FEEL_LEFT, EMPTY, -1),
    Interaction(FEEL_LEFT, WALL, -1),
    Interaction(FEEL_LEFT, UNKNOWN, 1),
    Interaction(FEEL_RIGHT, EMPTY, -1),
    Interaction(FEEL_RIGHT, WALL, -1),
    Interaction(FEEL_RIGHT, UNKNOWN, 1),
]
# Intialize the simulator to UNKNOWN grid
simulator_grid = np.full((SIM_HIGHT, SIM_WIDTH), UNKNOWN, dtype=int)
simulator = Simulator(simulator_grid)
agent = Agent(interactions, simulator)

# Run the interaction loop
step = 0
outcome = 0
img_nb = 0

# Display
out = Output()
e.display(agent.simulation)
display(out)

Output()

In [350]:
step += 1
img_nb += 1
print(f"Step {step}")
while agent.simulate():
    e.display(agent.simulation)
    e.save(step, img_nb, agent.simulation)  # Save the image file action = agent.action()
    img_nb += 1
action = agent.action()
outcome = e.outcome(action)
# print(agent.action_df)
agent.assimilate(outcome)
e.display(agent.simulation)
e.save(step, img_nb, agent.simulation)  # Save the image file 

Step 2
Action: 2, Predicted: 5, Outcome: 0, Prediction: False, Valence: -1)


La partie gauche représente l'environnement "réel" et la partie droite le simulateur interne de l'agent. 

Lorsque l'agent arrive dans des endroits ou les cellules du simulateur sont UNKNOWN, il sélectionne les interactions FEEL car elles ont une valence positive. 

# Créons le film gif

In [203]:
import imageio.v2 as imageio
import os

img_dir = f"./{save_dir}"
all_files = [os.path.join(img_dir, f) for f in os.listdir(img_dir) if f.endswith('.png')]
images = [imageio.imread(f) for f in all_files]
imageio.mimsave("movie.gif", images)

![Agent8 demo](agent8.gif)

# Analyse

Nous voudrions que l'agent apprenne par lui même le dictionnaire `operators` qui mémorise la correspondances entre les interactions et les operateurs du simulateur.

L'ensemble de toutes les configurations possibles du dictionnaire `operators` correspond à toutes les permutations possibles des 10 interactions, soient 10! = 3 628 800 permutations.


