-
Notifications
You must be signed in to change notification settings - Fork 8
/
differentiation.py
256 lines (196 loc) · 9.18 KB
/
differentiation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import abc
import numpy as np
from .utils import _memoize_arrays
methods = {}
default = ["finite_difference", {"k": 1}]
def register(name=""):
def inner(f):
n = name or f.__name__
methods[n] = f
return f
return inner
@_memoize_arrays()
def _gen_method(x, t, kind, axis, **kwargs):
return methods.get(kind)(**kwargs)
def dxdt(x, t, kind=None, axis=1, **kwargs):
"""
Compute the derivative of x with respect to t along axis using the numerical derivative specified by "kind". This is
the functional interface of the Derivative class.
This function requires that X and t have equal length along axis. An empty X results in an empty derivative. If
X.shape[axis] == 1, then the derivative cannot be computed in a reasonable way and X is returned.
The implementation 'kind', an instance of the Derivative class, is responsbile for determining the behavior.
Args:
x (:obj:`ndarray` of float): Ordered measurement values.
t (:obj:`ndarray` of float): Ordered measurement times.
kind (string): Derivative method name (see available kinds).
axis ({0,1}): Axis of x along which to differentiate. Default 1.
**kwargs: Keyword arguments for the derivative method "kind".
Available kinds
- finite_difference. Required kwargs: k (symmetric window size as index).
- savitzky_golay. Required kwargs: order (of a fit polynomial), left, right (window size).
- spectral. Required kwargs: None.
- spline. Required kwargs: s (smoothing).
- trend_filtered. Required kwargs: order (of a fit polynomial), alpha (regularization).
Returns:
:obj:`ndarray` of float: Returns dx/dt along axis.
"""
if kind is None:
method = _gen_method(x, t, default[0], axis, **default[1])
return method.d(x, t, axis=axis)
else:
method = _gen_method(x, t, kind, axis, **kwargs)
return method.d(x, t, axis=axis)
def smooth_x(x, t, kind=None, axis=1, **kwargs):
"""
Compute the smoothed version of x given t along axis using the numerical
derivative specified by "kind". This is the functional interface of
the Derivative class's x() method.
This function requires that X and t have equal length along axis. If
X.shape[axis] == 1, then the smoothing cannot be computed in a reasonable way and X is returned.
The implementation 'kind', an instance of the Derivative class, is responsbile for determining the behavior.
Args:
x (:obj:`ndarray` of float): Ordered measurement values.
t (:obj:`ndarray` of float): Ordered measurement times.
kind (string): Derivative method name (see available kinds).
axis ({0,1}): Axis of x along which to differentiate. Default 1.
**kwargs: Keyword arguments for the derivative method "kind".
Available kinds
- finite_difference. Required kwargs: k (symmetric window size as index).
- savitzky_golay. Required kwargs: order (of a fit polynomial), left, right (window size).
- spectral. Required kwargs: None.
- spline. Required kwargs: s (smoothing).
- trend_filtered. Required kwargs: order (of a fit polynomial), alpha (regularization).
Returns:
:obj:`ndarray` of float: Returns dx/dt along axis.
"""
if kind is None:
method = _gen_method(x, t, default[0], axis, **default[1])
return method.x(x, t, axis=axis)
else:
method = _gen_method(x, t, kind, axis, **kwargs)
return method.x(x, t, axis=axis)
class Derivative(abc.ABC):
""" Interface for computing numerical derivatives. """
@abc.abstractmethod
def compute(self, t, x, i):
"""
Compute the derivative of one-dimensional data x with respect to t at the index i of x, (dx/dt)[i].
Computation of a derivative should fail explicitely if the implementation is unable to compute a derivative at
the desired index. Used for global differentiation methods, for example.
This requires that x and t have equal lengths >= 2, and that the index i is a valid index.
For each implementation, any exceptions raised by a valid input should either be handled or denoted in the
implementation docstring. For example, some implementations may raise an exception when x and t have length 2.
Args:
t (:obj:`ndarray` of float): Ordered measurement times.
x (:obj:`ndarray` of float): Ordered measurement values.
i (int): Index i at which to compute (dx/dt)[i]
Returns:
float: (dx/dt)[i]
"""
def compute_for(self, t, x, indices):
"""
Compute derivative (dx/dt)[i] for i in indices. Overload this if desiring a more efficient computation over a
list of indices.
This function requires that x and t have equal length along axis, and that all of the indicies are valid.
Args:
t (:obj:`ndarray` of float): Ordered measurement times.
x (:obj:`ndarray` of float): Ordered measurement values.
indices (:obj:`ndarray` of int): Indices i at which to compute (dx/dt)[i]
Returns:
Generator[float]: yields (dx/dt)[i] for i in indices
"""
for i in indices:
yield self.compute(t, x, i)
def compute_x(self, t, x, i):
"""
Compute smoothed values of one-dimensional data x at the index i of x.
Overload this if subclass actually smooths values.
This requires that x and t have equal lengths >= 2, and that the index i is a valid index.
For each implementation, any exceptions raised by a valid input should either be handled or denoted in the
implementation docstring. For example, some implementations may raise an exception when x and t have length 2.
Args:
t (:obj:`ndarray` of float): Ordered measurement times.
x (:obj:`ndarray` of float): Ordered measurement values.
i (int): Index i at which to returned smoothed values
Returns:
float
"""
return x[i]
def compute_x_for(self, t, x, indices):
"""
Compute smoothed values of x at each i in indices. Overload
this if desiring a more efficient computation over a list of
indices.
This function requires that x and t have equal length along axis, and that all of the indicies are valid.
Args:
t (:obj:`ndarray` of float): Ordered measurement times.
x (:obj:`ndarray` of float): Ordered measurement values.
indices (:obj:`ndarray` of int): Indices i at which to compute (dx/dt)[i]
Returns:
Generator[float]: yields (dx/dt)[i] for i in indices
"""
for i in indices:
yield self.compute_x(t, x, i)
def d(self, X, t, axis=1):
"""
Compute the derivative of measurements X taken at times t.
An empty X results in an empty derivative. If X.shape[axis] == 1, then the derivative cannot be computed in a
reasonable way and X is returned.
Args:
X (:obj:`ndarray` of float): Ordered measurements values. Multiple measurements allowed.
t (:obj:`ndarray` of float): Ordered measurement times.
axis ({0,1}). axis of X along which to differentiate. default 1.
Returns:
:obj:`ndarray` of float: Returns dX/dt along axis.
Raises:
ValueError: Requires that X.shape[axis] equals len(t). If X is flat, requires that len(X) equals len(t).
"""
X, flat = _align_axes(X, t, axis)
if X.shape[1] == 1:
dX = X
else:
dX = np.array([list(self.compute_for(t, x, np.arange(len(t)))) for x in X])
return _restore_axes(dX, axis, flat)
def x(self, X, t, axis=1):
"""
Compute the smoothed X values from measurements X taken at times t.
Not all methods perform smoothing when calculating derivatives. In
these cases, X is returned unmodified
Args:
X (:obj:`ndarray` of float): Ordered measurements values. Multiple measurements allowed.
t (:obj:`ndarray` of float): Ordered measurement times.
axis ({0,1}). axis of X along which to smooth. default 1.
Returns:
:obj:`ndarray` of float: Returns dX/dt along axis.
"""
X, flat = _align_axes(X, t, axis)
if X.shape[1] == 1:
dX = X
else:
dX = np.array([list(self.compute_x_for(t, x, np.arange(len(t)))) for x in X])
return _restore_axes(dX, axis, flat)
def _align_axes(X, t, axis):
# Cast
X = np.array(X)
flat = False
# Check shape and axis
if len(X.shape) == 1:
X = X.reshape(1, -1)
flat = True
elif len(X.shape) == 2:
if axis == 0:
X = X.T
elif axis == 1:
pass
else:
raise ValueError("Invalid axis.")
else:
raise ValueError("Invalid shape of X.")
if X.shape[1] != len(t):
raise ValueError("Desired X axis size does not match t size.")
return X, flat
def _restore_axes(dX, axis, flat):
if flat:
return dX.flatten()
else:
return dX if axis == 1 else dX.T