# Evolving a Lunar Lander with differentiable Genetic Programming

## Installation
To install the required libraries run the command:

In [None]:
#!pip install -r requirements.txt

## Imports
Imports from the standard genepro-multi library are done here. Any adjustments (e.g. different operators) should be made in the notebook. For example:

```
class SmoothOperator(Node):
  def __init__(self):
    super(SmoothOperator,self).__init__()
    self.arity = 1
    self.symb = "SmoothOperator"

  def _get_args_repr(self, args):
    return self._get_typical_repr(args,'before')

  def get_output(self, X):
    c_outs = self._get_child_outputs(X)
    return np.smoothOperation(c_outs[0])

  def get_output_pt(self, X):
    c_outs = self._get_child_outputs_pt(X)
    return torch.smoothOperation(c_outs[0])
```

In [None]:
import gymnasium as gym

from genepro.node_impl import *
from genepro.evo import Evolution
from genepro.node_impl import Constant

import torch
import torch.optim as optim

import random
import os
import copy
from collections import namedtuple, deque

import matplotlib.pyplot as plt
from matplotlib import animation

## Reinforcement Learning Setup
Here we first setup the Gymnasium environment. Please see https://gymnasium.farama.org/environments/box2d/lunar_lander/ for more information on the environment. 

Then a memory buffer is made. This is a buffer in which state transitions are stored. When the buffer reaches its maximum capacity old transitions are replaced by new ones.

A frame buffer is initialised used to later store animation frames of the environment.

In [None]:
env = gym.make("LunarLander-v2", render_mode="rgb_array")

In [None]:
Transition = namedtuple("Transition", ("state", "action", "next_state", "reward"))


class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

    def __iadd__(self, other):
        self.memory += other.memory
        return self

    def __add__(self, other):
        self.memory = self.memory + other.memory
        return self

## Fitness Function

Here you get to be creative. The default setup evaluates 5 episodes of 300 frames. Think of what action to pick and what fitness function to use. The Multi-tree takes an input of $n \times d$ where $n$ is a batch of size 1.

In [None]:
def fitness_function_pt(multitree, num_episodes=10, episode_duration=300, ignore_done=False, render=False):
    memory = ReplayMemory(10000)
    rewards = []
    if render:
        frames = []

    # print(multitree.get_readable_repr())
    for _ in range(num_episodes):
        # get initial state of the environment
        observation = env.reset()
        observation = observation[0]

        for _ in range(episode_duration):
            if render:
                frames.append(env.render())
            input_sample = torch.from_numpy(observation.reshape((1, -1))).float()
            action = torch.argmax(multitree.get_output_pt(input_sample)).detach()
            observation, reward, terminated, truncated, info = env.step(action.item())
            rewards.append(reward)
            output_sample = torch.from_numpy(observation.reshape((1, -1))).float()
            memory.push(input_sample, torch.tensor([[action.item()]]), output_sample, torch.tensor([reward]))
            if (terminated or truncated) and not ignore_done:
                break

    # Get the average reward over all episodes
    fitness = np.sum(rewards) / num_episodes
    if render:
        return fitness, memory, frames
    return fitness, memory

In [None]:
### USED TO STORE THE EXPERIMENT DICTIONARY
import inspect
def serialize_functions_in_dict(dictionary):
    for key, value in dictionary.items():
        if inspect.isfunction(value) or inspect.ismethod(value):
            dictionary[key] = value.__name__
        elif isinstance(value, list):
            for i, item in enumerate(value):
                if isinstance(item, dict):
                    value[i] = serialize_functions_in_dict(item)
                elif inspect.isfunction(item) or inspect.ismethod(item):
                    value[i] = item.__name__
                elif isinstance(item, Node):
                    value[i] = item.symb
        elif isinstance(value, dict):
            dictionary[key] = serialize_functions_in_dict(value)
    return dictionary


## Evolution Setup
Here the leaf and internal nodes are defined. Think about the odds of sampling a constant in this default configurations. Also think about any operators that could be useful and add them here. 

Adjust the population size (multiple of 8 if you want to use the standard tournament selection), max generations and max tree size to taste. Be aware that each of these settings can increase the runtime.

#### BASELINE

In [None]:
from copy import deepcopy
import json
from genepro.selection import tournament_selection
from genepro.variation import coeff_mutation, subtree_crossover, subtree_mutation

experiment_name = "baseline"
num_features = env.observation_space.shape[0]
evo_settings = {
    "fitness_function": fitness_function_pt,
    "internal_nodes": [Plus(), Minus(), Times(), Div()],
    "leaf_nodes": [Feature(i) for i in range(num_features)] + [Constant()],
    "n_trees": 4,
    "pop_size": 64,
    "max_gens": 50,
    "init_max_depth": 4,
    "max_tree_size": 32,
    "crossovers": [{"fun": subtree_crossover, "rate": 0.5}],
    "mutations": [{"fun": subtree_mutation, "rate": 0.5}],
    "coeff_opts": [{"fun": coeff_mutation, "rate": 0.5}],
    "selection": {"fun": tournament_selection, "kwargs": {"tournament_size": 8}},
    "n_jobs": 8,
    "verbose": True
}

os.makedirs(f"./experiments/{experiment_name}", exist_ok=True)
with open(f"./experiments/{experiment_name}/evo_settings.json", "w") as f:
    serialized_dict = serialize_functions_in_dict(deepcopy(evo_settings))
    json.dump(serialized_dict, f)

evo_baseline = Evolution(**evo_settings)
evo_baseline.evolve()

#### SAVE BASELINE RESULTS

In [None]:
# Save the gen as a pickle file in the gens folder
import pickle
def save_and_evaluate_evo_generations(evo, fitness_function, experiment=""):
    for i, gen in enumerate(evo.best_of_gens):
        avg_fitness, _ = fitness_function(gen, num_episodes=50)
        print("Best of Generation", i, ": fitness", round(gen.fitness, 3), "test_fitness:", round(avg_fitness, 3))
        # create the gens folder if it doesn't exist
        with open(f"./experiments/{experiment_name}/gen_{i}_{round(avg_fitness)}.pickle", "wb") as f:
            pickle.dump(gen, f)
save_and_evaluate_evo_generations(evo_baseline, fitness_function_pt, experiment="baseline")

## Evolve
Running this cell will use all the settings above as parameters

# Test

## Make an animation
Here the best evolved individual is selected and one episode is rendered. Make sure to save your lunar landers over time to track progress and make comparisons.

In [None]:
# gist to save gif from https://gist.github.com/botforge/64cbb71780e6208172bbf03cd9293553
def save_frames_as_gif(frames, path="./", filename="evolved_lander.gif"):
    plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis("off")

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
    anim.save(path + filename, writer="imagemagick", fps=60)


frames = []
avg_fitness, frames = get_test_score(evo.best_of_gens[-1], num_episodes=5, episode_duration=300, seed=5, render=True)
print("Average fitness of the render is: ", avg_fitness)
env.close()
save_frames_as_gif(frames)

## Play animation

<img src="evolved_lander.gif" width="750">

## Optimisation
The coefficients in the multi-tree aren't optimised. Here Q-learning (taken from https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html) is used to optimise the weights further. Incorporate coefficient optimisation in training your agent(s). Coefficient Optimisation can be expensive. Think about how often you want to optimise, when, which individuals etc.

In [None]:
batch_size = 128
GAMMA = 0.99

constants = best.get_subtrees_consts()

if len(constants) > 0:
    optimizer = optim.AdamW(constants, lr=1e-3, amsgrad=True)

for _ in range(500):
    if len(constants) > 0 and len(evo.memory) > batch_size:
        target_tree = copy.deepcopy(best)

        transitions = evo.memory.sample(batch_size)
        batch = Transition(*zip(*transitions))

        non_final_mask = torch.tensor(
            tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.bool
        )

        non_final_next_states = torch.cat(
            [s for s in batch.next_state if s is not None]
        )
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        state_action_values = best.get_output_pt(state_batch).gather(1, action_batch)
        next_state_values = torch.zeros(batch_size, dtype=torch.float)
        with torch.no_grad():
            next_state_values[non_final_mask] = (
                target_tree.get_output_pt(non_final_next_states).max(1)[0].float()
            )

        expected_state_action_values = (next_state_values * GAMMA) + reward_batch

        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

        # Optimize the model
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_value_(constants, 100)
        optimizer.step()

print(best.get_readable_repr())
print(get_test_score(best))

In [None]:
frames = []
fitness_function_pt(
    best, num_episodes=1, episode_duration=500, render=True, ignore_done=False
)
env.close()
save_frames_as_gif(frames, filename="evolved_lander_RL.gif")

<img src="evolved_lander_RL.gif" width="750">