In [None]:
import swarmrl.engine.resobee as resobee
import os
infomsg = "I "

import flax.linen as nn
import numpy as np
import optax
import yaml
 
import swarmrl as srl
from swarmrl.actions.actions import Action

import matplotlib.pyplot as plt

## RL Configuration

In this code block the task and parameters are defined. Therefor here the goal of the RL procedure is determined.

In [None]:


class ActoCriticNet(nn.Module):
    """A simple dense model."""

    @nn.compact
    def __call__(self, x):
        y=nn.Dense(features=12)(x)
        x = nn.Dense(features=12)(x)
        x = nn.relu(x)
        y=nn.relu(y)

        y=nn.Dense(features=12)(y)
        x = nn.Dense(features=12)(x)
        x = nn.relu(x)
        y=nn.relu(y)
        y = nn.Dense(features=1)(x) #Critic
        x = nn.Dense(features=4)(x) #Actor
        return x, y

# Define an exploration policy
exploration_policy = srl.exploration_policies.RandomExploration(probability=0.1)

# Define a sampling_strategy
sampling_strategy = srl.sampling_strategies.GumbelDistribution()

# Value function to use
value_function = srl.value_functions.ExpectedReturns(
    gamma=0.1, standardize=True
)

#Define the model
actor_critic = ActoCriticNet()
network = srl.networks.FlaxModel(
        flax_model=actor_critic,
        optimizer=optax.adam(learning_rate=0.01),
        input_shape=(2,),
        sampling_strategy=sampling_strategy,
        exploration_policy=exploration_policy,
    )

def scale_function(distance: float):
    """
    Scaling function for the task
    """
    return 1 - distance

task = srl.tasks.searching.GradientSensing(
    source=np.array([10.0, 10.0]),
    decay_function=scale_function,
    reward_scale_factor=100,
    box_length=np.array([20.0, 20.]),
)

observable=srl.observables.PositionObservable(np.array([20.0,20.0]))

# Define the loss model
loss = srl.losses.PolicyGradientLoss(value_function=value_function)
actions = {
    "TranslateLeft": Action(force=10.0, new_direction=np.array([-10., 0.])),
    "TranslateUp": Action(force=10.0, new_direction=np.array([0., 10.])),
    "TranslateRight": Action(force=10.0, new_direction=np.array([10., 0.])),
    "TranslateDown": Action(force=10.0, new_direction=np.array([0., -10.])),
}
protocol=srl.agents.ActorCriticAgent(particle_type=0, network=network, task=task, observable=observable,actions=actions,loss=loss
                                     )

# Define the force model.
rl_trainer=srl.trainers.EpisodicTrainer([protocol])
n_episodes=100


Paths to the ResoBee root directory needs to be specified. 

In [None]:
resobee_root_path = ""

build_path = os.path.join(resobee_root_path, "build")
config_dir = os.path.join(resobee_root_path, 'workflow/projects/debug/parameter-combination-0/seed-0')

target = 'many_body_simulation'
resobee_executable = os.path.join(resobee_root_path, 'build/src', target)


In [None]:
system_runner = resobee.ResoBee(
    resobee_executable=resobee_executable,
    config_dir=config_dir
)

In [None]:

def get_engine(system):
    return system
reward=rl_trainer.perform_rl_training(get_engine=get_engine, 
            system=system_runner, 
            n_episodes=n_episodes, 
            episode_length=1)
plt.plot(reward)
plt.xlabel("epsiodes")
plt.ylabel("reward")
plt.show()