In [None]:
#!pip install gym==0.12.5
#!pip install pyglet==1.5.27
#!pip install torch

In [2]:
import gym

import numpy as np

import random

import torch
import torch.nn as nn

from tqdm import tqdm

import typing
from typing import List

Parameters = List[torch.nn.parameter.Parameter]

if torch.cuda.is_available():
    device = "cuda:0"
else:
    device = "cpu"

print(f"Pytorch will use device {device}")

# turn off gradients as we will not be needing them
torch.set_grad_enabled(False)

env = gym.make('CartPole-v0')
obs = env.reset()

in_dim = len(obs)
out_dim = env.action_space.n

print(in_dim, out_dim)

Pytorch will use device cpu
4 2


In [3]:
def get_params(net: torch.nn.Sequential) -> Parameters:
    params = []
    for layer in net:
        if hasattr(layer, 'weight') and layer.weight != None:
            params.append(layer.weight)
        if hasattr(layer, 'bias') and layer.bias != None:
            params.append(layer.bias)
    return params


def set_params(net: torch.nn.Sequential, params: Parameters) -> torch.nn.Sequential:
    i = 0
    for layerid, layer in enumerate(net):
        if hasattr(layer, 'weight') and layer.weight != None:
            net[layerid].weight = params[i]
            i += 1
        if hasattr(layer, 'bias') and layer.bias != None:
            net[layerid].bias = params[i]
            i += 1
    return net


def fitness(solution: Parameters, net: torch.nn.Sequential, render=False) -> float:
    #Evaluate a solution, a set of weights and biases for the network

    net = set_params(net, solution)
    
    ob = env.reset()
    
    done = False
    sum_reward = 0
    while not done:
        ob = torch.tensor(ob).float().unsqueeze(0).to(device)
        q_vals = net(ob)
        
        act = torch.argmax(q_vals.cpu()).item()
        
        ob_next, reward, done, info = env.step(act)
        ob = ob_next
    
        sum_reward += reward
        if render:
            env.render()
    return sum_reward


def select(pop: List[Parameters], fitnesses: np.ndarray) -> List[Parameters]:
    #Select a new population
    idx = np.random.choice(np.arange(POP_SIZE), size=POP_SIZE, replace=True, p=fitnesses/fitnesses.sum())
    return [pop[i] for i in idx]


def crossover(parent1: Parameters, pop: List[Parameters]) -> Parameters:
    if np.random.rand() < CROSS_RATE:
        i = np.random.randint(0, POP_SIZE, size=1)[0]
        parent2 = pop[i]
        child = []
        split = np.random.rand()
        
        for p1l, p2l in zip(parent1, parent2):
            splitpoint = int(len(p1l)*split)
            new_param = nn.parameter.Parameter(torch.cat([p1l[:splitpoint], p2l[splitpoint:]]))
            child.append(new_param)

        return child
    else:
        return parent1



def gen_mutate(shape: torch.Size) -> torch.tensor:
    #Generate a tensor to use for random mutation of a parameter
    return nn.Dropout(MUTATION_RATE)(torch.ones(shape)-0.5) * torch.randn(shape).to(device)*MUTATION_FACTOR



def mutate(child: Parameters) -> Parameters:
    for i in range(len(child)):
        for j in range(len(child[i])):
            child[i][j] += gen_mutate(child[i][j].shape)
                
    return child

In [4]:
%%time

# hyperparameters for genetic algorithm
POP_SIZE = 100
CROSS_RATE = 0.75
MUTATION_RATE = 0.25
MUTATION_FACTOR = 0.003
N_GENERATIONS = 30
FITNESS_EARLY_STOP_THRESH = 196

# the pytorch neural network to train
net = nn.Sequential(nn.Linear(in_dim, 32, bias=True),
                    nn.ReLU(),
                    nn.Linear(32, 16, bias=True),
                    nn.ReLU(),
                    nn.Linear(16, out_dim, bias=True)).to(device)

# get the required parameter shapes
base = get_params(net)
shapes = [param.shape for param in base]

# build a population
pop = []
for i in range(POP_SIZE):
    entity = []
    for shape in shapes:
        # if fan in and fan out can be calculated (tensor is 2d) then using kaiming uniform initialisation
        # as per nn.Linear
        # otherwise use uniform initialisation between -0.5 and 0.5
        try:
            rand_tensor = nn.init.kaiming_uniform_(torch.empty(shape)).to(device)
        except ValueError:
            rand_tensor = nn.init.uniform_(torch.empty(shape), -0.2, 0.2).to(device)
        entity.append((torch.nn.parameter.Parameter(rand_tensor)))
    pop.append(entity)

# whether or not to render while training (false runs code a lot faster)
render = False

# the max episodes (200 is the environment default)
env._max_episode_steps = 200


# train
for i in range(N_GENERATIONS):
    # get fitnesses
    fitnesses = np.array([fitness(entity, net, render) for entity in pop])
    # calculate average fitness of population
    avg_fitness = fitnesses.sum()/len(fitnesses)
    
    # print info of generation
    print(f"Generation {i}: Average Fitness is {avg_fitness} | Max Fitness is {fitnesses.max()}")
    
    if avg_fitness > FITNESS_EARLY_STOP_THRESH:
        break
    # select a new population
    
    fittest = pop[fitnesses.argmax()]
    pop = select(pop, fitnesses)
    random.shuffle(pop)
    pop = pop[:-1]
    pop.append(fittest)
    pop2 = list(pop)
    # go through the population and crossover and mutate
    for i in range(len(pop)):
        child = crossover(pop[i], pop2)
        child = mutate(child)
        pop[i] = child

Generation 0: Average Fitness is 18.72 | Max Fitness is 200.0
Generation 1: Average Fitness is 42.75 | Max Fitness is 200.0
Generation 2: Average Fitness is 59.13 | Max Fitness is 200.0
Generation 3: Average Fitness is 97.07 | Max Fitness is 200.0
Generation 4: Average Fitness is 110.64 | Max Fitness is 200.0
Generation 5: Average Fitness is 155.1 | Max Fitness is 200.0
Generation 6: Average Fitness is 162.22 | Max Fitness is 200.0
Generation 7: Average Fitness is 169.14 | Max Fitness is 200.0
Generation 8: Average Fitness is 154.77 | Max Fitness is 200.0
Generation 9: Average Fitness is 168.59 | Max Fitness is 200.0
Generation 10: Average Fitness is 159.43 | Max Fitness is 200.0
Generation 11: Average Fitness is 166.46 | Max Fitness is 200.0
Generation 12: Average Fitness is 165.48 | Max Fitness is 200.0
Generation 13: Average Fitness is 167.96 | Max Fitness is 200.0
Generation 14: Average Fitness is 165.1 | Max Fitness is 200.0
Generation 15: Average Fitness is 161.65 | Max Fitness i

In [6]:
env._max_episode_steps = 300

fitnesses = np.array([fitness(entity, net, render) for entity in pop])
fittest = np.argmax(fitnesses)

env._max_episode_steps = 1000

test_fitnesses = []
for _ in range(10):
    test_fitnesses.append(fitness(pop[fittest], net, True))
    
print(f"Average fitness of selected entity is {sum(test_fitnesses)/len(test_fitnesses)}")
print(f"Best performance of selected entity is {max(test_fitnesses)}")

Average fitness of selected entity is 244.9
Best performance of selected entity is 550.0
