Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
262 lines (187 sloc) 8.35 KB
import numpy as np
import theano
import theano.tensor as tt
import scipy
THEANO_FLAGS = 'optimizer=fast_compile'
THEANO_FLAG = 'compute_test_value=ignore'
class DifferentialEquation(theano.Op):
Specify an ordinary differential equation
.. math::
\dfrac{dy}{dt} = f(y,t,p) \quad y(t_0) = y_0
func : callable
Function specifying the differential equation
t0 : float
Time corresponding to the initial condition
times : array
Array of times at which to evaluate the solution of the differential equation.
n_states : int
Dimension of the differential equation. For scalar differential equations, n_states =1.
For vector valued differential equations, n_states = number of differential equations iun the system.
n_odeparams : int
Number of parameters in the differential equation.
.. code-block:: python
def odefunc(y,t,p):
#Logistic differential equation
return p[0]*y[0]*(1-y[0])
times = np.arange(0.5, 5, 0.5)
ode_model = DifferentialEquation(func = odefunc, t0 = 0, times = times, n_states = 1, n_odeparams = 1)
__props__ = ()
def __init__(self, func, t0, times, n_states, n_odeparams):
if not callable(func):
raise ValueError("Argument func must be callable.")
if np.any(np.diff(times)<0):
raise ValueError("The values in times must be monotonically increasing or monotonically decreasing; repeated values are allowed.")
if n_states<1:
raise ValueError('Argument n_states must be at least 1.')
if n_odeparams<0:
raise ValueError('Argument n_states must be non-negative.')
self.func = func
self.t0 = t0
self.times = times
self.n_states = n_states
self.n_odeparams = n_odeparams
self._n = n_states
self._m = n_odeparams + n_states
self._augmented_times = np.insert(times, t0, 0)
self._augmented_func = _augment_system(func, self._n, self._m)
self._sens_ic = self._make_sens_ic()
self._cached_y = None
self._cached_sens = None
self._cached_parameters = None
def _make_sens_ic(self):
# The sensitivity matrix will always have consistent form.
# If the first n_odeparams entries of the parameters vector in the simulate call
# correspond to ode paramaters, then the first n_odeparams columns in
# the sensitivity matrix will be 0
sens_matrix = np.zeros((self._n, self._m))
# If the last n_states entrues of the paramters vector in the simulate call
# correspond to initial conditions of the system,
# then the last n_states columns of the sensitivity matrix should form
# an identity matrix
sens_matrix[:, -self.n_states:] = np.eye(self.n_states)
# We need the sensitivity matrix to be a vector (see augmented_function)
# Ravel and return
dydp = sens_matrix.ravel()
return dydp
def _system(self, Y, t, p):
This is the function that will be passed to odeint.
Solves both ODE and sensitivities
Y (vector): current state and current gradient state
t (scalar): current time
p (vector): parameters
derivatives (vector): derivatives of state and gradient
dydt, ddt_dydp = self._augmented_func(Y[:self._n], t, p, Y[self._n:])
derivatives = np.concatenate([dydt, ddt_dydp])
return derivatives
def _simulate(self, parameters):
# Initial condition comprised of state initial conditions and raveled
# sensitivity matrix
y0 = np.concatenate([ parameters[self.n_odeparams:] , self._sens_ic])
# perform the integration
sol = scipy.integrate.odeint(func=self._system,
# The solution
y = sol[1:, :self.n_states]
# The sensitivities, reshaped to be a sequence of matrices
sens = sol[1:, self.n_states:].reshape(len(self.times), self._n, self._m)
return y, sens
def _cached_simulate(self, parameters):
if np.array_equal(np.array(parameters), self._cached_parameters):
return self._cached_y, self._cached_sens
return self._simulate(np.array(parameters))
def state(self, x):
y, sens = self._cached_simulate(np.array(x, dtype=np.float64))
self._cached_y, self._cached_sens, self._cached_parameters = y, sens, x
return y.ravel()
def numpy_vsp(self, x, g):
numpy_sens = self._cached_simulate(np.array(x, dtype=np.float64))[1].reshape((self.n_states * len(self.times), len(x)))
def make_node(self, odeparams, y0):
if len(odeparams)!=self.n_odeparams:
raise ValueError('odeparams has too many or too few parameters. Expected {a} paramteres but got {b}'.format(a = self.n_odeparams, b = len(odeparams)))
if len(y0)!=self.n_states:
raise ValueError('y0 has too many or too few parameters. Expected {a} paramteres but got {b}'.format(a = self.n_states, b = len(y0)))
if np.ndim(odeparams) > 1:
odeparams = np.ravel(odeparams)
if np.ndim(y0) > 1:
y0 = np.ravel(y0)
odeparams = tt.as_tensor_variable(odeparams)
y0 = tt.as_tensor_variable(y0)
x = tt.concatenate([odeparams, y0])
return theano.Apply(self, [x], [x.type()])
def perform(self, node, inputs_storage, output_storage):
x = inputs_storage[0]
out = output_storage[0]
# get the numerical solution of ODE states
out[0] = self.state(x)
def grad(self, inputs, output_grads):
x = inputs[0]
g = output_grads[0]
# pass the VSP when asked for gradient
grad_op = ODEGradop(self.numpy_vsp)
grad_op_apply = grad_op(x, g)
return [grad_op_apply]
class ODEGradop(theano.Op):
def __init__(self, numpy_vsp):
self._numpy_vsp = numpy_vsp
def make_node(self, x, g):
x = theano.tensor.as_tensor_variable(x)
g = theano.tensor.as_tensor_variable(g)
node = theano.Apply(self, [x, g], [g.type()])
return node
def perform(self, node, inputs_storage, output_storage):
x = inputs_storage[0]
g = inputs_storage[1]
out = output_storage[0]
out[0] = self._numpy_vsp(x, g) # get the numerical VSP
def _augment_system(ode_func, n, m):
'''Function to create augmented system.
Take a function which specifies a set of differential equations and return
a compiled function which allows for computation of gradients of the
differential equation's solition with repsect to the parameters.
ode_func (function): Differential equation. Returns array-like
n: Number of rows of the sensitivity matrix
m: Number of columns of the sensitivity matrix
system (function): Augemted system of differential equations.
# Present state of the system
t_y = tt.vector('y', dtype=theano.config.floatX)
# Parameter(s). Should be vector to allow for generaliztion to multiparameter
# systems of ODEs
t_p = tt.vector('p', dtype=theano.config.floatX)
# Time. Allow for non-automonous systems of ODEs to be analyzed
t_t = tt.scalar('t', dtype=theano.config.floatX)
# Present state of the gradients:
# Will always be 0 unless the parameter is the inital condition
# Entry i,j is partial of y[i] wrt to p[j]
dydp_vec = tt.vector('dydp', dtype=theano.config.floatX)
dydp = dydp_vec.reshape((n, m))
# Stack the results of the ode_func
# TODO: Does this behave the same of ODE is scalar?
f_tensor = tt.stack(ode_func(t_y, t_t, t_p))
# Now compute gradients
J = tt.jacobian(f_tensor, t_y)
Jdfdy =, dydp)
grad_f = tt.jacobian(f_tensor, t_p)
# This is the time derivative of dydp
ddt_dydp = (Jdfdy + grad_f).flatten()
system = theano.function(
inputs=[t_y, t_t, t_p, dydp_vec],
outputs=[f_tensor, ddt_dydp],
return system
You can’t perform that action at this time.