In [1]:
import gym

import numpy as np

import random

import torch
import torch.nn as nn

from tqdm import tqdm

import typing
from typing import List

Parameters = List[torch.nn.parameter.Parameter]

if torch.cuda.is_available():
    device = "cuda:0"
else:
    device = "cpu"

print(f"Pytorch will use device {device}")

torch.set_grad_enabled(False)

env = gym.make('CartPole-v0')
obs = env.reset()

in_dim = len(obs)
out_dim = env.action_space.n

print(in_dim, out_dim)

Pytorch will use device cuda:0
4 2


In [2]:
def get_params(net: torch.nn.Sequential) -> Parameters:
    '''
    Gets the parameters from a PyTorch model stored as an nn.Sequential
    
    @params
        network (nn.Sequential): A pytorch model
    @returns
        Parameters: the parameters of the model
    '''
    params = []
    for layer in net:
        if hasattr(layer, 'weight') and layer.weight != None:
            params.append(layer.weight)
        if hasattr(layer, 'bias') and layer.bias != None:
            params.append(layer.bias)
    return params


def set_params(net: torch.nn.Sequential, params: Parameters) -> torch.nn.Sequential:
    '''
    Sets the parameters for an nn.Sequential
    
    @params
        network (torch.nn.Sequential): A network to change the parameters of 
        params (Parameters): Parameters to place into the model
    @returns
        torch.nn.Sequential: A model the the provided parameters
    '''
    i = 0
    for layerid, layer in enumerate(net):
        if hasattr(layer, 'weight') and layer.weight != None:
            net[layerid].weight = params[i]
            i += 1
        if hasattr(layer, 'bias') and layer.bias != None:
            net[layerid].bias = params[i]
            i += 1
    return net

def fitness(solution: Parameters, net: torch.nn.Sequential, render=False) -> float:
    '''
    Evaluate a solution, a set of weights and biases for the network
    
    @params
        solution (Parameters): parameters to test the fitness of
        net (torch.nn.Sequential): A network for testing the parameters with
        render (bool): whether or not to draw the agent interacting with the environment as it trains
    @returns
        float: The fitness of the solution
    '''
    net = set_params(net, solution)
    
    ob = env.reset()
    
    done = False
    sum_reward = 0
    while not done:
        ob = torch.tensor(ob).float().unsqueeze(0).to(device)
        q_vals = net(ob)
        
        act = torch.argmax(q_vals.cpu()).item()
        
        ob_next, reward, done, info = env.step(act)
        ob = ob_next
    
        sum_reward += reward
        if render:
            env.render()
    return sum_reward

def select(pop: List[Parameters], fitnesses: np.ndarray) -> List[Parameters]:
    '''
    Select a new population
    
    @params
        pop (List[Parameters]): The entire population of parameters
        fitnesses (np.ndarray): the fitnesses for each entity in the population
    @returns
        List[Parameters]: A new population made of fitter individuals
    '''
    idx = np.random.choice(np.arange(POP_SIZE), size=POP_SIZE, replace=True, p=fitnesses/fitnesses.sum())
    return [pop[i] for i in idx]

def crossover(parent1: Parameters, pop: List[Parameters]) -> Parameters:
    '''
    Crossover two individuals and produce a child.
    
    This is done by randomly splitting the weights and biases at each layer for the parents and then
    combining them to produce a child
    
    @params
        parent1 (Parameters): A parent that may potentially be crossed over
        pop (List[Parameters]): The population of solutions
    @returns
        Parameters: A child with attributes of both parents or the original parent1
    '''
    if np.random.rand() < CROSS_RATE:
        i = np.random.randint(0, POP_SIZE, size=1)[0]
        parent2 = pop[i]
        child = []
        split = np.random.rand()
        
        for p1l, p2l in zip(parent1, parent2):
            splitpoint = int(len(p1l)*split)
            new_param = nn.parameter.Parameter(torch.cat([p1l[:splitpoint], p2l[splitpoint:]]))
            child.append(new_param)

        return child
    else:
        return parent1


def gen_mutate(shape: torch.Size) -> torch.tensor:
    '''
    Generate a tensor to use for random mutation of a parameter
    
    @params
        shape (torch.Size): The shape of the tensor to be created
    @returns
        torch.tensor: a random tensor
    '''
    return torch.randn(shape).to(device)*MUTATION_FACTOR
    
def mutate(child: Parameters) -> Parameters:
    '''
    Mutate a child
    
    @params
        child (Parameters): The original parameters
    @returns
        Parameters: The mutated child
    '''
    for i in range(len(child)):
        for j in range(len(child[i])):
            child[i][j] += gen_mutate(child[i][j].shape)
    return child

In [3]:
%%time

# hyperparameters for genetic algorithm
POP_SIZE = 100
CROSS_RATE = 0.8
MUTATION_RATE = 0.01
MUTATION_DECAY = 0.99
MUTATION_FACTOR = 0.001
N_GENERATIONS = 30
FITNESS_EARLY_STOP_THRESH = 196

# the pytorch neural network to train
net = nn.Sequential(nn.Linear(in_dim, 16, bias=True),
                    nn.Sigmoid(),
                    nn.Linear(16, 8, bias=True),
                    nn.Sigmoid(),
                    nn.Linear(8, out_dim, bias=True)).to(device)

# get the required parameter shapes
base = get_params(net)
shapes = [param.shape for param in base]

# build a population
pop = []
for i in range(POP_SIZE):
    entity = []
    for shape in shapes:
        # if fan in and fan out can be calculated (tensor is 2d) then using kaiming uniform initialisation
        # as per nn.Linear
        # otherwise use uniform initialisation between -0.5 and 0.5
        try:
            rand_tensor = nn.init.kaiming_uniform_(torch.empty(shape)).to(device)
        except ValueError:
            rand_tensor = nn.init.uniform_(torch.empty(shape), -0.2, 0.2).to(device)
        entity.append((torch.nn.parameter.Parameter(rand_tensor)))
    pop.append(entity)

# whether or not to render while training (false runs code a lot faster)
render = False

# the max episodes (200 is the environment default)
env._max_episode_steps = 200


# train
for i in range(N_GENERATIONS):
    # get fitnesses
    fitnesses = np.array([fitness(entity, net, render) for entity in pop])
    # calculate average fitness of population
    avg_fitness = fitnesses.sum()/len(fitnesses)
    
    # print info of generation
    print(f"Generation {i}: Average Fitness is {avg_fitness} | Max Fitness is {fitnesses.max()}")
    
    if avg_fitness > FITNESS_EARLY_STOP_THRESH:
        break
    # select a new population
    pop = select(pop, fitnesses)
    pop2 = list(pop)
    
    # go through the population and crossover and mutate
    for i in range(len(pop)):
        child = crossover(pop[i], pop2)
        child = mutate(child)
        pop[i] = child
        
    MUTATION_RATE = MUTATION_RATE*MUTATION_DECAY

Generation 0: Average Fitness is 9.635 | Max Fitness is 51.0
Generation 1: Average Fitness is 11.21 | Max Fitness is 63.0
Generation 2: Average Fitness is 14.89 | Max Fitness is 200.0
Generation 3: Average Fitness is 21.885 | Max Fitness is 200.0
Generation 4: Average Fitness is 35.98 | Max Fitness is 200.0
Generation 5: Average Fitness is 71.665 | Max Fitness is 200.0
Generation 6: Average Fitness is 94.22 | Max Fitness is 200.0
Generation 7: Average Fitness is 135.385 | Max Fitness is 200.0
Generation 8: Average Fitness is 179.865 | Max Fitness is 200.0
Generation 9: Average Fitness is 192.315 | Max Fitness is 200.0
Generation 10: Average Fitness is 194.365 | Max Fitness is 200.0
Generation 11: Average Fitness is 198.22 | Max Fitness is 200.0
Wall time: 1min 19s


In [6]:
env._max_episode_steps = 300

fitnesses = np.array([fitness(entity, net, render) for entity in pop])
fittest = np.argmax(fitnesses)

env._max_episode_steps = 1000

test_fitnesses = []
for _ in range(10):
    test_fitnesses.append(fitness(pop[fittest], net, True))
    
print(f"Average fitness of selected entity is {sum(test_fitnesses)/len(test_fitnesses)}")
print(f"Best performance of selected entity is {max(test_fitnesses)}")

Average fitness of selected entity is 1000.0
Best performance of selected entity is 1000.0
