-
Notifications
You must be signed in to change notification settings - Fork 240
/
complex_step.py
241 lines (196 loc) · 7.12 KB
/
complex_step.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
"""Complex Step derivative approximations."""
from collections import defaultdict
import numpy as np
from openmdao.utils.om_warnings import issue_warning, DerivativesWarning
from openmdao.approximation_schemes.approximation_scheme import ApproximationScheme
class ComplexStep(ApproximationScheme):
r"""
Approximation scheme using complex step to calculate derivatives.
For example, using a step size of 'h' will approximate the derivative in
the following way:
.. math::
f'(x) = \Im{\frac{f(x+ih)}{h}}.
Attributes
----------
_fd : <FiniteDifference>
When nested complex step is detected, we switch to Finite Difference.
"""
DEFAULT_OPTIONS = {
'step': 1e-40,
'directional': False,
}
def __init__(self):
"""
Initialize the ApproximationScheme.
"""
super().__init__()
# Only used when nested under complex step.
self._fd = None
def add_approximation(self, abs_key, system, kwargs, vector=None):
"""
Use this approximation scheme to approximate the derivative d(of)/d(wrt).
Parameters
----------
abs_key : tuple(str,str)
Absolute name pairing of (of, wrt) for the derivative.
system : System
Containing System.
kwargs : dict
Additional keyword arguments, to be interpreted by sub-classes.
vector : ndarray or None
Direction for difference when using directional derivatives.
"""
options = self.DEFAULT_OPTIONS.copy()
options.update(kwargs)
options['vector'] = vector
wrt = abs_key[1]
if wrt in self._wrt_meta:
self._wrt_meta[wrt].update(options)
else:
self._wrt_meta[wrt] = options
self._reset() # force later regen of approx_groups
def _get_approx_data(self, system, wrt, meta):
"""
Given approximation metadata, compute necessary delta for complex step.
Parameters
----------
system : System
System whose derivatives are being approximated.
wrt : str
Name of wrt variable.
meta : dict
Metadata dict.
Returns
-------
float
Delta needed for complex step perturbation.
"""
step = meta['step']
step *= 1j
return step
def compute_approx_col_iter(self, system, under_cs=False):
"""
Execute the system to compute the approximate sub-Jacobians.
Parameters
----------
system : System
System on which the execution is run.
under_cs : bool
True if we're currently under complex step at a higher level.
Yields
------
int
column index
ndarray
solution array corresponding to the jacobian column at the given column index
"""
if not self._wrt_meta:
return
if system.under_complex_step:
# If we are nested under another complex step, then warn and swap to FD.
if not self._fd:
from openmdao.approximation_schemes.finite_difference import FiniteDifference
issue_warning("Nested complex step detected. Finite difference will be used.",
prefix=system.pathname, category=DerivativesWarning)
fd = self._fd = FiniteDifference()
empty = {}
for wrt in self._wrt_meta:
fd.add_approximation(wrt, system, empty)
yield from self._fd.compute_approx_col_iter(system)
return
saved_inputs = system._inputs._get_data().copy()
system._inputs._data.imag[:] = 0.0
saved_outputs = system._outputs.asarray(copy=True)
system._outputs._data.imag[:] = 0.0
saved_resids = system._residuals.asarray(copy=True)
system._residuals._data.imag[:] = 0.0
# Turn on complex step.
system._set_complex_step_mode(True)
try:
for tup in self._compute_approx_col_iter(system, under_cs=True):
yield tup
# this was needed after adding relevance to the NL solve in order to clean
# out old results left over in the output array from a previous solve.
system._outputs.set_val(saved_outputs)
finally:
# Turn off complex step.
system._set_complex_step_mode(False)
system._inputs.set_val(saved_inputs)
system._outputs.set_val(saved_outputs)
system._residuals.set_val(saved_resids)
def _get_multiplier(self, delta):
"""
Return a multiplier to be applied to the jacobian.
Parameters
----------
delta : complex
Complex number used to compute the multiplier.
Returns
-------
float
multiplier to apply to the jacobian.
"""
return (1.0 / delta * 1j).real
def _transform_result(self, array):
"""
Return the imaginary part of the given array.
Parameters
----------
array : ndarray of complex
Result array after doing a complex step.
Returns
-------
ndarray
Imaginary part of the result array.
"""
return array.imag
def _run_point(self, system, idx_info, delta, result_array, total, idx_start=0):
"""
Perturb the system inputs with a complex step, run, and return the results.
Parameters
----------
system : System
The system having its derivs approximated.
idx_info : tuple of (Vector, ndarray of int)
Tuple of wrt indices and corresponding data vector to perturb.
delta : complex
Perturbation amount.
result_array : ndarray
An array used to store the results.
total : bool
If True total derivatives are being approximated, else partials.
idx_start : int
Vector index of the first element of this wrt variable.
Returns
-------
ndarray
Copy of the outputs or residuals array after running the perturbed system.
"""
for vec, idxs in idx_info:
if vec is not None and idxs is not None:
vec.iadd(delta, idxs)
if total:
system.run_solve_nonlinear()
result_array[:] = system._outputs.asarray()
else:
system.run_apply_nonlinear()
result_array[:] = system._residuals.asarray()
for vec, idxs in idx_info:
if vec is not None and idxs is not None:
vec.isub(delta, idxs)
return result_array
def apply_directional(self, data, direction):
"""
Apply stepsize to direction and embed into approximation data.
Parameters
----------
data : float
Step size for complex step.
direction : ndarray
Vector containing derivative direction.
Returns
-------
ndarray
New step direction.
"""
return data * direction