In [1]:
import numpy as np
import gym
env = gym.make("CartPole-v0")

In [2]:
# Try running environment with random actions
env.reset()
reward_sum = 0
num_games = 10
num_game = 0
while num_game < num_games:
    env.render()
    observation, reward, done, _ = env.step(env.action_space.sample())
    reward_sum += reward
    if done:
        print("Reward for this episode was: {}".format(reward_sum))
        reward_sum = 0
        num_game += 1
        env.reset()

Reward for this episode was: 15.0
Reward for this episode was: 12.0
Reward for this episode was: 20.0
Reward for this episode was: 15.0
Reward for this episode was: 17.0
Reward for this episode was: 18.0
Reward for this episode was: 50.0
Reward for this episode was: 15.0
Reward for this episode was: 39.0
Reward for this episode was: 17.0


In [3]:
def create_population(env, size=1, mean=0, std=1):
    """ Creates a population """
    params = env.observation_space.shape[0]  # Parameters in our environment state
    action_space = env.action_space.n # Possible actions
    if mean is None:
        # No mean specified, assume mean of zero
        mean = np.zeros((params, action_space)) 
    if std is None:
        # No standard deviation specified, assume standard deviation of 1
        std = np.ones((params,action_space))
    
    # Create a population based on a normal distribution given the mean and std provided
    pop = np.random.normal(mean,std,size=[size,params,action_space])
    
    return pop

def mutate(population, prob_mutate, std):
    """ Mutates a population based on normal distribution """
    # Create a mask of 0s and 1s that are used to determine whether a mutation will take place or not 
    # on the attribute level for each member 
    mutation_mask = np.random.choice([0,1], size=population.shape, p=[1-prob_mutate, prob_mutate])
    
    # Create a mutation based on a normal distribution
    mutation = np.random.normal(0, std, size=population.shape)
    
    # Apply the mutation mask
    mutation *= mutation_mask
    
    return population + mutation # Add the mutation to the population

def breed(population):
    """ Breeds a population with itself. Each individual is paired up with another individual from the same
        population and their values are chosen with a 50 / 50 chance of the offspring acquiring a value from
        either parent.
    """
    parent_1 = population.copy()
    parent_2 = population.copy()
    
    np.random.shuffle(parent_2) # Mix up one of the parent's ordering so we can just align them randomly
    
    # Used to determine if parent one attribute will be inherited
    parent_1_mask = np.random.choice([0,1],size=population.shape)
    
    # If parent two's attribute will be inherited
    parent_2_mask = (parent_1_mask + 1) % 2
    
    return parent_1 * parent_1_mask + parent_2 * parent_2_mask

def normalize(env,state):
    """ Normalizes state to range from 0 to 1 """
    if env.observation_space.low == float("inf"):
        # Some observation spaces are infinite, in which case we won't normalize
        lo = 0
        hi = 1
    else:
        lo = env.observation_space.low
        hi = env.observation_space.high
    return (state - lo) / (hi - lo)

In [4]:
def score(env, ind, trials=1, render=False):
    """ Scores an individual bot in the environment. Returns mean score """
    rewards = 0
    for trial in range(trials):
        state = env.reset()
        done = False
        while not done:
            if render:
                env.render()
            num_moves =+ 1
            out = np.dot(state,ind)
            action = np.argmax(out)
            state, reward, done, _ = env.step(action)
            rewards += reward
    env.close()
    return rewards / float(trials)

In [5]:
survival_rate = 0.01
prob_mutate = 0.25
std = 1
std_decay = 0.9
num_episodes = 30
print_every = 100
pop_size = 25
trials_per_individual = 10

In [13]:
def train(env, pop_size, trials_per_individual, survival_rate, prob_mutate, std, num_episodes, std_decay=0.9, curiousity=False, verbose=False, goal=200):
    """ Trains a bot based on an envolutionary model """
    params = env.observation_space.shape[0] # Determine the number of parameters our environment has
    pop = create_population(env, pop_size) # Create the population
    pop_survive = max(1,int(pop_size * survival_rate)) # Set the number of individuals that will survive after each episode
    best_score = float("-inf") # Track best score
    for episode in range(num_episodes):
        # Score individuals
        scores = [score(env, ind, trials=trials_per_individual, curiousity=curiousity) for ind in pop]
        
        # Convert to list
        pop_scores = zip(pop,scores)
        
        # Sort by how well each individual did
        pop_scores = sorted(pop_scores, key=lambda pop_score: pop_score[1], reverse=True)
        
        pop_scores = list(zip(*pop_scores)) # Apply scores to individuals
        pop, scores = pop_scores # Break apart population and scores
        pop = pop[:pop_survive] # Only keep the best
        pop = np.array(pop) 
        pop = np.vstack([pop, breed(pop)]) # Breed the population and append to existing population
        pop = np.vstack([pop, mutate(pop, prob_mutate, std)]) # Mutate the population and append to existing population
        
        # Determine how many new individuals to introduct to keep population number constant
        remaining_pop = max(0, pop_size - len(pop)) 
        
        # Determine the mean of the population parameters
        mean = np.mean(create_population(env,size=10,mean=0, std=1),axis=0)
        
        # Add new members to population
        new_pop = create_population(env, size=remaining_pop,mean=mean, std=std)
        pop = np.vstack([pop, new_pop])

        if verbose:
            print("episode: {} best score: {:0.2f}".format(episode, scores[0]))

        if scores[0] > best_score:
            # If best score is better than prior best score, decay the standard deviation since we're
            # likely getting to an optimal individual and we want variant to decrease
            std *= std_decay
            best_score = scores[0]
        else:
            # Prior best score not reached, increase standard deviation to add more variety and hopefully
            # break through any plateaus
            std /= std_decay
            
        if best_score >= goal:
            if verbose:
                print("training complete in {} episodes".format(episode))
            break
    return pop[0] # Return best invidual after training

In [7]:
def score(env, ind, trials=1, curiousity=False, render=False):
    rewards = 0
    for trial in range(trials):
        state = env.reset()
        min_state = state
        max_state = state
        
        done = False
        while not done:
            if render:
                env.render()
            num_moves =+ 1
            out = np.dot(state,ind)
            action = np.argmax(out)
            state, reward, done, _ = env.step(action)
            min_state = np.min([min_state, state],axis=0)
            max_state = np.max([max_state, state],axis=0)
            rewards += reward
        if curiousity:
            # Apply some value to exploration. This will nudge the algorithm in favor of bots that explore
            # more of the environment space.
            rewards += np.sum(max_state - min_state)
    env.close()
    return rewards / float(trials)

In [8]:
bot = train(env, pop_size, trials_per_individual, survival_rate, prob_mutate, std, num_episodes, curiousity=True, verbose=True, goal=200)

episode: 0 best score: 201.99562157873646
training complete in 0 episodes


In [10]:
env = gym.make("MountainCar-v0")

In [11]:
# No curiousity
bot = train(env, pop_size, trials_per_individual, survival_rate, prob_mutate, std, num_episodes, verbose=True, goal=-110)

episode: 0 best score: -200.0
episode: 1 best score: -200.0
episode: 2 best score: -200.0
episode: 3 best score: -200.0
episode: 4 best score: -200.0
episode: 5 best score: -200.0
episode: 6 best score: -200.0
episode: 7 best score: -200.0
episode: 8 best score: -200.0
episode: 9 best score: -130.0
episode: 10 best score: -136.8
episode: 11 best score: -120.3
episode: 12 best score: -138.1
episode: 13 best score: -122.4
episode: 14 best score: -138.4
episode: 15 best score: -122.0
episode: 16 best score: -120.1
episode: 17 best score: -128.2
episode: 18 best score: -127.7
episode: 19 best score: -137.2
episode: 20 best score: -139.6
episode: 21 best score: -118.5
episode: 22 best score: -119.3
episode: 23 best score: -118.5
episode: 24 best score: -119.5
episode: 25 best score: -121.4
episode: 26 best score: -127.5
episode: 27 best score: -120.3
episode: 28 best score: -122.2
episode: 29 best score: -120.6


In [14]:
bot = train(env, pop_size, trials_per_individual, survival_rate, prob_mutate, std, num_episodes, curiousity=True, verbose=True, goal=-110)

episode: 0 best score: -199.11
episode: 1 best score: -198.94
episode: 2 best score: -198.65
episode: 3 best score: -198.80
episode: 4 best score: -198.79
episode: 5 best score: -199.09
episode: 6 best score: -198.79
episode: 7 best score: -161.71
episode: 8 best score: -152.69
episode: 9 best score: -135.37
episode: 10 best score: -137.90
episode: 11 best score: -152.23
episode: 12 best score: -126.30
episode: 13 best score: -121.35
episode: 14 best score: -129.43
episode: 15 best score: -123.07
episode: 16 best score: -126.28
episode: 17 best score: -127.81
episode: 18 best score: -126.19
episode: 19 best score: -122.47
episode: 20 best score: -122.38
episode: 21 best score: -119.14
episode: 22 best score: -118.11
episode: 23 best score: -116.42
episode: 24 best score: -119.36
episode: 25 best score: -121.57
episode: 26 best score: -120.77
episode: 27 best score: -118.93
episode: 28 best score: -120.58
episode: 29 best score: -117.93


In [None]:
score(env,bot,render=True)