In [1]:
import torch
import torch.nn as nn
import gym
from itertools import count
import numpy as np
import random
import torch.nn.functional as F

envs = ['CartPole-v1','Acrobot-v1','MountainCar-v0','Pendulum-v0','BipedalWalker-v2']
env = gym.make(envs[0]).unwrapped

discrete_actions = True


#TODO
#parralel fitness measuring


[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
class Creature(nn.Module):
    def __init__(self, hidden_num = 20):
        super(Creature, self).__init__()
    
        self.layer1 = nn.Linear(env.observation_space.shape[0], hidden_num)
        self.layer2 = nn.Linear(hidden_num, hidden_num)
        self.layer3 = nn.Linear(hidden_num, hidden_num)
        self.layer4 = nn.Linear(hidden_num, hidden_num)
        self.layer5 = nn.Linear(hidden_num, hidden_num)
        
        if discrete_actions:
            self.layer6 = nn.Linear(hidden_num, env.action_space.n)
        else:
            self.layer6 = nn.Linear(hidden_num, env.action_space.shape[0])
    
    def forward(self, x):
        out = F.relu(self.layer1(x))
        out = F.relu(self.layer2(out))
        out = F.relu(self.layer3(out))
        out = F.relu(self.layer4(out))
        out = F.relu(self.layer5(out))
        out = self.layer6(out)
        return out


In [3]:
def measure_fitness(creature,render = False,max_steps = 1000):
    observation = env.reset()
    #creature fitness is cumulative reward in simulation
    total_reward = 0
    for i in range(max_steps):
        if render:
            
            env.render()
        #convert observation into tensor
        obs = torch.from_numpy(observation).to(device).type('torch.cuda.FloatTensor')
        
        #get action
        if discrete_actions:
            action = creature(obs).max(-1)[1].item()
        else:
            action = creature(obs).detach().cpu().numpy()
        observation, reward, done, _ = env.step(action)
        
        total_reward += reward
        
        if done:
            break
    return total_reward

In [4]:

#measure fitness of entire population and return scores
def measure_population_fitness(population,max_steps = 1000):
    scores = []
    for idx,p in enumerate(population):
        fitness = measure_fitness(p,max_steps = max_steps)
        scores.append(fitness)
    return np.array(scores)

In [5]:
def mutate(creature,mutation_rate=0.1):
    new = Creature().to(device)
    new.load_state_dict(creature.state_dict()) 
    for p in new.parameters():

        mutation = np.random.normal(scale = 0.07,size = p.data.shape)
        mutation *= np.random.choice([1, 0], p.data.shape,p=[mutation_rate,1-mutation_rate])
        mutation = torch.from_numpy(mutation).type('torch.FloatTensor').to(device)
        p.data += mutation
    return new


def mate(mom,dad,apply_mutation = True,dominance = 0.5):
    child = Creature()
    child.load_state_dict(mom.state_dict()) 
    for m1,m2,c in zip(mom.parameters(),dad.parameters(),child.parameters()):
        #flatten all model weights into vectors
        flat1 = m1.data.view(m1.numel()).cpu()
        flat2 = m2.data.view(m2.numel()).cpu()
        c_flat = c.data.view(c.numel())

        #one parents genes are more dominant
        
        r = np.random.choice([True, False], m1.numel(),p=[dominance,1-dominance])

        c_flat.numpy()[r] = flat2.numpy()[r]

        #unflatten and assign new weights to child
        unflat = c_flat.data.view(c.shape)
        c.data = unflat
        
    if apply_mutation:
        child = mutate(child)
    return child

     

def evolve(population,mutate = True,max_steps = 500):
    #probability of picking creature based on performance
    p_fitness = measure_population_fitness(population,max_steps =max_steps)
    p_fitness_positive = p_fitness - np.min(p_fitness) + 1
    normed = p_fitness- np.mean(p_fitness)
    normed -= np.min(normed)
    normed = np.power(normed, 0.5)
    pick_probabilities = normed/np.sum(normed)
    
    #pick creature
    choice = np.random.choice(pick_probabilities.size,population_size, p = pick_probabilities)
    new_population = []
    
    for p in range(len(population)-1):
        first_choice = population[choice[p]]
        second_choice = population[choice[p+1]]
        #more succesful(healthier?) creature has greater genetic dominance
        
        if p_fitness_positive[p] >= p_fitness_positive[p+1]:
            dominance = (p_fitness_positive[p+1]/p_fitness_positive[p])*0.7
            child = mate(first_choice,second_choice, mutate,dominance).to(device)
        else:
            dominance = (p_fitness_positive[p]/p_fitness_positive[p+1])*0.7
            child = mate(second_choice,first_choice, mutate,dominance).to(device)
            
        new_population.append(child)
    child = mate(population[0],population[len(population)-1]).to(device) 
    new_population.append(child)
    
    return new_population, p_fitness



In [6]:
#randomly inititialise starting population
population_size = 50
population = []

for p in range(population_size):
    population.append(Creature().to(device))
print("starting training")
n_generations = 1000000
for i in range(n_generations):
    
    population, p_fitness = evolve(population,True)
    if i % 1 == 0:
        fitness = measure_fitness(population[np.argmax(p_fitness)],render = True,max_steps = 200)
    print("Generation {}  fitness : {}".format(i+1,np.max(p_fitness)))
    


starting training
Generation 1  fitness : 50.0
Generation 2  fitness : 10.0
Generation 3  fitness : 82.0
Generation 4  fitness : 19.0
Generation 5  fitness : 11.0
Generation 6  fitness : 11.0
Generation 7  fitness : 18.0
Generation 8  fitness : 14.0
Generation 9  fitness : 12.0
Generation 10  fitness : 14.0
Generation 11  fitness : 11.0
Generation 12  fitness : 13.0
Generation 13  fitness : 10.0


KeyboardInterrupt: 

In [None]:
population, p_fitness = evolve(population,True)
fitness = measure_fitness(population[np.argmax(p_fitness)],render = True,max_steps = 2000)