In [2]:
from Solver import Particle, Perceptron, PerceptronModel, VicsekModel, NeuralNetwork, PerceptronMode, Mode, NeuralSwarmModel

import tensorflow as tf
from tensorflow.keras import layers
import numpy as np

# Use tensorflow agents for reinforcement learning
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym
from tf_agents.trajectories import time_step as ts
from tf_agents.agents.ddpg import ddpg_agent


In [None]:
# Hyperparameters
# :::::::::::::::::::::::::::::::::::::::

# Simulation settings
settings = {
        #                  N,      L,      v,      noise,  r
        "XXsmall": [       5,      4,      0.03,   0.1,    1],
        "Xsmall": [        20,     6,      0.03,   0.1,    1],
        "small": [         100,    30,     0.03,   0.1,    1],
        "a": [             300,    7,      0.03,   2.0,    1],
        "b": [             300,    25,     0.03,   0.5,    1],
        "d": [             300,    5,      0.03,   0.1,    1],
        "plot1_N40": [     40,     3.1,    0.03,   0.1,    1],
        "large": [         2000,   60,     0.03,   0.3,    1]
    }
    
# Choose between RADIUS, FIXED, FIXEDRADIUS
mode = Mode.FIXEDRADIUS
# Flags
ZDimension = False     # 2D or 3D
# Duration of simulation
timesteps = 5000
# Choose settings
chosen_settings = settings["small"]
N       = chosen_settings[0]
L       = chosen_settings[1]
v       = chosen_settings[2]
noise   = chosen_settings[3]
r       = chosen_settings[4]
k_neighbors = 5

In [None]:
# # Neural network model
# # ::::::::::::::::::::::::::::::::::::::::::::::::
# class ReinforcementModel(tf.keras.Model):
#     def __init__(self):
#         super(ReinforcementModel, self).__init__()
#         self.input_layer = tf.keras.Input(shape=(10,))  # 5 pairs of (sin, cos) values
#         # Input and output layer at the same time
#         self.dense1 = tf.keras.layers.Dense(5, activation='linear')
#         self.dense2 = tf.keras.layers.Dense(5, activation='linear')

#     def call(self, inputsCos, inputsSin):
#         x = self.dense1(inputsCos)
#         y = self.dense2(inputsSin)
#         return x, y

#     def get_angle(self, out_sin, out_cos):
#         # Calculate angle from sin and cos output of the model (in radians) and return it in the range [0, 2pi]
#         angle = np.arctan2(out_sin, out_cos)
#         angle = (angle + 2 * np.pi) % (2 * np.pi)
#         return angle

# # ::::::::::::::::::::::::::::::::::::::::::::::::

# NN = ReinforcementModel()

## Create simulation model
#model = NeuralSwarmModel(N, L, v, noise, r, mode, k_neighbors, ZDimension, seed=True)

# # Contains all neighbor lists for all particles
# AllNeighborsParticles = model.get_all_neighbors()

# print(len(AllNeighborsParticles))


In [None]:
# Create a custom environment
class SimulationEnvironment(py_environment.PyEnvironment):
    
    def __init__(self):
        # THOUGHTS: This should be correct. A particle can only choose an angle, which is a float (scalar)
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.float32, minimum=0, maximum=2*np.pi, name='action')
        # THOUGHTS: This should be correct. The observation is a vector of length k_neighbors + 1, where each entry is an angle. There is no information about the position of the particles.
        # k_neighbors + 1 because the particle itself is also included
        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(k_neighbors + 1,), dtype=np.float32, minimum=0, maximum=2*np.pi, name='observation')
        # THOUGHTS: What is _state? Is it the current state of the environment? This should be different from observation, because the observation is what the agent sees, while the state is the actual state of the environment.
        # So the state should be the order parameter to be maximized.
        self._state = 0
        self._episode_ended = False
        self.simulation = NeuralSwarmModel(N, L, v, noise, r, mode, k_neighbors, ZDimension, seed=True)
        # Only for testing: We observe and control the first particle
        self.index = 0

    def observation_spec(self):
        """Return observation_spec."""
        # DONE
        return self._observation_spec

    def action_spec(self):
        """Return action_spec."""
        # DONE
        return self._action_spec
    
    def reset(self):
        """Return initial_time_step."""
        # DONE
        self._current_time_step = self._reset()
        return self._current_time_step

    def step(self, action):
        """Apply action and return new time_step."""
        # DONE
        if self._current_time_step is None:
            return self.reset()
        self._current_time_step = self._step(action)
        return self._current_time_step

    def current_time_step(self):
        # DONE
        return self._current_time_step

    def time_step_spec(self):
        """Return time_step_spec."""
        # DONE

    def _reset(self):
        """Return initial_time_step."""
        # Reset simulation
        self.simulation = NeuralSwarmModel(N, L, v, noise, r, mode, k_neighbors, ZDimension, seed=True)
        self._state = 0
        self._episode_ended = False
        return ts.restart(np.zeros(shape=(k_neighbors + 1,), dtype=np.float32))

    def _step(self, action):
        """Apply action and return new time_step"""

        if self._episode_ended:
            # The last action ended the episode. Ignore the current action and start
            # a new episode.
            return self.reset()
        
        oldState = self._state

        # Make sure episodes don't go on forever.
        # Define a stopping action: action == -1
        if action < 0.:
            self._episode_ended = True
        elif action >= 0.:
            # Do one step in the simulation
            self.simulation.update_angle(self.index, action)
            self.simulation.update()
            self._state = self.simulation.get_local_order_parameter(self.index)
        else:
            raise ValueError('What did you do?')

        if self._episode_ended:
            # The reward is the difference between the new state and the old state.
            # An increase in the order parameter is rewarded, a decrease is punished.
            reward = self._state - oldState
            observation = self.simulation.get_angles(self.index)
            # The observation (first argument of ts.termination) is the angles of the neighbors of the particle
            return ts.termination(np.array(observation, dtype=np.float32), reward)
        else:
            return ts.transition(np.array(observation, dtype=np.float32), reward=0.0, discount=1.0)

In [None]:
class Actor(tf.keras.Model):
    def __init__(self, action_dim):
        super(Actor, self).__init__()
        self.dense1 = layers.Dense(64, activation='relu')
        self.dense2 = layers.Dense(64, activation='relu')
        self.out = layers.Dense(action_dim, activation='tanh')

    def call(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        action = self.out(x)
        return action


class Critic(tf.keras.Model):
    def __init__(self):
        super(Critic, self).__init__()
        self.dense1 = layers.Dense(64, activation='relu')
        self.dense2 = layers.Dense(64, activation='relu')
        self.out = layers.Dense(1)

    def call(self, state, action):
        x = tf.concat([state, action], axis=-1)
        x = self.dense1(x)
        x = self.dense2(x)
        value = self.out(x)
        return value


class DDPGAgent:
    def __init__(self, state_dim, action_dim):
        self.actor = Actor(action_dim)
        self.critic = Critic()

    def select_action(self, state):
        state = tf.expand_dims(tf.convert_to_tensor(state), 0)
        action = self.actor(state)
        return action[0]

    # Andere Methoden für Training, Aktualisieren der Zielnetzwerke, usw.
