In [1]:
import operator
import gym
import random
# from NN import NN 
from itertools import permutations
import numpy as np

In [2]:
env = gym.make('CartPole-v1')
# env = gym.make('Acrobot-v1')

In [13]:
class NN:
    def __init__(self, w):
        self.W = w

    @classmethod
    def from_params(cls, input_nodes, hidden_layers, output_nodes):  # NN.from_params(3,[3,4,5],1)
        w = []
        if len(hidden_layers):
            w.append(np.random.normal(0, 0.1, (input_nodes + 1, hidden_layers[0])))
            for i in range(1, len(hidden_layers)):
                w.append(np.random.normal(0, 0.1, (hidden_layers[i - 1] + 1, hidden_layers[i])))
            w.append(np.random.normal(0, 0.1, (hidden_layers[-1] + 1, output_nodes)))
        else:
            w.append(np.random.normal(0, 0.1, (input_nodes + 1, output_nodes)))
        return cls(w)

    @classmethod
    def from_weights(cls, w):
        return cls(w)

    @classmethod
    def crossover(cls, dna_1, dna_2):
        algo_maison = True
        new_w = []
        for m in range(len(dna_1)):
            if not algo_maison:
                new_m = np.zeros(dna_1[m].shape)
                for i in range(len(dna_1[m])):
                    for j in range(len(dna_1[m][i])):
                        new_m[i][j] = dna_1[m][i][j] if random.random() > 0.5 else dna_2[m][i][j]
                new_w.append(new_m)
            else:
                new_m = np.copy(dna_1[m].T)
                for r in range(new_m.shape[0]):
                    if random.random() > 0.5:
                        new_m[r] = dna_2[m].T[r]
                new_m = new_m.T
                new_w.append(new_m)            
        return new_w

    @classmethod
    def mutate(cls, dna, rate):
        _w = []
        for m in dna:
            new_m = np.copy(m)
            for w in np.nditer(new_m, op_flags=['readwrite']):
                if random.random() < rate:
                    w[...] = random.random()
            _w.append(new_m)
        return _w

    def hidden_activation(self, Z):
        return np.maximum(Z, 0)

    def softmax_activation(self, Z):
        exp = np.exp(Z - Z.max())
        return np.array(exp / exp.sum())

    def predict(self, inputs):
        a = np.append(np.array(inputs), 1)
        for i in range(len(self.W) - 1):
            Z = np.squeeze(a @ self.W[i])
            a = np.append(self.hidden_activation(Z), 1)
        Z = np.squeeze(a @ self.W[-1])
        a = self.softmax_activation(Z)
        return np.argmax(a)

    def get_weights_copy(self):
        _w = []
        for w in self.W:
            _w.append(np.copy(w))
        return _w

In [4]:
class Agent:
    def __init__(self, env, w = None):
        self.action_size = env.action_space.n
        self.observation_size = env.observation_space.shape[0]
        self.fitness = 0
        if w:
            self.NN = NN.from_weights(w)
        else:
            self.NN = NN.from_params(self.observation_size, [8, 4], self.action_size)


    def get_action(self, observation):
        return self.NN.predict(observation)
    
    def set_fitness(self, fitness):
        self.fitness = fitness
        
    def get_mutated_copy(self, rate):
        return NN.mutate(self.NN.get_weights_copy(), rate)
    
    def get_dna(self):
        return self.NN.get_weights_copy()
        

In [14]:
algo_recherche = True
mutation_rate = 0.01
total_run = 15
agent_quantity = 100 # un nombre carré
total_generation = 10
best_agent = None

# Création de la première génération
agents = [Agent(env) for i in range(agent_quantity)]

for generation in range(total_generation):
    print("Generation: " + str(generation) + " ----------------------------------------")
    print("Testing " + str(len(agents)) + " agents...")
    
    # Test de chaque agents    
    for agent in agents:
        fitness = 0
        # max_pos = -2.0
        # Chaque agent fait plusieurs essait
        for run in range(total_run):
            state = env.reset()
            for t in range(1000):
                action = agent.get_action(state)
                state, reward, done, info = env.step(action)
                # max_pos = max(state[0], max_pos)
                
                # if state[0] > -0.2:
                #     reward = 1
                fitness += reward
                if done:
                    break
        agent.set_fitness(fitness/total_run)
        

    best_candidate = max(agents, key=operator.attrgetter("fitness"))
    if best_agent is None or best_agent.fitness < best_candidate.fitness:
        best_agent = best_candidate
        
    print("Best candidate fitness: " + str(best_candidate.fitness))
    if best_candidate.fitness == 500.0:
        print("Early stopping")
        best_agent = best_candidate
        break
    
    # Nouvelle génération
    print("Creating new generation")
    print("Parents:")
    parents = []
    new_agents = []
    
    # Selection des parents par l'algorithme TOS
    parent_quantity = int(agent_quantity ** 0.5)
    if algo_recherche:
        tournament_size = parent_quantity
        for i in range(parent_quantity):
            selected_candidates = random.sample(agents, tournament_size)
            champion = max(selected_candidates, key=operator.attrgetter("fitness"))
            parents.append(champion)
            # Ajout du champion dans les prochains agents
            new_agents.append(champion)
            print(champion.fitness)
        
        # Chaque permutation de parent cré un enfant et est muté
        for i, j in permutations(parents, 2):
            child_dna = NN.crossover(i.get_dna(), j.get_dna())
            if max(i.fitness, j.fitness) == -200:
                child = Agent(env, NN.mutate(child_dna, mutation_rate*80))
            else:
                child = Agent(env, NN.mutate(child_dna, mutation_rate))
            new_agents.append(child)
    else:
        agents.sort(key=operator.attrgetter("fitness"), reverse=True)
        for agent in agents[:parent_quantity]:
            new_agents.append(agent)
            parents.append(agent)
            print(agent.fitness)
    
        for parent in parents:
            for i in range(agent_quantity//parent_quantity-1):
                new_agents.append(Agent(env, NN.mutate(parent.get_dna(), mutation_rate)))

    
    agents = new_agents
    print()
    
print("End training")

Generation: 0 ----------------------------------------
Testing 100 agents...
Best candidate fitness: 30.266666666666666
Creating new generation
Parents:
9.733333333333333
9.8
10.133333333333333
9.733333333333333
9.8
9.6
9.6
9.8
14.533333333333333
9.8

Generation: 1 ----------------------------------------
Testing 100 agents...
Best candidate fitness: 28.133333333333333
Creating new generation
Parents:
9.866666666666667
9.533333333333333
14.466666666666667
9.466666666666667
9.733333333333333
9.866666666666667
9.666666666666666
28.133333333333333
9.733333333333333
28.133333333333333

Generation: 2 ----------------------------------------
Testing 100 agents...
Best candidate fitness: 88.33333333333333
Creating new generation
Parents:
24.4
13.933333333333334
10.266666666666667
13.933333333333334
9.666666666666666
28.066666666666666
10.133333333333333
9.866666666666667
10.266666666666667
28.066666666666666

Generation: 3 ----------------------------------------
Testing 100 agents...
Best ca

In [17]:
evaluation_score = 0
evaluation_runs = 100
print("Best training fitness:", best_agent.fitness)
for run in range(evaluation_runs):
    fitness = 0
    state = env.reset()
    for t in range(1000):
        action = best_agent.get_action(state)
        state, reward, done, info = env.step(action)
        fitness += reward
        if done:
            evaluation_score+=fitness
            break
print(evaluation_score/evaluation_runs)


Best training fitness: 500.0
498.53


In [19]:
state = env.reset()
score = 0
for t in range(1000):
    action = best_agent.get_action(state)
    state, reward, done, info = env.step(action)
    score += reward
    env.render()
    if done:
        print(t)
        break
env.close()

499


In [None]:
env.close()