# <font color='red'>TME ROBOTIQUE ET APPRENTISSAGE</font>
# <font color='red'>Evolution de structures</font>

<font color="red">Version étudiant 2022-2023</font>

*mise à jour: 12/04/2023*

Ce notebook peut être exécuté dans [Google Colab](colab.research.google.com/)

Pour faciliter la lisibilité du notebook, le code donné, à écrire ou à compléter est dans les cellules en annexe, à la fin du notebook. Les cellules de réponses ne doivent contenir que les quelques instructions permettant d'afficher les résultats (éventuellement des appels permettant de les générer) et les commentaires d'analyse associés.

Vous devez déposer votre travail sur Moodle:
* déposer votre notebook, avec le nom de fichier *obligatoirement* au format suivant: **RA_NOM1_NOM2.ipynb**
* toutes les cellules exécutées
* des graphes et un commentaire sur les résultats obtenus
* affichage limité au nécessaire pour assurer la lisibilité du notebook (pas d'affichage de debug ni de centaines de graphes !)

*Le sujet est à faire en binome.*

# COMPLETEZ LES CHAMPS CI-DESSOUS AVEC NOM/PRENOM/CARTE_ETU:

* Étudiant 1: **_Slimani_ _Nour_ _Ismahane_ _21221230_**
* Étudiant 2: **_Buisson_ _Marc_ _28614429_**

## Introduction

Ce TME est composé de deux parties indépendantes qui s'appuieront toutes deux sur le framework DEAP que vous avez utilisé lors des TME précédents.

Dans la première partie, vous ferez de la regression symbolique avec de la programmation génétique.

Dans la seconde partie, vous testerez l'expérience de Lehman et Stanley sur novelty search.

Installation des dépendances:

In [1]:
!pip install deap
!pip install gym
!pip install scoop
!apt install libgraphviz-dev
!pip install pygraphviz
!apt install poppler-utils
!pip install pdf2image
!pip install plot

Collecting deap
  Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/135.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m133.1/135.4 kB[0m [31m3.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: deap
Successfully installed deap-1.4.1
Collecting scoop
  Downloading scoop-0.7.2.0.tar.gz (615 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m615.4/615.4 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: scoop
  Building wheel for scoop (setup.py) ... [?25l[?25hdone
  Created wheel for scoop: filename=scoop-0.7.2.0-py3-none-any.whl 

Sur les machines de TME (ou sur votre machine), vous pouvez également utiliser singularity, qui est un outil permettant de gérer des "containers" contenant tout l'environnement logiciel et les dépendances nécessaires, cf https://sylabs.io/guides/3.5/user-guide/index.html.

L'image singularity est disponible sur moodle.

Vous devez la copier en local sur votre machine (elle ne doit pas être dans un répertoire accessible par le réseau). Vous pouvez ensuite lancer un shell de la façon suivante:
<pre>singularity shell TME_RA.sif </pre>
Cela vous donnera accès à un shell dans lequel toutes les dépendances sont disponibles.

Remarque: singularity attache par défaut votre répertoire home à l'image singularity. C'est très pratique, mais cela peut poser des difficultés en python si vous avez des bibliothèques installées en local. Vous pouvez utiliser l'option --no-home pour éviter ce type de problème. Pour accéder à vos fichiers, vous pouvez alors demander à monter un répertoire particulier dans votre image avec l'argument --bind TME_hors_singularity:/TME_dans_singularity.

## 1. Regression symbolique

Vous allez utiliser la programmation génétique pour retrouver des équations à partir de données.
Vous utiliserez pour cela les fonctions proposées par DEAP:
https://deap.readthedocs.io/en/master/tutorials/advanced/gp.html et vous pourrez vous inspirez des exemples de programmation génétique donnés dans la documentation: https://deap.readthedocs.io/en/master/examples/gp_symbreg.html.


**1.1-** Complétez le code qui vous a été fourni (annexe, question 1-3, `symbolic_regression.py`). En vous appuyant sur DEAP, vous implémenterez 3 stratégies:
* une stratégie purement élitiste visant à minimiser l'erreur d'approximation uniquement,
* la stratégie avec double tournoi, le premier tournoi choisissant les individus avec les erreurs les plus faibles et le second tournoi choisissant les individus avec le modèle le plus simple
* une stratégie multi-objectif s'appuyant sur NSGA-2 avec l'erreur d'approximation comme premier objectif et la taille du modèle en deuxième objectif (les deux étant à minimiser)

Vous testerez votre code sur une fonction simple (par exemple f(x,y)=x*y+cos(x)) avec le jeu de fonctions primitives suivant: +, -, *, / (protected_div), cos et sin. Vous pourrez ajouter une constante (1) et une constante éphémère (variable aléatoire uniforme entre -1 et 1).

Vous génèrerez un ensemble de données d'entrainement et un ensemble de validation que vous utiliserez pour vérifier s'il y a eu surapprentissage. Vous pourrez générer, par exemple, 30 valeurs différentes de x et 30 valeurs différentes de y. Vous indiquerez dans votre réponse les opérateurs de mutation et de croisement que vous avez utilisés (remarque: si vous voulez combiner plusieurs opérateurs de mutation ou de croisement, il faut définir un nouvel opérateur qui gère cette combinaison).

Vous regarderez les arbres générés et indiquerez le nombre de fois que la fonction a été retrouvée sur une dizaine d'expériences. Vous comparerez la taille des fonctions générées selon la variante de sélection utilisée.

**Remarque1:** pour rappel, la programmation génétique utilise généralement de grandes populations. Il vous est recommandé d'utiliser des tailles de 400 minimum. En une centaine de générations, vous devriez pouvoir observer de premiers résultats.

**Remarque2:** pour limiter l'impact du "bloat", il vous est recommandé de mettre une taille maximale à l'arbre généré par les opérateurs de mutation et de croisement. Vous pourrez utiliser gp.staticLimit. Sans cela, certaines expériences risquent de prendre un temps et une mémoire considérables.

Complétez le squelette de code donné en annexe. L'exécution de la cellule sauvegardera son contenu que vous pourrez ensuite appeler dans un terminal ou directement depuis le notebook en transmettant les arguments décrivant la variante que vous souhaitez tester (tournoi, nsga2, ...).

Vous pourrez afficher des arbres dans votre notebook en vous inspirant du code fourni ou en affichant directement le PDF dans le notebook avec les commandes suivantes:

In [2]:
filepath="res_dir/hof_tree_genX.pdf"
from pdf2image import convert_from_path

images = convert_from_path(filepath)
images[0]  # first page


PDFPageCountError: ignored

In [3]:
#<ANSWER>
%%time
!python3 symbolic_regression.py

#</ANSWER>


python3: can't open file '/content/symbolic_regression.py': [Errno 2] No such file or directory
CPU times: user 4.97 ms, sys: 0 ns, total: 4.97 ms
Wall time: 107 ms


**1.2-** Ajoutez du bruit à vos fonctions et observez le résultat obtenu (mettez des valeurs qui sont faibles devant les données, par exemple 0.0001).

In [4]:
#<ANSWER>
%%time
!python3 symbolic_regression.py
#</ANSWER>

python3: can't open file '/content/symbolic_regression.py': [Errno 2] No such file or directory
CPU times: user 6.32 ms, sys: 0 ns, total: 6.32 ms
Wall time: 106 ms


## 2. Fitness & Nouveauté

L'environnement `FastsimSimpleNavigation-v0` de gym_fastsim permet de lancer des expériences de navigation avec un robot à roues naviguant dans un labyrinthe. Vous allez dans cette partie reproduire les expériences de Lehman et Stanley sur la recherche de nouveauté. Vous allez faire différentes variantes de cette expérience, certaines étant en mono- d'autres étant en multi-objectif. Pour simplifier, dans tous les cas, vous utiliserez NSGA-2, qui est équivalent à une stratégie élitiste en mono-objectif.

Pour installer l'environnement dans collab ou jupyter, utiliser les commandes suivantes:

In [5]:
!git clone https://github.com/sferes2/libfastsim
!cd libfastsim && ./waf configure build install
!git clone https://github.com/alexendy/pyfastsim
!cd pyfastsim && pip install .
!git clone https://github.com/alexendy/fastsim_gym
!cd fastsim_gym && pip install .

Cloning into 'libfastsim'...
remote: Enumerating objects: 274, done.[K
remote: Counting objects: 100% (22/22), done.[K
remote: Compressing objects: 100% (20/20), done.[K
remote: Total 274 (delta 5), reused 10 (delta 2), pack-reused 252[K
Receiving objects: 100% (274/274), 272.83 KiB | 3.00 MiB/s, done.
Resolving deltas: 100% (161/161), done.
[32m[0mSetting top to                           :[0m [0m[32m[32m/content/libfastsim[0m [0m
[32m[0mSetting out to                           :[0m [0m[32m[32m/content/libfastsim/build[0m [0m
configuring b-optimize
[32m[0mChecking for 'g++' (C++ compiler)        :[0m [0m[32m[32m/usr/bin/g++[0m [0m
[32m[0mChecking for 'gcc' (C compiler)          :[0m [0m[32m[32m/usr/bin/gcc[0m [0m
[32m[0mChecking for SDL (1.2 - sdl-config)      :[0m [0m[32m[01;31msdl-config not found[0m [0m
['-Wall', '-O3', '-msse2', '-fPIC']
[32m'configure' finished successfully (0.255s)[0m
[32mWaf: Entering directory `/content/libfastsim/

Remarque: pour une installation sur les machines de TME, vous n'aurez pas les droits pour installer fastsim dans les répertoires système. Dans ce cas, vous pouvez ajouter l'installer dans votre répertoire en ajoutant un argument 'prefix' au waf configure et ajouter le répertoire des libs ainsi créé à la variable d'environnement LIBRARY_PATH et le répertoire des fichiers headers à la variable d'environnement CPATH. Une fois cela fait, vous pouvez faire appel au pip install de pyfastsim puis de fastsim_gym.

**2.1-**  Lancer une première expérience dans laquelle le robot doit atteindre la sortie du labyrinthe. Vous pourrez essayer avec la reward de l'expérience, qui est une reward binaire (sortie atteinte ou non) et avec une fitness plus continue dans laquelle la récompense est la distance à la sortie (à minimiser donc). Pour observer le comportement de la recherche effectuée, vous pourrez écrire la position du robot à la fin de l'évaluation et ensuite tracer ces positions avec les fonctions fournies dans `maze_plot.py` (vous pouvez aussi tracer les trajectoires, mais comme il y a 2000 positions par évaluation, dans ce cas, vous pourrez n'écrire qu'une position sur 100, par exemple).

Quelles parties de l'espace ont été explorées dans les deux cas ? Est-ce que la sortie est atteinte (vous vous limiterez à 200 générations) ? Si oui, au bout de combien de générations ?

In [6]:
#<ANSWER>
%%time
!python3 gym_fastsim_maze.py
#</ANSWER>

python3: can't open file '/content/gym_fastsim_maze.py': [Errno 2] No such file or directory
CPU times: user 7.63 ms, sys: 59 µs, total: 7.69 ms
Wall time: 138 ms


**2.2-** Lancer la même expérience, mais avec un critère de nouveauté. Vous pourrez pour cela partir du code fourni pour le calcul de nouveauté (`novelty_search.py`) et le compléter.  

In [7]:
#<ANSWER>
%%time
!python3 gym_fastsim_maze.py --variant "NS"
#</ANSWER>

python3: can't open file '/content/gym_fastsim_maze.py': [Errno 2] No such file or directory
CPU times: user 7.65 ms, sys: 3 µs, total: 7.66 ms
Wall time: 318 ms


**2.3-** Utiliser en même temps la fitness et le critère de nouveauté avec NSGA-2. Mesurez le temps moyen pour atteindre la sortie.

In [None]:
#<ANSWER>

#</ANSWER>

## Annexes


In [8]:
from IPython.core.magic import register_cell_magic
@register_cell_magic
def run_and_save(line, cell):
    print("Run and save python code block to file: "+line)
    with open(line, 'wt') as fd:
        fd.write(cell)
    code = compile(cell, line, 'exec')
    exec(code, globals())

### Question 1.1 à 1.3

In [9]:
%%writefile symbolic_regression.py

# cellule à compléter au niveau des balises <ANSWER></ANSWER>

from deap import creator, gp, base, tools, algorithms
import operator
import math
import matplotlib.pyplot as plt
import numpy as np
import random
import argparse
import pickle
import datetime
import sys
import os


def protectedDiv(left, right):
    try:
        return left / right
    except ZeroDivisionError:
        return 1

def ru():
    return random.uniform(-1,1)



def evalSymbReg(individual, input, output, nb_obj=1):
    # Transform the tree expression in a callable function
    func = toolbox.compile(expr=individual)
    sqerrors=[]
    for i in range(len(input)):
        sqerrors.append((func(*input[i])-output[i])**2)
    if (nb_obj==1):
        return math.fsum(sqerrors) / len(sqerrors),
    else:
        return math.fsum(sqerrors) / len(sqerrors), len(individual)




if (__name__ == "__main__"):

    random.seed()


    parser = argparse.ArgumentParser(description='Launch symbolic regression run.')

    parser.add_argument('--nb_gen', type=int, default=200,
                        help='number of generations')
    parser.add_argument('--mu', type=int, default=400,
                        help='population size')
    parser.add_argument('--lambda_', type=int, default=400,
                        help='number of individuals to generate')
    parser.add_argument('--res_dir', type=str, default="res",
                        help='basename of the directory in which to put the results')
    parser.add_argument('--selection', type=str, default="elitist", choices=['elitist', 'double_tournament', 'nsga2'],
                        help='selection scheme')
    parser.add_argument('--problem', type=str, default="f1", choices=['f1', 'f2'],
                        help='function to fit')

    # for question 1.2
    parser.add_argument('--noise', type=float, default="0.0001",
                        help='noise added to the model to fit (gaussian, mean=0, sigma=noise)')

    args = parser.parse_args()
    print("Number of generations: "+str(args.nb_gen))
    ngen=args.nb_gen
    print("Population size: "+str(args.mu))
    mu=args.mu
    print("Number of offspring to generate: "+str(args.lambda_))
    lambda_=args.lambda_
    print("Selection scheme: "+str(args.selection))
    sel=args.selection
    if (sel=="nsga2"):
        nb_obj=2
    else:
        nb_obj=1
    print("Basename of the results dir: "+str(args.res_dir))
    name=args.res_dir

    if (nb_obj==1):
        creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
    elif (nb_obj==2):
        creator.create("FitnessMin", base.Fitness, weights=(-1.0,-1.0))

    creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)

    noise=args.noise
    problem=args.problem

    d=datetime.datetime.today()
    if(name!=""):
        sep="_"
    else:
        sep=""
    run_name=name+"_"+sel+"_"+d.strftime(name+sep+"%Y_%m_%d-%H-%M-%S")
    try:
        os.makedirs(run_name)
    except OSError:
        pass

    print("Putting the results in : "+run_name)

    stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
    stats_size = tools.Statistics(len)
    stats_height = tools.Statistics(lambda ind: ind.height)
    mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size, height=stats_height)
    mstats.register("avg", np.mean)
    mstats.register("std", np.std)
    mstats.register("min", np.min)
    mstats.register("max", np.max)


    if (problem=="f1"):
        nb_dim=2
        input_training=[]
        output_training=[]
        input_testing=[]
        output_testing=[]
        name_vars={"ARG0": "x1", "ARG1": "x2"}

        # Complétez pour générer l'ensemble d'entrainement et de validation avec une fonction choisie à 2 dimensions
        #<ANSWER>
        for i in range(30):
            x, y=(random.uniform(-100., 100.), random.uniform(-100., 100.))
            input_training.append((x,y))
            output_training.append(x*y+math.cos(x))
            x, y=(random.uniform(-100., 100.), random.uniform(-100., 100.))
            input_testing.append((x,y))
            output_testing.append(x*y+math.cos(x))
        #</ANSWER>
        print(input_training)
        print(output_training)
        print(input_testing)
        print(output_testing)
    # en OPTION: vous pouvez faire des tests sur d'autres fonctions
    #elif (problem=="f2"):
        #<ANSWER>

        #</ANSWER>

    pset = gp.PrimitiveSet("MAIN", nb_dim)

    # Complétez pour constituer l'ensemble de primitives qui pourront être utilisées
    #<ANSWER>
    pset.addPrimitive(operator.add, 2)
    pset.addPrimitive(operator.sub, 2)
    pset.addPrimitive(operator.mul, 2)
    pset.addPrimitive(protectedDiv, 2)
    pset.addPrimitive(math.cos, 1)
    pset.addPrimitive(math.sin, 1)
    #</ANSWER>

    pset.addTerminal(1)
    pset.addEphemeralConstant("cst", ru )
    pset.renameArguments(**name_vars)

    cxpb=0.5
    mutpb=0.1


    toolbox = base.Toolbox()

    # En vous inspirant des exemples de programmation génétique dans DEAP,
    # enregistrez les différents opérateurs que vous utiliserez dans la suite.
    # Vous choisirez l'opérateur de sélection en fonction de la variable sel
    # (voir valeurs possibles dans le parser d'arguments)
    #<ANSWER>
    toolbox.register("expr", gp.genFull, pset=pset, min_=1, max_=2)
    toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("compile", gp.compile, pset=pset)

    toolbox.register("evaluate", evalSymbReg, input=input_training, output=output_training)
    toolbox.register("select", tools.selTournament, tournsize=3)
    toolbox.register("mate", gp.cxOnePoint)
    toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
    toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

    toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))
    toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))
    #</ANSWER>

    pop = toolbox.population(n=400)

    if (nb_obj==1):
        print("Hall-of-fame: best solution")
        hof = tools.HallOfFame(1)
    else:
        print("Hall-of-fame: Pareto front")
        hof=tools.ParetoFront()

    # Pour simplifier, plutôt que d'écrire la boucle, vous pourrez utiliser un algorithme tout intégré,
    # par exemple eaMuPlusLambda (cf https://deap.readthedocs.io/en/master/api/algo.html).
    # Cela ne permettra pas de générer un NSGA-II complet, mais cela vous permettra de faire de premiers tests.
    # En option, si vous avez le temps, vous pourrez tester un NSGA-II complet pour voir si cela change les résultats.
    #<ANSWER>
    pop, log = algorithms.eaMuPlusLambda(pop, toolbox, mu, lambda_, cxpb, mutpb, ngen, stats=mstats, halloffame=hof)
    #</ANSWER>

    # Affichage des résultats. Tout est dans le répertoire run_name
    avg,dmin,dmax=log.chapters['fitness'].select("avg", "min", "max")
    gen=log.select("gen")

    plt.figure()
    plt.yscale("log")
    plt.plot(gen[1:],dmin[1:])
    plt.title("Minimum error")
    plt.savefig(run_name+"/min_error_gen%d.pdf"%(ngen))

    plt.figure()
    plt.yscale("log")
    plt.fill_between(gen[1:], dmin[1:], dmax[1:], alpha=0.25, linewidth=0)
    plt.plot(gen[1:],avg[1:])
    plt.title("Average error")
    plt.savefig(run_name+"/avg_error_gen%d.pdf"%(ngen))

    avg,dmin,dmax=log.chapters['size'].select("avg", "min", "max")
    gen=log.select("gen")
    plt.figure()
    plt.yscale("log")
    plt.fill_between(gen[1:], dmin[1:], dmax[1:], alpha=0.25, linewidth=0)
    plt.plot(gen[1:],avg[1:])
    plt.title("Average size")
    plt.savefig(run_name+"/avg_size_gen%d.pdf"%(ngen))

    avg,dmin,dmax=log.chapters['height'].select("avg", "min", "max")
    gen=log.select("gen")
    plt.figure()
    plt.yscale("log")
    plt.fill_between(gen[1:], dmin[1:], dmax[1:], alpha=0.25, linewidth=0)
    plt.plot(gen[1:],avg[1:])
    plt.title("Average height")
    plt.savefig(run_name+"/avg_height_gen%d.pdf"%(ngen))

    with open(run_name+"/pset_gen%d.npz"%(ngen), 'wb') as f:
        pickle.dump(pset, f)


    for i,ind in enumerate(hof):
        print("=========")
        print("HOF %d, len=%d"%(i,len(ind)))
        print("Error on the training dataset: %f"%(evalSymbReg(ind, input_training, output_training, nb_obj=1)))
        print("Error on the testing dataset: %f"%(evalSymbReg(ind, input_testing, output_testing, nb_obj=1)))
        with open(run_name+"/hof%d_gen%d.npz"%(i, ngen), 'wb') as f:
            pickle.dump(ind, f)

        nodes, edges, labels = gp.graph(ind)

        ### Graphviz Section ###
        import pygraphviz as pgv

        plt.figure()

        g = pgv.AGraph()
        g.add_nodes_from(nodes)
        g.add_edges_from(edges)
        g.layout(prog="dot")

        for ni in nodes:
            n = g.get_node(ni)
            n.attr["label"] = labels[ni]


        g.draw(run_name+"/hof%d_tree_gen%d.pdf"%(i,ngen))

    print("Results saved in "+run_name)

Writing symbolic_regression.py


## Code de la question 2

In [10]:
%%run_and_save maze_plot.py
#!/usr/bin/python -w

# NE PAS MODIFIER LE CONTENU DE CETTE CELLULE
# Cette cellule contient le code de fonctions permettant de tracer les points atteints
# par les politiques de navigation générées.
# Ce code n'a pas à être modifié ni complété, il suffit d'exécuter la cellule telle quelle.

import matplotlib.pyplot as plt


def plot_points(points, bg="maze_hard.pbm", title=None):
    x,y = zip(*points)
    fig1, ax1 = plt.subplots()
    ax1.set_xlim(0,600)
    ax1.set_ylim(600,0) # Decreasing
    ax1.set_aspect('equal')
    if(bg):
        img = plt.imread(bg)
        ax1.imshow(img, extent=[0, 600, 600, 0])
    if(title):
        ax1.set_title(title)
    ax1.scatter(x, y, s=2)
    plt.show()

def plot_points_lists(lpoints, bg="maze_hard.pbm", title=None):
    fig1, ax1 = plt.subplots()
    ax1.set_xlim(0,600)
    ax1.set_ylim(600,0) # Decreasing
    ax1.set_aspect('equal')
    if(bg):
        img = plt.imread(bg)
        ax1.imshow(img, extent=[0, 600, 600, 0])
    if(title):
        ax1.set_title(title)
    for points in lpoints:
        x,y = zip(*points)
        ax1.scatter(x, y, s=2)
    plt.show()


def plot_points_file(filename, bg="maze_hard.pbm", title=None):
    try:
        with open(filename) as f:
            points=[]
            for l in f.readlines():
                pos=list(map(float, l.split(" ")))
                points.append(pos)
            f.close()
            plot_points(points, bg, title)
    except IOError:
        print("Could not read file: "+f)


Run and save python code block to file: maze_plot.py


In [11]:
%%run_and_save nn.py
# NE PAS MODIFIER LE CONTENU DE CETTE CELLULE
# Cette cellule contient le code de gestion d'une politique au travers d'un réseau de neurones de structure fixe.
# Ce code n'a pas à être modifié ni complété, il suffit d'exécuter la cellule telle quelle.

# coding: utf-8

import numpy as np

## Suppress TF info messages

import os

def sigmoid(x):
    return 1./(1 + np.exp(-x))

def tanh(x):
    return np.tanh(x)


def gen_simplemlp(n_in, n_out, n_hidden_layers=2, n_neurons_per_hidden=5):
    n_neurons = [n_neurons_per_hidden]*n_hidden_layers if np.isscalar(n_neurons_per_hidden) else n_neurons_per_hidden
    i = Input(shape=(n_in,))
    x = i
    for n in n_neurons:
        x = Dense(n, activation='sigmoid')(x)
    o = Dense(n_out, activation='tanh')(x)
    m = Model(inputs=i, outputs=o)
    return m


class SimpleNeuralControllerNumpy():
    def __init__(self, n_in, n_out, n_hidden_layers=2, n_neurons_per_hidden=5, params=None):
        self.dim_in = n_in
        self.dim_out = n_out
        # if params is provided, we look for the number of hidden layers and neuron per layer into that parameter (a dicttionary)
        if (not params==None):
            if ("n_hidden_layers" in params.keys()):
                n_hidden_layers=params["n_hidden_layers"]
            if ("n_neurons_per_hidden" in params.keys()):
                n_neurons_per_hidden=params["n_neurons_per_hidden"]
        self.n_per_hidden = n_neurons_per_hidden
        self.n_hidden_layers = n_hidden_layers
        self.weights = None
        self.n_weights = None
        self.init_random_params()
        self.out = np.zeros(n_out)
        #print("Creating a simple mlp with %d inputs, %d outputs, %d hidden layers and %d neurons per layer"%(n_in, n_out,n_hidden_layers, n_neurons_per_hidden))


    def init_random_params(self):
        if(self.n_hidden_layers > 0):
            self.weights = [np.random.random((self.dim_in,self.n_per_hidden))] # In -> first hidden
            self.bias = [np.random.random(self.n_per_hidden)] # In -> first hidden
            for i in range(self.n_hidden_layers-1): # Hidden -> hidden
                self.weights.append(np.random.random((self.n_per_hidden,self.n_per_hidden)))
                self.bias.append(np.random.random(self.n_per_hidden))
            self.weights.append(np.random.random((self.n_per_hidden,self.dim_out))) # -> last hidden -> out
            self.bias.append(np.random.random(self.dim_out))
        else:
            self.weights = [np.random.random((self.dim_in,self.dim_out))] # Single-layer perceptron
            self.bias = [np.random.random(self.dim_out)]
        self.n_weights = np.sum([np.product(w.shape) for w in self.weights]) + np.sum([np.product(b.shape) for b in self.bias])

    def get_parameters(self):
        """
        Returns all network parameters as a single array
        """
        flat_weights = np.hstack([arr.flatten() for arr in (self.weights+self.bias)])
        return flat_weights

    def set_parameters(self, flat_parameters):
        """
        Set all network parameters from a single array
        """
        i = 0 # index
        to_set = []
        self.weights = list()
        self.bias = list()
        if(self.n_hidden_layers > 0):
            # In -> first hidden
            w0 = np.array(flat_parameters[i:(i+self.dim_in*self.n_per_hidden)])
            self.weights.append(w0.reshape(self.dim_in,self.n_per_hidden))
            i += self.dim_in*self.n_per_hidden
            for l in range(self.n_hidden_layers-1): # Hidden -> hidden
                w = np.array(flat_parameters[i:(i+self.n_per_hidden*self.n_per_hidden)])
                self.weights.append(w.reshape((self.n_per_hidden,self.n_per_hidden)))
                i += self.n_per_hidden*self.n_per_hidden
            # -> last hidden -> out
            wN = np.array(flat_parameters[i:(i+self.n_per_hidden*self.dim_out)])
            self.weights.append(wN.reshape((self.n_per_hidden,self.dim_out)))
            i += self.n_per_hidden*self.dim_out
            # Samefor bias now
            # In -> first hidden
            b0 = np.array(flat_parameters[i:(i+self.n_per_hidden)])
            self.bias.append(b0)
            i += self.n_per_hidden
            for l in range(self.n_hidden_layers-1): # Hidden -> hidden
                b = np.array(flat_parameters[i:(i+self.n_per_hidden)])
                self.bias.append(b)
                i += self.n_per_hidden
            # -> last hidden -> out
            bN = np.array(flat_parameters[i:(i+self.dim_out)])
            self.bias.append(bN)
            i += self.dim_out
        else:
            n_w = self.dim_in*self.dim_out
            w = np.array(flat_parameters[:n_w])
            self.weights = [w.reshape((self.dim_in,self.dim_out))]
            self.bias = [np.array(flat_parameters[n_w:])]
        self.n_weights = np.sum([np.product(w.shape) for w in self.weights]) + np.sum([np.product(b.shape) for b in self.bias])

    def predict(self,x):
        """
        Propagage
        """
        if(self.n_hidden_layers > 0):
            #Input
            a = np.matmul(x,self.weights[0]) + self.bias[0]
            y = sigmoid(a)
            # hidden -> hidden
            for i in range(1,self.n_hidden_layers-1):
                a = np.matmul(y, self.weights[i]) + self.bias[i]
                y = sigmoid(a)
            # Out
            a = np.matmul(y, self.weights[-1]) + self.bias[-1]
            out = tanh(a)
            return out
        else: # Simple monolayer perceptron
            return tanh(np.matmul(x,self.weights[0]) + self.bias[0])


Run and save python code block to file: nn.py


### Question 2.1

In [12]:
%%run_and_save novelty_search.py

# Cette cellule contient le code permettant de gérer novelty search.
# Complétez les trous dans les balises <ANSWER></ANSWER>

from scipy.spatial import KDTree
import random
import numpy as np


class NovArchive:
    """Archive used to compute novelty scores."""
    def __init__(self, lbd, k=15):
        self.all_bd=lbd
        self.kdtree=KDTree(self.all_bd)
        self.k=k
        #print("Archive constructor. size = %d"%(len(self.all_bd)))

    def update(self,new_bd):
        oldsize=len(self.all_bd)
        self.all_bd=self.all_bd + new_bd
        self.kdtree=KDTree(self.all_bd)
        #print("Archive updated, old size = %d, new size = %d"%(oldsize,len(self.all_bd)))

    def get_nov(self,bd, population=[]):

        # A compléter pour calculer la nouveauté
        # C'est la distance moyenne au self.k plus proches voisins parmi la population
        # et l'archive, représentée ici par un kdtree (cf https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.KDTree.html)
        #<ANSWER>

        dist_pop=[]
        for i in population:
            dist_pop.append(np.linalg.norm(np.array(bd)-np.array(ind.bd)))
        archive,i=self.kdtree.query(np.array(bd),self.k)
        dist=dist_pop+list(archive)
        dist.sort()
        sum=0
        for i in range(k):
            sum+=dist[i+1]
        sum = sum/self.k
        return sum
        #</ANSWER>

    def size(self):
        return len(self.all_bd)

def updateNovelty(population, offspring, archive, k=15, add_strategy="random", _lambda=6, verbose=False):
   """Update the novelty criterion (including archive update)

   Implementation of novelty search following (Gomes, J., Mariano, P., & Christensen, A. L. (2015, July). Devising effective novelty search algorithms: A comprehensive empirical study. In Proceedings of GECCO 2015 (pp. 943-950). ACM.).
   :param population: is the set of indiv for which novelty needs to be computed
   :param offspring: is the set of new individuals that need to be taken into account to update the archive
   (may be the same as population, but it may also be different as population may contain the set of parents)
   :param k: is the number of nearest neighbors taken into account
   :param add_strategy: is either "random" (a random set of indiv is added to the archive) or "novel" (only the most novel individuals are added to the archive).
   :param _lambda: is the number of individuals added to the archive for each generation
   The default values correspond to the one giving the better results in the above mentionned paper.

   The function returns the new archive
   """

   # Novelty scores updates
   if (archive) and (archive.size()>=k):
       if (verbose):
           print("Update Novelty. Archive size=%d"%(archive.size()))
       for ind in population:
           ind.novelty=archive.get_nov(ind.bd, population)
   else:
       if (verbose):
           print("Update Novelty. Initial step...")
       for ind in population:
           ind.novelty=0.

   if (verbose):
       print("Fitness (novelty): ",end="")
       for ind in population:
           print("%.2f, "%(ind.novelty),end="")
       print("")
   if (len(offspring)<_lambda):
       print("ERROR: updateNovelty, lambda(%d)<offspring size (%d)"%(_lambda, len(offspring)))
       return None

   lbd=[]
   # Update of the archive
   if(add_strategy=="random"):
       l=list(range(len(offspring)))
       random.shuffle(l)
       if (verbose):
           print("Random archive update. Adding offspring: "+str(l[:_lambda]))
       lbd=[offspring[l[i]].bd for i in range(_lambda)]
   elif(add_strategy=="novel"):
       soff=sorted(offspring,lambda x:x.novelty)
       ilast=len(offspring)-_lambda
       lbd=[soff[i].bd for i in range(ilast,len(soff))]
       if (verbose):
           print("Novel archive update. Adding offspring: ")
           for offs in soff[iLast:len(soff)]:
               print("    nov="+str(offs.novelty)+" fit="+str(offs.fitness.values)+" bd="+str(offs.bd))
   else:
       print("ERROR: updateNovelty: unknown add strategy(%s), valid alternatives are \"random\" and \"novel\""%(add_strategy))
       return None

   if(archive==None):
       archive=NovArchive(lbd,k)
   else:
       archive.update(lbd)

   return archive


Run and save python code block to file: novelty_search.py


In [13]:
%%writefile gym_fastsim_maze.py

# Cette cellule contient le code complet de l'expérience de navigation dans le maze.
# Complétez les trous dans les balises <ANSWER></ANSWER>


import gym, gym_fastsim

from deap import *
import numpy as np
from nn import SimpleNeuralControllerNumpy
from scipy.spatial import KDTree

from deap import algorithms
from deap import base
from deap import benchmarks
from deap import creator
from deap import tools
import argparse

import array
import random
import operator
import math

from plot import *

from scoop import futures

from novelty_search import *

weights=(-1.0,1.0)

if (hasattr(creator, "MyFitness")):
    # Deleting any previous definition (to avoid warning message)
    print("Main: destroying myfitness")
    del creator.MyFitness
creator.create("MyFitness", base.Fitness, weights=(weights))

if (hasattr(creator, "Individual")):
    # Deleting any previous definition (to avoid warning message)
    del creator.Individual
creator.create("Individual", list, fitness=creator.MyFitness)

# Evaluation d'un réseau de neurones en utilisant gym
def eval_nn(genotype, nbstep=2000, render=False, name=""):
    """Evaluation d'une politique parametrée par le génotype

    Evaluation d'une politique parametrée par le génotype
    :param genotype: le paramètre de politique à évaluer
    :param nbstep: le nombre maximum de pas de temps
    :param render: affichage/sauvegarde de la trajectoire du robot
    :param name: nom à donner au fichier de log
    """

    # Cette fonction fait l'évaluation d'un genotype utilisé pour
    # paraméter un réseau de neurones de structure fixe.
    # En vue de l'expérience avec novelty_search, elle renvoie:
    # - la fitness: distance à la sortie à minimiser (qui peut se récupérer dans la 4eme
    #valeur de retour de la méthode step, qui est un dictionnaire dans lequel la clé "dist_obj"
    #donne accès à l'opposé de la distance à la sortie)
    # - le descripteur comportemental, qui est la position finale qui est accessible depuis
    #le même dictionnaire (clé "robot_pos", dont il ne faut garder que les 2 premières composantes)

    nn=SimpleNeuralControllerNumpy(5,2,2,10)
    nn.set_parameters(genotype)
    observation = env.reset()
    old_pos=None
    total_dist=0
    if (render):
        f=open("traj"+name+".log","w")

    for t in range(nbstep):
        if render:
            env.render()
        action=nn.predict(observation)
        observation, reward, done, info = env.step(action)
        pos=info["robot_pos"][:2]
        if(render):
            f.write(" ".join(map(str,pos))+"\n")
        if (old_pos is not None):
            d=math.sqrt((pos[0]-old_pos[0])**2+(pos[1]-old_pos[1])**2)
            total_dist+=d
        old_pos=list(pos)
        if(done):
            break
    if (render):
        f.close()
    dist_obj=info["dist_obj"] # c'est l'opposé de la distance à la sortie
    #print("End of eval, total_dist=%f"%(total_dist))
    # Remarque: les positions et distances sont arrondis à 2 décimales pour éviter le surapprentissage et le maintien dans le front de pareto de solutions ne différant que
    # de décimales plus éloignées (variante FIT+NS)
    rpos=[round(x,2) for x in pos]
    return round(dist_obj,2), rpos


def nsga2_NS(n=100, nbgen=200, IND_SIZE=10, variant="FIT", MIN_V=-30, MAX_V=30, CXPB=0.6, MUTPB=0.3, verbose=False):

    # votre code contiendra donc des tests comme suit pour gérer la différence entre ces variantes:
    if (hasattr(creator, "MyFitness")):
        # Deleting any previous definition (to avoid warning message)
        print("nsga2_NS: destroying myfitness")
        del creator.MyFitness

    # Attention: le signe du poids associé à la fitness doit être adapté aux valeurs renvoyées par eval_nn (négatif si valeur à minimiser et positif si à maximiser)
    if (variant=="FIT+NS"):
        #print("Creator: FIT+NS")
        creator.create("MyFitness", base.Fitness, weights=(-1.0,1.0))
    elif (variant=="FIT"):
        #print("Creator: FIT")
        creator.create("MyFitness", base.Fitness, weights=(-1.0,))
    elif (variant=="NS"):
        #print("Creator: NS")
        creator.create("MyFitness", base.Fitness, weights=(1.0,))
    else:
        print("Variante inconnue: "+variant)

    if (hasattr(creator, "Individual")):
        # Deleting any previous definition (to avoid warning message)
        del creator.Individual
    creator.create("Individual", list, fitness=creator.MyFitness)


    toolbox = base.Toolbox()
    toolbox.register("attribute", random.uniform, MIN_V, MAX_V)
    toolbox.register("individual", tools.initRepeat, creator.Individual,
            toolbox.attribute, n=IND_SIZE)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("mate", tools.cxSimulatedBinaryBounded, eta=15, low=MIN_V, up=MAX_V)
    toolbox.register("mutate", tools.mutPolynomialBounded, eta=15, low=MIN_V, up=MAX_V, indpb=1/IND_SIZE)
    toolbox.register("select", tools.selNSGA2, k=n)
    toolbox.register("select_dcd", tools.selTournamentDCD, k=n)
    toolbox.register("evaluate", eval_nn, render=True)

    toolbox.register("map",futures.map)


    population = toolbox.population(n=n)
    paretofront = tools.ParetoFront()

    # Permet de sauvegarder les descripteurs comportementaux de tous les individus rencontrés.
    # vous pouvez tracer les descripteurs générés depuis une cellule de jupyter avec la commande:
    # plot_points_file("bd.log")
    fbd=open("bd.log","w")

    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    fitnesses_bds = toolbox.map(toolbox.evaluate, invalid_ind)
    for ind, (fit, bd) in zip(invalid_ind, fitnesses_bds):
        #print("Fit: "+str(fit))
        #print("BD: "+str(bd))
        # Complétez pour positionner fitness.values selon la variante choisie
        #<ANSWER>
        if (variant=="FIT+NS"):
          #print("Creator: FIT+NS")
          ind.fitness.values = (fit, 0)
        elif (variant=="FIT"):
          #print("Creator: FIT")
          ind.fitness.values = (fit,)
        elif (variant=="NS"):
          #print("Creator: NS")
          ind.fitness.values = (0,)
        else:
          print("Variante inconnue: "+variant)
        #</ANSWER>

        fbd.write(" ".join(map(str,bd))+"\n")
        fbd.flush()
        ind.fit=fit
        ind.bd=bd

    if paretofront is not None:
        paretofront.update(population)

    #print("Pareto Front: "+str(paretofront))

    k=15
    add_strategy="random"
    lambdaNov=6

    # Crée l'archive et mets à jour les champs ind.novelty de chaque individu
    archive=updateNovelty(population,population,None,k,add_strategy,lambdaNov)

    # Selon la variante, complétez ind.fitness.values pour ajouter la valeur de ind.novelty qui vient d'être calculée
    #<ANSWER>
    for ind in population:
        if (variant=="FIT+NS"):
            #print("Creator: FIT+NS")
            fit, _ = ind.fitness.values
            ind.fitness.values = (fit, ind.novelty)
        #elif (variant=="FIT"):
            #print("Creator: FIT")
        elif (variant=="NS"):
            #print("Creator: NS")
            ind.fitness.values = (ind.novelty,)
        else:
            print("Variante inconnue: "+variant)

    #</ANSWER>


    indexmin, valuemin = max(enumerate([i.fit for i in population]), key=operator.itemgetter(1))

    # Pour le calcul des crowdingDistances si utilisation de selTournamentDCD
    population[:] = toolbox.select(population)

    pq = population
    # Begin the generational process
    for gen in range(1, nbgen + 1):
        if (gen%10==0):
            print("+",end="", flush=True)
        else:
            print(".",end="", flush=True)

        # Complétez avec un NSGA-2 en prenant soin de mettre à jour les calculs de nouveauté et d'ajouter les
        # nouveautés à la fitness des individus.

        # ATTENTION: dans la mise à jour de la nouveauté (updateNovelty), vérifiez bien la signification du premier et du 2eme argument.
        # l'appel fait plus haut doit être adapté: la nouveauté de TOUS les individus doit être recalculée (parents+enfants)
        # et les individus susceptibles d'être ajoutés à l'archive ne doivent être pris que parmi les enfants.

        #<ANSWER>
        tools.selNSGA2(population, len(population))
        pop = toolbox.select_dcd(population)

        offspring=list(map(toolbox.clone, pop))
        #Mutation et apparimment
        offspring=algorithms.varAnd(offspring, toolbox, CXPB, MUTPB)

        #La rajouter à la population sélectionnée
        population=offspring

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in population if not ind.fitness.valid]
        fitnesses_bds = toolbox.map(toolbox.evaluate, invalid_ind)
        for ind, (fit, bd) in zip(invalid_ind, fitnesses_bds):
            #print("Fit: "+str(fit))
            #print("BD: "+str(bd))
            # Complétez pour positionner fitness.values selon la variante choisie
            if (variant=="FIT+NS"):
                ind.fitness.values = (fit, 0)
            elif (variant=="FIT"):
                ind.fitness.values = (fit,)
            elif (variant=="NS"):
                ind.fitness.values = (0,)
            ind.fit=fit
            ind.bd=bd

        # Update the hall of fame with the generated individuals
        if paretofront is not None:
            paretofront.update(population)

        # Crée l'archive et mets à jour les champs ind.novelty de chaque individu
        archive=updateNovelty(population,population,None,k,add_strategy,lambdaNov)

        # Selon la variante, complétez ind.fitness.values pour ajouter la valeur de ind.novelty qui vient d'être calculée
        for ind in population:
            if (variant=="FIT+NS"):
                fit, _ = ind.fitness.values
                ind.fitness.values = (fit, ind.novelty)
            elif (variant=="NS"):
                ind.fitness.values = (ind.novelty,)


        # used to track the max value (useful in particular if using only novelty)
        indexmin, newvaluemin = min(enumerate([i.fit for i in pq]), key=operator.itemgetter(1))
        if (newvaluemin<valuemin):
            valuemin=newvaluemin
            print("Gen "+str(gen)+", new min ! min fit="+str(valuemin)+" index="+str(indexmin))
            eval_nn(pq[indexmin],True,"gen%04d"%(gen))
    fbd.close()
    return population, None, paretofront


env = gym.make('FastsimSimpleNavigation-v0')

if (__name__ == "__main__"):

    parser = argparse.ArgumentParser(description='Launch maze navigation experiment.')
    parser.add_argument('--nbgen', type=int, default=200,
                        help='number of generations')
    parser.add_argument('--popsize', type=int, default=100,
                        help='population size')
    parser.add_argument('--res_dir', type=str, default="res",
                        help='basename of the directory in which to put the results')
    parser.add_argument('--variant', type=str, default="FIT", choices=['FIT', 'NS', 'FIT+NS'],
                        help='variant to consider')

    # Il vous est recommandé de gérer les différentes variantes avec cette variable. Les 3 valeurs possibles seront:
    # "FIT+NS": expérience multiobjectif avec la fitness et la nouveauté (NSGA-2)
    # "NS": nouveauté seule
    # "FIT": fitness seule
    # pour les variantes avec un seul objectif, il vous est cependant recommandé d'utiliser NSGA-2 car cela limitera la différence entre les variantes et cela
    # vous fera gagner du temps pour la suite.

    args = parser.parse_args()
    print("Number of generations: "+str(args.nbgen))
    nbgen=args.nbgen
    print("Population size: "+str(args.popsize))
    popsize=args.popsize
    print("Variant: "+args.variant)
    variant=args.variant

    nn=SimpleNeuralControllerNumpy(5,2,2,10)
    IND_SIZE=len(nn.get_parameters())

    pop, logbook, paretofront= nsga2_NS(n=popsize, variant=variant, nbgen=nbgen, IND_SIZE=IND_SIZE)
    #plot_pareto_front(paretofront, "Final pareto front")
    for i,p in enumerate(paretofront):
        print("Visualizing indiv "+str(i)+", fit="+str(p.fitness.values))
        eval_nn(p,True)

    env.close()


Writing gym_fastsim_maze.py
