# Chromosome Class

In [545]:
import json
import numpy as np

def string2array(string):
    string = string.replace("[", "").replace("]", "")
    rows = string.split("\n")
    result = []
    for r in rows:
        result.append(np.fromstring(r, dtype=int, sep=' '))
    return np.array(result)

class Chromosome():
    def __init__(self, width, height, init, mutate, fitness, behaviors):
        self._width = width
        self._height = height
        
        self._init_fn = init
        self._mutate_fn = mutate
        self._fitness_fn = fitness
        self._behaviors_fn = behaviors
        
        self._genes = self._init_fn(self._width, self._height)
        self._start_genes = self._genes.copy()
        self._actions = []
        self._fitness = -1
        self._behaviors = []
        
    def clone(self):
        c = Chromosome(self._width, self._height, self._init_fn, 
                       self._mutate_fn, self._fitness_fn, self._behaviors_fn)
        c._genes = self._genes.copy()
        c._start_genes = self._start_genes.copy()
        c._actions = self._actions.copy()
        c._fitness = self._fitness
        c._behaviors = self._behaviors.copy()
        return c
    
    def erase_history(self):
        c = self.clone()
        c._actions = []
        c._start_genes = self._genes.copy()
        return c
    
    def mutate(self, target):
        c = self.clone()
        c._fitness = -1
        c._behaviors = []
        
        actions = self._mutate_fn(c._genes, target)
        for act in actions:
            act["behaviors"] = c.behaviors()
        c._actions.extend(actions)
        
        return c
    
    def behaviors(self):
        if len(self._behaviors) == 0:
            self._behaviors = self._behaviors_fn(self._genes, self._actions)
        return self._behaviors
    
    def fitness(self):
        if self._fitness < 0:
            self._fitness = self._fitness_fn(self._genes, self._actions)
        return self._fitness
    
    def save(self, file_name):
        with open(file_name, 'w') as f:
            temp = {
                "width": self._width,
                "height": self._height,
                "genes": np.array2string(self._genes),
                "start": np.array2string(self._start_genes),
                "actions": self._actions,
                "fitness": self._fitness,
                "behaviors": self._behaviors,
            }
            f.write(json.dumps(temp))
    
    def load(self, file_name):
        with open(file_name, 'r') as f:
            temp = json.load(f)
            self._width = temp["width"]
            self._height = temp["height"]
            self._genes = string2array(temp["genes"])
            self._start_genes = string2array(temp["start"])
            self._actions = temp["actions"]
            self._fitness = temp["fitness"]
            self._behaviors = temp["behaviors"]

# Conditional Mutator Network

In [562]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from tqdm.notebook import trange, tqdm

class CMNN(nn.Module):
    def __init__(self,size,length):
        super(CMNN, self).__init__()
        
        self._nocond = length == 0
        
        self._conv1 = nn.Conv2d(1, 32, 3, padding='same')
        self._max1 = nn.MaxPool2d(2)
        self._conv2 = nn.Conv2d(32, 64, 3, padding='same')
        self._max2 = nn.MaxPool2d(2)
        self._conv3 = nn.Conv2d(64, 128, 3, padding='same')
        input_values = int(size / 4 * size / 4 * 128  + length)
        self._linear1 = nn.Linear(input_values, 256)
        self._linear2 = nn.Linear(256, 3)
        
    def forward(self, x, t):
        x = F.relu(self._max1(self._conv1(x)))
        x = F.relu(self._max2(self._conv2(x)))
        x = F.relu(self._conv3(x))
        x = x.view(x.shape[0],-1)
        if not self._nocond:
            x = torch.cat([x, t], 1)
        x = F.relu(self._linear1(x))
        x = self._linear2(x)
        return x
    
    def reset_parameters(self):
        self._conv1.reset_parameters()
        self._conv2.reset_parameters()
        self._conv3.reset_parameters()
        self._linear1.reset_parameters()
        self._linear2.reset_parameters()

def transform_input(level, position, size):
    x,y=position["x"],position["y"]
    cl = np.pad(level,size//2)
    cl = cl[y:y+size,x:x+size]
    return cl
    
def cmnn_mutate(model, level, size, target, epsilon, times=1):
    actions = []
    with torch.no_grad():
        for i in range(times):
            x,y=np.random.randint(level.shape[1]), np.random.randint(level.shape[0])
            c_lvl = transform_input(level, {"x":x, "y":y}, size)
            if np.random.random() < epsilon:
                value = np.random.randint(3)
            else:
                values = model(torch.tensor(c_lvl.copy().reshape(1,1,size,size)).float(),\
                           torch.tensor(target.copy().reshape(1,-1)).float())
                values = F.softmax(values, dim=1).numpy()
                value = np.random.choice([0,1,2], p=values.flatten())
#                 value = values.argmax().item()
            if value > 0:
                level[y][x] = value - 1
            actions.append({"x": x, "y": y, "action": value})
    return actions

def extract_init_data(archive, init, size, repeats = 1):
    levels = []
    targets = []
    actions = []
    keys = archive.keys()
    for k in keys:
        c = archive.get(k)
        target = np.array(k) / 40.0
        for i in range(repeats):
            level = init(c._width, c._height)
            pos = []
            for x in range(c._width):
                for y in range(c._height):
                    pos.append({"x": x, "y": y})
            np.random.shuffle(pos)
            for p in pos:
                cl = transform_input(level, p, size)
                levels.append(cl)
                targets.append(np.array(target) / 40.0)
                if c._genes[p["y"]][p["x"]] == level[p["y"]][p["x"]]:
                    actions.append(0)
                else:
                    actions.append(c._genes[p["y"]][p["x"]] + 1)
                level[p["y"]][p["x"]] = c._genes[p["y"]][p["x"]]
                if abs(level - c._genes).sum() == 0:
                    break
    return np.array(levels), np.array(targets), np.array(actions)

def extract_inbet_data(archive, init, size, repeats = 1):
    levels = []
    targets = []
    actions = []
    keys = archive.keys()
    for k in keys:
        c = archive.get(k)
        target = np.array(k) / 40.0
        for i in range(repeats):
            level = c._start_genes.copy()
            pos = []
            for x in range(c._width):
                for y in range(c._height):
                    pos.append({"x": x, "y": y})
            np.random.shuffle(pos)
            for p in pos:
                cl = transform_input(level, p, size)
                levels.append(cl)
                targets.append(np.array(target) / 40.0)
                if c._genes[p["y"]][p["x"]] == level[p["y"]][p["x"]]:
                    actions.append(0)
                else:
                    actions.append(c._genes[p["y"]][p["x"]] + 1)
                level[p["y"]][p["x"]] = c._genes[p["y"]][p["x"]]
                if abs(level - c._genes).sum() == 0:
                    break
    return np.array(levels), np.array(targets), np.array(actions)

def extract_traj_data(archive, init, size, subsets = 1):
    levels = []
    targets = []
    actions = []
    keys = archive.keys()
    for k in keys:
        c = archive.get(k)
        target = np.array(k) / 40.0
        level = c._start_genes.copy()
        if len(c._actions) == 0:
            continue
        interval = max(1,int(len(c._actions)/subsets+0.5))
        for i in range(0,len(c._actions),interval):
            targ_index = i + interval
            for act in c._actions[0:targ_index]:
                cl = transform_input(level, act, size)
                levels.append(cl)
                targets.append(np.array(act["behaviors"]) / 40.0)
                actions.append(act["action"])
    return np.array(levels), np.array(targets), np.array(actions)
        
def cmnn_train(model, optimizer, archive, init, extract, size, extract_value, epochs, batch,
               t_lvls, t_ts, t_acts):
    loss_fn = nn.CrossEntropyLoss()
    levels, targets, actions = extract(archive, init, size, extract_value)
    levels, targets, actions = np.concatenate((levels, t_lvls)), np.concatenate((targets, t_ts)),\
                               np.concatenate((actions, t_acts))
    epbar = trange(epochs, leave=False)
    for epoch in epbar:
        order = np.array(range(len(actions)))
        np.random.shuffle(order)
        in_lvls, in_t, in_act = levels[order], targets[order], actions[order]
        pbar = trange(len(in_act) - batch, leave=False)
        total_loss = 0
        for i in pbar:
            batch_lvls, batch_t, batch_act = in_lvls[i:i+batch], in_t[i:i+batch], in_act[i:i+batch]
            optimizer.zero_grad()
            
            batch_output = model(torch.tensor(batch_lvls.reshape(batch, 1, size, size)).float(),\
                  torch.tensor(batch_t.reshape(batch, batch_t.shape[1])).float())
            loss = loss_fn(batch_output, torch.tensor(batch_act).long())
            loss.backward()
            
            optimizer.step()
            
            total_loss += loss.item()
            pbar.set_postfix_str(f"Loss: {total_loss / (i + 1.0)}")
        if total_loss == 0:
            epbar.close()
            break
    return levels, targets, actions

# Generate Problem

In [563]:
import numpy as np
from helper import get_horz_symmetry, get_longest_path, get_number_regions, get_num_actions, get_range_reward

def binary_discrete(value):
    return int(40 * np.clip(value,0,1)-0.00000001)

def binary_init(width, height):
    return np.random.randint(2, size=(height, width))

def binary_target(archive):
    return np.array([np.random.randint(40)/40.0, np.random.randint(40)/40.0])

def get_binary_mutate(model, size, epsilon, times):
    return lambda level, target: cmnn_mutate(model, level, size, target, epsilon, times)

def binary_fitness(genes, actions):
    regions = get_range_reward(get_number_regions(genes, [1]), 1, 1, 1,\
                                           genes.shape[0] * genes.shape[1] / 10)
    return regions

def binary_behaviors(genes, actions):
    longest = binary_discrete(get_range_reward(get_longest_path(genes, [1]),\
                                               genes.shape[0] * genes.shape[1] / 2,\
                                               genes.shape[0] * genes.shape[1] / 2))
    symmetry = binary_discrete(get_range_reward(get_horz_symmetry(genes),\
                                                genes.shape[0] * genes.shape[1] / 2,\
                                                genes.shape[0] * genes.shape[1] / 2))
    return [longest, symmetry]

# Archive Class

In [564]:
import numpy as np
import os

class Archive:
    def __init__(self):
        self._map = {}
        
    def __len__(self):
        return len(self._map)
    
    def __str__(self):
        return f"Arhcive Size: {len(self._map)}\nValues:\n{str(self.keys())}"
        
    def keys(self, dim=-1, value=-1):
        if len(self._map) == 0:
            return np.array([])
        num_dim = len(list(self._map.keys())[0].split(","))
        keys = list(self._map.keys())
        result = []
        for key in keys:
            values = key.split(",")
            temp = []
            for v in values:
                temp.append(int(v))
            result.append(temp)
        result = np.array(result)
        if dim >= 0:
            result = np.array([k for k in result if k[dim] == value])
        return result
    
    def clone(self):
        archive = Archive()
        archive._map = self._map.copy()
        return archive
    
    def add(self, chromosome):
        key = ",".join([str(temp) for temp in chromosome.behaviors()])
        if key not in self._map or (key in self._map and chromosome.fitness() > self._map[key].fitness()):
            self._map[key] = chromosome
    
    def random(self):
        keys = list(self._map.keys())
        index = np.random.randint(len(keys))
        return self._map[keys[index]]
        
    def get(self, dimension):
        key = ",".join([str(temp) for temp in dimension])
        if key in self._map:
            return self._map[key]
        return None
    
    def get_all(self, dimensions):
        result = []
        for dim in dimensions:
            result.append(self.get(*dim))
        return result
    
    def save(self, folder):
        os.makedirs(folder)
        for key in self._map.keys():
            self._map[key].save(os.path.join(folder, f"{key}.json"))
    
    def load(self, folder, width, height, init, mutate, fitness, behaviors):
        self._map = {}
        files = [fn for fn in os.listdir(folder) if ".json" in fn]
        for fn in files:
            key = fn.split(".json")[0]
            self._map[key] = Chromosome(width, height, init, mutate, fitness, behaviors)
            self._map[key].load(os.path.join(folder, f"{key}.json"))

# Surrogate Mutation MAP-Elite

In [565]:
import numpy as np

class SMMAPElites:
    def __init__(self, start_size, width, height, fitness_threshold, init, mutate, fitness, behaviors, target):
        self._width = width
        self._height = height
        self._fitness_threshold = fitness_threshold
        
        self._init_fn = init
        self._mutate_fn = mutate
        self._fitness_fn = fitness
        self._behaviors_fn = behaviors
        self._target_fn = target
        
        self._map = Archive()
        for i in range(start_size):
            c = Chromosome(width, height, init, mutate, fitness, behaviors)
            if c.fitness() < self._fitness_threshold:
                self._map.add(c)
    
    def update(self, new_prob = 0.1, history_prob = 0.1):
        c = self._map.random()
        if np.random.random() < new_prob:
            temp = Chromosome(self._width, self._height, self._init_fn,\
                           self._mutate_fn, self._fitness_fn, self._behaviors_fn)
            if c.fitness() < self._fitness_threshold:
                c = temp
        if np.random.random() < history_prob and c.fitness() < self._fitness_threshold:
            c = c.erase_history()
        
        c = c.mutate(self._target_fn(self._map))
        self._map.add(c)
    
    def get_map(self):
        return self._map.clone()
    
    def __len__(self):
        return len(self._map)
    
    def save(self, folder):
        self._map.save(folder)
    
    def load(self, folder):
        self._map.load(folder, self._width, self._height, self._init_fn,\
                       self._mutate_fn, self._fitness_fn, self._behaviors_fn)

# Testing Surrogate Mutation MAP-Elites

### Old Parameters (DON'T TOUCH)

In [566]:
new_prob = 0.0
history_prob = 0.0
fitness_threshold = 1.0

### Hyper Parameters

In [572]:
# Level Size
width = 14                                        # width of the generated level
height = 14                                       # height of the generated level
conditional = False                               # specify if we want conditional input or ignore it

# Evolution Parameters
start_size = 100                                  # initial number of chromosomes to start MAP-Elites
iterations = 100000                               # number of fitness evalutations for MAP-Elites 
mutation_length = 1                               # how many tiles to mutate
epsilon = 0.25                                    # probability of doing random mutation not from model

# Training Parameters
train_period = 10000                              # frequency of training the network
train_epochs = 2                                  # number of epochs used in the middle of evolution
final_epochs = 2                                  # number of epochs before finishing evolution
batch_size = 32                                   # minibatch size during training
learning_rate = 0.0001                            # optimizer learning rate
reset_model = True                                # reset the model weights

# Data Creation Parameter
window_size = 8                                   # cropped view of the observation (can be any value)
increase_data_value = 1                           # increase data size by multiplying by that value
data_creation = ["traj", "inbet", "init"][2]      # method of creating the data
append_data = False                               # new data is generated beside old ones

### Functions (DON'T TOUCH)

In [573]:
from tqdm.notebook import trange, tqdm
import torch.optim as optim

init = binary_init
mutate = get_binary_mutate(model, window_size, epsilon, mutation_length)
fitness = binary_fitness
behaviors = binary_behaviors
target = binary_target
extract = {"traj": extract_traj_data, "inbet": extract_inbet_data, "init": extract_init_data}[data_creation]

cond = 0
if conditional:
    cond = 2
model = CMNN(window_size, cond)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
evolver = SMMAPElites(start_size, width, height, fitness_threshold, init, mutate, fitness, behaviors, target)

### Evolution

In [None]:
total_levels, total_targets, total_actions = np.array([]).reshape((0,window_size,window_size)),\
                                             np.array([]).reshape((0,2)), np.array([]).reshape((0))
pbar = trange(iterations)
for i in pbar:
    evolver.update(new_prob, history_prob)
    pbar.set_postfix_str(f"Map Size: {len(evolver)}")
    if (i > 0 and i % train_period == 0) or i == iterations-1:
        if reset_model:
            model.reset_parameters()
        epochs = train_epochs
        if i == iterations-1:
            epochs = final_epochs
        levels, targets, actions = cmnn_train(model, optimizer, evolver.get_map(), init, extract,\
                                             window_size, increase_data_value, epochs, batch_size,\
                                             total_levels, total_targets, total_actions)
        if append_data:
            total_levels = np.concatenate((total_levels, levels))
            total_targets = np.concatenate((total_targets, targets))
            total_actions = np.concatenate((total_actions, actions))

  0%|          | 0/100000 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

  0%|          | 0/37204 [00:00<?, ?it/s]

  0%|          | 0/37204 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

  0%|          | 0/47303 [00:00<?, ?it/s]

  0%|          | 0/47303 [00:00<?, ?it/s]

# Save Model and Archive

In [422]:
import os
import shutil

model_path = "first_model"

if os.path.exits(model_path):
    shutil.rmtree(model_path)
os.mkdir(model_path)
torch.save(model, os.path.join(model_path, "model"))
evolver.save(os.path.join(model_path, "archive"))

# Load Model and Archive

In [450]:
model = torch.load(os.path.join(model_path, "model"))
evolver.load(os.path.join(model_path, "archive"))

# Test Trained Model

In [494]:
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

condition = [1, 1]
repeatitions = 5

start = binary_init(width, height)
fig = plt.figure()
plt.axis('off')
im = plt.imshow(np.pad(start,1))
print(fitness(start, None))
print(behaviors(start, None))

frames = []
level = start.copy()
for i in range(repeatitions):
    for y in range(height):
        for x in range(width):
            obs = transform_input(level, {"x":x,"y":y}, window_size)
            targets = np.array(condition)
            action = model(torch.tensor(obs.reshape(1,1,window_size,window_size)).float(),\
                           torch.tensor(targets.reshape(1,-1)).float()).argmax().item()
            if action > 0:
                level[y][x] = action - 1
            
            frames.append(level.copy())
print(fitness(level, None))
print(behaviors(level, None))
    
def init_display():
    im.set_data(np.pad(start,1))
    return [im]

def animate_display(i):
    im.set_data(np.pad(frames[i],1))
    return [im]

anim = animation.FuncAnimation(fig, animate_display, init_func=init_display,\
                               frames=len(frames), interval=20, blit=True)
plt.close(anim._fig)

# Call function to display the animation
HTML(anim.to_html5_video())

0.4387755102040817
[5, 22]
1.0
[10, 35]
