# Towards self-organized control
### Train a neural cellular automata that controls a cart-pole agent

We Begin by installing the tools needed: some packages to render the environment in the notebook as well as SelfOrgControl, our implementation of neural CA and the training procedure for the cart-pole task.

In [None]:
!pip install git+https://github.com/aVariengien/self-organized-control.git#subdirectory=code
!apt-get install -y xvfb python-opengl > /dev/null 2>&1
!pip install gym pyvirtualdisplay > /dev/null 2>&1

In [None]:
import tensorflow as tf
import numpy as np
import time
import SelfOrgControl.NeuralCA as NCA
from SelfOrgControl.NCA_DQN import DQNAgent
from SelfOrgControl.NeuralCAVisu import visualize_agent, show_influence_field


#### Initialisation of the model
We first train the model to compute the mean of its 8 inputs.

In [None]:


inp_cell_pos = [(11, 26),(25,20),(5,20),(19,6),(19,26),(5,13),(25,12) ,(11, 6)]
out_cell_pos = [(13,16), (17,16)]

lr = 5e-3
lr_sched = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
    [1000, 5000], [lr, lr*0.1, lr*0.001])

nca = NCA.TrainableNeuralCA(input_electrodes = inp_cell_pos,
                            output_electrodes = out_cell_pos,
                            grid_size=32,
                            batch_size=16, channel_n=6,
                            ca_steps_per_sample=(50,60),
                            replace_proba=0.01,
                            task_loss_w=0.5, grid_pool_size=100,
                            learning_rate=lr,
                            repeat_input=1, #there is no redondancy
                            torus_boundaries=False,
                            penalize_overflow=True, overflow_w = 1e2,
                            use_hidden_inputs=True, perturb_io_pos=True,
                            add_noise=False, damage=True,
                            nb_hid_range=(0,0), move_rad=0, proba_move=0.0)
print(nca.neuralCA.dmodel.summary())


inputs_b = (np.random.random((4000,16,8)) - 0.5)*2
targets_b = np.repeat(np.expand_dims(np.mean(inputs_b, axis=-1),-1),2,axis=-1)*4
#we add a factor 4 to get on average a greater amplitude in the output values to
# predict


nca.fit(inputs_b, targets_b, verbose=True, use_batch=True)


### Test of the initiaisation phase
Plot the loss curve to ensure that something has been learned. 
The log10 of the loss should be around -1.

In [None]:
nca.plot_losses() #plot the loss curve

Check the dynamics of the outputs on 5 samples to check that the output
is responding to the inputs values.

In [None]:
_ = nca.plot_io_signals(55,inputs_b[:5,0,:], targets_b[:5,0,:]) 

If the initialisation phase worked as excpected, save the model and go to the Deep-Q learning phase.

In [None]:
nca.neuralCA.dmodel.save_weights("compute_mean_initialisation")

# Deep-Q learning

In [None]:
agent = DQNAgent()
# define the hyperparameters of the deep-q learning algo.
agent.epsilon = 1.
agent.epsilon_decay = 0.999
#the agent batch size is the number of transitions sampled from the memory at each 
#replay. They will be divided in agent.batch_size/agent.model.batch_size batches 
#for the training of the neural CA

agent.batch_size = 128

lr = 5e-3
lr_sched = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
    [1000,10000], [lr, lr*0.1, lr*0.001])

# define the NCA model to train
agent.model = NCA.TrainableNeuralCA(input_electrodes = inp_cell_pos,
                            output_electrodes = out_cell_pos,
                            grid_size=32,
                            batch_size=16, channel_n=6,
                            ca_steps_per_sample=(50,60),
                            replace_proba=0.01,
                            task_loss_w=0.5, grid_pool_size=100,
                            learning_rate=lr,
                            repeat_input=2, #Redondancy is used here, each input is linked to 2 input cells
                            torus_boundaries=False,
                            penalize_overflow=True, overflow_w = 1e2,
                            use_hidden_inputs=True, perturb_io_pos=True,
                            add_noise=False, damage=True,
                            nb_hid_range=(0,0), move_rad=0, proba_move=0.0)

# initialize using the previously trained parameters
agent.model.neuralCA.dmodel.load_weights("compute_mean_initialisation")

### Train the agent
Because the deep-q learning algo optimize the model to learn a proxy for the policy, the decreasing of the loss doesn't always means that the score of the agent will increase. It sometimes fail to find a good agent despite the log 10 of loss reaching ~ -1.8.

It will be sometimes necessary to restart the learning process from the begining to find a good performing model (score > 300). The best performing model can be deceptive even after 700 replays when epsilon has decay below 0.05. Around 1 over 2 runs of training lead to agents presenting good performance.

In [None]:
agent.run()

# Test the model


In [None]:
agent_for_test = DQNAgent()

# define the NCA model to test
agent_for_test.model = NCA.TrainableNeuralCA(input_electrodes = inp_cell_pos,
                            output_electrodes = out_cell_pos,
                            grid_size=32,
                            batch_size=1, channel_n=6, #we use batch_size =1 to speed up the computation for the test phase
                            ca_steps_per_sample=(50,60),
                            replace_proba=0.01,
                            task_loss_w=0.5, grid_pool_size=100,
                            learning_rate=lr,
                            repeat_input=2, #Redondancy is used here, each input is linked to 2 input cells
                            torus_boundaries=False,
                            penalize_overflow=True, overflow_w = 1e2,
                            use_hidden_inputs=True, perturb_io_pos=True,
                            add_noise=False, damage=True,
                            nb_hid_range=(0,0), move_rad=0, proba_move=0.0)


In [None]:
agent_for_test.model.neuralCA.dmodel.load_weights(agent.best_model_name)

### Load a pretrained model
You can execute the cells belove to load a model pretrained to solve the cart-pole problem.

In [None]:
!wget https://raw.githubusercontent.com/aVariengien/self-organized-control/main/code/PretrainedModels.zip
!unzip PretrainedModels.zip

In [None]:
#Available models: model 1,2 and 3
agent_for_test.model.neuralCA.dmodel.load_weights("PretrainedModels/model3")

### Visualise the learned strategy

In [None]:
#Start a virtual display to render the cart-pole environment
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
display = Display(visible=0, size=(400, 300))
display.start()

In [1]:
agent_for_test.test(nb_episode=5, verbose=1, render=False, render_for_colab=True)

NameError: name 'agent_for_test' is not defined

### Test the performance

In [None]:
agent_for_test.test(nb_episode=5, verbose=1, render=False)

### Visualize the neural CA

The images will be saved in an automatically created directory called "agent_video_images"

In [None]:
#create a video of the cart-pole agent and the NCA in parallel
agent_for_test.env.auto_reset = True #we make the environment reset automatically if the pole fall
visualize_agent(agent_for_test, 100, output_video=True)

### Show the influence field of the inputs

In [None]:
_,_,_,_,sensors  = agent_for_test.test(return_sensors=True, verbose=False, render=False, nb_episode=1, fix_nb_step=100)
sensors_list = []

for i in range(len(sensors)):
    sensors_list.append(sensors[i])

In [None]:
deviations = []
inp_cell_pos = [(11, 26),(25,20),(5,20),(19,6),(19,26),(5,13),(25,12) ,(11, 6)]
for i in range(8):
    dev = show_influence_field(agent_for_test.model,inputs_to_sample=sensors_list, 
                                nb_rounds=10, perturb_range=(-1,1), 
                                perturb_input=[i],
                                normalize_mean=True)
    deviations.append(dev)
    print("\r"+str(i+1)+ "/8", end="")
no_pertub_dev = show_influence_field(agent_for_test.model,inputs_to_sample=sensors_list, 
                                nb_rounds=10, perturb_range=(1.,1.), 
                                perturb_input=[],
                                normalize_mean=True)
print("\rdone.", end="")

In [None]:
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.pyplot as plt
titles = ["Cart position", "Cart velocity", "Pole angle", "Pole angular velocity", "No perturbation"]
min_val = -1.6
max_val= 0.2

#fig= plt.figure(figsize=(10,10))
#fig, axes = plt.subplots(2,4, figsize=(20,10))

#last_axe = plt.subplot2grid( (25,10), [21,5], 1, 1, fig=fig )

axes = []
fig=plt.figure(figsize=(20,8))
for i in range(4):
    col = []
    for j in range(2):
        col.append(plt.subplot2grid((4,10), [j*2,i*2], 2, 2, fig=fig))
    axes.append(col)
    
axes.append(plt.subplot2grid((4,10), [1,8], 2, 2 , fig=fig))

inp_cell_pos = [(11, 26),(25,20),(5,20),(19,6),(19,26),(5,13),(25,12) ,(11, 6)]
log_dev = np.log10(deviations)
for i in range(0,4):
    for j in range(2):
        axes[i][j].axis("off")
        m  = axes[i][j].matshow(log_dev[2*i+j], vmin=min_val, vmax=max_val)

        axes[i][j].plot([inp_cell_pos[2*i+j][1]], [inp_cell_pos[2*i+j][0]], marker="x", color='red')
        axes[i][j].plot([16,16], [17,13], marker=".",linestyle="none", color="black", alpha=0.5)
        if j == 0:
            axes[i][j].set_title(titles[i],  pad=-200)
        
axes[4].set_title(titles[4])
axes[4].axis("off")
m= axes[4].matshow(np.log10(no_pertub_dev), vmin=min_val, vmax=max_val) 
axes[4].plot([16,16], [17,13], marker=".",linestyle="none", color="black", alpha=0.5)
divider = make_axes_locatable(axes[4])
cax = divider.new_horizontal(size="5%", pad=0.05)
fig.add_axes(cax)
fig.colorbar(m, cax=cax)
    
fig.suptitle("Log 10 of the deviation after perturbation", fontsize=15, y=1.05)
#fig.tight_layout()
fig.show()