# 07. Evolutionary algorithms
- First create a set of random policies (initial population) <br>
- The parameters of each policy are the genes where the parameters of one policy form a chromosome<br>
- To select which policies get filtered out a fitness function is required. Idealy some continous reward to rank the agents and not a +1/0 where all agents get a 0.<br>
- Two agents with a high fitness function are used in a crossover to mix the chromosomes. <br>
- With some low probability a mutation occures (some chromosomes of the offspring are changed)

In [86]:
import gym
import numpy as np
from collections import deque
np.random.seed(42)

#env_name = 'Taxi-v2'
env_name = 'FrozenLake-v0'

env = gym.make(env_name)
state_space = env.observation_space.n
action_space = env.action_space.n

gamma = 0.999
population_size = 200
max_generations = 2000

def initialize_population(population_size):
    population = []
    for i in range(population_size):
        population.append(np.random.randn(state_space, action_space))
    return population

def evaluate_agent(q):
    reward_list = deque(maxlen=100)
    for game in range(100):
        state = env.reset()
        done = False
        episode_reward = 0

        while(not done): 
            action = np.argmax(q[state, :])
            state, reward, done, _ = env.step(action)
            episode_reward += reward

            if(done):
                reward_list.append(episode_reward)

    return np.mean(reward_list)

def choose_n_fittest_agents(fitness_scores, n):
    return fitness_scores.argsort()[-n:]

def crossover(fittest_agents):
    males = fittest_agents.copy()
    females = fittest_agents.copy()
    np.random.shuffle(males)
    np.random.shuffle(females)
    offspring = [np.vstack([males[i][0:8,:], females[i][8:,:]]) for i in range(len(fittest_agents))]
    return offspring

def mutation(offspring):
    mutated_offspring = []
    for agent in offspring:
        mutated_offspring.append(agent + np.random.normal(-0.01, 0.01, agent.shape))
    return mutated_offspring

population = initialize_population(population_size)
for generation in range(max_generations):
    fitness_scores = np.array([])
    for q in population:
        fitness_scores = np.append(fitness_scores, evaluate_agent(q))
    
    index_fittest_agents = choose_n_fittest_agents(fitness_scores, int(population_size/2.5))
    fittest_agents = [population[i] for i in index_fittest_agents]
    offspring = crossover(fittest_agents)
    mutated_offspring = mutation(offspring)
    population = mutated_offspring + fittest_agents + initialize_population(int(0.2*population_size))
    if generation%10 == 0:
        print('average fitness of generation ', generation, ': ', np.mean(fitness_scores))

average fitness of generation  0 :  0.014950000000000001
average fitness of generation  10 :  0.5654
average fitness of generation  20 :  0.5656


KeyboardInterrupt: 

In [91]:
import os
import numpy as np
import gym
import time

np.random.seed(1337)
env_name='BipedalWalker-v2'
env = gym.make(env_name)

# Hyperparameters
episode_length=2000
lr = 0.03 # learning rate / how much noise is applied for mutations
frac_mut = 0.1 # fraction of mutations
n_policy = 100 # number of policies for in generation
n_generations = 100

# used to save the visualization
videos_dir = mkdir('.', 'videos')
monitor_dir = mkdir(videos_dir, env_name)
def mkdir(base, name):
    path = os.path.join(base, name)
    if not os.path.exists(path):
        os.makedirs(path)
    return path

def gen_random_policy():
    ''' generates a random policy '''
    return np.random.randn(env.observation_space.shape[0], env.action_space.shape[0])
             
def crossover(policy1, policy2):
    ''' crossover for evolutionary algorithm '''
    new_policy = policy1.copy()
    for i in range(env.observation_space.shape[0]):
        for j in range(env.action_space.shape[0]):
            rand = np.random.uniform()
            if rand > 0.5:
                new_policy[i, j] = policy2[i, j]
    return new_policy

def mutation(policy, p=frac_mut):
    ''' mutation for evolutionary algorithm '''
    new_policy = policy.copy()
    for i in range(env.observation_space.shape[0]):
        for j in range(env.action_space.shape[0]):
            rand = np.random.uniform()
            if rand < p:
                new_policy[i, j] = new_policy[i, j] + lr*np.random.randn()
    return new_policy

def state_to_action(sensor_input, policy):
    ''' maps from state space (sensor values) to action space (motor torques) by a linear function '''
    sensor_input = np.atleast_2d(sensor_input)
    outp = np.dot(sensor_input, policy)[0]
    return outp

def evaluate_policy( policy ):
    ''' runs one epoch with a given policy to evaluate the reward '''
    state = env.reset()
    done = False
    sum_rewards = 0.0
    num_plays = 0.0
    while not done and num_plays < episode_length:
        action = state_to_action(state, policy)
        state, reward, done, _ = env.step(action)
        sum_rewards += reward
        num_plays += 1
    sum_rewards += 300 # get positive rewards to get rid of sign errors
    return sum_rewards

rec_vid = False
should_record = lambda i: rec_vid
env = gym.wrappers.Monitor(env, monitor_dir, video_callable=should_record, force=True)
policy_pop = [gen_random_policy() for _ in range(n_policy)] # start with a random population of policies
a = time.time()

for i in range(n_generations):
    policy_scores = [evaluate_policy(p) for p in policy_pop ]        
    print('Generation %d : max score = %d' %(i+1,  max(policy_scores) - 300.0))
    policy_ranks = list(reversed(np.argsort(policy_scores)))
    elite_set = [policy_pop[x] for x in policy_ranks[:5]]
    select_probs = np.array(policy_scores) / np.sum(policy_scores)
    child_set = [crossover(
        policy_pop[np.random.choice(range(n_policy), p=select_probs)],
        policy_pop[np.random.choice(range(n_policy), p=select_probs)])
        for _ in range(n_policy - 5) ]
    mutated_list = [mutation(p) for p in child_set] 
    policy_pop = elite_set
    policy_pop += mutated_list
policy_score = [evaluate_policy(p) for p in policy_pop]
optimal_policy = policy_pop[np.argmax(policy_score)]

b = time.time()
print('used time: ', b-a)
rec_vid = True
evaluate_policy(optimal_policy)
rec_vid = False
env.env.close()


[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
Generation 1 : max score = -98
Generation 2 : max score = -99
Generation 3 : max score = -98
Generation 4 : max score = -1
Generation 5 : max score = -98
Generation 6 : max score = -74
Generation 7 : max score = -88
Generation 8 : max score = -77
Generation 9 : max score = -75
Generation 10 : max score = -76
Generation 11 : max score = -69
Generation 12 : max score = -80
Generation 13 : max score = -45
Generation 14 : max score = -35
Generation 15 : max score = -52
Generation 16 : max score = -35
Generation 17 : max score = 97
Generation 18 : max score = 55
Generation 19 : max score = 88


KeyboardInterrupt: 