forked from utiasDSL/safe-control-gym
-
Notifications
You must be signed in to change notification settings - Fork 3
/
mpsc.py
286 lines (242 loc) · 12 KB
/
mpsc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
'''Abstract Model Predictive Safety Certification (MPSC).
The core idea is that any learning controller input can be either certificated as safe or, if not safe, corrected
using an MPC controller based on Tube MPC.
Based on
* K.P. Wabsersich and M.N. Zeilinger 'Linear model predictive safety certification for learning-based control' 2019
https://arxiv.org/pdf/1803.08552.pdf
'''
from copy import deepcopy
from abc import ABC, abstractmethod
import numpy as np
from safe_control_gym.safety_filters.base_safety_filter import BaseSafetyFilter
from safe_control_gym.safety_filters.mpsc.mpsc_utils import get_trajectory_on_horizon
from safe_control_gym.safety_filters.mpsc.mpsc_cost_function.one_step_cost import ONE_STEP_COST
from safe_control_gym.safety_filters.mpsc.mpsc_cost_function.constant_cost import CONSTANT_COST
from safe_control_gym.safety_filters.mpsc.mpsc_cost_function.regularized_cost import REGULARIZED_COST
from safe_control_gym.safety_filters.mpsc.mpsc_cost_function.lqr_cost import LQR_COST
from safe_control_gym.safety_filters.mpsc.mpsc_cost_function.precomputed_cost import PRECOMPUTED_COST
from safe_control_gym.safety_filters.mpsc.mpsc_cost_function.learned_cost import LEARNED_COST
from safe_control_gym.controllers.mpc.mpc_utils import get_cost_weight_matrix, reset_constraints
from safe_control_gym.controllers.lqr.lqr_utils import compute_lqr_gain
from safe_control_gym.safety_filters.mpsc.mpsc_utils import Cost_Function
class MPSC(BaseSafetyFilter, ABC):
'''Abstract Model Predictive Safety Certification Class.'''
def __init__(self,
env_func,
horizon: int = 10,
q_lin: list = None,
r_lin: list = None,
integration_algo: str = 'rk4',
warmstart: bool = True,
additional_constraints: list = None,
use_terminal_set: bool = True,
cost_function: Cost_Function = Cost_Function.ONE_STEP_COST,
mpsc_cost_horizon: int = 5,
decay_factor: float = 0.85,
**kwargs
):
'''Initialize the MPSC.
Args:
env_func (partial BenchmarkEnv): Environment for the task.
horizon (int): The MPC horizon.
q_lin, r_lin (list): Q and R gain matrices for linear controller.
integration_algo (str): The algorithm used for integrating the dynamics,
either 'LTI', 'rk4', 'rk', or 'cvodes'.
warmstart (bool): If the previous MPC soln should be used to warmstart the next mpc step.
additional_constraints (list): List of additional constraints to consider.
use_terminal_set (bool): Whether to use a terminal set constraint or not.
cost_function (Cost_Function): A string (from Cost_Function) representing the cost function to be used.
mpsc_cost_horizon (int): How many steps forward to check for constraint violations.
decay_factor (float): How much to discount future costs.
'''
# Store all params/args.
for k, v in locals().items():
if k != 'self' and k != 'kwargs' and '__' not in k:
self.__dict__[k] = v
super().__init__(env_func, **kwargs)
np.random.seed(self.seed)
# Setup the Environments.
self.env = env_func(normalized_rl_action_space=False)
self.training_env = env_func(randomized_init=True,
init_state=None,
cost='quadratic',
normalized_rl_action_space=False,
)
# Setup attributes.
self.reset()
self.dt = self.model.dt
self.Q = get_cost_weight_matrix(q_lin, self.model.nx)
self.R = get_cost_weight_matrix(r_lin, self.model.nu)
self.X_EQ = np.zeros(self.model.nx)
self.U_EQ = self.model.U_EQ
self.set_dynamics()
self.lqr_gain = -compute_lqr_gain(self.model, self.X_EQ, self.U_EQ, self.Q, self.R, discrete_dynamics=True)
self.terminal_set = None
self.prev_action = self.U_EQ
if self.additional_constraints is None:
additional_constraints = []
self.constraints, self.state_constraints_sym, self.input_constraints_sym = reset_constraints(
self.env.constraints.constraints +
additional_constraints)
if cost_function == Cost_Function.ONE_STEP_COST:
self.cost_function = ONE_STEP_COST()
elif cost_function == Cost_Function.CONSTANT_COST:
self.cost_function = CONSTANT_COST(self.env, mpsc_cost_horizon, decay_factor)
elif cost_function == Cost_Function.REGULARIZED_COST:
self.cost_function = REGULARIZED_COST(self.env, mpsc_cost_horizon, decay_factor)
elif cost_function == Cost_Function.LQR_COST:
self.cost_function = LQR_COST(self.env, mpsc_cost_horizon, decay_factor)
elif cost_function == Cost_Function.PRECOMPUTED_COST:
self.cost_function = PRECOMPUTED_COST(self.env, mpsc_cost_horizon, decay_factor, self.output_dir)
elif cost_function == Cost_Function.LEARNED_COST:
self.cost_function = LEARNED_COST(self.env, mpsc_cost_horizon, decay_factor)
else:
raise NotImplementedError(f'The MPSC cost function {cost_function} has not been implemented')
@abstractmethod
def set_dynamics(self):
'''Compute the dynamics.'''
raise NotImplementedError
@abstractmethod
def setup_optimizer(self):
'''Setup the certifying MPC problem.'''
raise NotImplementedError
def before_optimization(self, obs):
'''Setup the optimization.
Args:
obs (ndarray): Current state/observation.
'''
return
def solve_optimization(self,
obs,
uncertified_action,
iteration=None,
):
'''Solve the MPC optimization problem for a given observation and uncertified input.
Args:
obs (ndarray): Current state/observation.
uncertified_action (ndarray): The uncertified_controller's action.
iteration (int): The current iteration, used for trajectory tracking.
Returns:
action (ndarray): The certified action.
feasible (bool): Whether the safety filtering was feasible or not.
'''
opti_dict = self.opti_dict
opti = opti_dict['opti']
z_var = opti_dict['z_var']
v_var = opti_dict['v_var']
next_u = opti_dict['next_u']
u_L = opti_dict['u_L']
x_init = opti_dict['x_init']
X_GOAL = opti_dict['X_GOAL']
opti.set_value(x_init, obs - self.X_EQ)
opti.set_value(u_L, uncertified_action)
clipped_X_GOAL = get_trajectory_on_horizon(self.env, iteration, self.horizon)
opti.set_value(X_GOAL, clipped_X_GOAL)
if isinstance(self.cost_function, REGULARIZED_COST):
opti_dict['prev_u_val'] = self.prev_action
self.cost_function.prepare_cost_variables(opti_dict, obs, iteration)
# Initial guess for optimization problem.
if (self.warmstart and
self.z_prev is not None and
self.v_prev is not None):
# Shift previous solutions by 1 step.
z_guess = deepcopy(self.z_prev)
v_guess = deepcopy(self.v_prev)
z_guess[:, :-1] = z_guess[:, 1:]
v_guess[:-1] = v_guess[1:]
opti.set_initial(z_var, z_guess)
opti.set_initial(v_var, v_guess)
# Solve the optimization problem.
try:
sol = opti.solve()
x_val, u_val, next_u_val = sol.value(z_var), sol.value(v_var), sol.value(next_u)
self.z_prev = x_val
self.v_prev = u_val.reshape((self.model.nu), self.horizon)
self.next_u_prev = next_u_val
# Take the first one from solved action sequence.
action = next_u_val
self.prev_action = next_u_val
feasible = True
except Exception as e:
print('Error Return Status:', opti.debug.return_status())
print(e)
feasible = False
action = None
return action, feasible
def certify_action(self,
current_state,
uncertified_action,
info=None,
):
'''Algorithm 1 from Wabsersich 2019.
Args:
current_state (ndarray): Current state/observation.
uncertified_action (ndarray): The uncertified_controller's action.
info (dict): The info at this timestep.
Returns:
certified_action (ndarray): The certified action
success (bool): Whether the safety filtering was successful or not.
'''
uncertified_action = np.clip(uncertified_action, self.env.physical_action_bounds[0], self.env.physical_action_bounds[1])
self.results_dict['uncertified_action'].append(uncertified_action)
success = True
self.before_optimization(current_state)
iteration = self.extract_step(info)
action, feasible = self.solve_optimization(current_state, uncertified_action, iteration)
self.results_dict['feasible'].append(feasible)
if feasible:
self.kinf = 0
certified_action = action
else:
self.kinf += 1
if (self.kinf <= self.horizon - 1 and
self.z_prev is not None and
self.v_prev is not None):
action = np.squeeze(self.v_prev[:, self.kinf]) + \
np.squeeze(self.lqr_gain @ (current_state.reshape((self.model.nx, 1)) - self.z_prev[:, self.kinf].reshape((self.model.nx, 1))))
if self.integration_algo == 'LTI':
action = np.squeeze(action) + np.squeeze(self.U_EQ)
action = np.squeeze(action)
clipped_action = np.clip(action, self.constraints.input_constraints[0].lower_bounds, self.constraints.input_constraints[0].upper_bounds)
if np.linalg.norm(clipped_action - action) >= 0.01:
success = False
certified_action = clipped_action
else:
action = np.squeeze(self.lqr_gain @ (current_state - self.X_EQ))
if self.integration_algo == 'LTI':
action += np.squeeze(self.U_EQ)
clipped_action = np.clip(action, self.constraints.input_constraints[0].lower_bounds, self.constraints.input_constraints[0].upper_bounds)
success = False
certified_action = clipped_action
certified_action = np.squeeze(np.array(certified_action))
self.results_dict['kinf'].append(self.kinf)
self.results_dict['certified_action'].append(certified_action)
self.results_dict['correction'].append(np.linalg.norm(certified_action - uncertified_action))
return certified_action, success
def setup_results_dict(self):
'''Setup the results dictionary to store run information.'''
self.results_dict = {}
self.results_dict['feasible'] = []
self.results_dict['kinf'] = []
self.results_dict['uncertified_action'] = []
self.results_dict['certified_action'] = []
self.results_dict['correction'] = []
def close(self):
'''Cleans up resources.'''
self.env.close()
self.training_env.close()
def reset(self):
'''Prepares for training or evaluation.'''
self.model = self.get_prior(self.env, self.prior_info)
self.env.reset()
self.training_env.reset()
self.reset_before_run()
def reset_before_run(self, env=None):
'''Reinitialize just the controller before a new run.
Args:
env (BenchmarkEnv): The environment to be used for the new run.
'''
self.z_prev = None
self.v_prev = None
self.kinf = self.horizon - 1
self.setup_results_dict()