# Nonlinear example for gym environment with FMU with HER

In [None]:
import os, sys
import time
from collections import OrderedDict

import numpy as np
from matplotlib import pyplot as plt

from fmpy import read_model_description, extract

from fmugym import FMUGym, FMUGymConfig, VarSpace, State2Out, TargetValue

import gymnasium as gym
from stable_baselines3 import SAC, HerReplayBuffer

### Parameters of the simulation

In [None]:
class FMUEnv(FMUGym):
    def __init__(self, config):
        super().__init__(config)

    def _get_info(self):
        return {'info_time':time.time()}

    # possibility to do processing of FMU outputs to observations of the environment here
    def _get_obs(self):
        
        self._get_fmu_output()

        obs = np.array(list(self.observation.values())).flatten()
        
        noisy_observation = obs + self._get_output_noise()
        
        setpoint = self.setpoint_trajectory(self.y_start, self.y_stop, self.time)

        obs_dict = OrderedDict([
            ('observation', np.array(noisy_observation)),
            ('achieved_goal', np.array(noisy_observation)),
            ('desired_goal', setpoint.astype(np.float32))
        ])
        return obs_dict
    
    def _get_input_noise(self):
        input_noise = []
        for inp_name in self.input_dict:
            input_noise.append(self.input_noise[inp_name].sample()[0])
        return np.array(input_noise)

    def _get_output_noise(self):
        output_noise = []
        for out_name in self.output_dict:
                output_noise.append(self.output_noise[out_name].sample()[0])
        return np.array(output_noise)

    def _get_terminated(self):
        if self.time > self.stop_time:
            self.reset()
            return True, False

        for termination in self.terminations:
            min_value = self.terminations[termination].low[0]
            max_value = self.terminations[termination].high[0]
            if self.observation[termination] < min_value or self.observation[termination] > max_value:
                self.reset()
                return False, True

        return False, False

    def _create_action_space(self, inputs):
        lows = []
        highs = []
        for inp in inputs:
            lows.append(inputs[inp].low[0])
            highs.append(inputs[inp].high[0])
        action_space = gym.spaces.Box(low=np.array(lows), high=np.array(highs), dtype=np.float32)
        return action_space
    
    def _create_observation_space(self, outputs):
        lows = []
        highs = []
        for out in outputs:
            lows.append(outputs[out].low[0])
            highs.append(outputs[out].high[0])
        observation_space = gym.spaces.Dict({
            'observation': gym.spaces.Box(low=np.array(lows), high=np.array(highs), dtype=np.float32),
            'achieved_goal': gym.spaces.Box(low=np.array(lows), high=np.array(highs), dtype=np.float32),
            'desired_goal': gym.spaces.Box(low=np.array(lows), high=np.array(highs), dtype=np.float32)
        })
        return observation_space

    def _noisy_init(self):
        # add noise to setpoint goals
        for ye in self.y_stop:
            self.y_stop[ye] = self.y_stop_range[ye].sample()[0]
        
        # add noise to initial system state
        init_states = {}
        for var in self.random_vars_refs:
            var_ref = self.random_vars_refs[var][0]
            uniform_value = self.random_vars_refs[var][1].sample()[0]
            init_states[var_ref] = uniform_value

            # domain randomization with noisy initial y_start
            if var in self.rand_starts.keys():
                input_string = self.rand_starts[var]
                self.y_start[input_string] = float(uniform_value)
        
        return init_states

    def _process_action(self, action):
        processed_action = action + self._get_input_noise()
        return processed_action

    def setpoint_trajectory(self, y_start, y_stop, time):
        y = []
        for y0, ye in zip(y_start.values(), y_stop.values()):
            y.append((ye - y0)/(self.stop_time-self.start_time)*(time-self.start_time) + y0)
        return np.array(y)

    def _process_reward(self, obs, acts, info):
        achieved_goal = obs["achieved_goal"]
        desired_goal = obs["desired_goal"]
        reward = self.compute_reward(achieved_goal, desired_goal, info)
        return reward

    
    def compute_reward(self, achieved_goal, desired_goal, info):
        """
        compute reward for HER
            achieved_goal: outputs of FMU
            desired_goal: current setpoint of trajectory
            info: [NOT USED]
        Returns:
            float: environment reward
        """

        # Deceptive reward: it is positive (0) only when the goal is achieved
        # Here we are using a vectorized version
        boundary_accuracy = 0.1
        control_error = achieved_goal - desired_goal
        if len(control_error) == self.observation_space["observation"].shape[0]:
                boundary_violated = False
                for err in control_error:
                    boundary_violated = boundary_violated or (abs(err)>boundary_accuracy)            
                reward = -(boundary_violated.astype(np.float32))
                
        else:
            reward = []
            for ctrl_err in control_error:
                boundary_violated = False
                for err in ctrl_err:
                    boundary_violated = boundary_violated or (abs(err)>boundary_accuracy)
                reward.append(-(boundary_violated.astype(np.float32)))
        
        return np.array(reward)

In [None]:
inputs = VarSpace("inputs")
inputs.add_var_box("input1", -10.0, 10.0)
inputs.add_var_box("input2", -10.0, 10.0)

input_noise = VarSpace("input_noise")
input_noise.add_var_box("input1", -5.0, 5.0)
input_noise.add_var_box("input2", -5.0, 5.0)

outputs = VarSpace("outputs")
outputs.add_var_box("output1", -1e6, 1e6)
outputs.add_var_box("output2", -1e6, 1e6)

output_noise = VarSpace("output_noise")
output_noise.add_var_box("output1", -0.05, 0.05)
output_noise.add_var_box("output2", -0.05, 0.05)

# dynamics and domain randomization
random_vars = VarSpace("random_vars")
random_vars.add_var_box("firstOrder.k", 4, 6)
random_vars.add_var_box("firstOrder1.k", 4, 6)
random_vars.add_var_box("firstOrder.y_start", -0.5, 0.5)
random_vars.add_var_box("firstOrder1.y_start", -0.5, 0.5)

# map state variables to corresponding outputs of Modelica Model for init values
set_point_map = State2Out("set_point_map")
set_point_map.add_map("firstOrder.y_start", "output1")
set_point_map.add_map("firstOrder1.y_start", "output2")

set_point_nominal_start = TargetValue("set_point_nominal_start")
set_point_nominal_start.add_target("output1", 0.0)
set_point_nominal_start.add_target("output2", 0.0)

set_point_stop = VarSpace("set_point_stop")
set_point_stop.add_var_box("output1", 1.0, 2.5)
set_point_stop.add_var_box("output2", 1.2, 3.0)

terminations = VarSpace("terminations")

config = FMUGymConfig(fmu_path=os.path.abspath('FMUs/NonlinearMIMO.fmu'),
                      start_time=0.0,
                      stop_time=10.0,
                      sim_step_size=0.01,
                      action_step_size=0.01,
                      inputs=inputs,
                      input_noise=input_noise,
                      outputs=outputs,
                      output_noise=output_noise,
                      random_vars=random_vars,
                      set_point_map=set_point_map,
                      set_point_nominal_start=set_point_nominal_start,
                      set_point_stop=set_point_stop,
                      terminations=terminations
                     )                        

In [None]:
model_description = read_model_description(config.fmu_path)
print(model_description)

### Creation of gym environment

In [None]:
dummyEnv = FMUEnv(config)

### Learning of SB3 agent

In [None]:
model = SAC(
    "MultiInputPolicy",
    dummyEnv,
    replay_buffer_class=HerReplayBuffer,
    replay_buffer_kwargs=dict(
        n_sampled_goal=6,
        goal_selection_strategy="future",
    ),
    ent_coef="auto_1.0",
    learning_starts = int(2*(config.stop_time - config.start_time)/config.sim_step_size),
    gamma=0.99,
    verbose=1,
)

In [None]:
dummyEnv.reset()

In [None]:
import time
timer_start_time = time.time()
model.learn(total_timesteps=3e5)
print("learn duration: ", time.time() - timer_start_time)

In [None]:
model.save("trained_agents/sac_agent_nonlinear_example")

### ... or loading model

In [None]:
model = SAC.load("trained_agents/sac_agent_nonlinear_example.zip", dummyEnv, print_system_info=True)

### Inference

In [None]:
observation, info = dummyEnv.reset()
obs_array = []
act_array = []
obs_array.append(observation["observation"])

num_steps = int((config.stop_time - config.start_time) / config.sim_step_size)
for _ in range(num_steps):
    action, _state = model.predict(observation, deterministic=True) 
    observation, reward, terminated, truncated, info = dummyEnv.step(action)

    if terminated or truncated:
        observation, info = dummyEnv.reset()
    obs_array.append(observation["observation"])
    act_array.append(action)

obs_array = np.array(obs_array).flatten()
obs_array = obs_array.reshape(-1, 2)  
act_array = np.array(act_array).flatten()
act_array = act_array.reshape(-1, 2) 

In [None]:
y1_setpoints = []
y2_setpoints = []

for t in np.arange(config.start_time, config.stop_time, config.sim_step_size):
    y1_setpoints.append(dummyEnv.setpoint_trajectory(dummyEnv.y_start, dummyEnv.y_stop, t)[0])
    y2_setpoints.append(dummyEnv.setpoint_trajectory(dummyEnv.y_start, dummyEnv.y_stop, t)[1])

In [None]:
plt.subplot(2, 1, 1)
plt.plot(obs_array[:, 0], label='y1', color="b")
plt.plot(y1_setpoints, label='y1_set', color="b", linestyle='--')
plt.plot(obs_array[:, 1], label='y2', color="r")
plt.plot(y2_setpoints, label='y2_set', color="r", linestyle='--')
plt.legend()

plt.subplot(2, 1, 2)
plt.plot(act_array[:, 0], label='u1', color="b")
plt.plot(act_array[:, 1], label='u2', color="r")
plt.legend()

plt.show()

In [None]:
dummyEnv.close()