In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time
import seaborn as sns
%matplotlib inline

In [None]:
# import
import gym
env = gym.make('CartPole-v1')

In [None]:
obs = env.reset()
num_params = len(obs)

In [None]:
def evaluate(W):
    # reset the environment, i.e. going back to beginning
    X = env.reset()
    
    # we run for 200 time steps, because 200 is the max reward we can get
    for t in range(1,201):
        # 0 means go left, 1 means go right
        action = 0 if W@X < 0 else 1
        X, reward, done, _ = env.step(action)
        if done:
            return t
    return t

In [None]:
class SimpleES():
    """Simple Evolution strategies"""
    def __init__(self, popsize=256):
        self.popsize = popsize
        self.mu = np.random.normal(0,1,4)
        self.cov = np.full((4,4), 0.5)
        self.best_s = self.mu
        self.best_r = 0
        self.first_gen = True
    
    def ask(self):
        self.sols = np.random.multivariate_normal(self.mu, self.cov, self.popsize)
        return self.sols
        
    def tell(self, fit_list):
        self.fit = fit_list
        self.best_i = np.argmax(self.fit)
        self.best_s = self.sols[self.best_i]
        self.best_r_ = self.fit[self.best_i]
        
        if self.first_gen or (self.best_r < self.best_r_): 
            self.first_gen = False
            self.best_r = self.best_r_
        self.mu = self.best_s
        
    def result(self): return self.best_s, self.best_r, self.best_r_, None
        
        

In [None]:
class SimpleGA:
    """Simple Genetic Algorithm"""
    def __init__(self, num_params, # Number of input features
                 popsize=256,      # Number of sols that we want to generate 
                 sig_init=0.1,     # Std deviation 
                 sig_decay=0.999,  # Rate of decay for std deviation
                 sig_lim=0.01,     # Min limit when to stop the decay
                 elite_ratio=0.1,  # Elite popuation % to keep
                 w_decay=0.1,      
                 forget_best=False):
        
        self.num_params = num_params
        self.popsize = popsize
        self.sig_init = sig_init
        self.sig_decay = sig_decay
        self.sig_lim = sig_lim
        self.elite_ratio = elite_ratio
        self.w_decay = w_decay
        self.first_gen = True
        self.forget_best = forget_best
        self.sig = self.sig_init
        
        # Initiate the size of elite population (total best sols to keep)
        self.elite_popsize = int(self.popsize*self.elite_ratio)
        # Initiate weights for best sols
        self.elite_w = np.zeros((self.elite_popsize, self.num_params))
        # Initiate fitness for best sols
        self.elite_r = np.zeros(self.elite_popsize)
        # Initiate parameters for best solution
        self.best_s = np.zeros(self.num_params)
        # Initiate best reward
        self.best_r = 0
        
    def ask(self):
        # Gaussian noise to be added after random recombination of bst sols (mating)
        self.noise = np.random.randn(self.popsize, self.num_params)*self.sig
        solutions = []
        
        
        def mate(a, b):
            c = np.copy(a)
            idx = np.where(np.random.rand((c.size))>0.5)
            # create population with parameters selected randomly from both parents
            c[idx] = b[idx]
            return c
        
        for i in range(self.popsize):
            idx_a = np.random.choice(self.elite_popsize) # get random idx
            idx_b = np.random.choice(self.elite_popsize) # get random idx
            # get a child by mating two parameters with random probability
            child = mate(self.elite_w[idx_a], self.elite_w[idx_b])
            solutions.append(child + self.noise[i]) # add noise to params
        
        # convert the list to numpy array
        solutions = np.array(solutions)
        self.solutions = solutions
        return solutions
        
    def tell(self, reward_list):
        # assert that we have reward for every solution
        assert (len(reward_list) == self.popsize), "Incosistant reward size"
        r_list = reward_list
        
        
        if self.forget_best or self.first_gen:
            r = r_list
            soln = self.solutions
        
        
        else: 
            # add new rewards & solns to best from last genenrations.
            r = np.concatenate([r_list,  self.elite_r])
            soln = np.concatenate([self.solutions, self.elite_w])
        
        # get the indices for population with best rewards (elite population)
        idx = r.argsort()[::-1][0:self.elite_popsize]
        self.elite_r = r[idx]
        self.elite_w = soln[idx]
        
        # best reward for this interation
        self.best_r_ = self.elite_r[0]
        
        if self.first_gen or (self.best_r_ > self.best_r):
            self.first_gen = False
            self.best_s = np.copy(self.elite_w[0])
            self.best_r = self.elite_r[0]
        
        if self.sig > self.sig_lim:
            self.sig *= self.sig_decay
    
    def result(self):
        return self.best_s, self.best_r, self.best_r_, self.sig
        
        

In [None]:
class SimpleNES():
    """Simple Natural Evolution Strategies"""
    def __init__(self, num_params, 
                 popsize=256, 
                 sig_init=0.1, 
                 sig_decay=0.999, 
                 sig_lim=0.01, 
                 alpha = 0.1):
        
        self.num_params = num_params
        self.popsize = popsize
        self.sig_init = sig_init
        self.sig_decay = sig_decay
        self.sig_lim = sig_lim
        self.first_gen = True
        self.sig = self.sig_init
        self.alpha = alpha
        
        # Initialise the memory for solutions and best solution
        self.solutions = np.random.randn(self.popsize, self.num_params)
        self.best_s = np.zeros(self.num_params)
        self.best_r = 0
        
    def ask(self):
        # Sample noise from normal distribution (0,1)
        self.noise = np.random.randn(self.popsize, self.num_params)*self.sig
        # Jitter the solutions with gaussian noise
        solutions = self.solutions + self.noise
        self.solutions = solutions
        return solutions
        
    def tell(self, reward_list):
        assert (len(reward_list) == self.popsize), "Inconsistant reward size"
        
        idx = np.argmax(reward_list)
        self.best_r_ = reward_list[idx]
        self.best_s = self.solutions[idx]
        
        # Normalise the reward to gaussian distribution
        self.r = (reward_list - np.mean(reward_list))/np.std(reward_list)
        # Perform the parameter update (SGD)
        # ---np.dot(self.noise.T, self.r) : This is basically weighing the Noise by reward
        # ---self.alpha / (self.popsize*self.sig) : Get the mean for all solutions
        self.solutions = self.solutions + self.alpha / (self.popsize*self.sig) * np.dot(self.noise.T, self.r)
        
        if self.first_gen or (self.best_r_ > self.best_r):
            self.first_gen = False
            self.best_r = self.best_r_
        
        if self.sig > self.sig_lim:
            self.sig *= self.sig_decay
            
    def result(self):
        return self.best_s, self.best_r, self.best_r_, self.sig
        

In [None]:
MY_REQUIRED_FITNESS = 199

In [None]:
solver = SimpleES(num_params)
e = 0
while e < 100:

    # ask the ES to give us a set of candidate solutions
    solutions = solver.ask()

    # create an array to hold the fitness results.
    fitness_list = np.zeros(solver.popsize)

    # evaluate the fitness for each given solution.
    for i in range(solver.popsize):
        fitness_list[i] = evaluate(solutions[i])

    # give list of fitness results back to ES
    solver.tell(fitness_list)

    # get best parameter, fitness from ES
    best_solution, best_fitness_ever, best_fitness_current, sigma = solver.result()
    e += 1
    print(e, best_fitness_ever, best_fitness_current)
    #print (best_fitness)
    if best_fitness_ever > MY_REQUIRED_FITNESS:
        break

In [None]:
print(best_solution)

In [None]:
def render(W):
    # reset the environment, i.e. going back to beginning
    X = env.reset()
    
    # we run for 200 time steps, because 200 is the max reward we can get
    for t in range(1,201):
        env.render()
        # 0 means go left, 1 means go right
        action = 0 if W@X < 0 else 1
        X, reward, done, _ = env.step(action)
        if done:
            return t
    return t

In [None]:
render([8.81054195, 5.82105525, 7.1435554, 7.99971522])