In [None]:
import deap
import cma
from deap import cma

from deap import creator , base , tools, algorithms
import numpy as np
import random

# Introduction a DEAP : 
DEAP est une librairie qui permet de prototyper rapidement des algorithmes évolutionnaires et des algorithmes génétiques, elle contient des modes de perturbations, de sélection et de croisement plus ou moins sophistiqués qui sont mis a disposition sous forme d'outils.
Dans cette partie nous allons essayer d'aller en compliquant les choses tout en présentant les différents outils implémentés.

## Création de types : 
Dans deap nous pouvons créer dynamiquement des types en utilisant le créateur. \
Généralement nous n'avons pas beaucoup de types a utiliser sauf pour la programmation génétique, on utilisera également "base" pour manipuler plusieurs classes de bases proposées par DEAP.

In [None]:
"""
La fonction create permet de créer un type, elle prend en paramètres un nom, une classe de base et des attributs
chaque
"""

"""
Les premiers type que l'on va créer serrons des fonctions de fitness, pour se faire nous utiliserons la classe 
Fitness présente dans base et comme attributs additionnels nous ajouterons l'attribut weight qui permet de : 
    - Préciser les poids des différents attributs pour les départager si besoin est
    - Préciser le sens dans lequel on optimise i.e si on souhaite les maximiser ou les minimiser
"""
creator.create("Fitness_Function", base.Fitness, weights=(-1.0,))
creator.create("Multi_Objectif", base.Fitness, weights=(-1.0, 1.0))
# Pour utiliser un type crée on ecris par exemple creator.Fitness_Function

"""
A présent on va créer des individus, pour ce faire nous utiliserons généralement comme classe de base 
la classe liste que nous enrichirons par un attribut fitness qui aura un des types crées précédements.
Dans la partie sur la programmation génétique on verra d'autres types plus exotiques.
"""
creator.create("Individual", list, fitness=creator.Fitness_Function)
#On aurait pu utiliser la classe ndarray dispo sur numpy comme classe de base
creator.create("Individual2", np.ndarray , fitness=creator.Fitness_Function)


## Création et remplissage de la Toolbox : 
La toolbox est l'élément le plus important d'un algorithme génétique, elle permet de spécifier les différentes fonctions que l'on va utiliser pour perturber les solutions, pour les faire muter etc...
Son remplissage nécessite d'utiliser la fonction register, et la encore pour la remplire nous pourrons nous appuyer sur les outils mis a disposition par deap.\
La toolbox sert également a préciser la structure de la population et comment l'initialiser.

In [None]:
"""
D'abord on crée la population et on l'ajoute a la ToolBox 
"""
#Taille de l'individu 
IND_SIZE = 10
#Initialisation de la toolbox
toolbox = base.Toolbox()
#On enregistre la fonction qui sert a initialiser les individus 
toolbox.register("initialisation_random", random.random)
#On ajoute les individus en précisant le mode d'initialisation, le type la fonction d'initialisation et leur taille 
toolbox.register("population", tools.initRepeat, creator.Individual,toolbox.initialisation_random, n=IND_SIZE)

In [None]:
"""
Maintenant je vais présenter chaque outils
"""

"""
1/Fonction d'évaluation : 
"""
#On crée un individu pour tester en utilisant "individual"

individu1 = toolbox.population()
print(individu1)
print(individu1.fitness) #Fitness pas encore calculée
print(individu1.fitness.valid) #Quand la fitness n'est pas calculée on dit qu'elle est invalide

#D'abord comment enregistrer une fonction d'évaluation


def evaluate(individual):
    #Votre fonction d'évaluation qui retourne une ou plusieurs valeurs (autant qu'il n'y en a dans weights)
    return 2,

individu1.fitness.values = evaluate(individu1)
#Aprés évaluation 
print (individu1.fitness.valid) #ça passe a valid parce qu'on a rempli le champ values
print (individu1.fitness.values)

#Maintenant on enregistre dans la toolbox comme ça

In [None]:
"""
2/Opérateur de mutation (perturbation) : 
"""
mutant = toolbox.clone(individu1)
individu2, = tools.mutGaussian(mutant, mu=0.0, sigma=0.2, indpb=0.7)

del mutant.fitness.values
print(individu1)
print(individu2)
print(individu1.fitness.valid)
print(individu2.fitness.valid)

In [None]:
"""
3/Opérateur de croisement (intensification) : 
"""
child1, child2 = [toolbox.clone(ind) for ind in (individu1, individu2)]
tools.cxBlend(child1, child2, 0.8)
del child1.fitness.values
del child2.fitness.values
print(child1)
print(child2)

In [None]:
"""
4/Opérateur de selection (intensification) : 
"""
Lambd = 3
MU = 10
selected = tools.selBest([child1, child2], 1)
print(selected)

Maintenant on assemble le tout dans une seule bulle ce qui nous fait notre premier algorithme évolutionnaire, nous considérons le design suivant : 
- Une stratégie de perturbation qui consiste a inverser un bit.
- Une sélection par tournoi
- Un croisement Binaire qui consiste en l'inversion de deux points.

Le probléme que nous allons résoudre pour voire si ça marche est le problème de maximum binaire, la fonction d'évaluation serra donc naturellement la somme des éléments de la liste et on s'attend a la fin a trouver un vecteur de 1.

In [None]:
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()

# Attribute generator 
#                      define 'attr_bool' to be an attribute ('gene')
#                      which corresponds to integers sampled uniformly
#                      from the range [0,1] (i.e. 0 or 1 with equal
#                      probability)
toolbox.register("attr_bool", random.randint, 0, 1)

# Structure initializers
#                         define 'individual' to be an individual
#                         consisting of 100 'attr_bool' elements ('genes')
toolbox.register("individual", tools.initRepeat, creator.Individual,
                 toolbox.attr_bool, 100)

# define the population to be a list of individuals
toolbox.register("population", tools.initRepeat, list, toolbox.individual)


# the goal ('fitness') function to be maximized
def evaluation_func(individual):
    return sum(individual),


toolbox.register("evaluate", evaluation_func)


# ----------
# Ajout des opérateurs 
# ----------
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)


# ----------

random.seed(64)
# create an initial population of 300 individuals (where
# each individual is a list of integers)
pop = toolbox.population(n=300)
"""
Supposons qu'on adopte un mécanisme connu consistant a assigner des probabilités : 
    -CXPB : Probabilités de croisements.
    -MUTPB : Probabilités de mutations.
Ne pas oublier que les mutation diversifient la recherche donc souvent il ne faut pas en abuser pour une bonne
convergence
"""
CXPB, MUTPB = 0.5, 0.2

print("Start of evolution")
# Evaluer la population
fitnesses = list(map(toolbox.evaluate, pop))
for ind, fit in zip(pop, fitnesses):
    ind.fitness.values = fit
    

print("  Evaluated %i individuals" % len(pop))
# Met les fitnesses dans une liste
fits = [ind.fitness.values[0] for ind in pop]
gen = 0
# Begin the evolution
while max(fits) < 100 and g < 1000:
    # A new generation
    gen = gen + 1
    print("---- Iteration : %i ----" % gen)
    
    # On sélectionne des individus (supposons que tous les éléments parents participent a la génération de l'offspring)
    offspring = toolbox.select(pop, len(pop))
    
    # On les clone etant donné que les oppérateurs aggissent in place et on récupére une liste de références
    offspring = list(map(toolbox.clone, offspring))
    
    # on croise les éléments paires avec les éléments impaires suivant une probabilité CXPB
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CXPB:
            toolbox.mate(child1, child2)
            #On déléte les fitness, vu que les éléments ont changés elle doit être recalculée
            del child1.fitness.values
            del child2.fitness.values
            
    
    for mutant in offspring:
        #Mutation avec probabilité MUTPB
        if random.random() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values
    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
    fitnesses = map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit
    print("  Evaluated %i individuals" % len(invalid_ind))
    # The population is entirely replaced by the offspring
    pop[:] = offspring
    # Gather all the fitnesses in one list and print the stats
    fits = [ind.fitness.values[0] for ind in pop]
    length = len(pop)
    mean = sum(fits) / length
    sum2 = sum(x * x for x in fits)
    std = abs(sum2 / length - mean ** 2) ** 0.5
    print("  Min %s" % min(fits))
    print("  Max %s" % max(fits))
    print("  Avg %s" % mean)
    print("  Std %s" % std)
print("-- End of (successful) evolution --")
best_ind = tools.selBest(pop, 1)[0]
print("Best individual is %s, %s" % (best_ind, best_ind.fitness.values))

In [None]:
"""
Un peu trop long ? 
Il existe un autre moyen de coder ces comportements basiques (tellement basiques que même moi j'ai copié des
partie de ce code du code de mon prof qui l'a copié du code de la librairie) et c'est en utilisant le package
algorithms.
"""
pop = toolbox.population(n=300)
log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.1, ngen=1000)

In [None]:
"""
Un peu triste comme display...
On peut le customiser en demandant a l'algorithme de calculer des statistiques grace au package stats.
"""
pop = toolbox.population(n=300)
#Initialiser l'objet stats en lui précisant une fonction lui permettant de savoir pour chaque individu sur quoi
#il est sensé calculer ces stats.
stats = tools.Statistics(key=lambda ind: ind.fitness.values)
#On ajoute des statistiques que l'on veut avoir 
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
#Pour l'executer sur une population on pourrait faire compile
#Mais la le but est de le donner a l'algorithme
log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.3, ngen=100,stats=stats)

In [None]:
"""
On retrouve également les autres algorithmes évolutionnaires connus.
"""
pop = toolbox.population(n=300)
#Décomenter pour tester si vous voulez tester
#l= algorithms.eaMuPlusLambda(pop, toolbox, mu=300, lambda_=100, cxpb=0.5, mutpb=0.3, ngen=200, stats=stats)
#l= algorithms.eaMuCommaLambda(pop, toolbox, mu=300, lambda_=400, cxpb=0.5, mutpb=0.3, ngen=200, stats=stats)

On peut également manipuler des stratégies, par exemple CMA-ES qu'on a vu précédemment a été développé en utilisant deap et par conséquent on peut utiliser cette stratégie :
- generate permet de générer une population a partir d'un élément.
- update permet d'actualiser a partir d'une population évaluée les paramètres permettant de generer des éléments

In [None]:

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual,
                 toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
def evaluation_func(individual):
    return sum(individual),
toolbox.register("evaluate", evaluation_func)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

pop = toolbox.population(n=300)

S = cma.Strategy(centroid = pop[0],sigma = 0.01)
toolbox.register("generate", S.generate, creator.Individual)
toolbox.register("update",S.update)

stats = tools.Statistics(key=lambda ind: ind.fitness.values)
#On ajoute des statistiques que l'on veut avoir 
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("max", np.max)

logbook = tools.Logbook()



for g in range(100):
    pop = toolbox.generate()
    evaluate(pop)
    fitnesses = list(map(toolbox.evaluate, pop))
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit
    toolbox.update(pop)
    record = stats.compile(pop)
    logbook.record(gen=g, **record)
    logbook.header = "gen", "avg", "max"
    print(logbook.stream)

"""
Quelqu'un peut expliquer pourquoi ça marche pas ??
"""


### A Nous de jouer : 
On va essayer cette fois ci de prototyper rapidement un algorithme génétique de notre choix sur le dataset boston en utilisant les mêmes fonctions d'évaluations que précédément et le même réseau de neurones.
Vous choisissez l'algorithme que vous voulez et on voit qui a le meilleur résultat en 1000 itérations.

In [None]:
#La fonction d'évaluation : 
from sklearn.metrics import mean_squared_error
from sklearn.datasets import load_boston
def evaluation_function(w):
    X, y = load_boston(return_X_y=True)
    Network = SimpleNeuralControllerNumpy(13, 1, n_hidden_layers=0, n_neurons_per_hidden=0, params=None)
    Network.set_parameters(w)
    y_pred = Network.predict(X)
    return mean_squared_error(y,y_pred),

#Je vous remet la classe NeuralNetwork pour rappel (et pour éviter de l'importer)
def sigmoid(x):
    return 1./(1 + np.exp(-x))

def tanh(x):
    return np.tanh(x)

class SimpleNeuralControllerNumpy():
    def __init__(self, n_in, n_out, n_hidden_layers=2, n_neurons_per_hidden=5, params=None):
        self.dim_in = n_in
        self.dim_out = n_out
        # if params is provided, we look for the number of hidden layers and neuron per layer into that parameter (a dicttionary)
        if (not params==None):
            if ("n_hidden_layers" in params.keys()):
                n_hidden_layers=params["n_hidden_layers"]
            if ("n_neurons_per_hidden" in params.keys()):
                n_neurons_per_hidden=params["n_neurons_per_hidden"]
        self.n_per_hidden = n_neurons_per_hidden
        self.n_hidden_layers = n_hidden_layers
        self.weights = None 
        self.n_weights = None
        self.init_random_params()
        self.out = np.zeros(n_out)
        #print("Creating a simple mlp with %d inputs, %d outputs, %d hidden layers and %d neurons per layer"%(n_in, n_out,n_hidden_layers, n_neurons_per_hidden))
    def init_random_params(self):
        if(self.n_hidden_layers > 0):
            self.weights = [np.random.random((self.dim_in,self.n_per_hidden))] # In -> first hidden
            self.bias = [np.random.random(self.n_per_hidden)] # In -> first hidden
            for i in range(self.n_hidden_layers-1): # Hidden -> hidden
                self.weights.append(np.random.random((self.n_per_hidden,self.n_per_hidden)))
                self.bias.append(np.random.random(self.n_per_hidden))
            self.weights.append(np.random.random((self.n_per_hidden,self.dim_out))) # -> last hidden -> out
            self.bias.append(np.random.random(self.dim_out))
        else:
            self.weights = [np.random.random((self.dim_in,self.dim_out))] # Single-layer perceptron
            self.bias = [np.random.random(self.dim_out)]
        self.n_weights = np.sum([np.product(w.shape) for w in self.weights]) + np.sum([np.product(b.shape) for b in self.bias])

    def get_parameters(self):
        """
        Returns all network parameters as a single array
        """
        flat_weights = np.hstack([arr.flatten() for arr in (self.weights+self.bias)])
        return flat_weights

    def set_parameters(self, flat_parameters):
        """
        Set all network parameters from a single array
        """
        i = 0 # index
        to_set = []
        self.weights = list()
        self.bias = list()
        if(self.n_hidden_layers > 0):
            # In -> first hidden
            w0 = np.array(flat_parameters[i:(i+self.dim_in*self.n_per_hidden)])
            self.weights.append(w0.reshape(self.dim_in,self.n_per_hidden))
            i += self.dim_in*self.n_per_hidden
            for l in range(self.n_hidden_layers-1): # Hidden -> hidden
                w = np.array(flat_parameters[i:(i+self.n_per_hidden*self.n_per_hidden)])
                self.weights.append(w.reshape((self.n_per_hidden,self.n_per_hidden)))
                i += self.n_per_hidden*self.n_per_hidden
            # -> last hidden -> out
            wN = np.array(flat_parameters[i:(i+self.n_per_hidden*self.dim_out)])
            self.weights.append(wN.reshape((self.n_per_hidden,self.dim_out)))
            i += self.n_per_hidden*self.dim_out
            # Samefor bias now
            # In -> first hidden
            b0 = np.array(flat_parameters[i:(i+self.n_per_hidden)])
            self.bias.append(b0)
            i += self.n_per_hidden
            for l in range(self.n_hidden_layers-1): # Hidden -> hidden
                b = np.array(flat_parameters[i:(i+self.n_per_hidden)])
                self.bias.append(b)
                i += self.n_per_hidden
            # -> last hidden -> out
            bN = np.array(flat_parameters[i:(i+self.dim_out)])
            self.bias.append(bN)
            i += self.dim_out
        else:
            n_w = self.dim_in*self.dim_out
            w = np.array(flat_parameters[:n_w])
            self.weights = [w.reshape((self.dim_in,self.dim_out))]
            self.bias = [np.array(flat_parameters[n_w:])]
        self.n_weights = np.sum([np.product(w.shape) for w in self.weights]) + np.sum([np.product(b.shape) for b in self.bias])
    
    def predict(self,x):
        if(self.n_hidden_layers > 0):
            #Input
            a = np.matmul(x,self.weights[0]) + self.bias[0]
            #y = sigmoid(a)
            y = a
            # hidden -> hidden
            for i in range(1,self.n_hidden_layers-1):
                a = np.matmul(y, self.weights[i]) + self.bias[i]
                y = sigmoid(a)
            # Out
            a = np.matmul(y, self.weights[-1]) + self.bias[-1]
            #out = tanh(a)
            out = a
            return out
        else: # Simple monolayer perceptron
            #return tanh(np.matmul(x,self.weights[0]) + self.bias[0])
            return np.matmul(x,self.weights[0]) + self.bias[0]
        
#Et voila pour rappel comment on initialise les paramètres aléatoirement
Network = SimpleNeuralControllerNumpy(13, 1, n_hidden_layers=0, n_neurons_per_hidden=0, params=None)
Network.init_random_params()
print(Network.get_parameters())

In [None]:
creator.create("FitnessMax", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)


toolbox.register("attr_bool", random.random)

toolbox.register("individual", tools.initRepeat, creator.Individual,
                 toolbox.attr_bool, len(Network.get_parameters()))

toolbox.register("population", tools.initRepeat, list, toolbox.individual)

toolbox.register("evaluate", evaluation_function)
toolbox.register("mate", tools.cxSimulatedBinary,eta=15)
toolbox.register("mutate", tools.mutGaussian,mu = 0, sigma=0.01, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

pop = toolbox.population(n=300)

l= algorithms.eaMuPlusLambda(pop, toolbox, mu=300, lambda_=100, cxpb=0.5, mutpb=0.3, ngen=200, stats=stats)


In [None]:
S = cma.Strategy(centroid = pop[0],sigma = 0.01)
toolbox.register("generate", S.generate, creator.Individual)
toolbox.register("update",S.update)

stats = tools.Statistics(key=lambda ind: ind.fitness.values)
#On ajoute des statistiques que l'on veut avoir 
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)

logbook = tools.Logbook()



for g in range(1000):
    pop = toolbox.generate()
    evaluate(pop)
    fitnesses = list(map(toolbox.evaluate, pop))
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit
    toolbox.update(pop)
    record = stats.compile(pop)
    logbook.record(gen=g, **record)
    logbook.header = "gen", "avg", "min"
    print(logbook.stream)
