In [2]:
!pip install -r requirements.txt

Collecting mesa
  Downloading Mesa-0.9.0-py3-none-any.whl (691 kB)
Installing collected packages: mesa
Successfully installed mesa-0.9.0


# Introduction

Dans ce TP, nous allons étudier un système inspiré du \textsl{Mars Explorer} de Luc Steels un peu modifié. La problématique est la suivante: un endroit impénétrable aux ondes a été miné, et le but est de le déminer. Pour cela, on emploie une équipe de robots capables de détecter les mines et de les désarmer. Puisqu'il est impossible de communiquer avec l'intérieur de l'endroit, les machines doivent constituer un système autonome et efficace. Les agents décrits dans le cadre de ce TP sont basés sur les principes des agents réactifs et permettent de résoudre ce problème.


In [3]:
import enum
import math
import random
import uuid
from enum import Enum

import mesa
import numpy as np
from collections import defaultdict

import mesa.space
from mesa import Agent, Model
from mesa.datacollection import DataCollector
from mesa.time import RandomActivation
from mesa.visualization.ModularVisualization import VisualizationElement, ModularServer
from mesa.visualization.modules import ChartModule

MAX_ITERATION = 100
PROBA_CHGT_ANGLE = 0.01


L'environnement est composé de trois éléments principaux:

- les mines sont les objets que le robot doit détecter. Une fois une mine détectée, le robot se place aux mêmes coordonnées qu'elle et le détruit,
- les obstacles sont des éléments qui ne doivent pas être détruits. De plus, un robot ne doit jamais se retrouver sur un obstacle, ce qui comprend son emplacement et un rayon de 2 unités autour de lui.
- des sables mouvants, qui ralentissent les agents (ils vont alors à la moitié de leur vitesse normale)

In [4]:
class Obstacle:  # Environnement: obstacle
    def __init__(self, x, y, r):
        self.x = x
        self.y = y
        self.r = r

    def portrayal_method(self):
        portrayal = {"Shape": "circle",
                     "Filled": "true",
                     "Layer": 1,
                     "Color": "black",
                     "r": self.r}
        return portrayal


class Quicksand:  # Environnement: ralentissement
    def __init__(self, x, y, r):
        self.x = x
        self.y = y
        self.r = r

    def portrayal_method(self):
        portrayal = {"Shape": "circle",
                     "Filled": "true",
                     "Layer": 1,
                     "Color": "olive",
                     "r": self.r}
        return portrayal


class Mine:  # Environnement: élément à ramasser
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def portrayal_method(self):
        portrayal = {"Shape": "circle",
                     "Filled": "true",
                     "Layer": 2,
                     "Color": "black",
                     "r": 2}
        return portrayal



Enfin, les robots doivent s'éviter. On supposera qu'ils ne doivent jamais se croiser. Voici le code concernant le Canvas:

In [5]:
class ContinuousCanvas(VisualizationElement):
    local_includes = [
        "./js/simple_continuous_canvas.js",
    ]

    def __init__(self, canvas_height=500,
                 canvas_width=500, instantiate=True):
        VisualizationElement.__init__(self)
        self.canvas_height = canvas_height
        self.canvas_width = canvas_width
        self.identifier = "space-canvas"
        if (instantiate):
            new_element = ("new Simple_Continuous_Module({}, {},'{}')".
                           format(self.canvas_width, self.canvas_height, self.identifier))
            self.js_code = "elements.push(" + new_element + ");"

    def portrayal_method(self, obj):
        return obj.portrayal_method()

    def render(self, model):
        representation = defaultdict(list)
        for obj in model.schedule.agents:
            portrayal = self.portrayal_method(obj)
            if portrayal:
                portrayal["x"] = ((obj.x - model.space.x_min) /
                                  (model.space.x_max - model.space.x_min))
                portrayal["y"] = ((obj.y - model.space.y_min) /
                                  (model.space.y_max - model.space.y_min))
            representation[portrayal["Layer"]].append(portrayal)
        for obj in model.mines:
            portrayal = self.portrayal_method(obj)
            if portrayal:
                portrayal["x"] = ((obj.x - model.space.x_min) /
                                  (model.space.x_max - model.space.x_min))
                portrayal["y"] = ((obj.y - model.space.y_min) /
                                  (model.space.y_max - model.space.y_min))
            representation[portrayal["Layer"]].append(portrayal)
        for obj in model.markers:
            portrayal = self.portrayal_method(obj)
            if portrayal:
                portrayal["x"] = ((obj.x - model.space.x_min) /
                                  (model.space.x_max - model.space.x_min))
                portrayal["y"] = ((obj.y - model.space.y_min) /
                                  (model.space.y_max - model.space.y_min))
            representation[portrayal["Layer"]].append(portrayal)
        for obj in model.obstacles:
            portrayal = self.portrayal_method(obj)
            if portrayal:
                portrayal["x"] = ((obj.x - model.space.x_min) /
                                  (model.space.x_max - model.space.x_min))
                portrayal["y"] = ((obj.y - model.space.y_min) /
                                  (model.space.y_max - model.space.y_min))
            representation[portrayal["Layer"]].append(portrayal)
        for obj in model.quicksands:
            portrayal = self.portrayal_method(obj)
            if portrayal:
                portrayal["x"] = ((obj.x - model.space.x_min) /
                                  (model.space.x_max - model.space.x_min))
                portrayal["y"] = ((obj.y - model.space.y_min) /
                                  (model.space.y_max - model.space.y_min))
            representation[portrayal["Layer"]].append(portrayal)
        return representation


# Description basique du robot
Le robot est doté d'un certain nombre de capacités primaires: il peut se déplacer, détecter un obstacle ou un autre robot à éviter, détecter une mine et la détruire. Voici les comportements attendus par le robot:
- **Se déplacer** lorsque le robot est à la recherche de mines, il se déplace de manière rectiligne, à une vitesse `speed` et selon un angle `angle`. Selon une probabilité fixée par avance (`PROBA_CHGT_ANGLE`), le robot peut changer de direction. Dans ce cas, son nouvel angle est un angle aléatoire choisi.
- **Éviter un autre robot** Lorsqu'un ou plusieurs autres robots se trouve à portée de détection, le robot calcule son futur mouvement et vérifie si les robots à portée pourraient entrer en collision avec lui. Puisqu'il ne connaît que la position des autres robots et non leur angle, il considère qu'une collision est possible si sa position au tour suivant ou n'importe quelle position intermédiaire entre sa position actuelle et sa position future se trouvent à une distance inférieure à la vitesse maximale de l'autre robot. Si c'est le cas, il modifie son angle jusqu'à trouver un angle où il n'y ait pas de collision.
- **Éviter un obstacle / les bords de l'environnement** Si un robot détecte un ou plusieurs obstacles, il procède au même calcul, mais il considère qu'une collision est possible si sa future position est dans l'obstacle.  Pour simplifier, on ne prendra pas en compte le fait qu'un robot puisse traverser un obstacle.
- **Détruire une mine** Lorsqu'un agent se trouve sur une mine, il la désamorce (dans mesa, il la retire du `model`). Détruire une mine est une action qui peut être menée en plus du déplacement.
- **Se diriger vers une mine** Lorsqu'un agent détecte une mine, il se dirige vers la mine à sa vitesse, sans prendre en compte ni modifier son angle.

In [6]:
class Robot(Agent):  # La classe des agents
    def __init__(self, unique_id: int, model: Model, x, y, speed, sight_distance, angle=0.0):
        super().__init__(unique_id, model)
        self.x = x
        self.y = y
        self.speed = speed
        self.sight_distance = sight_distance
        self.angle = angle
        self.counter = 0

    def step(self):
        # Se deplacer et/ou aller vers une mine
        detect_mine = False
        mins = [mine for mine in model.mines if np.sqrt((self.x - mine.x)**2 + (self.y - mine.y)**2) <= self.sight_distance]
        pick = 0
        target = mins[0]
        
        if len(mins) != 0:
            pick = random.randint(0, len(mins)-1)
            target = mins[pick]
            dest_x, dest_y, ang = go_to(self.x, self.y, self.speed, target.x, target.y)
            detect_mine = True
            #self.x = dest_x
            #self.y = dest_y
            #model.schedule.remove(target)
        else:
            prob = random.random()
            speed = self.speed
            for sand in model.quicksands:
                if self.x == sand.x and self.y == self.y:
                    speed = self.speed / 2
                    
            if prob < PROBA_CHGT_ANGLE:
                ang = np.radians(random.random())
                dest_x, dest_y = move(self.x, self.y, speed, ang)
            else:
                dest_x, dest_y = move(self.x, self.y, speed, self.angle)
                
            obsts = [obst for obst in model.obstacles if np.sqrt((self.x - obst.x)**2 + (self.y - obst.y)**2) <= self.sight_distance]
            target_obsts = [obst for obst in obsts if np.sqrt((dest_x - obst.x)**2 + (dest_y - obst.y)**2) <= 2.0]
            
            robs = [rob for rob in model.schedule.agents if np.sqrt((self.x - rob.x)**2 + (self.y - rob.y)**2) <= self.sight_distance]
            a = (dest_y - self.y)/(dest_x - self.x)
            b = (self.y*dest_x - dest_y*self.x)/(dest_x - self.x)
            x0 = (dest_x - self.x)/2
            y0 = (dest_y - self.y)/2
            rad = (1/2)*np.sqrt((self.x - dest_x)**2 + (self.y - dest_y)**2)
            target_robs = [rob for rob in robs if np.abs(a*rob.x - rob.y + b)/np.sqrt(a**2 + b**2) <= rob.speed and np.sqrt((rob.x - x0)**2 + (rob.y - y0)**2) <= rad]
            
            while len(target_robs) != 0 or len(target_obsts) != 0:
                if detect_mine and len(mins) > 0:
                    del mins[pick]
                    pick = random.randint(0, len(mins)-1)
                    target = mins[pick]
                    dest_x, dest_y, ang = go_to(self.x, self.y, self.speed, target.x, target.y)
                else:
                    detect_mine = False
                    ang = np.radians(random.random())
                    dest_x, dest_y = move(self.x, self.y, speed, ang)
                a = (dest_y - self.y)/(dest_x - self.x)
                b = (self.y*dest_x - dest_y*self.x)/(dest_x - self.x)
                x0 = (dest_x - self.x)/2
                y0 = (dest_y - self.y)/2
                rad = (1/2)*np.sqrt((self.x - dest_x)**2 + (self.y - dest_y)**2)
                target_robs = [rob for rob in robs if np.abs(a*rob.x - rob.y + b)/np.sqrt(a**2 + b**2) <= rob.speed and np.sqrt((rob.x - x0)**2 + (rob.y - y0)**2) <= rad]
                target_obsts = [obst for obst in obsts if np.sqrt((dest_x - obst.x)**2 + (dest_y - obst.y)**2) <= 2.0]
            self.x = dest_x
            self.y = dest_y
            
            if detect_mine:
                model.schedule.remove(target)
            
        
        pass  # TODO L'intégralité du code du TP peut être ajoutée ici.

    def portrayal_method(self):
        portrayal = {"Shape": "arrowHead", "s": 1, "Filled": "true", "Color": "Red", "Layer": 3, 'x': self.x,
                     'y': self.y, "angle": self.angle}
        return portrayal


Vous pouvez vous appuyer sur ces deux fonctions: 

In [7]:
def move(x, y, speed, angle):  # se déplacer de speed selon l'angle angle depuis x, y
    return x + speed * math.cos(angle), y + speed * math.sin(angle)


def go_to(x, y, speed, dest_x, dest_y):  # se déplacer de speed vers (dest_x, dest_y) ou si la speed est trop importante s'y arrêter.
    if np.linalg.norm((x - dest_x, y - dest_y)) < speed:
        return (dest_x, dest_y), 2 * math.pi * random.random()
    else:
        angle = math.acos((dest_x - x)/np.linalg.norm((x - dest_x, y - dest_y)))
        if dest_y < y:
            angle = - angle
        return move(x, y, speed, angle), angle


Enfin, le modèle et le lancement du serveur:

In [8]:
class MinedZone(Model):
    collector = DataCollector(
        model_reporters={"Mines": lambda model: len(model.mines),
                         "Danger markers": lambda model: len([m for m in model.markers if
                                                          m.purpose == MarkerPurpose.DANGER]),
                         "Indication markers": lambda model: len([m for m in model.markers if
                                                          m.purpose == MarkerPurpose.INDICATION]),
                         },
        agent_reporters={})

    def __init__(self, n_robots, n_obstacles, n_quicksand, n_mines, speed):
        Model.__init__(self)
        self.space = mesa.space.ContinuousSpace(600, 600, False)
        self.schedule = RandomActivation(self)
        self.mines = []
        self.markers = []
        self.obstacles = []
        self.quicksands = []
        for _ in range(n_obstacles):
            self.obstacles.append(Obstacle(random.random() * 500, random.random() * 500, 10 + 20 * random.random()))
        for _ in range(n_quicksand):
            self.quicksands.append(Quicksand(random.random() * 500, random.random() * 500, 10 + 20 * random.random()))
        for _ in range(n_robots):
            x, y = random.random() * 500, random.random() * 500
            while [o for o in self.obstacles if np.linalg.norm((o.x - x, o.y - y)) < o.r] or \
                    [o for o in self.quicksands if np.linalg.norm((o.x - x, o.y - y)) < o.r]:
                x, y = random.random() * 500, random.random() * 500
            self.schedule.add(
                Robot(int(uuid.uuid1()), self, x, y, speed,
                      2 * speed, random.random() * 2 * math.pi))
        for _ in range(n_mines):
            x, y = random.random() * 500, random.random() * 500
            while [o for o in self.obstacles if np.linalg.norm((o.x - x, o.y - y)) < o.r] or \
                    [o for o in self.quicksands if np.linalg.norm((o.x - x, o.y - y)) < o.r]:
                x, y = random.random() * 500, random.random() * 500
            self.mines.append(Mine(x, y))
        self.datacollector = self.collector

    def step(self):
        self.datacollector.collect(self)
        self.schedule.step()
        if not self.mines:
            self.running = False


def run_single_server():
    chart = ChartModule([{"Label": "Mines",
                          "Color": "Orange"},
                         {"Label": "Danger markers",
                          "Color": "Red"},
                         {"Label": "Indication markers",
                          "Color": "Green"}
                         ],
                        data_collector_name='datacollector')
    server = ModularServer(MinedZone,
                           [ContinuousCanvas(),
                            chart],
                           "Deminer robots",
                           {"n_robots": mesa.visualization.
                            ModularVisualization.UserSettableParameter('slider', "Number of robots", 7, 3,
                                                                       15, 1),
                            "n_obstacles": mesa.visualization.
                            ModularVisualization.UserSettableParameter('slider', "Number of obstacles", 5, 2, 10, 1),
                            "n_quicksand": mesa.visualization.
                            ModularVisualization.UserSettableParameter('slider', "Number of quicksand", 5, 2, 10, 1),
                            "speed": mesa.visualization.
                            ModularVisualization.UserSettableParameter('slider', "Robot speed", 15, 5, 40, 5),
                            "n_mines": mesa.visualization.
                            ModularVisualization.UserSettableParameter('slider', "Number of mines", 15, 5, 30, 1)})
    server.port = 8521
    server.launch()


if __name__ == "__main__":
    run_single_server()



Interface starting at http://127.0.0.1:8521


RuntimeError: This event loop is already running

Socket opened!
{"type":"reset"}
{"type":"get_step","step":1}


**Question 1** – Quelle architecture vous paraît la plus à même de traiter ce problème ?

L'architecture la plus adaptée est celle du subsomption.

**Question 2** –  Il est capital que les robots ne rentrent pas en collision les uns avec les autres. Cela prime sur chacun de leurs déplacement. Proposer un ordre de priorité pour les comportements décrits ci-dessus qui respecte cette contrainte et permette aux robots d'accomplir leur mission (NB, il est possible que plusieurs comportements doivent être fusionnés)

Pour cela on fait déjà les tests qui concernent la position actuelle comme les sables mouvants ou la détection de mines à supprimer. Ensuite on cherche la position suivante en fonction de potentielles mines ou alors d'un déplacement aléatoire et pour celle-ci on modifie l'angle tant qu'un angle qui ne satisfait pas toutes les conditions d'évitement, à savoir collision avec un autre agent ou rencontre d'un obstacle ou d'un bord, n'est pas trouvé.

Implémentez l'ordre que vous proposé précédemment. Ajoutez une métrique représentant le cumul des mines désamorcées à chaque tour. Enregistrez ce graphe et joignez le à votre TP. Lancez la simulation une dizaine de fois et donnez le temps moyen de désamorçage de toutes les mines.

*Voir graph_1*

**Question 3** – Quels principes des agents réactifs sont ici respectés? Lesquels ne le sont pas? Justifiez.

La simplicité et l'absence de modèle. En effet, pour chaque perception des agents on a une action qui est effectuée et les données brutes en l'occurence la position des capteurs ne sont pas prétraitées avant d'être exploitées. Par contre la modularité n'est pas respectée car on retrouve des tâches qui s'imbriquent entre elles.

# Communication indirecte

Pour faire communiquer nos robots, nous allons utiliser l'environnement. Nous allons utiliser des balises (`marker`), comme Steels le décrit pour le *Mars Explorer*. Les robots ont deux types balises. Un agent dépose une balise `DANGER` lorsqu'il sort de sables mouvants, afin que les autres agents n'y pénètrent pas, et une balise `INDICATION`, déposée lorsque l'agent démine, et qui indique dans quelle direction il est allé. Un agent qui détecte une balise `DANGER` fera demi-tour. Un agent détectant une balise `INDICATION` modifie son angle afin de se déplacer à 90° dans une direction ou l'autre par rapport à l'angle indiqué sur la balise. Un robot détectant une balise se déplace jusqu'à elle, la ramasse et fait demi-tour.

Plus formellement, voici la description des comportements attendus pour le robot:
- **Faire demi-tour** Lorsqu'un agent détecte une balise `DANGER`, il fait demi-tour.
- **Tourner à 90°** Lorsqu'un agent détecte une balise `INDICATION`, il se dirige à 90° de la direction indiquée par la balise
- **Déposer une balise** Lorsqu'un agent vient de sortir des sables mouvants, un agent dépose une balise `DANGER`; lorsqu'un agent démine, il dépose une balise `INDICATION` dont la `direction` indique la direction dans laquelle se dirige l'agent. Attention, pour éviter que le robot ne soit influencé par la balise qu'il vient de déposer, un compteur de tour est initialisé à `speed/2` durant lequel le robot ignore les balises qu'il voit.

La classe de balise a déjà été implémentée:

In [None]:
class MarkerPurpose(Enum):  # Enum pour les types de balises
    DANGER = enum.auto(),
    INDICATION = enum.auto()

    
class Marker:  # La classe pour les balises
    def __init__(self, x, y, purpose, direction=None):
        self.x = x
        self.y = y
        self.purpose = purpose
        if purpose == MarkerPurpose.INDICATION:
            if direction is not None:
                self.direction = direction
            else:
                raise ValueError("Direction should not be none for indication marker")

    def portrayal_method(self):
        portrayal = {"Shape": "circle",
                     "Filled": "true",
                     "Layer": 2,
                     "Color": "red" if self.purpose == MarkerPurpose.DANGER else "green",
                     "r": 2}
        return portrayal

Socket opened!
{"type":"reset"}


**Question 4** – Commentez ce dernier point au regard des principes de l'architecture réactive.

Ces nouvelles fonctionalités nous rapprochent gardent toujours dans le cadre de l'architecture du subsomption puisque la communication et les interactions entre les agents ne sont pas directes mais plutôt indirectes en passant par l'intermédiaire de balises posées sur le chemin.

**Question 5** – Proposez une nouvelle manière d'organiser ces comportements et justifiez.

Une nouvelle organisation consiste à garder la même que précédemment, juste introduire parmi les tests de detection, la detection des balises, qui modifient l'angle des agents et dont la prochaine destination possible.

**Question 6** – Relancez 10 simulations et incluez un graphe dans le rendu du TP. Quel est le temps moyen de désamorçage des mines? Commentez.

*Voir graph_é et temps_deminage_10* 

Le temps moyen de déminage est d'environ une centaine d'étapes. 

**Question 7** – Ajoutez un reporter permettant de suivre le nombre de tours passés dans les sables mouvants. Comment les balises `DANGER` influencent-elles le temps moyen passé dans les sables mouvants ?

Elles réduisent le temps passé dans les sables mouvants car empêchent les agents de venir ralentir à l'intérieur à chaque fois.

**Question Bonus** – Ajouter le mécanisme suivant: on suppose désormais que les agents sont capables de transmettre un signal leur permettant de savoir où se trouvent les autres. Lorsqu'un agent modifie son angle aléatoirement, faites en sorte qu'il se tourne de manière à maximiser l'angle entre sa nouvelle direction et la direction envers chacun de ses 2 pluss proches voixins. Relancez 10 expérimentations. Qu'observez-vous ?

*Insérez votre réponse ici*