In [1]:
import gym 
import pickle
import random
import numpy as np 
from time import time

  logger.warn(


In [3]:
POP = 64
GENS = 8
KEEP_PERCENTAGE = 0.5

EPS = 8
MAX_EP_LEN = 2000
GOAL = 300

MUT_VAR = 0.08

MODEL_PATH = "model.bin"

ARCH = (24, 256, 64, 4)

In [2]:
rng = np.random.default_rng(314)


def generate_random(size):
    rand_arr = rng.random(size)
    rand_signs = rng.choice([-1, 1], size)
    return rand_arr * rand_signs


def relu(a):
    return np.where(a > 0, a, 0)


def softmax(a):
    return np.exp(a) / np.sum(np.exp(a))


def sigmoid(a):
    return 1 / (1 + np.exp(-a))


# Vectorize sigmoid function
sigmoid_v = np.vectorize(sigmoid)


def time_elapsed(prev, cur, round_digit=8):
    delta_time = cur - prev
    return f"\tTime elapsed: {round(delta_time, round_digit)} seconds"


In [8]:
class NN: # Discrete input NN
    def __init__(self, arch, env_name, net_to_copy=None, mut_var=MUT_VAR):
        self.env_name = env_name
        
        if net_to_copy is None:
            weights = [
                generate_random((arch[i + 1], arch[i])) for i in range(len(arch) - 1)
            ]

            biases = [generate_random((arch[i],)) for i in range(1, len(arch))]

        else:
            weights = net_to_copy.weights
            biases = net_to_copy.biases

            weights = [
                weight + np.random.normal(0.0, mut_var, weight.shape)
                for weight in weights
            ]
            biases = [
                bias + np.random.normal(0.0, mut_var, bias.shape) for bias in biases
            ]

        self.weights = weights
        self.biases = biases

    def forward(self, _obs):
        weights = self.weights
        biases = self.biases

        a = np.array(_obs)

        for i in range(len(weights)):
            a = relu(weights[i] @ a + biases[i])

        probabilities = softmax(a)
        action = np.argmax(probabilities)

        return action

    def evaluate(
        self, episodes=EPS, max_ep_len=MAX_EP_LEN, render_env=False, record=False
    ):
        ep_rewards = []
        env = gym.make(self.env_name)

        if record:
            env = gym.wrappers.Monitor(env, "recording")

        env._max_episode_steps = max_ep_len

        for i_ep in range(episodes):
            _obs = env.reset()
            done = False
            score = 0

            while not done:
                if render_env:
                    env.render()

                action = self.forward(_obs)

                _obs, reward, done, _ = env.step(action)
                score += reward

            ep_rewards.append(score)

        env.close()

        return np.array(ep_rewards).max()


In [13]:
class GA:
    def __init__(
        self,
        env_name,
        arch=ARCH,
        pop=POP,
        mut_var=MUT_VAR,
        verbose=False,
        eps=EPS,
        max_ep_len=MAX_EP_LEN,
        goal=GOAL,
        render_env=False,
        record=False,
    ):
        self.env_name = env_name
        self.arch = arch
        self.pop = pop
        self.mut_var = mut_var
        self.verbose = verbose
        self.eps = eps
        self.max_ep_len = max_ep_len
        self.goal = goal
        self.render_env = render_env
        self.record = record

        self.nets = [NN(arch, self.env_name) for _ in range(pop)]

    def get_best_fit(self):
        #TODO Add skip for first {KEEP_PERCENTAGE} of population to reduce time spent evaluating
        # fitness_arr = self.fitnesses[int(POP * (1 - KEEP_PERCENTAGE)) :]
        # sorted_nets = self.nets[int(POP * (1 - KEEP_PERCENTAGE)) :]

        # self.nets = self.nets[int(POP * (1 - KEEP_PERCENTAGE)) :]
        
        fitness_arr = []
        sorted_nets = []
        
        for i, net in enumerate(self.nets):
            prev_time = time()

            fitness = self.evaluate(net)
            if self.verbose:
                print(
                    "Network:",
                    i + 1,
                    "\tFitness:",
                    round(fitness),
                    time_elapsed(prev_time, time()),
                )

            # Insertion sort
            inserted = False
            for j in range(len(fitness_arr)):
                if fitness_arr[j] > fitness:
                    fitness_arr.insert(j, fitness)
                    sorted_nets.insert(j, net)
                    inserted = True
                    break

            if not inserted:
                fitness_arr.append(fitness)
                sorted_nets.append(net)

            if fitness == self.goal:
                break

        fitnesses = np.array(fitness_arr)

        return sorted_nets, fitnesses

    def mutate_nets(self, parents):
        new_nets = [
            NN(self.arch, self.env_name, random.choice(parents), self.mut_var)
            for _ in range(self.pop - len(parents))
        ]
        return new_nets

    def train(self, gens=GENS):
        # children = self.nets
        
        for i in range(gens):
            prev_time = time()

            # nets, fitnesses = self.get_best_fit(children)
            nets, fitnesses = self.get_best_fit()

            if self.verbose:
                print(
                    "=====",
                    "Generation:",
                    i + 1,
                    "\tMaximum Reward:",
                    round(fitnesses.max()),
                    "\tAverage Reward:",
                    round(fitnesses.mean()),
                    time_elapsed(prev_time, time(), 3),
                    "=====",
                    "\n",
                )

            self.best_net = nets[-1]

            if fitnesses[-1] == self.goal:
                print("Callback reached: Max score obtained.")
                break

            parents = nets[int(POP * (1 - KEEP_PERCENTAGE)) :]
            children = self.mutate_nets(parents)

            self.nets = parents + children
            # self.fitnesses = fitnesses

    def load(self, path=MODEL_PATH):
        with open(path, "rb") as f:
            self.nets = pickle.load(f)

    def save(self, path=MODEL_PATH):
        with open(path, "wb") as f:
            pickle.dump(self.nets, f)

    def evaluate(self, net):
        mean_rewards = net.evaluate(
            self.eps, self.max_ep_len, self.render_env, self.record
        )
        return mean_rewards


In [14]:
ga = GA(verbose=True)
model_path = MODEL_PATH

In [None]:
ga.train()

In [None]:
ga.save(model_path)

In [81]:
ga.load(model_path)

record = False

if record:
    ga.best_net.evaluate(1, 200, True, True) # Record only records 200 steps
else:
    ga.nets[55].evaluate(1, 10000, True)